IMPALA-13620: Refresh compute_table_stats.py script

This patch refreshes compute_table_stats.py script with the following
changes:
- Limit parallelism to IMPALA_BUILD_THREADS at maximum if --parallelism
  argument is not set.
- Change its default connection to hs2, leveraging existing
  ImpylaHS2Connection.
- Change OptionParser to ArgumentParser.
- Use impala-python3 to run the script.
- Add --exclude_table_names to skip running COMPUTE STATS on certain
  tables/views.
- continue_on_error is False by default.

This patch also improves query handle logging in ImpylaHS2Connection.
collect_profile_and_log argument is added to control whether to pull
logs and runtime profile at the end of __fetch_results(). The default
behavior remains unchanged.

Skip COMPUTE STATS for functional_kudu.alltypesagg and
functional_kudu.manynulls because it is invalid to run COMPUTE STATS
over view.

Customized hive-site.xml to set datanucleus.connectionPool.maxPoolSize
to 30 and hikaricp.connectionTimeout to 60000 ms. Also set hive.log.dir
to ${IMPALA_CLUSTER_LOGS_DIR}/hive.

Testing:
Repeatedly run compute-table-stats.sh from cold state and confirm there
is no error occurs. This is the script to do so from active minicluster:

cd $IMPALA_HOME
./bin/start-impala-cluster.py --kill
./testdata/bin/kill-hive-server.sh
./testdata/bin/run-hive-server.sh
./bin/start-impala-cluster.py
./testdata/bin/compute-table-stats.sh > /tmp/compute-stats.txt 2>&1
grep error /tmp/compute-stats.txt

Core tests ran and passed.

Change-Id: I1ebf02f95b957e7dda3a30622b87e8fca3197699
Reviewed-on: http://gerrit.cloudera.org:8080/22231
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Riza Suminto
2024-12-17 11:53:43 -08:00
committed by Impala Public Jenkins
parent 617e99981e
commit 01b8b45252
5 changed files with 167 additions and 85 deletions

View File

