mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
Several tests on catalogd HA failover have a loop of the following pattern: - Do some operations - Kills the active catalogd - Verifies some results - Starts the killed catalogd After starting the killed catalogd, the test gets the new active and standby catalogds and check their /healthz pages immediately. This could fail if the web pages are not registered yet. The cause is when starting catalogd, we just wait for its 'statestore-subscriber.connected' to be True. This doesn't guarantee that the web pages are initialized. This patch adds a wait for this, i.e. when getting the web pages hits 404 (Not Found) error, wait and retry. Another flaky issue of these failover tests is cleanup unique_database could fail due to impalad still using the old active catalogd address even in RPC failure retries (IMPALA-14228). This patch adds a retry on the DROP DATABASE statement to work around this. Sets disable_log_buffering to True so the killed catalogd has complete logs. Sets catalog_client_connection_num_retries to 2 to save time in coordinator retrying RPCs to the killed catalogd. This reduce the duration of test_warmed_up_metadata_failover_catchup from 100s to 50s. Tests: - Ran all (15) failover tests in test_catalogd_ha.py 10 times (each round takes 450s). Change-Id: Iad42a55ed7c357ed98d85c69e16ff705a8cae89d Reviewed-on: http://gerrit.cloudera.org:8080/23235 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Quanlong Huang <huangquanlong@gmail.com>
821 lines
33 KiB
Python
821 lines
33 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
# Basic object model of an Impala cluster (set of Impala processes).
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
from builtins import map, range
|
|
import json
|
|
import logging
|
|
import os
|
|
import pipes
|
|
import psutil
|
|
import socket
|
|
import time
|
|
import requests
|
|
from getpass import getuser
|
|
from random import choice
|
|
from signal import SIGKILL, SIGRTMIN
|
|
from subprocess import check_call, check_output
|
|
from time import sleep
|
|
|
|
import tests.common.environ
|
|
from tests.common.impala_service import (
|
|
AdmissiondService,
|
|
CatalogdService,
|
|
ImpaladService,
|
|
StateStoredService)
|
|
from tests.util.shell_util import exec_process
|
|
|
|
LOG = logging.getLogger('impala_cluster')
|
|
LOG.setLevel(level=logging.DEBUG)
|
|
|
|
IMPALA_HOME = os.environ['IMPALA_HOME']
|
|
START_DAEMON_PATH = os.path.join(IMPALA_HOME, 'bin/start-daemon.sh')
|
|
|
|
DEFAULT_BEESWAX_PORT = 21000
|
|
DEFAULT_HS2_PORT = 21050
|
|
DEFAULT_EXTERNAL_FE_PORT = 21150
|
|
DEFAULT_HS2_HTTP_PORT = 28000
|
|
DEFAULT_KRPC_PORT = 27000
|
|
DEFAULT_CATALOG_SERVICE_PORT = 26000
|
|
DEFAULT_STATE_STORE_SUBSCRIBER_PORT = 23000
|
|
DEFAULT_CATALOGD_STATE_STORE_SUBSCRIBER_PORT = 23020
|
|
DEFAULT_STATESTORE_SERVICE_PORT = 24000
|
|
DEFAULT_STATESTORE_HA_SERVICE_PORT = 24020
|
|
DEFAULT_PEER_STATESTORE_HA_SERVICE_PORT = 24021
|
|
DEFAULT_IMPALAD_WEBSERVER_PORT = 25000
|
|
DEFAULT_STATESTORED_WEBSERVER_PORT = 25010
|
|
DEFAULT_CATALOGD_WEBSERVER_PORT = 25020
|
|
DEFAULT_ADMISSIOND_WEBSERVER_PORT = 25030
|
|
|
|
DEFAULT_IMPALAD_JVM_DEBUG_PORT = 30000
|
|
DEFAULT_CATALOGD_JVM_DEBUG_PORT = 30030
|
|
|
|
# Timeout to use when waiting for a cluster to start up. Set quite high to avoid test
|
|
# flakiness.
|
|
CLUSTER_WAIT_TIMEOUT_IN_SECONDS = 240
|
|
|
|
# Url format used to set JVM log level.
|
|
SET_JAVA_LOGLEVEL_URL = "http://{0}:{1}/set_java_loglevel"
|
|
|
|
|
|
def post_data(url, data):
|
|
"""Helper method to post data to a url."""
|
|
response = requests.head(url)
|
|
assert response.status_code == requests.codes.ok, "URL: {0} Resp:{1}".format(
|
|
url, response.text)
|
|
response = requests.post(url, data=data)
|
|
assert response.status_code == requests.codes.ok, "URL: {0} Resp:{1}".format(
|
|
url, response.text)
|
|
return response
|
|
|
|
|
|
# Represents a set of Impala processes.
|
|
# Handles two cases:
|
|
# * The traditional minicluster with many processes running as the current user on
|
|
# the local system. In this case various settings are detected based on command
|
|
# line options(beeswax_port, webserver_port, etc)
|
|
# * The docker minicluster with one container per process connected to a user-defined
|
|
# bridge network.
|
|
class ImpalaCluster(object):
|
|
def __init__(self, docker_network=None, use_admission_service=False,
|
|
deploy_catalogd=True):
|
|
self.docker_network = docker_network
|
|
self.use_admission_service = use_admission_service
|
|
self.deploy_catalogd = deploy_catalogd
|
|
self.refresh()
|
|
|
|
@classmethod
|
|
def get_e2e_test_cluster(cls):
|
|
"""Within end-to-end tests, get the cluster under test with settings detected from
|
|
the environment."""
|
|
return ImpalaCluster(docker_network=tests.common.environ.docker_network)
|
|
|
|
def refresh(self, silent=False):
|
|
""" Re-loads the impalad/statestored/catalogd processes if they exist.
|
|
|
|
Helpful to confirm that processes have been killed.
|
|
"""
|
|
if self.docker_network is None:
|
|
self.__impalads, self.__statestoreds, self.__catalogds, self.__admissiond =\
|
|
self.__build_impala_process_lists()
|
|
else:
|
|
self.__impalads, self.__statestoreds, self.__catalogds, self.__admissiond =\
|
|
self.__find_docker_containers()
|
|
admissiond_str = ""
|
|
if self.use_admission_service:
|
|
admissiond_str = "/%d admissiond" % (1 if self.__admissiond else 0)
|
|
|
|
if not silent:
|
|
LOG.debug("Found %d impalad/%d statestored/%d catalogd%s process(es)" %
|
|
(len(self.__impalads), len(self.__statestoreds), len(self.__catalogds),
|
|
admissiond_str))
|
|
|
|
@property
|
|
def statestored(self):
|
|
"""
|
|
Returns the statestore process
|
|
|
|
Note: Currently we expectly a single statestore process, in the future this might
|
|
change in which case this should return the "active" statestore.
|
|
"""
|
|
# If no statestored process exists, return None.
|
|
return self.__statestoreds[0] if len(self.__statestoreds) > 0 else None
|
|
|
|
def statestoreds(self):
|
|
"""Returns a list of the known statestored processes"""
|
|
return self.__statestoreds
|
|
|
|
def get_first_statestored(self):
|
|
return self.statestoreds[0]
|
|
|
|
@property
|
|
def impalads(self):
|
|
"""Returns a list of the known impalad processes"""
|
|
return self.__impalads
|
|
|
|
@property
|
|
def catalogd(self):
|
|
# If no catalogd process exists, return None. Otherwise, return first catalogd
|
|
return self.__catalogds[0] if len(self.__catalogds) > 0 else None
|
|
|
|
def catalogds(self):
|
|
"""Returns a list of the known catalogd processes"""
|
|
return self.__catalogds
|
|
|
|
def get_first_catalogd(self):
|
|
return self.catalogds[0]
|
|
|
|
@property
|
|
def admissiond(self):
|
|
"""Returns the admisisond process, or None if no admissiond process was found"""
|
|
return self.__admissiond
|
|
|
|
def get_first_impalad(self):
|
|
return self.impalads[0]
|
|
|
|
def get_any_impalad(self):
|
|
"""Selects a random impalad from the list of known processes"""
|
|
return choice(self.impalads)
|
|
|
|
def get_different_impalad(self, other_impalad):
|
|
"""Selects an impalad that is different from the given impalad"""
|
|
if len(self.impalads) <= 1:
|
|
assert 0, "Only %d impalads available to choose from" % len(self.impalads)
|
|
LOG.info("other_impalad: " + str(other_impalad))
|
|
LOG.info("Cluster: " + str(len(self.impalads)))
|
|
LOG.info("Cluster: " + str(self.impalads))
|
|
return choice([impalad for impalad in self.impalads if impalad != other_impalad])
|
|
|
|
def get_all_coordinators(self):
|
|
"""Returns a list of all impalads where is_coordinator returns True. If no
|
|
coordinators are found, returns an empty list. The returned list is sorted by krpc
|
|
port ascending."""
|
|
|
|
return sorted([imp for imp in self.impalads if imp.is_coordinator()],
|
|
key=lambda x: x.service.krpc_port)
|
|
|
|
def num_responsive_coordinators(self):
|
|
"""Find the number of impalad coordinators that can evaluate a test query."""
|
|
n = 0
|
|
for impalad in self.impalads:
|
|
try:
|
|
client = impalad.service.create_hs2_client()
|
|
result = client.execute("select 1")
|
|
assert result.success
|
|
++n
|
|
except Exception as e: print(e)
|
|
finally:
|
|
client.close()
|
|
return n
|
|
|
|
def wait_until_ready(self, expected_num_impalads=1, expected_num_ready_impalads=None):
|
|
"""Waits for this 'cluster' to be ready to submit queries.
|
|
|
|
A cluster is deemed "ready" if:
|
|
- expected_num_impalads impala processes are up (or, if not specified, at least
|
|
one impalad is up).
|
|
- expected_num_ready_impalads backends are registered with the statestore.
|
|
expected_num_ready_impalads defaults to expected_num_impalads.
|
|
- Each impalad's debug webserver is ready.
|
|
- Each coordinator impalad's hs2/beeswax port is open (this happens after catalog
|
|
cache is ready).
|
|
- All impalads knows about all other ready impalads.
|
|
This information is retrieved by querying the statestore debug webpage
|
|
and each individual impalad's metrics webpage.
|
|
"""
|
|
start_time = time.time()
|
|
self.wait_for_num_impalads(expected_num_impalads)
|
|
|
|
# TODO: fix this for coordinator-only nodes as well.
|
|
if expected_num_ready_impalads is None:
|
|
expected_num_ready_impalads = len(self.impalads)
|
|
|
|
def check_processes_still_running():
|
|
"""Check that the processes we waited for above (i.e. impalads, statestored,
|
|
catalogd) are still running. Throw an exception otherwise."""
|
|
self.refresh(silent=True)
|
|
# The number of impalad processes may temporarily increase if breakpad forked a
|
|
# process to write a minidump.
|
|
assert len(self.impalads) >= expected_num_impalads
|
|
assert self.statestored is not None
|
|
if (self.deploy_catalogd): assert self.catalogd is not None
|
|
|
|
sleep_interval = 0.5
|
|
# Wait for each webserver to be ready.
|
|
for impalad in self.impalads:
|
|
impalad.wait_for_webserver(sleep_interval, check_processes_still_running)
|
|
|
|
# Wait for coordinators to start.
|
|
for impalad in self.impalads:
|
|
# lookup /varz page. webserver should up already.
|
|
flags = impalad.service.get_flag_current_values()
|
|
if flags.get('is_coordinator', 'true') != 'true':
|
|
continue
|
|
if flags.get('stress_catalog_init_delay_ms', '0') != '0':
|
|
continue
|
|
impalad.wait_for_coordinator_services(sleep_interval, check_processes_still_running)
|
|
# Decrease sleep_interval after first coordinator ready as the others are also
|
|
# likely to be (nearly) ready.
|
|
sleep_interval = 0.2
|
|
|
|
# Restore sleep interval to avoid potential log spew. At this point it is unlikely
|
|
# that any more sleeps are actually needed.
|
|
sleep_interval = 0.5
|
|
# Wait till all impalads consider all backends ready.
|
|
for impalad in self.impalads:
|
|
impalad.service.wait_for_num_known_live_backends(expected_num_ready_impalads,
|
|
timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=sleep_interval,
|
|
early_abort_fn=check_processes_still_running)
|
|
LOG.info("Total wait: {:.2f}s".format(time.time() - start_time))
|
|
|
|
def wait_for_num_impalads(self, num_impalads, retries=10):
|
|
"""Checks that at least 'num_impalads' impalad processes are running, along with
|
|
the statestored and catalogd.
|
|
|
|
Refresh until the number running impalad processes reaches the expected
|
|
number based on num_impalads, or the retry limit is hit. Failing this, raise a
|
|
RuntimeError.
|
|
"""
|
|
for i in range(retries):
|
|
if len(self.impalads) < num_impalads or not self.statestored or not self.catalogd:
|
|
sleep(1)
|
|
self.refresh()
|
|
msg = ""
|
|
if len(self.impalads) < num_impalads:
|
|
msg += "Expected {expected_num} impalad(s), only {actual_num} found\n".format(
|
|
expected_num=num_impalads, actual_num=len(self.impalads))
|
|
if not self.statestored:
|
|
msg += "statestored failed to start.\n"
|
|
if self.deploy_catalogd and not self.catalogd:
|
|
msg += "catalogd failed to start.\n"
|
|
if msg:
|
|
raise RuntimeError(msg)
|
|
|
|
def graceful_shutdown_impalads(self):
|
|
# Refresh cluster in case cluster has changed.
|
|
self.refresh()
|
|
for impalad in self.impalads:
|
|
impalad.kill(SIGRTMIN)
|
|
for impalad in self.impalads:
|
|
impalad.wait_for_exit()
|
|
|
|
def __build_impala_process_lists(self):
|
|
"""
|
|
Gets all the running Impala procs (with start arguments) on the machine.
|
|
|
|
Note: This currently only works for the local case. To support running in a cluster
|
|
environment this would need to enumerate each machine in the cluster.
|
|
"""
|
|
impalads = list()
|
|
statestoreds = list()
|
|
catalogds = list()
|
|
admissiond = None
|
|
daemons = ['impalad', 'catalogd', 'statestored', 'admissiond']
|
|
for binary, process in find_user_processes(daemons):
|
|
# IMPALA-6889: When a process shuts down and becomes a zombie its cmdline becomes
|
|
# empty for a brief moment, before it gets reaped by its parent (see man proc). We
|
|
# copy the cmdline to prevent it from changing between the following checks and
|
|
# the construction of the *Process objects.
|
|
cmdline = ''
|
|
try:
|
|
cmdline = process.cmdline()
|
|
except psutil.NoSuchProcess:
|
|
# IMPALA-8320: psutil.Process.cmdline is a property and the process could have
|
|
# disappeared between the time we built the process list and now.
|
|
continue
|
|
if len(cmdline) == 0:
|
|
continue
|
|
if binary == 'impalad':
|
|
impalads.append(ImpaladProcess(cmdline))
|
|
elif binary == 'statestored':
|
|
statestoreds.append(StateStoreProcess(cmdline))
|
|
elif binary == 'catalogd':
|
|
catalogds.append(CatalogdProcess(cmdline))
|
|
elif binary == 'admissiond':
|
|
admissiond = AdmissiondProcess(cmdline)
|
|
|
|
self.__sort_impalads(impalads)
|
|
self.__sort_statestoreds(statestoreds)
|
|
self.__sort_catalogds(catalogds)
|
|
return impalads, statestoreds, catalogds, admissiond
|
|
|
|
def __find_docker_containers(self):
|
|
"""
|
|
Gets all the running Impala containers on self.docker_network.
|
|
"""
|
|
impalads = []
|
|
statestoreds = []
|
|
catalogds = []
|
|
admissiond = None
|
|
output = check_output(["docker", "network", "inspect", self.docker_network],
|
|
universal_newlines=True)
|
|
# Only one network should be present in the top level array.
|
|
for container_id in json.loads(output)[0]["Containers"]:
|
|
container_info = get_container_info(container_id)
|
|
if container_info["State"]["Status"] != "running":
|
|
# Skip over stopped containers.
|
|
continue
|
|
args = container_info["Args"]
|
|
executable = os.path.basename(args[0])
|
|
port_map = {}
|
|
for k, v in container_info["NetworkSettings"]["Ports"].items():
|
|
# Key looks like "25000/tcp"..
|
|
port = int(k.split("/")[0])
|
|
# Value looks like { "HostPort": "25002", "HostIp": "" }.
|
|
host_port = int(v[0]["HostPort"])
|
|
port_map[port] = host_port
|
|
if executable == 'impalad':
|
|
impalads.append(ImpaladProcess(args, container_id=container_id,
|
|
port_map=port_map))
|
|
elif executable == 'statestored':
|
|
statestoreds.append(StateStoreProcess(args, container_id=container_id,
|
|
port_map=port_map))
|
|
elif executable == 'catalogd':
|
|
catalogds.append(CatalogdProcess(args, container_id=container_id,
|
|
port_map=port_map))
|
|
elif executable == 'admissiond':
|
|
assert admissiond is None
|
|
admissiond = AdmissiondProcess(args, container_id=container_id,
|
|
port_map=port_map)
|
|
self.__sort_impalads(impalads)
|
|
self.__sort_statestoreds(statestoreds)
|
|
self.__sort_catalogds(catalogds)
|
|
return impalads, statestoreds, catalogds, admissiond
|
|
|
|
def __sort_impalads(self, impalads):
|
|
"""Does an in-place sort of a list of ImpaladProcess objects into a canonical order.
|
|
We order them by their HS2 port, so that get_first_impalad() always returns the
|
|
first one. We need to use a port that is exposed and mapped to a host port for
|
|
the containerised cluster."""
|
|
impalads.sort(key=lambda i: i.service.hs2_port)
|
|
|
|
def __sort_statestoreds(self, statestoreds):
|
|
"""Does an in-place sort of a list of StateStoredService objects into a canonical
|
|
order. We order them by their service port, so that get_first_statestored() always
|
|
returns the first one. We need to use a port that is exposed and mapped to a host
|
|
port for the containerised cluster."""
|
|
statestoreds.sort(key=lambda i: i.service.service_port)
|
|
|
|
def __sort_catalogds(self, catalogds):
|
|
"""Does an in-place sort of a list of CatalogdProcess objects into a canonical order.
|
|
We order them by their service port, so that get_first_catalogd() always returns the
|
|
first one. We need to use a port that is exposed and mapped to a host port for
|
|
the containerised cluster."""
|
|
catalogds.sort(key=lambda i: i.service.service_port)
|
|
|
|
|
|
# Represents a process running on a machine and common actions that can be performed
|
|
# on a process such as restarting or killing. The process may be the main process in
|
|
# a Docker container, if the cluster is containerised (in this case container_id must
|
|
# be provided). Note that containerised processes are really just processes running
|
|
# on the local system with some additional virtualisation, so some operations are
|
|
# the same for both containerised and non-containerised cases.
|
|
#
|
|
# For containerised processes, 'port_map' should be provided to map from the container's
|
|
# ports to ports on the host. Methods from this class always return the host port.
|
|
class Process(object):
|
|
def __init__(self, cmd, container_id=None, port_map=None):
|
|
assert cmd is not None and len(cmd) >= 1,\
|
|
'Process object must be created with valid command line argument list'
|
|
assert container_id is None or port_map is not None,\
|
|
"Must provide port_map for containerised process"
|
|
self.cmd = cmd
|
|
self.container_id = container_id
|
|
self.port_map = port_map
|
|
|
|
def __class_name(self):
|
|
return self.__class__.__name__
|
|
|
|
def __str__(self):
|
|
return "<%s PID: %s (%s)>" % (self.__class_name(), self.__get_pid(),
|
|
' '.join(self.cmd))
|
|
|
|
def __repr__(self):
|
|
return str(self)
|
|
|
|
def get_pid(self):
|
|
"""Gets the PID of the process. Returns None if the PID cannot be determined"""
|
|
pid = self.__get_pid()
|
|
if pid:
|
|
LOG.info("Found PID %s for %s" % (pid, " ".join(self.cmd)))
|
|
else:
|
|
LOG.info("No PID found for process cmdline: %s. Process is dead?" %
|
|
" ".join(self.cmd))
|
|
return pid
|
|
|
|
def get_pids(self):
|
|
"""Gets the PIDs of the process. In some circumstances, a process can run multiple
|
|
times, e.g. when it forks in the Breakpad crash handler. Returns an empty list if no
|
|
PIDs can be determined."""
|
|
pids = [proc['pid'] for proc in self.__get_procs()]
|
|
if pids:
|
|
LOG.info("Found PIDs %s for %s" % (", ".join(map(str, pids)), " ".join(self.cmd)))
|
|
else:
|
|
LOG.info("No PID found for process cmdline: %s. Process is dead?" %
|
|
" ".join(self.cmd))
|
|
return pids
|
|
|
|
def __procs_str(self, procs):
|
|
return "\n".join([str(proc) for proc in procs])
|
|
|
|
def __get_pid(self):
|
|
procs = self.__get_procs()
|
|
# Return early for containerized environments
|
|
if len(procs) == 1:
|
|
return procs[0]['pid']
|
|
|
|
result = None
|
|
# In some circumstances - notably ubsan tests - child processes can be slow to exit.
|
|
# Only return the original process, i.e. one who's parent has a different cmd.
|
|
pids = [proc['pid'] for proc in procs]
|
|
for process in procs:
|
|
if process['ppid'] not in pids:
|
|
assert result is None,\
|
|
"Multiple non-child processes:\n%s" % self.__procs_str(procs)
|
|
result = process['pid']
|
|
else:
|
|
LOG.info("Child process active:\n%s" % self.__procs_str(procs))
|
|
|
|
return result
|
|
|
|
def __get_procs(self):
|
|
"""
|
|
Returns a list of dicts containing {pid, ppid, cmdline} for related processes.
|
|
"""
|
|
if self.container_id is not None:
|
|
container_info = get_container_info(self.container_id)
|
|
if container_info["State"]["Status"] != "running":
|
|
return []
|
|
return [{'pid': container_info["State"]["Pid"], 'ppid': 0, 'cmdline': self.cmd}]
|
|
|
|
# In non-containerised case, search for process based on matching command lines.
|
|
procs = []
|
|
for process in psutil.process_iter(['pid', 'ppid', 'cmdline']):
|
|
# Use info because it won't throw NoSuchProcess exceptions.
|
|
if set(self.cmd) == set(process.info['cmdline']):
|
|
procs.append(process.info)
|
|
return procs
|
|
|
|
def kill(self, signal=SIGKILL):
|
|
"""
|
|
Kills the given processes.
|
|
"""
|
|
if self.container_id is None:
|
|
pid = self.get_pid()
|
|
assert pid is not None, "No processes for %s" % self
|
|
LOG.info('Killing %s with signal %s' % (self, signal))
|
|
exec_process("kill -%d %d" % (signal, pid))
|
|
else:
|
|
LOG.info("Stopping container: {0}".format(self.container_id))
|
|
check_call(["docker", "container", "stop", self.container_id])
|
|
|
|
def start(self):
|
|
"""Start the process with the same arguments after it was stopped."""
|
|
if self.container_id is None:
|
|
binary = os.path.basename(self.cmd[0])
|
|
restart_args = self.cmd[1:]
|
|
LOG.info("Starting {0} with arguments {1}".format(binary, restart_args))
|
|
run_daemon(binary, restart_args)
|
|
else:
|
|
LOG.info("Starting container: {0}".format(self.container_id))
|
|
check_call(["docker", "container", "start", self.container_id])
|
|
|
|
def restart(self, signal=SIGKILL):
|
|
"""Kills and restarts the process"""
|
|
self.kill(signal=signal)
|
|
self.wait_for_exit()
|
|
self.start()
|
|
|
|
def wait_for_exit(self):
|
|
"""Wait until the process exits (or return immediately if it already has exited."""
|
|
LOG.info('Waiting for exit: {0} (PID: {1})'.format(
|
|
' '.join(self.cmd), self.get_pid()))
|
|
while self.__get_pid() is not None:
|
|
sleep(0.01)
|
|
|
|
def kill_and_wait_for_exit(self, signal=SIGKILL):
|
|
"""Kill the process and wait for it to exit"""
|
|
self.kill(signal)
|
|
self.wait_for_exit()
|
|
|
|
def modify_argument(self, argument, new_value):
|
|
"""Modify the 'argument' in start args with new_value.
|
|
If no 'argument' in start args, add it.
|
|
If new_value is None, add or remove 'argument'."""
|
|
for i in range(1, len(self.cmd)):
|
|
if self.cmd[i].split('=')[0] == argument:
|
|
if new_value is None:
|
|
del self.cmd[i]
|
|
else:
|
|
self.cmd[i] = (argument + '=' + new_value)
|
|
return
|
|
self.cmd.append(argument if new_value is None else (argument + '=' + new_value))
|
|
|
|
|
|
# Base class for all Impala processes
|
|
class BaseImpalaProcess(Process):
|
|
def __init__(self, cmd, container_id=None, port_map=None):
|
|
super(BaseImpalaProcess, self).__init__(cmd, container_id, port_map)
|
|
self.hostname = self._get_hostname()
|
|
self.webserver_interface = self._get_webserver_interface()
|
|
|
|
def get_webserver_port(self):
|
|
"""Return the port for the webserver of this process."""
|
|
return int(self._get_port('webserver_port', self._get_default_webserver_port()))
|
|
|
|
def set_jvm_log_level(self, class_name, level="info"): # noqa: U100
|
|
"""Helper method to set JVM log level for certain class name.
|
|
Some daemon might not have JVM in it."""
|
|
raise NotImplementedError()
|
|
|
|
def _get_default_webserver_port(self):
|
|
"""Different daemons have different defaults. Subclasses must override."""
|
|
raise NotImplementedError()
|
|
|
|
def _get_webserver_certificate_file(self):
|
|
# TODO: if this is containerised, the path will likely not be the same on the host.
|
|
# TODO: what we need in the client is the CA, not the server cert
|
|
return self._get_arg_value("webserver_certificate_file", "")
|
|
|
|
def _get_ssl_client_ca_certificate(self):
|
|
return self._get_arg_value("ssl_client_ca_certificate", "")
|
|
|
|
def _get_hostname(self):
|
|
return self._get_arg_value("hostname", socket.gethostname())
|
|
|
|
def _get_webserver_interface(self):
|
|
return self._get_arg_value("webserver_interface", socket.gethostname())
|
|
|
|
def _get_external_interface(self):
|
|
return self._get_arg_value("external_interface", socket.gethostname())
|
|
|
|
def _get_arg_value(self, arg_name, default=None):
|
|
"""Gets the argument value for given argument name"""
|
|
for arg in self.cmd:
|
|
if arg.strip().lstrip('-').startswith('%s=' % arg_name):
|
|
return arg.split('=')[1]
|
|
if default is None:
|
|
assert 0, "Argument '{0}' not found in cmd '{1}'.".format(arg_name, self.cmd)
|
|
return default
|
|
|
|
def _get_port(self, arg_name, default):
|
|
"""Return the host port for the specified by the command line argument 'arg_name'.
|
|
If 'self.port_map' is set, maps from container ports to host ports."""
|
|
port = int(self._get_arg_value(arg_name, default))
|
|
if self.port_map is not None:
|
|
port = self.port_map.get(port, port)
|
|
return port
|
|
|
|
|
|
# Represents an impalad process
|
|
class ImpaladProcess(BaseImpalaProcess):
|
|
def __init__(self, cmd, container_id=None, port_map=None):
|
|
super(ImpaladProcess, self).__init__(cmd, container_id, port_map)
|
|
self.external_interface = self._get_external_interface()
|
|
self.service = ImpaladService(self.hostname, self.webserver_interface,
|
|
self.external_interface,
|
|
self.get_webserver_port(), self.__get_beeswax_port(),
|
|
self.__get_krpc_port(), self.__get_hs2_port(), self.__get_hs2_http_port(),
|
|
self._get_webserver_certificate_file(), self._get_ssl_client_ca_certificate())
|
|
|
|
def _get_default_webserver_port(self):
|
|
return DEFAULT_IMPALAD_WEBSERVER_PORT
|
|
|
|
def __get_beeswax_port(self):
|
|
return int(self._get_port('beeswax_port', DEFAULT_BEESWAX_PORT))
|
|
|
|
def __get_krpc_port(self):
|
|
return int(self._get_port('krpc_port', DEFAULT_KRPC_PORT))
|
|
|
|
def __get_hs2_port(self):
|
|
return int(self._get_port('hs2_port', DEFAULT_HS2_PORT))
|
|
|
|
def __get_hs2_http_port(self):
|
|
return int(self._get_port('hs2_http_port', DEFAULT_HS2_HTTP_PORT))
|
|
|
|
def is_coordinator(self):
|
|
"""Returns boolean True or False depending on whether or not the current process is
|
|
a coordinator. Both exclusive and non-exclusive coordinators will return true."""
|
|
return self._get_arg_value("is_coordinator", "true") == "true"
|
|
|
|
def start(self, wait_until_ready=True, timeout=30):
|
|
"""Starts the impalad and waits until the service is ready to accept connections.
|
|
'timeout' is the amount of time to wait for the Impala server to be in the
|
|
ready state."""
|
|
restart_args = self.cmd[1:]
|
|
LOG.info("Starting Impalad process with args: {0}".format(restart_args))
|
|
run_daemon("impalad", restart_args)
|
|
if wait_until_ready:
|
|
self.service.wait_for_metric_value('impala-server.ready',
|
|
expected_value=1, timeout=timeout)
|
|
|
|
def wait_for_webserver(self, sleep_interval, early_abort_fn):
|
|
start_time = time.time()
|
|
while time.time() - start_time < CLUSTER_WAIT_TIMEOUT_IN_SECONDS:
|
|
LOG.info("Waiting for Impalad webserver port %s", self.service.webserver_port)
|
|
if self.service.webserver_port_is_open(): return
|
|
early_abort_fn()
|
|
sleep(sleep_interval)
|
|
|
|
def wait_for_coordinator_services(self, sleep_interval, early_abort_fn):
|
|
"""Waits for client ports to be opened. Assumes that the webservice ports are open."""
|
|
start_time = time.time()
|
|
LOG.info(
|
|
"Waiting for coordinator client services "
|
|
+ "- hs2 port: %d hs2-http port: %d beeswax port: %d",
|
|
self.service.hs2_port, self.service.hs2_http_port, self.service.beeswax_port)
|
|
while time.time() - start_time < CLUSTER_WAIT_TIMEOUT_IN_SECONDS:
|
|
beeswax_port_is_open = self.service.beeswax_port_is_open()
|
|
hs2_port_is_open = self.service.hs2_port_is_open()
|
|
hs2_http_port_is_open = self.service.hs2_http_port_is_open()
|
|
if beeswax_port_is_open and hs2_port_is_open and hs2_http_port_is_open:
|
|
return
|
|
early_abort_fn()
|
|
# The coordinator is likely waiting for the catalog update. Display currently
|
|
# known catalog version to show progress.
|
|
metric_check_time = time.time()
|
|
catalog_version = self.service.get_metric_value("catalog.curr-version")
|
|
LOG.info(("Client services not ready. Might be waiting for catalog topic update: "
|
|
"(catalog.curr-version={catalog_version}). Trying again ...").format(
|
|
catalog_version=catalog_version))
|
|
# Include metric_check_time in sleep_interval.
|
|
sleep(max(0, sleep_interval - (time.time() - metric_check_time)))
|
|
|
|
raise RuntimeError(
|
|
"Unable to open client ports within {num_seconds} seconds.".format(
|
|
num_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS))
|
|
|
|
def set_jvm_log_level(self, class_name, level):
|
|
"""Helper method to set JVM log level for certain class name."""
|
|
url = SET_JAVA_LOGLEVEL_URL.format(
|
|
self.webserver_interface, self.get_webserver_port())
|
|
return post_data(url, {"class": class_name, "level": level})
|
|
|
|
|
|
# Represents a statestored process
|
|
class StateStoreProcess(BaseImpalaProcess):
|
|
def __init__(self, cmd, container_id=None, port_map=None):
|
|
super(StateStoreProcess, self).__init__(cmd, container_id, port_map)
|
|
self.service = StateStoredService(self.hostname, self.webserver_interface,
|
|
self.get_webserver_port(), self._get_webserver_certificate_file(),
|
|
self._get_ssl_client_ca_certificate(), self.__get_port())
|
|
|
|
def _get_default_webserver_port(self):
|
|
return DEFAULT_STATESTORED_WEBSERVER_PORT
|
|
|
|
def __get_port(self):
|
|
return int(self._get_port('state_store_port', DEFAULT_STATESTORE_SERVICE_PORT))
|
|
|
|
def start(self, wait_until_ready=True, additional_args=None):
|
|
"""Starts statestored and waits until the service is started and ready to accept
|
|
connections."""
|
|
restart_args = self.cmd[1:]
|
|
if additional_args:
|
|
restart_args = restart_args + [additional_args]
|
|
LOG.info("Starting Statestored process: {0}".format(restart_args))
|
|
run_daemon("statestored", restart_args)
|
|
if wait_until_ready:
|
|
self.service.wait_for_metric_value('statestore.service-started',
|
|
expected_value=1, timeout=30)
|
|
|
|
|
|
# Represents a catalogd process
|
|
class CatalogdProcess(BaseImpalaProcess):
|
|
def __init__(self, cmd, container_id=None, port_map=None):
|
|
super(CatalogdProcess, self).__init__(cmd, container_id, port_map)
|
|
self.service = CatalogdService(self.hostname, self.webserver_interface,
|
|
self.get_webserver_port(), self._get_webserver_certificate_file(),
|
|
self._get_ssl_client_ca_certificate(),
|
|
self.__get_port())
|
|
|
|
def _get_default_webserver_port(self):
|
|
return DEFAULT_CATALOGD_WEBSERVER_PORT
|
|
|
|
def __get_port(self):
|
|
return int(self._get_port('catalog_service_port', DEFAULT_CATALOG_SERVICE_PORT))
|
|
|
|
def start(self, wait_until_ready=True, additional_args=None):
|
|
"""Starts catalogd and waits until the service is ready to accept connections."""
|
|
restart_args = self.cmd[1:]
|
|
if additional_args:
|
|
restart_args = restart_args + [additional_args]
|
|
LOG.info("Starting Catalogd process: {0}".format(restart_args))
|
|
run_daemon("catalogd", restart_args)
|
|
if wait_until_ready:
|
|
self.service.wait_for_metric_value('statestore-subscriber.connected',
|
|
expected_value=1, timeout=30)
|
|
# Also wait until web pages are initialized
|
|
self.service.wait_for_page_ready("healthz")
|
|
|
|
def set_jvm_log_level(self, class_name, level):
|
|
"""Helper method to set JVM log level for certain class name."""
|
|
url = SET_JAVA_LOGLEVEL_URL.format(
|
|
self.webserver_interface, self.get_webserver_port())
|
|
return post_data(url, {"class": class_name, "level": level})
|
|
|
|
|
|
# Represents an admission control process.
|
|
class AdmissiondProcess(BaseImpalaProcess):
|
|
def __init__(self, cmd, container_id=None, port_map=None):
|
|
super(AdmissiondProcess, self).__init__(cmd, container_id, port_map)
|
|
self.service = AdmissiondService(self.hostname, self.webserver_interface,
|
|
self.get_webserver_port(), self._get_webserver_certificate_file(),
|
|
self._get_ssl_client_ca_certificate())
|
|
|
|
def _get_default_webserver_port(self):
|
|
return DEFAULT_ADMISSIOND_WEBSERVER_PORT
|
|
|
|
|
|
def find_user_processes(binaries):
|
|
"""Returns an iterator over all processes owned by the current user with a matching
|
|
binary name from the provided list. Return a iterable of tuples, with each tuple
|
|
containing the binary name and the psutil.Process object."""
|
|
for pid in psutil.pids():
|
|
try:
|
|
process = psutil.Process(pid)
|
|
cmdline = process.cmdline()
|
|
if process.username() != getuser() or len(cmdline) == 0:
|
|
continue
|
|
# IMPALA-8820 - sometimes the process name does not reflect the executed binary
|
|
# because the process can change its own name at runtime. Checking the command
|
|
# line is more robust.
|
|
binary_name = os.path.basename(cmdline[0])
|
|
if binary_name in binaries:
|
|
yield binary_name, process
|
|
except KeyError as e:
|
|
if "uid not found" not in str(e):
|
|
raise
|
|
except psutil.NoSuchProcess:
|
|
# Ignore the case when a process no longer exists.
|
|
pass
|
|
|
|
|
|
def run_daemon(daemon_binary, args, build_type="latest", env_vars={}, output_file=None):
|
|
"""Starts up an impalad with the specified command line arguments. args must be a list
|
|
of strings. An optional build_type parameter can be passed to determine the build type
|
|
to use for the impalad instance. Any values in the env_vars override environment
|
|
variables inherited from this process. If output_file is specified, stdout and stderr
|
|
are redirected to that file.
|
|
"""
|
|
bin_path = os.path.join(IMPALA_HOME, "be", "build", build_type, "service",
|
|
daemon_binary)
|
|
redirect = ""
|
|
if output_file is not None:
|
|
redirect = "1>{0} 2>&1".format(output_file)
|
|
cmd = [START_DAEMON_PATH, bin_path] + args
|
|
# Use os.system() to start 'cmd' in the background via a shell so its parent will be
|
|
# init after the shell exits. Otherwise, the parent of 'cmd' will be py.test and we
|
|
# cannot cleanly kill it until py.test exits. In theory, Popen(shell=True) should
|
|
# achieve the same thing but it doesn't work on some platforms for some reasons.
|
|
sys_cmd = ("{set_cmds} {cmd} {redirect} &".format(
|
|
set_cmds=''.join(["export {0}={1};".format(k, pipes.quote(v))
|
|
for k, v in env_vars.items()]),
|
|
cmd=' '.join([pipes.quote(tok) for tok in cmd]),
|
|
redirect=redirect))
|
|
os.system(sys_cmd)
|
|
|
|
|
|
def get_container_info(container_id):
|
|
"""Get the output of "docker container inspect" as a python data structure."""
|
|
containers = json.loads(check_output(["docker", "container", "inspect", container_id],
|
|
universal_newlines=True))
|
|
# Only one container should be present in the top level array.
|
|
assert len(containers) == 1, json.dumps(containers, indent=4)
|
|
return containers[0]
|