mirror of
https://github.com/apache/impala.git
synced 2025-12-19 09:58:28 -05:00
It's not that easy to find log files of a custom-cluster test. All custom-cluster tests use the same log dir and the test output just shows the symlink of the log files, e.g. "Starting State Store logging to .../logs/custom_cluster_tests/statestored.INFO". This patch prints the actual log file names after the cluster launchs. An example output: 15:17:19 MainThread: Starting State Store logging to /tmp/statestored.INFO 15:17:19 MainThread: Starting Catalog Service logging to /tmp/catalogd.INFO 15:17:19 MainThread: Starting Impala Daemon logging to /tmp/impalad.INFO 15:17:19 MainThread: Starting Impala Daemon logging to /tmp/impalad_node1.INFO 15:17:19 MainThread: Starting Impala Daemon logging to /tmp/impalad_node2.INFO ... 15:17:24 MainThread: Total wait: 2.54s 15:17:24 MainThread: Actual log file names: 15:17:24 MainThread: statestored.INFO -> statestored.quanlong-Precision-3680.quanlong.log.INFO.20251216-151719.1094348 15:17:24 MainThread: catalogd.INFO -> catalogd.quanlong-Precision-3680.quanlong.log.INFO.20251216-151719.1094368 15:17:24 MainThread: impalad.INFO -> impalad.quanlong-Precision-3680.quanlong.log.INFO.20251216-151719.1094466 15:17:24 MainThread: impalad_node1.INFO -> impalad.quanlong-Precision-3680.quanlong.log.INFO.20251216-151719.1094468 15:17:24 MainThread: impalad_node2.INFO -> impalad.quanlong-Precision-3680.quanlong.log.INFO.20251216-151719.1094470 15:17:24 MainThread: Impala Cluster Running with 3 nodes (3 coordinators, 3 executors). Tests - Ran the script locally. - Ran a failed custom-cluster test and verified the actual file names are printed in the output. Change-Id: Id76c0a8bdfb221ab24ee315e2e273abca4257398 Reviewed-on: http://gerrit.cloudera.org:8080/23781 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Quanlong Huang <huangquanlong@gmail.com>
1308 lines
64 KiB
Python
Executable File
1308 lines
64 KiB
Python
Executable File
#!/usr/bin/env impala-python3
|
|
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
# Starts up an Impala cluster (ImpalaD + State Store) with the specified number of
|
|
# ImpalaD instances. Each ImpalaD runs on a different port allowing this to be run
|
|
# on a single machine.
|
|
from __future__ import absolute_import, division, print_function
|
|
from builtins import range
|
|
import getpass
|
|
import itertools
|
|
import json
|
|
import logging
|
|
import os
|
|
import psutil
|
|
import shlex
|
|
import sys
|
|
from datetime import datetime
|
|
from getpass import getuser
|
|
from time import sleep, time
|
|
from optparse import OptionParser, SUPPRESS_HELP
|
|
from subprocess import call, check_call, check_output
|
|
from tests.common.environ import build_flavor_timeout
|
|
from tests.common.impala_cluster import (ImpalaCluster, DEFAULT_BEESWAX_PORT,
|
|
DEFAULT_HS2_PORT, DEFAULT_KRPC_PORT, DEFAULT_HS2_HTTP_PORT,
|
|
DEFAULT_STATE_STORE_SUBSCRIBER_PORT, DEFAULT_IMPALAD_WEBSERVER_PORT,
|
|
DEFAULT_STATESTORED_WEBSERVER_PORT, DEFAULT_CATALOGD_WEBSERVER_PORT,
|
|
DEFAULT_ADMISSIOND_WEBSERVER_PORT, DEFAULT_CATALOGD_JVM_DEBUG_PORT,
|
|
DEFAULT_CATALOG_SERVICE_PORT, DEFAULT_CATALOGD_STATE_STORE_SUBSCRIBER_PORT,
|
|
DEFAULT_EXTERNAL_FE_PORT, DEFAULT_IMPALAD_JVM_DEBUG_PORT,
|
|
DEFAULT_STATESTORE_SERVICE_PORT, DEFAULT_STATESTORE_HA_SERVICE_PORT,
|
|
DEFAULT_PEER_STATESTORE_HA_SERVICE_PORT,
|
|
find_user_processes, run_daemon)
|
|
|
|
LOG = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
|
|
LOG.setLevel(level=logging.DEBUG)
|
|
|
|
KUDU_MASTER_HOSTS = os.getenv("KUDU_MASTER_HOSTS", "127.0.0.1")
|
|
DEFAULT_IMPALA_MAX_LOG_FILES = os.environ.get("IMPALA_MAX_LOG_FILES", 10)
|
|
INTERNAL_LISTEN_HOST = os.getenv("INTERNAL_LISTEN_HOST", "localhost")
|
|
TARGET_FILESYSTEM = os.getenv("TARGET_FILESYSTEM") or "hdfs"
|
|
HOST_TZ = os.getenv("TZ", None)
|
|
|
|
# Options
|
|
parser = OptionParser()
|
|
parser.add_option("-s", "--cluster_size", type="int", dest="cluster_size", default=3,
|
|
help="Size of the cluster (number of impalad instances to start).")
|
|
parser.add_option("-c", "--num_coordinators", type="int", dest="num_coordinators",
|
|
default=3, help="Number of coordinators.")
|
|
parser.add_option("--use_exclusive_coordinators", dest="use_exclusive_coordinators",
|
|
action="store_true", default=False, help="If true, coordinators only "
|
|
"coordinate queries and execute coordinator fragments. If false, "
|
|
"coordinators also act as executors.")
|
|
parser.add_option("--build_type", dest="build_type", default= "latest",
|
|
help="Build type to use - debug / release / latest")
|
|
parser.add_option("--impalad_args", dest="impalad_args", action="append", type="string",
|
|
default=[],
|
|
help="Additional arguments to pass to each Impalad during startup")
|
|
parser.add_option("--state_store_args", dest="state_store_args", action="append",
|
|
type="string", default=[],
|
|
help="Additional arguments to pass to State Store during startup")
|
|
parser.add_option("--catalogd_args", dest="catalogd_args", action="append",
|
|
type="string", default=[],
|
|
help="Additional arguments to pass to the Catalog Service at startup")
|
|
parser.add_option("--admissiond_args", dest="admissiond_args",
|
|
action="append", type="string", default=[], help="Additional arguments "
|
|
"to pass to the Admission Control Service at startup")
|
|
parser.add_option("--kill", "--kill_only", dest="kill_only", action="store_true",
|
|
default=False, help="Instead of starting the cluster, just kill all"
|
|
" the running impalads and the statestored.")
|
|
parser.add_option("--force_kill", dest="force_kill", action="store_true", default=False,
|
|
help="Force kill impalad and statestore processes.")
|
|
parser.add_option("-a", "--add_executors", dest="add_executors",
|
|
action="store_true", default=False,
|
|
help="Start additional executors. The executor group name must be"
|
|
"specified using --impalad_args")
|
|
parser.add_option("--add_impalads", dest="add_impalads",
|
|
action="store_true", default=False,
|
|
help="Start additional impalad processes.")
|
|
parser.add_option("-r", "--restart_impalad_only", dest="restart_impalad_only",
|
|
action="store_true", default=False,
|
|
help="Restarts only the impalad processes")
|
|
parser.add_option("--restart_catalogd_only", dest="restart_catalogd_only",
|
|
action="store_true", default=False,
|
|
help="Restarts only the catalogd process")
|
|
parser.add_option("--restart_statestored_only", dest="restart_statestored_only",
|
|
action="store_true", default=False,
|
|
help="Restarts only the statestored process")
|
|
parser.add_option("--in-process", dest="inprocess", action="store_true", default=False,
|
|
help="Start all Impala backends and state store in a single process.")
|
|
parser.add_option("--log_dir", dest="log_dir",
|
|
default=os.environ["IMPALA_CLUSTER_LOGS_DIR"],
|
|
help="Directory to store output logs to.")
|
|
parser.add_option("--max_log_files", default=DEFAULT_IMPALA_MAX_LOG_FILES,
|
|
help="Max number of log files before rotation occurs.")
|
|
parser.add_option("--log_level", type="int", dest="log_level", default=1,
|
|
help="Set the impalad backend logging level")
|
|
parser.add_option("--ignore_pid_on_log_rotation", dest="ignore_pid_on_log_rotation",
|
|
action='store_true', default=False,
|
|
help=("Determine if log rotation should ignore or match PID in "
|
|
"log file name."))
|
|
parser.add_option("--jvm_args", dest="jvm_args", default="",
|
|
help="Additional arguments to pass to the JVM(s) during startup.")
|
|
parser.add_option("--env_vars", dest="env_vars", default="",
|
|
help="Additional environment variables for Impala to run with")
|
|
parser.add_option("--kudu_master_hosts", default=KUDU_MASTER_HOSTS,
|
|
help="The host name or address of the Kudu master. Multiple masters "
|
|
"can be specified using a comma separated list.")
|
|
parser.add_option("--docker_network", dest="docker_network", default=None,
|
|
help="If set, the cluster processes run inside docker containers "
|
|
"(which must be already built, e.g. with 'make docker_images'. "
|
|
"The containers are connected to the virtual network specified by "
|
|
"the argument value. This is currently experimental and not all "
|
|
"actions work. This mode only works on python 2.7+")
|
|
parser.add_option("--docker_auto_ports", dest="docker_auto_ports",
|
|
action="store_true", default=False,
|
|
help="Only has an effect if --docker_network is set. If true, Docker "
|
|
"will automatically allocate ports for client-facing endpoints "
|
|
"(Beewax, HS2, Web UIs, etc), which avoids collisions with other "
|
|
"running processes. If false, ports are mapped to the same ports "
|
|
"on localhost as the non-docker impala cluster.")
|
|
parser.add_option("--mount_sources", dest="mount_sources", action="store_true",
|
|
help="Mount the $IMPALA_HOME directory as /opt/impala/sources into "
|
|
"the containers for easier debugging.")
|
|
parser.add_option("--data_cache_dir", dest="data_cache_dir", default=None,
|
|
help="This specifies a base directory in which the IO data cache will "
|
|
"use.")
|
|
parser.add_option("--data_cache_size", dest="data_cache_size", default=0,
|
|
help="This specifies the maximum storage usage of the IO data cache "
|
|
"each Impala daemon can use.")
|
|
parser.add_option("--data_cache_eviction_policy", dest="data_cache_eviction_policy",
|
|
default="LRU", help="This specifies the cache eviction policy to use "
|
|
"for the data cache")
|
|
parser.add_option("--data_cache_num_async_write_threads",
|
|
dest="data_cache_num_async_write_threads", default=0,
|
|
help="This specifies the number of asynchronous write threads for the "
|
|
"data cache, with 0 set means synchronous writes.")
|
|
parser.add_option("--data_cache_enable_tracing", dest="data_cache_enable_tracing",
|
|
action="store_true", default=False,
|
|
help="If the data cache is enabled, this enables tracing accesses.")
|
|
parser.add_option("--enable_admission_service", dest="enable_admission_service",
|
|
action="store_true", default=False,
|
|
help="If true, enables the Admissison Control Service - the cluster "
|
|
"will be launched with an admissiond and all coordinators configured "
|
|
"to use it for admission control.")
|
|
parser.add_option("--enable_external_fe_support", dest="enable_external_fe_support",
|
|
action="store_true", default=False,
|
|
help="If true, impalads will start with the external_fe_port defined.")
|
|
parser.add_option("--geospatial_library", dest="geospatial_library",
|
|
action="store", default="HIVE_ESRI",
|
|
help="Sets which implementation of geospatial libraries should be "
|
|
"initialized")
|
|
parser.add_option("--enable_catalogd_ha", dest="enable_catalogd_ha",
|
|
action="store_true", default=False,
|
|
help="If true, enables CatalogD HA - the cluster will be launched "
|
|
"with two catalogd instances as Active-Passive HA pair.")
|
|
parser.add_option("--no_catalogd", dest="no_catalogd",
|
|
action="store_true", default=False,
|
|
help="If true, there will be no CatalogD.")
|
|
parser.add_option("--jni_frontend_class", dest="jni_frontend_class",
|
|
action="store", default="",
|
|
help="Use a custom java frontend interface.")
|
|
parser.add_option("--enable_statestored_ha", dest="enable_statestored_ha",
|
|
action="store_true", default=False,
|
|
help="If true, enables StatestoreD HA - the cluster will be launched "
|
|
"with two statestored instances as Active-Passive HA pair.")
|
|
parser.add_option("--reduce_disk_io_threads", default="True", type="choice",
|
|
choices=["true", "True", "false", "False"],
|
|
help="If true, reduce the number of disk io mgr threads for "
|
|
"filesystems that are not the TARGET_FILESYSTEM.")
|
|
parser.add_option("--disable_tuple_caching", default=False, action="store_true",
|
|
help="If true, sets the tuple caching feature flag "
|
|
"(allow_tuple_caching) to false. This defaults to false to enable "
|
|
"tuple caching in the development environment")
|
|
parser.add_option("--tuple_cache_dir", dest="tuple_cache_dir",
|
|
default=os.environ.get("TUPLE_CACHE_DIR", None),
|
|
help="Specifies a base directory for the result tuple cache.")
|
|
parser.add_option("--tuple_cache_capacity", dest="tuple_cache_capacity",
|
|
default=os.environ.get("TUPLE_CACHE_CAPACITY", "1GB"),
|
|
help="This specifies the maximum storage usage of the tuple cache "
|
|
"each Impala daemon can use.")
|
|
parser.add_option("--tuple_cache_debug_dump_dir", dest="tuple_cache_debug_dump_dir",
|
|
default=os.environ.get("TUPLE_CACHE_DEBUG_DUMP_DIR", None),
|
|
help="Specifies a base directory for the dumping tuple cache files "
|
|
"for debug purposes")
|
|
parser.add_option("--tuple_cache_eviction_policy", dest="tuple_cache_eviction_policy",
|
|
default="LRU", help="This specifies the cache eviction policy to use "
|
|
"for the tuple cache.")
|
|
parser.add_option("--use_calcite_planner", default="False", type="choice",
|
|
choices=["true", "True", "false", "False"],
|
|
help="If true, use the Calcite planner for query optimization "
|
|
"instead of the Impala planner")
|
|
parser.add_option("--enable_ranger_authz", dest="enable_ranger_authz",
|
|
action="store_true", default=False,
|
|
help="If true, enable Ranger authorization in Impala cluster.")
|
|
|
|
# For testing: list of comma-separated delays, in milliseconds, that delay impalad catalog
|
|
# replica initialization. The ith delay is applied to the ith impalad.
|
|
parser.add_option("--catalog_init_delays", dest="catalog_init_delays", default="",
|
|
help=SUPPRESS_HELP)
|
|
# For testing: Semi-colon separated list of startup arguments to be passed per impalad.
|
|
# The ith group of options is applied to the ith impalad.
|
|
parser.add_option("--per_impalad_args", dest="per_impalad_args", type="string"
|
|
,default="", help=SUPPRESS_HELP)
|
|
|
|
options, args = parser.parse_args()
|
|
|
|
IMPALA_HOME = os.environ["IMPALA_HOME"]
|
|
CORE_SITE_PATH = os.path.join(IMPALA_HOME, "fe/src/test/resources/core-site.xml")
|
|
KNOWN_BUILD_TYPES = ["debug", "release", "latest"]
|
|
# The location in the container where the cache is always mounted.
|
|
DATA_CACHE_CONTAINER_PATH = "/opt/impala/cache"
|
|
|
|
# Kills have a timeout to prevent automated scripts from hanging indefinitely.
|
|
# It is set to a high value to avoid failing if processes are slow to shut down.
|
|
KILL_TIMEOUT_IN_SECONDS = 240
|
|
# For build types like ASAN, modify the default Kudu rpc timeout.
|
|
KUDU_RPC_TIMEOUT = build_flavor_timeout(0, slow_build_timeout=60000)
|
|
|
|
# HTTP connections don't keep alive their associated sessions. We increase the timeout
|
|
# during builds to make spurious session expiration less likely.
|
|
DISCONNECTED_SESSION_TIMEOUT = 60 * 60 * 6
|
|
|
|
def check_process_exists(binary, attempts=1):
|
|
"""Checks if a process exists given the binary name. The `attempts` count allows us to
|
|
control the time a process needs to settle until it becomes available. After each try
|
|
the script will sleep for one second and retry. Returns True if it exists and False
|
|
otherwise.
|
|
"""
|
|
for _ in range(attempts):
|
|
for _ in find_user_processes([binary]):
|
|
return True
|
|
sleep(1)
|
|
return False
|
|
|
|
|
|
def print_actual_log_file(log_symlink, timeout=5):
|
|
"""Resolve the log symlink to get the actual log file name."""
|
|
symlink_name = os.path.basename(log_symlink)
|
|
for _ in range(timeout):
|
|
if os.path.exists(log_symlink):
|
|
try:
|
|
actual_path = os.path.realpath(log_symlink)
|
|
actual_file = os.path.basename(actual_path)
|
|
LOG.info("{0} -> {1}".format(symlink_name, actual_file))
|
|
return
|
|
except OSError as e:
|
|
LOG.error("Error resolving log symlink {0}: {1}".format(log_symlink, e))
|
|
return
|
|
sleep(1)
|
|
|
|
def run_daemon_with_options(daemon_binary, args, output_file, jvm_debug_port=None):
|
|
"""Wrapper around run_daemon() with options determined from command-line options."""
|
|
env_vars = {"JAVA_TOOL_OPTIONS": build_java_tool_options(jvm_debug_port)}
|
|
if options.env_vars is not None:
|
|
for kv in options.env_vars.split():
|
|
k, v = kv.split('=')
|
|
env_vars[k] = v
|
|
run_daemon(daemon_binary, args, build_type=options.build_type, env_vars=env_vars,
|
|
output_file=output_file)
|
|
|
|
|
|
def build_java_tool_options(jvm_debug_port=None):
|
|
"""Construct the value of the JAVA_TOOL_OPTIONS environment variable to pass to
|
|
daemons."""
|
|
java_tool_options = ""
|
|
# In a Docker container the Java error file location is always fixed.
|
|
if options.docker_network is not None:
|
|
java_tool_options = "-XX:ErrorFile=/opt/impala/java-error/hs_err_pid_%p.log"
|
|
if jvm_debug_port is not None:
|
|
java_tool_options = ("-agentlib:jdwp=transport=dt_socket,address={debug_port}," +
|
|
"server=y,suspend=n ").format(debug_port=jvm_debug_port) + java_tool_options
|
|
if options.jvm_args is not None:
|
|
java_tool_options += " " + options.jvm_args
|
|
return java_tool_options
|
|
|
|
def kill_matching_processes(binary_names, force=False):
|
|
"""Kills all processes with the given binary name, waiting for them to exit"""
|
|
# Send all the signals before waiting so that processes can clean up in parallel.
|
|
processes = [proc for _, proc in find_user_processes(binary_names)]
|
|
for process in processes:
|
|
try:
|
|
if force:
|
|
process.kill()
|
|
else:
|
|
process.terminate()
|
|
except psutil.NoSuchProcess:
|
|
pass
|
|
|
|
for process in processes:
|
|
try:
|
|
process.wait(KILL_TIMEOUT_IN_SECONDS)
|
|
except psutil.TimeoutExpired:
|
|
raise RuntimeError(("Unable to kill {process_name} (pid {process_pid}) "
|
|
"after {num_seconds} seconds.").format(
|
|
process_name=process.name,
|
|
process_pid=process.pid,
|
|
num_seconds=KILL_TIMEOUT_IN_SECONDS))
|
|
|
|
|
|
def choose_impalad_ports(instance_num):
|
|
"""Compute the ports for impalad instance num 'instance_num', returning as a map
|
|
from the argument name to the port number."""
|
|
return {'beeswax_port': DEFAULT_BEESWAX_PORT + instance_num,
|
|
'hs2_port': DEFAULT_HS2_PORT + instance_num,
|
|
'hs2_http_port': DEFAULT_HS2_HTTP_PORT + instance_num,
|
|
'krpc_port': DEFAULT_KRPC_PORT + instance_num,
|
|
'external_fe_port': DEFAULT_EXTERNAL_FE_PORT + instance_num,
|
|
'state_store_subscriber_port':
|
|
DEFAULT_STATE_STORE_SUBSCRIBER_PORT + instance_num,
|
|
'webserver_port': DEFAULT_IMPALAD_WEBSERVER_PORT + instance_num}
|
|
|
|
|
|
def build_impalad_port_args(instance_num):
|
|
IMPALAD_PORTS = (
|
|
"-beeswax_port={beeswax_port} "
|
|
"-hs2_port={hs2_port} "
|
|
"-hs2_http_port={hs2_http_port} "
|
|
"-krpc_port={krpc_port} "
|
|
"-state_store_subscriber_port={state_store_subscriber_port} "
|
|
"-webserver_port={webserver_port}")
|
|
if options.enable_external_fe_support:
|
|
IMPALAD_PORTS += " -external_fe_port={external_fe_port}"
|
|
return IMPALAD_PORTS.format(**choose_impalad_ports(instance_num))
|
|
|
|
|
|
def build_logging_args(service_name):
|
|
"""Return a list of command line arguments to pass to daemon processes to configure
|
|
logging"""
|
|
result = ["-logbufsecs=5", "-v={0}".format(options.log_level),
|
|
"-max_log_files={0}".format(options.max_log_files)]
|
|
if not options.ignore_pid_on_log_rotation:
|
|
# IMPALA-12595: ignore_pid_on_log_rotation default to False in this script.
|
|
# This is because multiple impalads still logs to the same log_dir in minicluster
|
|
# and we want to keep all logs for debugging purpose.
|
|
result += ["-log_rotation_match_pid=true"]
|
|
if options.docker_network is None:
|
|
# Impala inside a docker container should always log to the same location.
|
|
result += ["-log_filename={0}".format(service_name),
|
|
"-log_dir={0}".format(options.log_dir)]
|
|
return result
|
|
|
|
|
|
def impalad_service_name(i):
|
|
"""Return the name to use for the ith impala daemon in the cluster."""
|
|
if i == 0:
|
|
# The first impalad always logs to impalad.INFO
|
|
return "impalad"
|
|
else:
|
|
return "impalad_node{node_num}".format(node_num=i)
|
|
|
|
|
|
def choose_catalogd_ports(instance_num):
|
|
"""Compute the ports for catalogd instance num 'instance_num', returning as a map
|
|
from the argument name to the port number."""
|
|
return {'catalog_service_port': DEFAULT_CATALOG_SERVICE_PORT + instance_num,
|
|
'state_store_subscriber_port':
|
|
DEFAULT_CATALOGD_STATE_STORE_SUBSCRIBER_PORT + instance_num,
|
|
'webserver_port': DEFAULT_CATALOGD_WEBSERVER_PORT + instance_num}
|
|
|
|
|
|
def build_catalogd_port_args(instance_num):
|
|
CATALOGD_PORTS = (
|
|
"-catalog_service_port={catalog_service_port} "
|
|
"-state_store_subscriber_port={state_store_subscriber_port} "
|
|
"-webserver_port={webserver_port}")
|
|
return CATALOGD_PORTS.format(**choose_catalogd_ports(instance_num))
|
|
|
|
|
|
def catalogd_service_name(i):
|
|
"""Return the name to use for the ith catalog daemon in the cluster."""
|
|
if i == 0:
|
|
# The first catalogd always logs to catalogd.INFO
|
|
return "catalogd"
|
|
else:
|
|
return "catalogd_node{node_num}".format(node_num=i)
|
|
|
|
|
|
def choose_statestored_ports(enable_statestored_ha, instance_num):
|
|
"""Compute the ports for statestored instance num 'instance_num', returning as a map
|
|
from the argument name to the port number."""
|
|
if not enable_statestored_ha:
|
|
return {'state_store_port': DEFAULT_STATESTORE_SERVICE_PORT + instance_num,
|
|
'webserver_port': DEFAULT_STATESTORED_WEBSERVER_PORT + instance_num}
|
|
else:
|
|
# Assume two statestore instances will be launched when statestore HA is enabled
|
|
state_store_peer_ha_port =\
|
|
DEFAULT_STATESTORE_HA_SERVICE_PORT + ((instance_num + 1) % 2)
|
|
return {'state_store_port': DEFAULT_STATESTORE_SERVICE_PORT + instance_num,
|
|
'state_store_ha_port': DEFAULT_STATESTORE_HA_SERVICE_PORT + instance_num,
|
|
'state_store_peer_ha_port': state_store_peer_ha_port,
|
|
'webserver_port': DEFAULT_STATESTORED_WEBSERVER_PORT + instance_num}
|
|
|
|
|
|
def build_statestored_port_args(enable_statestored_ha, instance_num):
|
|
if not enable_statestored_ha:
|
|
STATESTORED_PORTS = (
|
|
"-state_store_port={state_store_port} "
|
|
"-webserver_port={webserver_port}")
|
|
return STATESTORED_PORTS.format(
|
|
**choose_statestored_ports(enable_statestored_ha, instance_num))
|
|
else:
|
|
STATESTORED_PORTS = (
|
|
"-state_store_port={state_store_port} "
|
|
"-state_store_ha_port={state_store_ha_port} "
|
|
"-state_store_peer_ha_port={state_store_peer_ha_port} "
|
|
"-webserver_port={webserver_port}")
|
|
return STATESTORED_PORTS.format(
|
|
**choose_statestored_ports(enable_statestored_ha, instance_num))
|
|
|
|
|
|
def statestored_service_name(i):
|
|
"""Return the name to use for the ith statestore daemon in the cluster."""
|
|
if i == 0:
|
|
# The first statestored always logs to statestored.INFO
|
|
return "statestored"
|
|
else:
|
|
return "statestored_node{node_num}".format(node_num=i)
|
|
|
|
|
|
def combine_arg_list_opts(opt_args):
|
|
"""Helper for processing arguments like impalad_args. The input is a list of strings,
|
|
each of which is the string passed into one instance of the argument, e.g. for
|
|
--impalad_args="-foo -bar" --impalad_args="-baz", the input to this function is
|
|
["-foo -bar", "-baz"]. This function combines the argument lists by tokenised each
|
|
string into separate arguments, if needed, e.g. to produce the output
|
|
["-foo", "-bar", "-baz"]"""
|
|
return list(itertools.chain(*[shlex.split(arg) for arg in opt_args]))
|
|
|
|
|
|
def build_statestored_arg_list(num_statestored, remap_ports):
|
|
"""Build a list of lists of command line arguments to pass to each statestored
|
|
instance. Build args for two statestored instances if statestored HA is enabled."""
|
|
statestored_arg_list = []
|
|
for i in range(num_statestored):
|
|
service_name = statestored_service_name(i)
|
|
args = (build_logging_args(service_name)
|
|
+ build_kerberos_args("statestored")
|
|
+ combine_arg_list_opts(options.state_store_args))
|
|
if remap_ports:
|
|
statestored_port_args =\
|
|
build_statestored_port_args(options.enable_statestored_ha, i)
|
|
args.extend(shlex.split(statestored_port_args))
|
|
if options.enable_catalogd_ha:
|
|
args.extend(["-enable_catalogd_ha=true"])
|
|
if options.enable_statestored_ha:
|
|
args.extend(["-enable_statestored_ha=true"])
|
|
statestored_arg_list.append(args)
|
|
return statestored_arg_list
|
|
|
|
|
|
def build_catalogd_arg_list(num_catalogd, remap_ports):
|
|
"""Build a list of lists of command line arguments to pass to each catalogd instance.
|
|
Build args for two catalogd instances if catalogd HA is enabled."""
|
|
catalogd_arg_list = []
|
|
for i in range(num_catalogd):
|
|
service_name = catalogd_service_name(i)
|
|
args = (build_logging_args(service_name)
|
|
+ ["-kudu_master_hosts", options.kudu_master_hosts]
|
|
+ build_kerberos_args("catalogd")
|
|
+ combine_arg_list_opts(options.catalogd_args))
|
|
if remap_ports:
|
|
catalogd_port_args = build_catalogd_port_args(i)
|
|
args.extend(shlex.split(catalogd_port_args))
|
|
if options.enable_catalogd_ha:
|
|
args.extend(["-enable_catalogd_ha=true"])
|
|
if options.enable_statestored_ha:
|
|
args.extend(["-enable_statestored_ha=true"])
|
|
state_store_port = DEFAULT_STATESTORE_SERVICE_PORT
|
|
args.extend(
|
|
["-state_store_port={0}".format(state_store_port)])
|
|
args.extend(
|
|
["-state_store_2_port={0}".format(state_store_port + 1)])
|
|
if options.enable_ranger_authz:
|
|
args.extend(["--server-name=server1", "--ranger_service_type=hive",
|
|
"--ranger_app_id=impala", "--authorization_provider=ranger"])
|
|
catalogd_arg_list.append(args)
|
|
return catalogd_arg_list
|
|
|
|
|
|
def build_admissiond_arg_list():
|
|
"""Build a list of command line arguments to pass to the admissiond."""
|
|
args = (build_logging_args("admissiond")
|
|
+ build_kerberos_args("admissiond")
|
|
+ combine_arg_list_opts(options.admissiond_args))
|
|
if options.enable_statestored_ha:
|
|
args.extend(["-enable_statestored_ha=true"])
|
|
state_store_port = DEFAULT_STATESTORE_SERVICE_PORT
|
|
args.extend(
|
|
["-state_store_port={0}".format(state_store_port)])
|
|
args.extend(
|
|
["-state_store_2_port={0}".format(state_store_port + 1)])
|
|
return args
|
|
|
|
|
|
def build_impalad_arg_lists(cluster_size, num_coordinators, use_exclusive_coordinators,
|
|
remap_ports, start_idx=0, admissiond_host=INTERNAL_LISTEN_HOST):
|
|
"""Build the argument lists for impala daemons in the cluster. Returns a list of
|
|
argument lists, one for each impala daemon in the cluster. Each argument list is
|
|
a list of strings. 'num_coordinators' and 'use_exclusive_coordinators' allow setting
|
|
up the cluster with dedicated coordinators. If 'remap_ports' is true, the impalad
|
|
ports are changed from their default values to avoid port conflicts. If the admission
|
|
service is enabled, 'admissiond_host' is the hostname for the admissiond."""
|
|
# TODO: currently we build a big string blob then split it. It would be better to
|
|
# build up the lists directly.
|
|
|
|
mem_limit_arg = ""
|
|
if options.docker_network is None:
|
|
mem_limit_arg = "-mem_limit={0}".format(compute_impalad_mem_limit(cluster_size))
|
|
else:
|
|
# For containerised impalads, set a memory limit via docker instead of directly,
|
|
# to emulate what would happen in a production container. JVM heap is included,
|
|
# so we should be able to use 100% of the detected mem_limit.
|
|
mem_limit_arg = "-mem_limit=100%"
|
|
|
|
delay_list = []
|
|
if options.catalog_init_delays != "":
|
|
delay_list = [delay.strip() for delay in options.catalog_init_delays.split(",")]
|
|
|
|
per_impalad_args = []
|
|
if options.per_impalad_args != "":
|
|
per_impalad_args = [args.strip() for args in options.per_impalad_args.split(";")]
|
|
|
|
# Build args for each each impalad instance.
|
|
impalad_args = []
|
|
for i in range(start_idx, start_idx + cluster_size):
|
|
service_name = impalad_service_name(i)
|
|
|
|
impala_port_args = ""
|
|
if remap_ports:
|
|
impala_port_args = build_impalad_port_args(i)
|
|
# impalad args from the --impalad_args flag. Also replacing '#ID' with the instance.
|
|
param_args = (" ".join(options.impalad_args)).replace("#ID", str(i))
|
|
args = ("{mem_limit_arg} "
|
|
"{impala_logging_args} "
|
|
"{impala_port_args} "
|
|
"{impala_kerberos_args} "
|
|
"{param_args}").format(
|
|
mem_limit_arg=mem_limit_arg, # Goes first so --impalad_args will override it.
|
|
impala_logging_args=" ".join(build_logging_args(service_name)),
|
|
impala_port_args=impala_port_args,
|
|
impala_kerberos_args=" ".join(build_kerberos_args("impalad")),
|
|
param_args=param_args)
|
|
if options.kudu_master_hosts:
|
|
# Must be prepended, otherwise the java options interfere.
|
|
args = "-kudu_master_hosts {kudu_master_hosts} {args}".format(
|
|
kudu_master_hosts=options.kudu_master_hosts,
|
|
args=args)
|
|
|
|
if "kudu_client_rpc_timeout" not in args:
|
|
args = "-kudu_client_rpc_timeout_ms {kudu_rpc_timeout} {args}".format(
|
|
kudu_rpc_timeout=KUDU_RPC_TIMEOUT,
|
|
args=args)
|
|
|
|
if "disconnected_session_timeout" not in args:
|
|
args = "-disconnected_session_timeout {timeout} {args}".format(
|
|
timeout=DISCONNECTED_SESSION_TIMEOUT,
|
|
args=args)
|
|
|
|
if i - start_idx >= num_coordinators:
|
|
args = "-is_coordinator=false {args}".format(args=args)
|
|
elif use_exclusive_coordinators:
|
|
# Coordinator instance that doesn't execute non-coordinator fragments
|
|
args = "-is_executor=false {args}".format(args=args)
|
|
|
|
if i < len(delay_list):
|
|
args = "-stress_catalog_init_delay_ms={delay} {args}".format(
|
|
delay=delay_list[i],
|
|
args=args)
|
|
|
|
if options.data_cache_dir:
|
|
# create the base directory
|
|
assert options.data_cache_size != 0, "--data_cache_dir must be used along " \
|
|
"with --data_cache_size"
|
|
data_cache_path = \
|
|
os.path.join(options.data_cache_dir, "impala-datacache-{0}".format(str(i)))
|
|
# Try creating the directory if it doesn't exist already. May raise exception.
|
|
if not os.path.exists(data_cache_path):
|
|
os.mkdir(data_cache_path)
|
|
if options.docker_network is None:
|
|
data_cache_path_arg = data_cache_path
|
|
else:
|
|
# The data cache directory will always be mounted at the same path inside the
|
|
# container.
|
|
data_cache_path_arg = DATA_CACHE_CONTAINER_PATH
|
|
|
|
args = "-data_cache={dir}:{quota} {args}".format(
|
|
dir=data_cache_path_arg, quota=options.data_cache_size, args=args)
|
|
|
|
# Add the eviction policy
|
|
args = "-data_cache_eviction_policy={policy} {args}".format(
|
|
policy=options.data_cache_eviction_policy, args=args)
|
|
|
|
# Add the number of async write threads.
|
|
args = "-data_cache_num_async_write_threads={num_threads} {args}".format(
|
|
num_threads=options.data_cache_num_async_write_threads, args=args)
|
|
|
|
# Add access tracing arguments if requested
|
|
if options.data_cache_enable_tracing:
|
|
tracing_args = ""
|
|
if options.docker_network is None:
|
|
# To avoid collisions in log files, use different data_cache_trace_dir values
|
|
# for different Impalads. The default directory is fine for the docker-based
|
|
# tests.
|
|
data_cache_trace_dir = "{log_dir}/data_cache_traces_{impalad_num}".format(
|
|
log_dir=options.log_dir, impalad_num=i)
|
|
tracing_args = "-data_cache_trace_dir={trace_dir} {tracing_args}".format(
|
|
trace_dir=data_cache_trace_dir, tracing_args=tracing_args)
|
|
|
|
tracing_args = "-data_cache_enable_tracing=true {tracing_args}".format(
|
|
tracing_args=tracing_args)
|
|
args = "{tracing_args} {args}".format(tracing_args=tracing_args, args=args)
|
|
|
|
if options.tuple_cache_dir:
|
|
# create the base directory
|
|
tuple_cache_path = \
|
|
os.path.join(options.tuple_cache_dir, "impala-tuplecache-{0}".format(str(i)))
|
|
# Try creating the directory if it doesn't exist already. May raise exception.
|
|
if not os.path.exists(tuple_cache_path):
|
|
os.makedirs(tuple_cache_path)
|
|
if options.docker_network is None:
|
|
tuple_cache_path_arg = tuple_cache_path
|
|
else:
|
|
# The cache directory will always be mounted at the same path inside the
|
|
# container. Reuses the data cache dedicated mount.
|
|
tuple_cache_path_arg = DATA_CACHE_CONTAINER_PATH
|
|
|
|
args = "-tuple_cache={dir}:{cap} {args}".format(
|
|
dir=tuple_cache_path_arg, cap=options.tuple_cache_capacity, args=args)
|
|
|
|
# Add the eviction policy
|
|
args = "-tuple_cache_eviction_policy={policy} {args}".format(
|
|
policy=options.tuple_cache_eviction_policy, args=args)
|
|
|
|
if options.tuple_cache_debug_dump_dir:
|
|
# create the base directory
|
|
tuple_cache_debug_dump_path = \
|
|
os.path.join(options.tuple_cache_debug_dump_dir,
|
|
"impala-tuplecache-debugdump-{0}".format(str(i)))
|
|
# Try creating the directory if it doesn't exist already. May raise exception.
|
|
if not os.path.exists(tuple_cache_debug_dump_path):
|
|
os.makedirs(tuple_cache_debug_dump_path)
|
|
if options.docker_network is None:
|
|
tuple_cache_debug_dump_path_arg = tuple_cache_debug_dump_path
|
|
else:
|
|
# The cache directory will always be mounted at the same path inside the
|
|
# container. Reuses the data cache dedicated mount.
|
|
tuple_cache_debug_dump_path_arg = DATA_CACHE_CONTAINER_PATH
|
|
args = "-tuple_cache_debug_dump_dir={dir} {args}".format(
|
|
dir=tuple_cache_debug_dump_path_arg, args=args)
|
|
|
|
if options.enable_admission_service:
|
|
args = "{args} -admission_service_host={host}".format(
|
|
args=args, host=admissiond_host)
|
|
|
|
if options.enable_statestored_ha:
|
|
state_store_port = DEFAULT_STATESTORE_SERVICE_PORT
|
|
state_store_2_port = DEFAULT_STATESTORE_SERVICE_PORT + 1
|
|
args = "{args} -enable_statestored_ha=true -state_store_port={state_store_port} "\
|
|
"-state_store_2_port={state_store_2_port}".format(
|
|
args=args, state_store_port=state_store_port,
|
|
state_store_2_port=state_store_2_port)
|
|
|
|
if options.reduce_disk_io_threads.lower() == 'true':
|
|
# This leaves the default value for the TARGET_FILESYSTEM, but it reduces the thread
|
|
# count for every other filesystem that is not the TARGET_FILESYSTEM.
|
|
if TARGET_FILESYSTEM != 'abfs':
|
|
args = "{args} -num_abfs_io_threads=1".format(args=args)
|
|
if TARGET_FILESYSTEM != 'adls':
|
|
args = "{args} -num_adls_io_threads=1".format(args=args)
|
|
if TARGET_FILESYSTEM != 'cosn':
|
|
args = "{args} -num_cos_io_threads=1".format(args=args)
|
|
if TARGET_FILESYSTEM != 'gs':
|
|
args = "{args} -num_gcs_io_threads=1".format(args=args)
|
|
if TARGET_FILESYSTEM != 'hdfs':
|
|
args = "{args} -num_remote_hdfs_file_oper_io_threads=1".format(args=args)
|
|
args = "{args} -num_remote_hdfs_io_threads=1".format(args=args)
|
|
if TARGET_FILESYSTEM != 'obs':
|
|
args = "{args} -num_obs_io_threads=1".format(args=args)
|
|
if TARGET_FILESYSTEM != 'oss':
|
|
args = "{args} -num_oss_io_threads=1".format(args=args)
|
|
if TARGET_FILESYSTEM != 'ozone':
|
|
args = "{args} -num_ozone_io_threads=1".format(args=args)
|
|
if TARGET_FILESYSTEM != 's3':
|
|
args = "{args} -num_s3_io_threads=1".format(args=args)
|
|
args = "{args} -num_s3_file_oper_io_threads=1".format(args=args)
|
|
|
|
# SFS (single-file system) doesn't have a corresponding TARGET_FILESYSTEM, and
|
|
# it can always be restricted.
|
|
args = "{args} -num_sfs_io_threads=1".format(args=args)
|
|
|
|
if "geospatial_library" not in args:
|
|
args = "{args} -geospatial_library={geospatial_library}".format(
|
|
args=args, geospatial_library=options.geospatial_library)
|
|
|
|
if options.jni_frontend_class != "":
|
|
args = "-jni_frontend_class={jni_frontend_class} {args}".format(
|
|
jni_frontend_class=options.jni_frontend_class, args=args)
|
|
|
|
if options.disable_tuple_caching:
|
|
args = "-allow_tuple_caching=false {args}".format(args=args)
|
|
else:
|
|
args = "-allow_tuple_caching=true {args}".format(args=args)
|
|
|
|
if options.use_calcite_planner.lower() == 'true':
|
|
args = "-use_calcite_planner=true {args}".format(args=args)
|
|
os.environ["USE_CALCITE_PLANNER"] = "true"
|
|
|
|
if options.enable_ranger_authz:
|
|
args = ("--server-name=server1 --ranger_service_type=hive --ranger_app_id=impala "
|
|
"--authorization_provider=ranger {args}").format(args=args)
|
|
|
|
# Appended at the end so they can override previous args.
|
|
if i < len(per_impalad_args):
|
|
args = "{args} {per_impalad_args}".format(
|
|
args=args, per_impalad_args=per_impalad_args[i])
|
|
impalad_args.append(shlex.split(args))
|
|
return impalad_args
|
|
|
|
|
|
def build_kerberos_args(daemon):
|
|
"""If the cluster is kerberized, returns arguments to pass to daemon process.
|
|
daemon should either be "impalad", "catalogd", "statestored", or "admissiond"."""
|
|
# Note: this code has probably bit-rotted but is preserved in case someone needs to
|
|
# revive the kerberized minicluster.
|
|
assert daemon in ("impalad", "catalogd", "statestored", "admissiond")
|
|
if call([os.path.join(IMPALA_HOME, "testdata/cluster/admin"), "is_kerberized"]) != 0:
|
|
return []
|
|
args = ["-keytab_file={0}".format(os.getenv("KRB5_KTNAME")),
|
|
"-krb5_conf={0}".format(os.getenv("KRB5_CONFIG"))]
|
|
if daemon == "impalad":
|
|
args += ["-principal={0}".format(os.getenv("MINIKDC_PRINC_IMPALA")),
|
|
"-be_principal={0}".format(os.getenv("MINIKDC_PRINC_IMPALA_BE"))]
|
|
else:
|
|
args.append("-principal={0}".format(os.getenv("MINIKDC_PRINC_IMPALA_BE")))
|
|
if os.getenv("MINIKDC_DEBUG", "") == "true":
|
|
args.append("-krb5_debug_file=/tmp/{0}.krb5_debug".format(daemon))
|
|
return args
|
|
|
|
|
|
def compute_impalad_mem_limit(cluster_size):
|
|
# Set mem_limit of each impalad to the smaller of 12GB or
|
|
# 1/cluster_size (typically 1/3) of 70% of available memory.
|
|
#
|
|
# The default memory limit for an impalad is 80% of the total system memory. On a
|
|
# mini-cluster with 3 impalads that means 240%. Since having an impalad be OOM killed
|
|
# is very annoying, the mem limit will be reduced. This can be overridden using the
|
|
# --impalad_args flag. virtual_memory().total returns the total physical memory.
|
|
# The exact ratio to use is somewhat arbitrary. Peak memory usage during
|
|
# tests depends on the concurrency of parallel tests as well as their ordering.
|
|
# On the other hand, to avoid using too much memory, we limit the
|
|
# memory choice here to max out at 12GB. This should be sufficient for tests.
|
|
#
|
|
# Beware that ASAN builds use more memory than regular builds.
|
|
physical_mem_gb = psutil.virtual_memory().total // 1024 // 1024 // 1024
|
|
available_mem = int(os.getenv("IMPALA_CLUSTER_MAX_MEM_GB", str(physical_mem_gb)))
|
|
mem_limit = int(0.7 * available_mem * 1024 * 1024 * 1024 / cluster_size)
|
|
return min(12 * 1024 * 1024 * 1024, mem_limit)
|
|
|
|
class MiniClusterOperations(object):
|
|
"""Implementations of operations for the non-containerized minicluster
|
|
implementation.
|
|
TODO: much of this logic could be moved into ImpalaCluster.
|
|
"""
|
|
def __init__(self):
|
|
# Track log symlinks to print after cluster is ready
|
|
self.log_symlinks = []
|
|
|
|
def get_cluster(self):
|
|
"""Return an ImpalaCluster instance."""
|
|
return ImpalaCluster(use_admission_service=options.enable_admission_service,
|
|
deploy_catalogd=not options.no_catalogd)
|
|
|
|
def kill_all_daemons(self, force=False):
|
|
kill_matching_processes(["catalogd", "impalad", "statestored", "admissiond"], force)
|
|
|
|
def kill_all_impalads(self, force=False):
|
|
kill_matching_processes(["impalad"], force=force)
|
|
|
|
def kill_all_catalogds(self, force=False):
|
|
kill_matching_processes(["catalogd"], force=force)
|
|
|
|
def kill_all_statestoreds(self, force=False):
|
|
kill_matching_processes(["statestored"], force=force)
|
|
|
|
def kill_admissiond(self, force=False):
|
|
kill_matching_processes(["admissiond"], force=force)
|
|
|
|
def start_statestore(self):
|
|
if options.enable_statestored_ha:
|
|
num_statestored = 2
|
|
else:
|
|
num_statestored = 1
|
|
statestored_arg_lists = build_statestored_arg_list(num_statestored, remap_ports=True)
|
|
for i in range(num_statestored):
|
|
service_name = statestored_service_name(i)
|
|
log_symlink = os.path.join(options.log_dir, "{0}.INFO".format(service_name))
|
|
LOG.info("Starting State Store logging to {0}".format(log_symlink))
|
|
output_file = os.path.join(
|
|
options.log_dir, "{service_name}-out.log".format(service_name=service_name))
|
|
run_daemon_with_options("statestored", statestored_arg_lists[i], output_file)
|
|
if not check_process_exists("statestored", 10):
|
|
raise RuntimeError("Unable to start statestored. Check log or file permissions"
|
|
" for more details.")
|
|
self.log_symlinks.append(log_symlink)
|
|
|
|
def start_catalogd(self):
|
|
if options.no_catalogd:
|
|
num_catalogd = 0
|
|
elif options.enable_catalogd_ha:
|
|
num_catalogd = 2
|
|
else:
|
|
num_catalogd = 1
|
|
catalogd_arg_lists = build_catalogd_arg_list(num_catalogd, remap_ports=True)
|
|
for i in range(num_catalogd):
|
|
service_name = catalogd_service_name(i)
|
|
log_symlink = os.path.join(options.log_dir, "{0}.INFO".format(service_name))
|
|
LOG.info("Starting Catalog Service logging to {0}".format(log_symlink))
|
|
output_file = os.path.join(
|
|
options.log_dir, "{service_name}-out.log".format(service_name=service_name))
|
|
run_daemon_with_options("catalogd", catalogd_arg_lists[i], output_file,
|
|
jvm_debug_port=DEFAULT_CATALOGD_JVM_DEBUG_PORT + i)
|
|
if not check_process_exists("catalogd", 10):
|
|
raise RuntimeError("Unable to start catalogd. Check log or file permissions"
|
|
" for more details.")
|
|
self.log_symlinks.append(log_symlink)
|
|
|
|
def start_admissiond(self):
|
|
log_symlink = os.path.join(options.log_dir, "admissiond.INFO")
|
|
LOG.info("Starting Admission Control Service logging to {0}".format(log_symlink))
|
|
output_file = os.path.join(options.log_dir, "admissiond-out.log")
|
|
run_daemon_with_options("admissiond", build_admissiond_arg_list(), output_file)
|
|
if not check_process_exists("admissiond", 10):
|
|
raise RuntimeError("Unable to start admissiond. Check log or file permissions"
|
|
" for more details.")
|
|
self.log_symlinks.append(log_symlink)
|
|
|
|
def start_impalads(self, cluster_size, num_coordinators, use_exclusive_coordinators,
|
|
start_idx=0):
|
|
"""Start 'cluster_size' impalad instances. The first 'num_coordinator' instances will
|
|
act as coordinators. 'use_exclusive_coordinators' specifies whether the coordinators
|
|
will only execute coordinator fragments."""
|
|
if cluster_size == 0:
|
|
# No impalad instances should be started.
|
|
return
|
|
|
|
# The current TCP port allocation of the minicluster allows up to 10 impalads before
|
|
# the backend port (25000 + idx) will collide with the statestore (25010).
|
|
assert start_idx + cluster_size <= 10, "Must not start more than 10 impalads"
|
|
|
|
impalad_arg_lists = build_impalad_arg_lists(
|
|
cluster_size, num_coordinators, use_exclusive_coordinators, remap_ports=True,
|
|
start_idx=start_idx)
|
|
assert cluster_size == len(impalad_arg_lists)
|
|
for i in range(start_idx, start_idx + cluster_size):
|
|
service_name = impalad_service_name(i)
|
|
log_symlink = os.path.join(options.log_dir, "{0}.INFO".format(service_name))
|
|
LOG.info("Starting Impala Daemon logging to {0}".format(log_symlink))
|
|
output_file = os.path.join(
|
|
options.log_dir, "{service_name}-out.log".format(service_name=service_name))
|
|
run_daemon_with_options("impalad", impalad_arg_lists[i - start_idx],
|
|
jvm_debug_port=DEFAULT_IMPALAD_JVM_DEBUG_PORT + i, output_file=output_file)
|
|
self.log_symlinks.append(log_symlink)
|
|
|
|
|
|
class DockerMiniClusterOperations(object):
|
|
"""Implementations of operations for the containerized minicluster implementation
|
|
with all processes attached to a user-defined docker bridge network.
|
|
|
|
We assume that only one impala cluster is running on the network - existing containers
|
|
created by this script (or with names that collide with those generated by this script)
|
|
can be destroyed if present.
|
|
|
|
We use a naming convention for the created docker containers so that we can easily
|
|
refer to them with docker commands:
|
|
impala-test-cluster-<network_name>-<daemon_name>[-<instance_num>],
|
|
e.g. impala-test-cluster-impala_network-catalogd or
|
|
impala-test-cluster-impala_network-impalad-0.
|
|
"""
|
|
def __init__(self, network_name):
|
|
self.network_name = network_name
|
|
# Track log symlinks to print after cluster is ready (not used in docker mode)
|
|
self.log_symlinks = []
|
|
# Make sure that the network actually exists.
|
|
check_call(["docker", "network", "inspect", network_name])
|
|
|
|
def get_cluster(self):
|
|
"""Return an ImpalaCluster instance."""
|
|
return ImpalaCluster(docker_network=self.network_name,
|
|
use_admission_service=options.enable_admission_service)
|
|
|
|
def kill_all_daemons(self, force=False):
|
|
self.kill_all_statestoreds(force=force)
|
|
self.kill_all_catalogds(force=force)
|
|
self.kill_admissiond(force=force)
|
|
self.kill_all_impalads(force=force)
|
|
|
|
def kill_all_impalads(self, force=False):
|
|
# List all running containers on the network and kill those with the impalad name
|
|
# prefix to make sure that no running container are left over from previous clusters.
|
|
container_name_prefix = self.__gen_container_name__("impalad")
|
|
for container_id, info in self.__get_network_info__()["Containers"].items():
|
|
container_name = info["Name"]
|
|
if container_name.startswith(container_name_prefix):
|
|
LOG.info("Stopping container {0}".format(container_name))
|
|
check_call(["docker", "stop", container_name])
|
|
|
|
def kill_all_catalogds(self, force=False):
|
|
# List all running containers on the network and kill those with the catalogd name
|
|
# prefix to make sure that no running container are left over from previous clusters.
|
|
container_name_prefix = self.__gen_container_name__("catalogd")
|
|
for container_id, info in self.__get_network_info__()["Containers"].items():
|
|
container_name = info["Name"]
|
|
if container_name.startswith(container_name_prefix):
|
|
LOG.info("Stopping container {0}".format(container_name))
|
|
check_call(["docker", "stop", container_name])
|
|
|
|
def kill_all_statestoreds(self, force=False):
|
|
# List all running containers on the network and kill those with the statestored name
|
|
# prefix to make sure that no running container are left over from previous clusters.
|
|
container_name_prefix = self.__gen_container_name__("statestored")
|
|
for container_id, info in self.__get_network_info__()["Containers"].items():
|
|
container_name = info["Name"]
|
|
if container_name.startswith(container_name_prefix):
|
|
LOG.info("Stopping container {0}".format(container_name))
|
|
check_call(["docker", "stop", container_name])
|
|
|
|
def kill_admissiond(self, force=False):
|
|
self.__stop_container__("admissiond")
|
|
|
|
def start_statestore(self):
|
|
if not options.enable_statestored_ha:
|
|
statestored_arg_lists =\
|
|
build_statestored_arg_list(num_statestored=1, remap_ports=False)
|
|
self.__run_container__("statestored", statestored_arg_lists[0],
|
|
{DEFAULT_STATESTORED_WEBSERVER_PORT: DEFAULT_STATESTORED_WEBSERVER_PORT})
|
|
else:
|
|
num_statestored = 2
|
|
statestored_arg_lists =\
|
|
build_statestored_arg_list(num_statestored, remap_ports=False)
|
|
for i in range(num_statestored):
|
|
chosen_ports = choose_statestored_ports(
|
|
enable_statestored_ha=True, instance_num=i)
|
|
port_map = {
|
|
DEFAULT_STATESTORE_SERVICE_PORT: chosen_ports['state_store_port'],
|
|
DEFAULT_STATESTORE_HA_SERVICE_PORT: chosen_ports['state_store_ha_port'],
|
|
DEFAULT_PEER_STATESTORE_HA_SERVICE_PORT:
|
|
chosen_ports['state_store_peer_ha_port'],
|
|
DEFAULT_STATESTORED_WEBSERVER_PORT: chosen_ports['webserver_port']}
|
|
self.__run_container__("statestored", statestored_arg_lists[i], port_map, i)
|
|
|
|
def start_catalogd(self):
|
|
if options.enable_catalogd_ha:
|
|
num_catalogd = 2
|
|
else:
|
|
num_catalogd = 1
|
|
catalogd_arg_lists = build_catalogd_arg_list(num_catalogd, remap_ports=False)
|
|
for i in range(num_catalogd):
|
|
chosen_ports = choose_catalogd_ports(i)
|
|
port_map = {DEFAULT_CATALOG_SERVICE_PORT: chosen_ports['catalog_service_port'],
|
|
DEFAULT_CATALOGD_WEBSERVER_PORT: chosen_ports['webserver_port']}
|
|
self.__run_container__("catalogd", catalogd_arg_lists[i], port_map, i)
|
|
|
|
def start_admissiond(self):
|
|
self.__run_container__("admissiond", build_admissiond_arg_list(),
|
|
{DEFAULT_ADMISSIOND_WEBSERVER_PORT: DEFAULT_ADMISSIOND_WEBSERVER_PORT})
|
|
|
|
def start_impalads(self, cluster_size, num_coordinators, use_exclusive_coordinators):
|
|
impalad_arg_lists = build_impalad_arg_lists(cluster_size, num_coordinators,
|
|
use_exclusive_coordinators, remap_ports=False, admissiond_host="admissiond")
|
|
assert cluster_size == len(impalad_arg_lists)
|
|
mem_limit = compute_impalad_mem_limit(cluster_size)
|
|
for i in range(cluster_size):
|
|
chosen_ports = choose_impalad_ports(i)
|
|
port_map = {DEFAULT_BEESWAX_PORT: chosen_ports['beeswax_port'],
|
|
DEFAULT_HS2_PORT: chosen_ports['hs2_port'],
|
|
DEFAULT_HS2_HTTP_PORT: chosen_ports['hs2_http_port'],
|
|
DEFAULT_IMPALAD_WEBSERVER_PORT: chosen_ports['webserver_port'],
|
|
DEFAULT_EXTERNAL_FE_PORT: chosen_ports['external_fe_port']}
|
|
self.__run_container__("impalad_coord_exec", impalad_arg_lists[i], port_map, i,
|
|
mem_limit=mem_limit, supports_data_cache=True)
|
|
|
|
def __gen_container_name__(self, daemon, instance=None):
|
|
"""Generate the name for the container, which should be unique among containers
|
|
managed by this script."""
|
|
return "impala-test-cluster-{0}-{1}".format(
|
|
self.network_name, self.__gen_host_name__(daemon, instance))
|
|
|
|
def __gen_host_name__(self, daemon, instance=None):
|
|
"""Generate the host name for the daemon inside the network, e.g. catalogd or
|
|
impalad-1."""
|
|
if instance is None:
|
|
return daemon
|
|
return "{0}-{1}".format(daemon, instance)
|
|
|
|
def __run_container__(self, daemon, args, port_map, instance=None, mem_limit=None,
|
|
supports_data_cache=False):
|
|
"""Launch a container with the daemon - impalad, catalogd, or statestored. If there
|
|
are multiple impalads in the cluster, a unique instance number must be specified.
|
|
'args' are command-line arguments to be appended to the end of the daemon command
|
|
line. 'port_map' determines a mapping from container ports to ports on localhost. If
|
|
--docker_auto_ports was set on the command line, 'port_map' is ignored and Docker
|
|
will automatically choose the mapping. If there is an existing running or stopped
|
|
container with the same name, it will be destroyed. If provided, mem_limit is
|
|
passed to "docker run" as a string to set the memory limit for the container.
|
|
If 'supports_data_cache' is true and the data cache is enabled via --data_cache_dir,
|
|
mount the data cache inside the container."""
|
|
self.__destroy_container__(daemon, instance)
|
|
if options.docker_auto_ports:
|
|
port_args = ["-P"]
|
|
else:
|
|
port_args = ["-p{dst}:{src}".format(src=src, dst=dst)
|
|
for src, dst in port_map.items()]
|
|
# Impersonate the current user for operations against the minicluster. This is
|
|
# necessary because the user name inside the container is "root".
|
|
# TODO: pass in the actual options
|
|
env_args = ["-e", "HADOOP_USER_NAME={0}".format(getpass.getuser()),
|
|
"-e", "JAVA_TOOL_OPTIONS={0}".format(
|
|
build_java_tool_options(DEFAULT_IMPALAD_JVM_DEBUG_PORT))]
|
|
|
|
# Calculate the timezone to pass into the container.
|
|
# Mounting /etc/localtime into the container does not work when /etc/localtime is a
|
|
# symbolic link to a real timezone file inside /usr/share/zoneinfo: Linux resolves
|
|
# the symlink before performing the bind mount, so you can't create a symlink within
|
|
# the container.
|
|
# Set the timezone by injecting the TZ environment variable with the desired timezone
|
|
# string instead. Initialize the env var from the host's TZ variable if it exists,
|
|
# or calculate the value (the timezone specifier) from the name of the timezone file
|
|
# pointed to by /etc/localtime, if it is a symlink.
|
|
# If /etc/localtime is a real file, and TZ is undefined on the host, then mount
|
|
# /etc/localtime into the container
|
|
timezone_as_env_var = True
|
|
timezone_as_mount = False
|
|
|
|
if HOST_TZ is None:
|
|
try:
|
|
if os.path.islink("/etc/localtime"):
|
|
# This is a symlink, so figure out where it points, cut the prefix, and hope
|
|
# we'll get a timezone spec. Don't confuse realpath() and relpath() here!
|
|
timezone_string = os.path.realpath("/etc/localtime")
|
|
timezone_string = os.path.relpath(timezone_string, "/usr/share/zoneinfo")
|
|
elif os.path.isfile("/etc/localtime"):
|
|
# This is a real file, and we'll just have to mount it into the container
|
|
timezone_as_env_var = False
|
|
timezone_as_mount = True
|
|
else:
|
|
timezone_as_env_var = False
|
|
timezone_as_mount = False
|
|
LOG.warning("Unable to determine local timezone, "
|
|
"containers will user their default timezones.")
|
|
except OSError as ex:
|
|
timezone_as_env_var = False
|
|
LOG.error("Unable to map /etc/localtime to a timezone name. Reported error"
|
|
"is {0}".format(ex))
|
|
if timezone_as_env_var:
|
|
env_args += ["-e", "TZ={0}".format(timezone_string)]
|
|
# The container build processes tags the generated image with the daemon name.
|
|
debug_build = options.build_type == "debug" or (options.build_type == "latest" and
|
|
os.path.basename(os.path.dirname(os.readlink("be/build/latest"))) == "debug")
|
|
if debug_build:
|
|
image_tag = daemon + "_debug"
|
|
else:
|
|
image_tag = daemon
|
|
java_versions = {"8": "", "11": "_java11", "17": "_java17"}
|
|
image_tag += java_versions[os.getenv('IMPALA_DOCKER_JAVA', '8')]
|
|
host_name = self.__gen_host_name__(daemon, instance)
|
|
container_name = self.__gen_container_name__(daemon, instance)
|
|
# Mount configuration into container so that we don't need to rebuild container
|
|
# for config changes to take effect.
|
|
conf_dir = os.path.join(IMPALA_HOME, "fe/src/test/resources/")
|
|
mount_args = ["--mount", "type=bind,src={0},dst=/opt/impala/conf".format(conf_dir)]
|
|
# Collect container logs in a unique subdirectory per daemon to avoid any potential
|
|
# interaction between containers, which should be isolated.
|
|
log_dir = os.path.join(IMPALA_HOME, options.log_dir, host_name)
|
|
if not os.path.isdir(log_dir):
|
|
os.makedirs(log_dir)
|
|
mount_args += ["--mount", "type=bind,src={0},dst=/opt/impala/logs".format(log_dir)]
|
|
# Collect Java error files hs_err_pidNNN.log in a unique subdirectory per daemon to
|
|
# avoid any potential interaction between containers, which should be isolated.
|
|
java_error_dir = os.path.join(IMPALA_HOME, options.log_dir, host_name, "java-error")
|
|
if not os.path.isdir(java_error_dir):
|
|
os.makedirs(java_error_dir)
|
|
mount_args += ["--mount", "type=bind,src={0},dst=/opt/impala/java-errors".format(
|
|
java_error_dir)]
|
|
# If /etc/localtime was found to be a real file instead of a symlink, then mount it
|
|
# into the container to ensure consistent clocks between the host and the Impala
|
|
# containers. This is important for logs as well as Iceberg tests.
|
|
if timezone_as_mount:
|
|
mount_args += ["--mount",
|
|
"type=bind,src=/etc/localtime,dst=/etc/localtime,readonly"]
|
|
if options.mount_sources:
|
|
mount_args += ["--mount",
|
|
"type=bind,src={0},dst=/opt/impala/sources,readonly".format(IMPALA_HOME)]
|
|
# Add entries to the container's /etc/hosts file for the Docker host and the
|
|
# gateway from the container to the host. These are needed for stable reverse
|
|
# name resolution of the host's and the gateway's IP addresses
|
|
# The local host's name is governed by convention, see
|
|
# https://docs.docker.com/reference/cli/docker/container/run/#add-host
|
|
local_host_arg = ["--add-host=host.docker.internal:host-gateway"]
|
|
# For a dockerised setup INTERNAL_LISTEN_HOST stores the Docker gateway's IP address,
|
|
# prepared by docker/configure_test_network.sh. If it contains "localhost", then
|
|
# setup is not complete, so don't add it; it would be invalid as an IP address
|
|
# anyway.
|
|
local_gateway_arg = None
|
|
if INTERNAL_LISTEN_HOST != "localhost":
|
|
local_gateway_arg = ["--add-host=gateway.internal:{0}".format(INTERNAL_LISTEN_HOST)]
|
|
|
|
# Create a data cache subdirectory for each daemon and mount at /opt/impala/cache
|
|
# in the container.
|
|
if options.data_cache_dir and supports_data_cache:
|
|
data_cache_dir = os.path.join(options.data_cache_dir, host_name + "_cache")
|
|
if not os.path.isdir(data_cache_dir):
|
|
os.makedirs(data_cache_dir)
|
|
mount_args += ["--mount", "type=bind,src={0},dst={1}".format(
|
|
data_cache_dir, DATA_CACHE_CONTAINER_PATH)]
|
|
|
|
# Run the container as the current user.
|
|
user_args = ["--user", "{0}:{1}".format(os.getuid(), os.getgid())]
|
|
|
|
mem_limit_args = []
|
|
if mem_limit is not None:
|
|
mem_limit_args = ["--memory", str(mem_limit)]
|
|
LOG.info("Running container {0}".format(container_name))
|
|
run_cmd = (["docker", "run", "-d"] + env_args + port_args + user_args + ["--network",
|
|
self.network_name, "--name", container_name, "--network-alias", host_name]
|
|
+ local_host_arg + local_gateway_arg
|
|
+ mount_args + mem_limit_args + [image_tag] + args)
|
|
LOG.info("Running command {0}".format(run_cmd))
|
|
check_call(run_cmd)
|
|
port_mapping = check_output(["docker", "port", container_name],
|
|
universal_newlines=True)
|
|
LOG.info("Launched container {0} with port mapping:\n{1}".format(
|
|
container_name, port_mapping))
|
|
|
|
def __stop_container__(self, daemon, instance=None):
|
|
"""Stop a container that was created by __run_container__()."""
|
|
container_name = self.__gen_container_name__(daemon, instance)
|
|
if call(["docker", "stop", container_name]) == 0:
|
|
LOG.info("Stopped container {0}".format(container_name))
|
|
|
|
def __destroy_container__(self, daemon, instance=None):
|
|
"""Destroy a container that was created by __run_container__()."""
|
|
container_name = self.__gen_container_name__(daemon, instance)
|
|
if call(["docker", "rm", "-f", container_name]) == 0:
|
|
LOG.info("Destroyed container {0}".format(container_name))
|
|
|
|
def __get_network_info__(self):
|
|
"""Get the output of "docker network inspect" as a python data structure."""
|
|
output = check_output(["docker", "network", "inspect", self.network_name],
|
|
universal_newlines=True)
|
|
# Only one network should be present in the top level array.
|
|
return json.loads(output)[0]
|
|
|
|
|
|
def validate_options():
|
|
if options.build_type not in KNOWN_BUILD_TYPES:
|
|
LOG.error("Invalid build type {0}".format(options.build_type))
|
|
LOG.error("Valid values: {0}".format(", ".join(KNOWN_BUILD_TYPES)))
|
|
sys.exit(1)
|
|
|
|
if options.cluster_size < 0:
|
|
LOG.error("Please specify a cluster size >= 0")
|
|
sys.exit(1)
|
|
|
|
if (options.use_exclusive_coordinators and
|
|
options.num_coordinators >= options.cluster_size):
|
|
LOG.info("Starting impala cluster without executors")
|
|
|
|
if not os.path.isdir(options.log_dir):
|
|
LOG.error("Log dir does not exist or is not a directory: {log_dir}".format(
|
|
log_dir=options.log_dir))
|
|
sys.exit(1)
|
|
|
|
restart_only_count = len([opt for opt in [options.restart_impalad_only,
|
|
options.restart_statestored_only,
|
|
options.restart_catalogd_only,
|
|
options.add_executors] if opt])
|
|
if restart_only_count > 1:
|
|
LOG.error("--restart_impalad_only, --restart_catalogd_only, "
|
|
"--restart_statestored_only, and --add_executors options are mutually "
|
|
"exclusive")
|
|
sys.exit(1)
|
|
elif restart_only_count == 1:
|
|
if options.inprocess:
|
|
LOG.error(
|
|
"Cannot perform individual component restarts using an in-process cluster")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logging.basicConfig(level=logging.ERROR, format="%(asctime)s %(threadName)s: %(message)s",
|
|
datefmt="%H:%M:%S")
|
|
validate_options()
|
|
if options.docker_network is None:
|
|
cluster_ops = MiniClusterOperations()
|
|
else:
|
|
cluster_ops = DockerMiniClusterOperations(options.docker_network)
|
|
|
|
# If core-site.xml is missing, it likely means that we are missing config
|
|
# files and should try regenerating them.
|
|
if not os.path.exists(CORE_SITE_PATH):
|
|
LOG.info("{0} is missing, regenerating cluster configs".format(CORE_SITE_PATH))
|
|
check_call(os.path.join(IMPALA_HOME, "bin/create-test-configuration.sh"))
|
|
|
|
# Kill existing cluster processes based on the current configuration.
|
|
if options.restart_impalad_only:
|
|
cluster_ops.kill_all_impalads(force=options.force_kill)
|
|
elif options.restart_catalogd_only:
|
|
cluster_ops.kill_all_catalogds(force=options.force_kill)
|
|
elif options.restart_statestored_only:
|
|
cluster_ops.kill_all_statestoreds(force=options.force_kill)
|
|
elif options.add_executors or options.add_impalads:
|
|
pass
|
|
else:
|
|
cluster_ops.kill_all_daemons(force=options.force_kill)
|
|
|
|
if options.kill_only:
|
|
sys.exit(0)
|
|
|
|
if options.restart_impalad_only:
|
|
impala_cluster = ImpalaCluster()
|
|
if not impala_cluster.statestored or not impala_cluster.catalogd:
|
|
LOG.info("No running statestored or catalogd detected. "
|
|
"Restarting entire cluster.")
|
|
options.restart_impalad_only = False
|
|
|
|
existing_cluster_size = len(cluster_ops.get_cluster().impalads)
|
|
expected_cluster_size = options.cluster_size
|
|
num_coordinators = options.num_coordinators
|
|
try:
|
|
if options.restart_catalogd_only:
|
|
cluster_ops.start_catalogd()
|
|
elif options.restart_statestored_only:
|
|
cluster_ops.start_statestore()
|
|
elif options.restart_impalad_only:
|
|
cluster_ops.start_impalads(options.cluster_size, options.num_coordinators,
|
|
options.use_exclusive_coordinators)
|
|
elif options.add_executors:
|
|
num_coordinators = 0
|
|
use_exclusive_coordinators = False
|
|
cluster_ops.start_impalads(options.cluster_size, num_coordinators,
|
|
use_exclusive_coordinators, existing_cluster_size)
|
|
expected_cluster_size += existing_cluster_size
|
|
elif options.add_impalads:
|
|
cluster_ops.start_impalads(options.num_coordinators, options.num_coordinators,
|
|
options.use_exclusive_coordinators,
|
|
existing_cluster_size)
|
|
expected_cluster_size += existing_cluster_size
|
|
else:
|
|
cluster_ops.start_statestore()
|
|
cluster_ops.start_catalogd()
|
|
if options.enable_admission_service:
|
|
cluster_ops.start_admissiond()
|
|
cluster_ops.start_impalads(options.cluster_size, options.num_coordinators,
|
|
options.use_exclusive_coordinators)
|
|
# Sleep briefly to reduce log spam: the cluster takes some time to start up.
|
|
sleep(2)
|
|
|
|
impala_cluster = cluster_ops.get_cluster()
|
|
expected_catalog_delays = 0
|
|
if options.catalog_init_delays != "":
|
|
for delay in options.catalog_init_delays.split(","):
|
|
if int(delay.strip()) != 0: expected_catalog_delays += 1
|
|
# Check for the cluster to be ready.
|
|
expected_num_ready_impalads = expected_cluster_size - expected_catalog_delays
|
|
if options.add_impalads:
|
|
# TODO: This is a hack to make the waiting logic work. We'd better add a dedicated
|
|
# option for adding a new cluster using the existing catalogd and statestore.
|
|
# https://issues.apache.org/jira/browse/IMPALA-13755
|
|
expected_num_ready_impalads = options.cluster_size
|
|
expected_cluster_size = options.cluster_size
|
|
impala_cluster.wait_until_ready(expected_cluster_size, expected_num_ready_impalads)
|
|
|
|
# Print actual log files after cluster is ready
|
|
if cluster_ops.log_symlinks:
|
|
LOG.info("Actual log file names:")
|
|
for log_symlink in cluster_ops.log_symlinks:
|
|
print_actual_log_file(log_symlink)
|
|
|
|
except Exception as e:
|
|
LOG.exception("Error starting cluster")
|
|
sys.exit(1)
|
|
|
|
if options.use_exclusive_coordinators == True:
|
|
executors = options.cluster_size - options.num_coordinators
|
|
else:
|
|
executors = options.cluster_size
|
|
LOG.info(("Impala Cluster Running with {num_nodes} nodes "
|
|
"({num_coordinators} coordinators, {num_executors} executors).").format(
|
|
num_nodes=options.cluster_size,
|
|
num_coordinators=min(options.cluster_size, num_coordinators),
|
|
num_executors=executors))
|