@@ -220,8 +220,10 @@ CONFIG.update({
'datanucleus.autoCreateSchema': 'false',
'datanucleus.fixedDatastore': 'false',
'datanucleus.metadata.validate': 'false',
'datanucleus.connectionPool.maxPoolSize': 30,
'javax.jdo.option.ConnectionUserName': 'hiveuser',
'javax.jdo.option.ConnectionPassword': 'password',
'hikaricp.connectionTimeout': 60000,
})
if db_type == 'postgres':
CONFIG.update({

View File

@@ -26,9 +26,10 @@ setup_report_build_error
. ${IMPALA_HOME}/bin/impala-config.sh > /dev/null 2>&1
# TODO: We need a better way of managing how these get set. See IMPALA-4346
IMPALAD=${IMPALAD:-localhost:21000}
IMPALAD_HS2=${IMPALAD_HS2:-localhost:21050}
COMPUTE_STATS_SCRIPT="${IMPALA_HOME}/tests/util/compute_table_stats.py --impalad=${IMPALAD}"
COMPUTE_STATS_SCRIPT="${IMPALA_HOME}/tests/util/compute_table_stats.py\
--impalad=${IMPALAD_HS2}"
# Run compute stats over as many of the tables used in the Planner tests as possible.
${COMPUTE_STATS_SCRIPT} --db_names=functional\
@@ -46,10 +47,8 @@ if [ "${TARGET_FILESYSTEM}" = "hdfs" ]; then
fi
${COMPUTE_STATS_SCRIPT} --db_names=tpch,tpch_parquet,tpch_orc_def \
--table_names=customer,lineitem,nation,orders,part,partsupp,region,supplier
${COMPUTE_STATS_SCRIPT} --db_names=tpch_nested_parquet,tpcds,tpcds_parquet
${COMPUTE_STATS_SCRIPT} --db_names=functional_kudu,tpch_kudu
${COMPUTE_STATS_SCRIPT} --db_names="tpch_nested_parquet,tpch_kudu,tpcds,tpcds_parquet,\
tpcds_partitioned_parquet_snap"
${COMPUTE_STATS_SCRIPT} --db_names=functional_kudu \
--exclude_table_names="alltypesagg,manynulls"
# Compute tables of tpcds_partitioned_parquet_snap serially
# due to large number of partitions in some of the fact tables.
${COMPUTE_STATS_SCRIPT} --db_names=tpcds_partitioned_parquet_snap \
--parallelism=1

View File

@@ -159,7 +159,8 @@ export KUDU_SKIP_HMS_PLUGIN_VALIDATION=${KUDU_SKIP_HMS_PLUGIN_VALIDATION:-1}
# To debug log4j2 loading issues, add to HADOOP_CLIENT_OPTS:
# -Dorg.apache.logging.log4j.simplelog.StatusLogger.level=TRACE
if [[ ${START_METASTORE} -eq 1 && -z $HMS_PID ]]; then
HADOOP_CLIENT_OPTS="-Xmx2024m -Dhive.log.file=hive-metastore.log" hive \
HADOOP_CLIENT_OPTS="-Xmx2024m -Dhive.log.dir=${LOGDIR} \
-Dhive.log.file=hive-metastore.log" hive \
--service metastore -p $HIVE_METASTORE_PORT >> ${LOGDIR}/hive-metastore.out 2>&1 &
# Wait for the Metastore to come up because HiveServer2 relies on it being live.
@@ -194,7 +195,8 @@ if [[ ${START_HIVESERVER} -eq 1 && -z $HS2_PID ]]; then
# Starts a HiveServer2 instance on the port specified by the HIVE_SERVER2_THRIFT_PORT
# environment variable. HADOOP_HEAPSIZE should be set to at least 2048 to avoid OOM
# when loading ORC tables like widerow.
HADOOP_CLIENT_OPTS="-Xmx2048m -Dhive.log.file=hive-server2.log" hive \
HADOOP_CLIENT_OPTS="-Xmx2048m -Dhive.log.dir=${LOGDIR} \
-Dhive.log.file=hive-server2.log" hive \
--service hiveserver2 >> ${LOGDIR}/hive-server2.out 2>&1 &
# Wait for the HiveServer2 service to come up because callers of this script

View File

@@ -32,7 +32,7 @@ from RuntimeProfile.ttypes import TRuntimeProfileFormat
from tests.beeswax.impala_beeswax import ImpalaBeeswaxClient
LOG = logging.getLogger('impala_connection')
LOG = logging.getLogger(__name__)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# All logging needs to be either executable SQL or a SQL comment (prefix with --).
@@ -274,10 +274,12 @@ class ImpylaHS2Connection(ImpalaConnection):
TODO: implement support for kerberos, SSL, etc.
"""
def __init__(self, host_port, use_kerberos=False, is_hive=False,
use_http_transport=False, http_path=""):
use_http_transport=False, http_path="", use_ssl=False,
collect_profile_and_log=True):
self.__host_port = host_port
self.__use_http_transport = use_http_transport
self.__http_path = http_path
self.__use_ssl = use_ssl
if use_kerberos:
raise NotImplementedError("Kerberos support not yet implemented")
# Impyla connection and cursor is initialised in connect(). We need to reuse the same
@@ -289,6 +291,7 @@ class ImpylaHS2Connection(ImpalaConnection):
# Query options to send along with each query.
self.__query_options = {}
self._is_hive = is_hive
self._collect_profile_and_log = not is_hive and collect_profile_and_log
def set_configuration_option(self, name, value):
self.__query_options[name] = str(value)
@@ -309,7 +312,8 @@ class ImpylaHS2Connection(ImpalaConnection):
conn_kwargs['auth_mechanism'] = 'PLAIN'
self.__impyla_conn = impyla.connect(host=host, port=int(port),
use_http_transport=self.__use_http_transport,
http_path=self.__http_path, **conn_kwargs)
http_path=self.__http_path,
use_ssl=self.__use_ssl, **conn_kwargs)
# Get the default query options for the session before any modifications are made.
self.__cursor = self.__impyla_conn.cursor(convert_types=False)
self.__default_query_options = {}
@@ -345,7 +349,7 @@ class ImpylaHS2Connection(ImpalaConnection):
return self.__cursor.fetchall()
def close_query(self, operation_handle):
LOG.info("-- closing query for operation handle: {0}".format(operation_handle))
self.log_handle(operation_handle, 'closing query for operation')
operation_handle.get_handle().close_operation()
def execute(self, sql_stmt, user=None, profile_format=TRuntimeProfileFormat.STRING,
@@ -392,57 +396,68 @@ class ImpylaHS2Connection(ImpalaConnection):
raise
def cancel(self, operation_handle):
LOG.info("-- canceling operation: {0}".format(operation_handle))
self.log_handle(operation_handle, 'canceling operation')
cursor = operation_handle.get_handle()
return cursor.cancel_operation(reset_state=False)
def get_query_id(self, operation_handle):
"""Return the string representation of the query id."""
guid_bytes = \
operation_handle.get_handle()._last_operation.handle.operationId.guid
"""Return the string representation of the query id.
Return empty string if handle is already canceled or closed."""
last_op = operation_handle.get_handle()._last_operation
if last_op is None:
return ""
guid_bytes = last_op.handle.operationId.guid
# hex_codec works on bytes, so this needs to a decode() to get back to a string
hi_str = codecs.encode(guid_bytes[7::-1], 'hex_codec').decode()
lo_str = codecs.encode(guid_bytes[16:7:-1], 'hex_codec').decode()
return "{0}:{1}".format(hi_str, lo_str)
def handle_id_for_logging(self, operation_handle):
query_id = self.get_query_id(operation_handle)
return query_id if query_id else str(operation_handle)
def log_handle(self, handle, message):
handle_id = self.handle_id_for_logging(handle)
LOG.info("-- {0}: {1}".format(handle_id, message))
def get_state(self, operation_handle):
LOG.info("-- getting state for operation: {0}".format(operation_handle))
handle_id = self.handle_id_for_logging(operation_handle)
LOG.info("-- getting state for operation: {0}".format(handle_id))
cursor = operation_handle.get_handle()
return cursor.status()
def state_is_finished(self, operation_handle):
LOG.info("-- checking finished state for operation: {0}".format(operation_handle))
self.log_handle(operation_handle, 'checking finished state for operation')
cursor = operation_handle.get_handle()
# cursor.status contains a string representation of one of
# TCLIService.TOperationState.
return cursor.status() == "FINISHED_STATE"
def get_exec_summary(self, operation_handle):
LOG.info("-- getting exec summary operation: {0}".format(operation_handle))
self.log_handle(operation_handle, 'getting exec summary operation')
cursor = operation_handle.get_handle()
# summary returned is thrift, not string.
return cursor.get_summary()
def get_runtime_profile(self, operation_handle, profile_format):
LOG.info("-- getting runtime profile operation: {0}".format(operation_handle))
self.log_handle(operation_handle, 'getting runtime profile operation')
cursor = operation_handle.get_handle()
return cursor.get_profile(profile_format=profile_format)
def wait_for_finished_timeout(self, operation_handle, timeout):
LOG.info("-- waiting for query to reach FINISHED state: {0}".format(operation_handle))
self.log_handle(operation_handle, 'waiting for query to reach FINISHED state')
raise NotImplementedError("Not yet implemented for HS2 - states differ from beeswax")
def wait_for_admission_control(self, operation_handle):
LOG.info("-- waiting for completion of the admission control processing of the "
"query: {0}".format(operation_handle))
self.log_handle(operation_handle, 'waiting for completion of the admission control')
raise NotImplementedError("Not yet implemented for HS2 - states differ from beeswax")
def get_admission_result(self, operation_handle):
LOG.info("-- getting the admission result: {0}".format(operation_handle))
self.log_handle(operation_handle, 'getting the admission result')
raise NotImplementedError("Not yet implemented for HS2 - states differ from beeswax")
def get_log(self, operation_handle):
LOG.info("-- getting log for operation: {0}".format(operation_handle))
self.log_handle(operation_handle, 'getting log for operation')
# HS2 includes non-error log messages that we need to filter out.
cursor = operation_handle.get_handle()
lines = [line for line in cursor.get_log().split('\n')
@@ -450,7 +465,7 @@ class ImpylaHS2Connection(ImpalaConnection):
return '\n'.join(lines)
def fetch(self, sql_stmt, handle, max_rows=-1):
LOG.info("-- fetching results from: {0}".format(handle))
self.log_handle(handle, 'fetching results')
return self.__fetch_results(handle, max_rows)
def __fetch_results(self, handle, max_rows=-1,
@@ -471,7 +486,7 @@ class ImpylaHS2Connection(ImpalaConnection):
else:
result_tuples = cursor.fetchmany(max_rows)
if not self._is_hive:
if not self._is_hive and self._collect_profile_and_log:
log = self.get_log(handle)
profile = self.get_runtime_profile(handle, profile_format=profile_format)
else:
@@ -520,16 +535,19 @@ class ImpylaHS2ResultSet(object):
def create_connection(host_port, use_kerberos=False, protocol='beeswax',
is_hive=False):
is_hive=False, use_ssl=False, collect_profile_and_log=True):
if protocol == 'beeswax':
c = BeeswaxConnection(host_port=host_port, use_kerberos=use_kerberos)
c = BeeswaxConnection(host_port=host_port, use_kerberos=use_kerberos,
use_ssl=use_ssl)
elif protocol == 'hs2':
c = ImpylaHS2Connection(host_port=host_port, use_kerberos=use_kerberos,
is_hive=is_hive)
is_hive=is_hive, use_ssl=use_ssl,
collect_profile_and_log=collect_profile_and_log)
else:
assert protocol == 'hs2-http'
c = ImpylaHS2Connection(host_port=host_port, use_kerberos=use_kerberos,
is_hive=is_hive, use_http_transport=True, http_path='cliservice')
is_hive=is_hive, use_http_transport=True, http_path='cliservice',
use_ssl=use_ssl, collect_profile_and_log=collect_profile_and_log)
# A hook in conftest sets tests.common.current_node. Skip for Hive connections since
# Hive cannot modify client_identifier at runtime.

View File

@@ -1,4 +1,4 @@
#!/usr/bin/env impala-python
#!/usr/bin/env impala-python3
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -21,14 +21,22 @@
from __future__ import absolute_import, division, print_function
from contextlib import contextmanager
from optparse import OptionParser
from argparse import ArgumentParser
import logging
import multiprocessing
import multiprocessing.pool
import os
from tests.beeswax.impala_beeswax import ImpalaBeeswaxClient
from tests.common.impala_connection import create_connection
def compute_stats_table(client_factory, db, table, continue_on_error):
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(threadName)s: %(message)s')
LOG = logging.getLogger(__name__)
DEFAULT_PARALLELISM = int(
os.environ.get('IMPALA_BUILD_THREADS', multiprocessing.cpu_count()))
def compute_stats_table(client_factory, db, table):
"""
Runs 'compute stats' on a given table. If continue_on_error is
True, exceptions computing statistics are swallowed.
@@ -36,17 +44,27 @@ def compute_stats_table(client_factory, db, table, continue_on_error):
with client_factory() as impala_client:
db_table = "%s.%s" % (db, table)
statement = "compute stats %s" % (db_table,)
logging.info('Executing: %s', statement)
LOG.info('Executing: %s', statement)
try:
result = impala_client.execute(statement)
logging.info(" %s -> %s", db_table, ' '.join(result.data).strip())
except:
logging.exception(' Failed on table %s', db_table)
if not continue_on_error:
raise
LOG.info(" %s -> %s", db_table, ' '.join(result.data).strip())
return db_table
except Exception as e:
LOG.exception(' Failed on table %s', db_table)
raise e
def log_completion(completed, total_tables, error=None):
if error:
LOG.error("Completed COMPUTE STATS for %d/%d tables with error.",
completed, total_tables, exc_info=error)
else:
LOG.info("Completed COMPUTE STATS for %d/%d tables.", completed, total_tables)
def compute_stats(client_factory, db_names=None, table_names=None,
continue_on_error=False, parallelism=multiprocessing.cpu_count()):
exclude_table_names=None, continue_on_error=False,
parallelism=DEFAULT_PARALLELISM):
"""
Runs COMPUTE STATS over the selected tables. The target tables can be filtered by
specifying a list of databases and/or table names. If no filters are specified this will
@@ -55,68 +73,111 @@ def compute_stats(client_factory, db_names=None, table_names=None,
parallelism controls the size of the thread pool to which compute_stats
is sent.
"""
logging.info("Enumerating databases and tables for compute stats.")
LOG.info("Enumerating databases and tables for compute stats. "
"db_names={} table_names={} exclude_table_names={} parallelism={}.".format(
str(db_names), str(table_names), str(exclude_table_names), parallelism
))
pool = multiprocessing.pool.ThreadPool(processes=parallelism)
futures = []
with client_factory() as impala_client:
db_table_map = {}
total_tables = 0
all_dbs = set(name.split('\t')[0].lower() for name
in impala_client.execute("show databases").data)
selected_dbs = all_dbs if db_names is None else set(db_names)
for db in all_dbs.intersection(selected_dbs):
all_tables =\
set([t.lower() for t in impala_client.execute("show tables in %s" % db).data])
all_tables = set(
[t.lower() for t in impala_client.execute("show tables in %s" % db).data])
selected_tables = all_tables if table_names is None else set(table_names)
for table in all_tables.intersection(selected_tables):
excluded_tables = (set() if exclude_table_names is None
else set(exclude_table_names))
tables_to_compute = (all_tables.intersection(selected_tables)
- excluded_tables)
db_table_map[db] = tables_to_compute
total_tables += len(tables_to_compute)
for db, tables in db_table_map.items():
for table in tables:
# Submit command to threadpool
futures.append(pool.apply_async(compute_stats_table,
(client_factory, db, table, continue_on_error,)))
futures.append(
pool.apply_async(compute_stats_table, (client_factory, db, table,)))
# Wait for all stats commands to finish
completed = 0
for f in futures:
f.get()
try:
f.get()
completed += 1
except Exception as e:
if not continue_on_error:
log_completion(completed, total_tables, e)
raise e
log_completion(completed, total_tables)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(threadName)s: %(message)s')
parser = OptionParser()
parser.add_option("--continue_on_error", dest="continue_on_error",
action="store_true", default=True, help="If True, continue "\
"if there is an error executing the compute stats statement.")
parser.add_option("--stop_on_error", dest="continue_on_error",
action="store_false", default=True, help="If True, stop "\
"if there is an error executing the compute stats statement.")
parser.add_option("--impalad", dest="impalad", default="localhost:21000",
help="Impala daemon <host:port> to connect to.")
parser.add_option("--use_kerberos", action="store_true", default=False,
help="Compute stats on a kerberized cluster.")
parser.add_option("--use_ssl", action="store_true", default=False,
help="Compute stats on a cluster with SSL enabled.")
parser.add_option("--parallelism", type=int, default=multiprocessing.cpu_count(),
help="Number of parallel compute stats commands.")
parser.add_option("--db_names", dest="db_names", default=None,
help="Comma-separated list of database names for which to compute "\
"stats. Can be used in conjunction with the --table_names flag. "\
"If not specified, compute stats will run on tables from all "\
"databases.")
parser.add_option("--table_names", dest="table_names", default=None,
help="Comma-separated list of table names to compute stats over. A"\
" substring comparison is done. If no tables are specified stats "\
"are computed across all tables.")
options, args = parser.parse_args()
parser = ArgumentParser()
group_continuation_opt = parser.add_mutually_exclusive_group()
group_continuation_opt.add_argument(
"--continue_on_error", dest="continue_on_error", action="store_true",
help="If True, continue if there is an error executing the compute stats statement.")
group_continuation_opt.add_argument(
"--stop_on_error", dest="continue_on_error", action="store_false",
help="If True, stop if there is an error executing the compute stats statement.")
parser.add_argument(
"--impalad", dest="impalad", default="localhost:21050",
help="Impala daemon <host:hs2_port> to connect to.")
parser.add_argument(
"--use_kerberos", action="store_true", default=False,
help="Compute stats on a kerberized cluster.")
parser.add_argument(
"--use_ssl", action="store_true", default=False,
help="Compute stats on a cluster with SSL enabled.")
parser.add_argument(
"--parallelism", type=int, default=DEFAULT_PARALLELISM,
help="Number of parallel compute stats commands.")
parser.add_argument(
"--db_names", dest="db_names", default=None, help=(
"Comma-separated list of database names for which to compute stats. "
"Can be used in conjunction with the --table_names or --exclude_table_names flag. "
"If not specified, compute stats will run on tables from all databases."))
group_selection_opt = parser.add_mutually_exclusive_group()
group_selection_opt.add_argument(
"--table_names", dest="table_names", default=None, help=(
"Comma-separated list of table names to compute stats over. "
"A substring comparison is done. If no tables are specified stats are computed "
"across all tables. Can not be used in conjunction with --exclude_table_names."))
group_selection_opt.add_argument(
"--exclude_table_names", dest="exclude_table_names", default=None, help=(
"Comma-separated list of table names to exclude compute stats. "
"A substring comparison is done. If no tables are specified stats are computed "
"across all tables. Can not be used in conjunction with --table_names."))
args = parser.parse_args()
table_names = None
if options.table_names is not None:
table_names = [name.lower().strip() for name in options.table_names.split(',')]
if args.table_names is not None:
table_names = [name.lower().strip() for name in args.table_names.split(',')]
exclude_table_names = None
if args.exclude_table_names is not None:
exclude_table_names = [name.lower().strip()
for name in args.exclude_table_names.split(',')]
db_names = None
if options.db_names is not None:
db_names = [name.lower().strip() for name in options.db_names.split(',')]
if args.db_names is not None:
db_names = [name.lower().strip() for name in args.db_names.split(',')]
@contextmanager
def client_factory():
impala_client = ImpalaBeeswaxClient(options.impalad,
use_kerberos=options.use_kerberos, use_ssl=options.use_ssl)
impala_client = create_connection(args.impalad,
use_kerberos=args.use_kerberos, use_ssl=args.use_ssl, protocol='hs2',
collect_profile_and_log=False)
impala_client.connect()
yield impala_client
impala_client.close_connection()
impala_client.close()
compute_stats(client_factory, db_names=db_names, table_names=table_names,
continue_on_error=options.continue_on_error, parallelism=options.parallelism)
exclude_table_names=exclude_table_names,
continue_on_error=args.continue_on_error, parallelism=args.parallelism)