mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
To remove the dependency on Python 2, existing scripts need to use python3 rather than python. These commands find those locations (for impala-python and regular python): git grep impala-python | grep -v impala-python3 | grep -v impala-python-common | grep -v init-impala-python git grep bin/python | grep -v python3 This removes or switches most of these locations by various means: 1. If a python file has a #!/bin/env impala-python (or python) but doesn't have a main function, it removes the hash-bang and makes sure that the file is not executable. 2. Most scripts can simply switch from impala-python to impala-python3 (or python to python3) with minimal changes. 3. The cm-api pypi package (which doesn't support Python 3) has been replaced by the cm-client pypi package and interfaces have changed. Rather than migrating the code (which hasn't been used in years), this deletes the old code and stops installing cm-api into the virtualenv. The code can be restored and revamped if there is any interest in interacting with CM clusters. 4. This switches tests/comparison over to impala-python3, but this code has bit-rotted. Some pieces can be run manually, but it can't be fully verified with Python 3. It shouldn't hold back the migration on its own. 5. This also replaces locations of impala-python in comments / documentation / READMEs. 6. kazoo (used for interacting with HBase) needed to be upgraded to a version that supports Python 3. The newest version of kazoo requires upgrades of other component versions, so this uses kazoo 2.8.0 to avoid needing other upgrades. The two remaining uses of impala-python are: - bin/cmake_aux/create_virtualenv.sh - bin/impala-env-versioned-python These will be removed separately when we drop Python 2 support completely. In particular, these are useful for testing impala-shell with Python 2 until we stop supporting Python 2 for impala-shell. The docker-based tests still use /usr/bin/python, but this can be switched over independently (and doesn't impact impala-python) Testing: - Ran core job - Ran build + dataload on Centos 7, Redhat 8 - Manual testing of individual scripts (except some bitrotted areas like the random query generator) Change-Id: If209b761290bc7e7c716c312ea757da3e3bca6dc Reviewed-on: http://gerrit.cloudera.org:8080/23468 Reviewed-by: Michael Smith <michael.smith@cloudera.com> Tested-by: Michael Smith <michael.smith@cloudera.com>
342 lines
14 KiB
Python
Executable File
342 lines
14 KiB
Python
Executable File
#!/usr/bin/env impala-python3
|
|
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
import argparse
|
|
import time
|
|
import logging
|
|
import os
|
|
import pipes
|
|
from subprocess import check_call
|
|
from tests.common.impala_cluster import ImpalaCluster
|
|
from tests.util.filesystem_utils import IS_EC
|
|
from threading import Event, Thread
|
|
|
|
IMPALA_HOME = os.environ["IMPALA_HOME"]
|
|
|
|
|
|
class AutoScaler(object):
|
|
"""This class implements a simple autoscaling algorithm: if queries queue up for a
|
|
configurable duration, a new executor group is started. Likewise, if the number of
|
|
concurrently running queries indicated that an executor group can be removed, such
|
|
measure is taken.
|
|
|
|
Users of this class can start an auto scaler by calling start() and must call stop()
|
|
before exiting (see main() below for an example).
|
|
|
|
This class only uses the default admission control pool.
|
|
"""
|
|
DEFAULT_POOL_NAME = "default-pool"
|
|
|
|
def __init__(self, executor_slots, group_size, start_batch_size=0, max_groups=0,
|
|
wait_up_s=0, wait_down_s=0, coordinator_slots=128):
|
|
# Number of queries that can run concurrently on each executor
|
|
self.executor_slots = executor_slots
|
|
self.coordinator_slots = coordinator_slots
|
|
# Number of executors per executor group
|
|
self.group_size = group_size
|
|
# New executor groups will be started in increments of this size
|
|
self.start_batch_size = group_size
|
|
if start_batch_size > 0:
|
|
self.start_batch_size = start_batch_size
|
|
# Maximum number of executor groups. We only have 10 TCP ports free on our
|
|
# miniclusters and we need one for the dedicated coordinator.
|
|
self.max_groups = 9 // self.group_size
|
|
# max_groups can further bound the maximum number of groups we are going to start,
|
|
# but we won't start more than possible.
|
|
if max_groups > 0 and max_groups < self.max_groups:
|
|
self.max_groups = max_groups
|
|
# Number of seconds to wait before scaling up/down
|
|
self.scale_wait_up_s = 5
|
|
if wait_up_s > 0:
|
|
self.scale_wait_up_s = wait_up_s
|
|
self.scale_wait_down_s = 5
|
|
if wait_down_s > 0:
|
|
self.scale_wait_down_s = wait_down_s
|
|
self.groups = []
|
|
self.num_groups = 0
|
|
# Stopwatches to track how long the conditions for scaling up/down have been met.
|
|
self.scale_up_sw = time.time()
|
|
self.scale_down_sw = time.time()
|
|
|
|
self.loop_thread = None
|
|
# Event to signal that the control loop should exit
|
|
self.stop_ev = Event()
|
|
|
|
def get_cluster(self):
|
|
return ImpalaCluster.get_e2e_test_cluster()
|
|
|
|
def get_coordinator(self):
|
|
cluster = self.get_cluster()
|
|
assert len(cluster.impalads) > 0
|
|
return cluster.get_first_impalad()
|
|
|
|
def get_service(self):
|
|
return self.get_coordinator().service
|
|
|
|
def get_client(self):
|
|
return self.get_coordinator().service.create_hs2_client()
|
|
|
|
def group_name(self, idx):
|
|
# By convention, group names must start with their associated resource pool name
|
|
# followed by a "-".
|
|
return "%s-group-%s" % (self.DEFAULT_POOL_NAME, idx)
|
|
|
|
def start_base_cluster(self):
|
|
"""Starts the base cluster consisting of an exclusive coordinator, catalog, and
|
|
statestore. Does not add any executors."""
|
|
logging.info("Starting base cluster (coordinator, catalog, statestore)")
|
|
cluster_args = ["--impalad_args=-executor_groups=coordinator"]
|
|
self._start_impala_cluster(cluster_args, cluster_size=1,
|
|
executor_slots=self.coordinator_slots,
|
|
expected_num_executors=0, add_executors=False)
|
|
logging.info("Done, number of running executor groups: %s" % self.num_groups)
|
|
|
|
def start_group(self):
|
|
"""Starts an executor group. The name of the group is automatically determined based
|
|
on the current number of total executor groups. Executors in the group will be started
|
|
in batches."""
|
|
self.num_groups += 1
|
|
name = self.group_name(self.num_groups)
|
|
desc = "%s:%s" % (name, self.group_size)
|
|
logging.info("Starting executor group %s with %s members" % (name, self.group_size))
|
|
cluster_args = ["--impalad_args=-executor_groups=%s" % desc]
|
|
batch_size = self.start_batch_size
|
|
num_started = 0
|
|
num_expected = (self.num_groups - 1) * self.group_size
|
|
while (num_started < self.group_size):
|
|
to_start = min(batch_size, self.group_size - num_started)
|
|
num_expected += to_start
|
|
if to_start == 1:
|
|
start_msg = "Starting executor %s" % (num_started + 1)
|
|
else:
|
|
start_msg = "Starting executors %s-%s" % (num_started + 1,
|
|
num_started + to_start)
|
|
logging.info(start_msg)
|
|
self._start_impala_cluster(cluster_args, cluster_size=to_start,
|
|
executor_slots=self.executor_slots,
|
|
expected_num_executors=num_expected, add_executors=True)
|
|
num_started += to_start
|
|
logging.info("Done, number of running executor groups: %s" % self.num_groups)
|
|
|
|
def stop_group(self):
|
|
"""Stops the executor group that was added last."""
|
|
name = self.group_name(self.num_groups)
|
|
group_hosts = self.get_groups()[name]
|
|
logging.info("Stopping executor group %s" % name)
|
|
for host in group_hosts:
|
|
logging.debug("Stopping host %s" % host)
|
|
query = ":shutdown('%s');" % host
|
|
self.execute(query)
|
|
self.wait_for_group_gone(name)
|
|
self.num_groups -= 1
|
|
logging.info("Done, number of running executor groups: %s" % self.num_groups)
|
|
|
|
def wait_for_group_gone(self, group_name, timeout=120):
|
|
"""Waits until all executors in group 'group_name' have unregistered themselves from
|
|
the coordinator's cluster membership view."""
|
|
end = time.time() + timeout
|
|
while time.time() < end:
|
|
groups = self.get_groups()
|
|
if group_name not in groups:
|
|
return
|
|
time.sleep(0.5)
|
|
assert False, "Timeout waiting for group %s to shut down" % group_name
|
|
|
|
def get_groups(self):
|
|
return self.get_service().get_executor_groups()
|
|
|
|
def execute(self, query):
|
|
return self.get_client().execute(query)
|
|
|
|
def get_num_queued_queries(self):
|
|
"""Returns the number of queries currently queued in the default pool on the
|
|
coordinator."""
|
|
return self.get_service().get_num_queued_queries(pool_name=self.DEFAULT_POOL_NAME)
|
|
|
|
def get_num_running_queries(self):
|
|
"""Returns the number of queries currently queued in the default pool on the
|
|
coordinator."""
|
|
return self.get_service().get_num_running_queries(self.DEFAULT_POOL_NAME)
|
|
|
|
def loop(self):
|
|
"""Controls whether new executor groups need to be started or existing ones need to be
|
|
stopped, based on the number of queries that are currently queued and running.
|
|
"""
|
|
while not self.stop_ev.is_set():
|
|
now = time.time()
|
|
num_queued = self.get_num_queued_queries()
|
|
num_running = self.get_num_running_queries()
|
|
capacity = self.executor_slots * self.num_groups
|
|
|
|
logging.debug("queued: %s, running: %s, capacity: %s" % (num_queued, num_running,
|
|
capacity))
|
|
|
|
if num_queued == 0:
|
|
self.scale_up_sw = now
|
|
|
|
scale_up = self.scale_up_sw < now - self.scale_wait_up_s
|
|
if scale_up and self.num_groups < self.max_groups:
|
|
self.start_group()
|
|
self.scale_up_sw = time.time()
|
|
self.scale_down_sw = self.scale_up_sw
|
|
continue
|
|
|
|
surplus = capacity - num_running
|
|
if surplus < self.executor_slots:
|
|
self.scale_down_sw = now
|
|
|
|
if self.scale_down_sw < now - self.scale_wait_down_s:
|
|
self.stop_group()
|
|
self.scale_up_sw = time.time()
|
|
self.scale_down_sw = self.scale_up_sw
|
|
continue
|
|
|
|
time.sleep(1)
|
|
|
|
def start(self):
|
|
"""Starts a base cluster with coordinator and statestore and the control loop to start
|
|
and stop additional executor groups."""
|
|
self.start_base_cluster()
|
|
assert self.loop_thread is None
|
|
self.loop_thread = Thread(target=self.loop)
|
|
self.loop_thread.start()
|
|
|
|
def stop(self):
|
|
"""Stops the AutoScaler and its cluster."""
|
|
if self.stop_ev.is_set():
|
|
return
|
|
self.stop_ev.set()
|
|
if self.loop_thread:
|
|
self.loop_thread.join()
|
|
self.loop_thread = None
|
|
self._kill_whole_cluster()
|
|
|
|
def _start_impala_cluster(self, options, cluster_size, executor_slots,
|
|
expected_num_executors, add_executors):
|
|
"""Starts an Impala cluster and waits for all impalads to come online.
|
|
|
|
If 'add_executors' is True, new executors will be added to the cluster and the
|
|
existing daemons will not be restarted. In that case 'cluster_size' must specify the
|
|
number of nodes that will be added and 'expected_num_executors' must be the total
|
|
expected number of executors after the additional ones have started.
|
|
|
|
If 'add_executors' is false, 'cluster_size' must be 1 and a single exclusive
|
|
coordinator will be started (together with catalog and statestore).
|
|
"""
|
|
assert cluster_size > 0, "cluster_size cannot be 0"
|
|
impala_log_dir = os.getenv("LOG_DIR", "/tmp/")
|
|
cmd = [os.path.join(IMPALA_HOME, "bin/start-impala-cluster.py"),
|
|
"--cluster_size=%d" % cluster_size,
|
|
"--log_dir=%s" % impala_log_dir,
|
|
"--log_level=1"]
|
|
if add_executors:
|
|
cmd.append("--add_executors")
|
|
else:
|
|
assert expected_num_executors == 0
|
|
assert cluster_size == 1
|
|
cmd.append("--use_exclusive_coordinators")
|
|
|
|
impalad_args = [
|
|
"-vmodule=admission-controller=3,cluster-membership-mgr=3",
|
|
"-admission_control_slots=%s" % executor_slots,
|
|
"-shutdown_grace_period_s=2"]
|
|
|
|
options += ["--impalad_args=%s" % a for a in impalad_args]
|
|
|
|
logging.debug("Starting cluster with command: %s" %
|
|
" ".join(pipes.quote(arg) for arg in cmd + options))
|
|
log_debug = logging.getLogger().getEffectiveLevel() == logging.DEBUG
|
|
log_file = None
|
|
if not log_debug:
|
|
log_file = open("/dev/null", "w")
|
|
|
|
check_call(cmd + options, close_fds=True, stdout=log_file, stderr=log_file)
|
|
|
|
# The number of statestore subscribers is
|
|
# cluster_size (# of impalad) + 1 (for catalogd).
|
|
if expected_num_executors > 0:
|
|
expected_subscribers = expected_num_executors + 2
|
|
expected_backends = expected_num_executors + 1
|
|
else:
|
|
expected_subscribers = cluster_size + 1
|
|
expected_backends = 1
|
|
|
|
cluster = self.get_cluster()
|
|
statestored = cluster.statestored
|
|
if statestored is None:
|
|
raise Exception("statestored was not found")
|
|
|
|
logging.debug("Waiting for %s subscribers to come online" % expected_subscribers)
|
|
statestored.service.wait_for_live_subscribers(expected_subscribers, timeout=60)
|
|
for impalad in cluster.impalads:
|
|
logging.debug("Waiting for %s executors to come online" % expected_backends)
|
|
impalad.service.wait_for_num_known_live_backends(expected_backends, timeout=60)
|
|
|
|
def _kill_whole_cluster(self):
|
|
"""Terminates the whole cluster, i.e. all impalads, catalogd, and statestored."""
|
|
logging.info("terminating cluster")
|
|
check_call([os.path.join(IMPALA_HOME, "bin/start-impala-cluster.py"), "--kill_only"])
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("-n", "--executor_slots", help="Concurrent queries per executor "
|
|
"group", type=int, default=3)
|
|
parser.add_argument("-g", "--group_size", help="Number of executors per group",
|
|
type=int, default=2)
|
|
parser.add_argument("-b", "--batch_size", help="Start executors of a group "
|
|
"in batches instead of all at once", type=int, default=0)
|
|
parser.add_argument("-m", "--max_groups", help="Maximum number of groups to start",
|
|
type=int, default=0)
|
|
parser.add_argument("-d", "--wait_down", help="Time to wait before scaling down (s)",
|
|
type=int, default=5)
|
|
parser.add_argument("-u", "--wait_up", help="Time to wait before scaling up (s)",
|
|
type=int, default=5)
|
|
parser.add_argument("-v", "--verbose", help="Verbose logging", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
# Restrict some logging for command line usage
|
|
logging.getLogger("impala_cluster").setLevel(logging.INFO)
|
|
logging.getLogger("requests").setLevel(logging.WARNING)
|
|
if args.verbose:
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
logging.getLogger("impala.hiveserver2").setLevel(logging.INFO)
|
|
else:
|
|
logging.basicConfig(level=logging.INFO)
|
|
# Also restrict other modules' debug output
|
|
logging.getLogger("impala_connection").setLevel(logging.WARNING)
|
|
logging.getLogger("impala_service").setLevel(logging.WARNING)
|
|
logging.getLogger("impala.hiveserver2").setLevel(logging.WARNING)
|
|
|
|
a = AutoScaler(executor_slots=args.executor_slots, group_size=args.group_size,
|
|
start_batch_size=args.batch_size, max_groups=args.max_groups,
|
|
wait_up_s=args.wait_up, wait_down_s=args.wait_down)
|
|
a.start()
|
|
try:
|
|
while True:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
logging.debug("Caught KeyboardInterrupt, stopping autoscaler")
|
|
a.stop()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|