diff --git a/fe/src/test/resources/hive-site.xml.py b/fe/src/test/resources/hive-site.xml.py index 9b9c2dbfd..137b55d28 100644 --- a/fe/src/test/resources/hive-site.xml.py +++ b/fe/src/test/resources/hive-site.xml.py @@ -220,8 +220,10 @@ CONFIG.update({ 'datanucleus.autoCreateSchema': 'false', 'datanucleus.fixedDatastore': 'false', 'datanucleus.metadata.validate': 'false', + 'datanucleus.connectionPool.maxPoolSize': 30, 'javax.jdo.option.ConnectionUserName': 'hiveuser', 'javax.jdo.option.ConnectionPassword': 'password', + 'hikaricp.connectionTimeout': 60000, }) if db_type == 'postgres': CONFIG.update({ diff --git a/testdata/bin/compute-table-stats.sh b/testdata/bin/compute-table-stats.sh index 40001eaa4..76e1ee757 100755 --- a/testdata/bin/compute-table-stats.sh +++ b/testdata/bin/compute-table-stats.sh @@ -26,9 +26,10 @@ setup_report_build_error . ${IMPALA_HOME}/bin/impala-config.sh > /dev/null 2>&1 # TODO: We need a better way of managing how these get set. See IMPALA-4346 -IMPALAD=${IMPALAD:-localhost:21000} +IMPALAD_HS2=${IMPALAD_HS2:-localhost:21050} -COMPUTE_STATS_SCRIPT="${IMPALA_HOME}/tests/util/compute_table_stats.py --impalad=${IMPALAD}" +COMPUTE_STATS_SCRIPT="${IMPALA_HOME}/tests/util/compute_table_stats.py\ + --impalad=${IMPALAD_HS2}" # Run compute stats over as many of the tables used in the Planner tests as possible. ${COMPUTE_STATS_SCRIPT} --db_names=functional\ @@ -46,10 +47,8 @@ if [ "${TARGET_FILESYSTEM}" = "hdfs" ]; then fi ${COMPUTE_STATS_SCRIPT} --db_names=tpch,tpch_parquet,tpch_orc_def \ --table_names=customer,lineitem,nation,orders,part,partsupp,region,supplier -${COMPUTE_STATS_SCRIPT} --db_names=tpch_nested_parquet,tpcds,tpcds_parquet -${COMPUTE_STATS_SCRIPT} --db_names=functional_kudu,tpch_kudu +${COMPUTE_STATS_SCRIPT} --db_names="tpch_nested_parquet,tpch_kudu,tpcds,tpcds_parquet,\ + tpcds_partitioned_parquet_snap" +${COMPUTE_STATS_SCRIPT} --db_names=functional_kudu \ + --exclude_table_names="alltypesagg,manynulls" -# Compute tables of tpcds_partitioned_parquet_snap serially -# due to large number of partitions in some of the fact tables. -${COMPUTE_STATS_SCRIPT} --db_names=tpcds_partitioned_parquet_snap \ - --parallelism=1 diff --git a/testdata/bin/run-hive-server.sh b/testdata/bin/run-hive-server.sh index f64d4cdd5..dccf4e6a2 100755 --- a/testdata/bin/run-hive-server.sh +++ b/testdata/bin/run-hive-server.sh @@ -159,7 +159,8 @@ export KUDU_SKIP_HMS_PLUGIN_VALIDATION=${KUDU_SKIP_HMS_PLUGIN_VALIDATION:-1} # To debug log4j2 loading issues, add to HADOOP_CLIENT_OPTS: # -Dorg.apache.logging.log4j.simplelog.StatusLogger.level=TRACE if [[ ${START_METASTORE} -eq 1 && -z $HMS_PID ]]; then - HADOOP_CLIENT_OPTS="-Xmx2024m -Dhive.log.file=hive-metastore.log" hive \ + HADOOP_CLIENT_OPTS="-Xmx2024m -Dhive.log.dir=${LOGDIR} \ + -Dhive.log.file=hive-metastore.log" hive \ --service metastore -p $HIVE_METASTORE_PORT >> ${LOGDIR}/hive-metastore.out 2>&1 & # Wait for the Metastore to come up because HiveServer2 relies on it being live. @@ -194,7 +195,8 @@ if [[ ${START_HIVESERVER} -eq 1 && -z $HS2_PID ]]; then # Starts a HiveServer2 instance on the port specified by the HIVE_SERVER2_THRIFT_PORT # environment variable. HADOOP_HEAPSIZE should be set to at least 2048 to avoid OOM # when loading ORC tables like widerow. - HADOOP_CLIENT_OPTS="-Xmx2048m -Dhive.log.file=hive-server2.log" hive \ + HADOOP_CLIENT_OPTS="-Xmx2048m -Dhive.log.dir=${LOGDIR} \ + -Dhive.log.file=hive-server2.log" hive \ --service hiveserver2 >> ${LOGDIR}/hive-server2.out 2>&1 & # Wait for the HiveServer2 service to come up because callers of this script diff --git a/tests/common/impala_connection.py b/tests/common/impala_connection.py index 2ebaacc2a..0bdf2a8ed 100644 --- a/tests/common/impala_connection.py +++ b/tests/common/impala_connection.py @@ -32,7 +32,7 @@ from RuntimeProfile.ttypes import TRuntimeProfileFormat from tests.beeswax.impala_beeswax import ImpalaBeeswaxClient -LOG = logging.getLogger('impala_connection') +LOG = logging.getLogger(__name__) console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) # All logging needs to be either executable SQL or a SQL comment (prefix with --). @@ -274,10 +274,12 @@ class ImpylaHS2Connection(ImpalaConnection): TODO: implement support for kerberos, SSL, etc. """ def __init__(self, host_port, use_kerberos=False, is_hive=False, - use_http_transport=False, http_path=""): + use_http_transport=False, http_path="", use_ssl=False, + collect_profile_and_log=True): self.__host_port = host_port self.__use_http_transport = use_http_transport self.__http_path = http_path + self.__use_ssl = use_ssl if use_kerberos: raise NotImplementedError("Kerberos support not yet implemented") # Impyla connection and cursor is initialised in connect(). We need to reuse the same @@ -289,6 +291,7 @@ class ImpylaHS2Connection(ImpalaConnection): # Query options to send along with each query. self.__query_options = {} self._is_hive = is_hive + self._collect_profile_and_log = not is_hive and collect_profile_and_log def set_configuration_option(self, name, value): self.__query_options[name] = str(value) @@ -309,7 +312,8 @@ class ImpylaHS2Connection(ImpalaConnection): conn_kwargs['auth_mechanism'] = 'PLAIN' self.__impyla_conn = impyla.connect(host=host, port=int(port), use_http_transport=self.__use_http_transport, - http_path=self.__http_path, **conn_kwargs) + http_path=self.__http_path, + use_ssl=self.__use_ssl, **conn_kwargs) # Get the default query options for the session before any modifications are made. self.__cursor = self.__impyla_conn.cursor(convert_types=False) self.__default_query_options = {} @@ -345,7 +349,7 @@ class ImpylaHS2Connection(ImpalaConnection): return self.__cursor.fetchall() def close_query(self, operation_handle): - LOG.info("-- closing query for operation handle: {0}".format(operation_handle)) + self.log_handle(operation_handle, 'closing query for operation') operation_handle.get_handle().close_operation() def execute(self, sql_stmt, user=None, profile_format=TRuntimeProfileFormat.STRING, @@ -392,57 +396,68 @@ class ImpylaHS2Connection(ImpalaConnection): raise def cancel(self, operation_handle): - LOG.info("-- canceling operation: {0}".format(operation_handle)) + self.log_handle(operation_handle, 'canceling operation') cursor = operation_handle.get_handle() return cursor.cancel_operation(reset_state=False) def get_query_id(self, operation_handle): - """Return the string representation of the query id.""" - guid_bytes = \ - operation_handle.get_handle()._last_operation.handle.operationId.guid + """Return the string representation of the query id. + Return empty string if handle is already canceled or closed.""" + last_op = operation_handle.get_handle()._last_operation + if last_op is None: + return "" + guid_bytes = last_op.handle.operationId.guid # hex_codec works on bytes, so this needs to a decode() to get back to a string hi_str = codecs.encode(guid_bytes[7::-1], 'hex_codec').decode() lo_str = codecs.encode(guid_bytes[16:7:-1], 'hex_codec').decode() return "{0}:{1}".format(hi_str, lo_str) + def handle_id_for_logging(self, operation_handle): + query_id = self.get_query_id(operation_handle) + return query_id if query_id else str(operation_handle) + + def log_handle(self, handle, message): + handle_id = self.handle_id_for_logging(handle) + LOG.info("-- {0}: {1}".format(handle_id, message)) + def get_state(self, operation_handle): - LOG.info("-- getting state for operation: {0}".format(operation_handle)) + handle_id = self.handle_id_for_logging(operation_handle) + LOG.info("-- getting state for operation: {0}".format(handle_id)) cursor = operation_handle.get_handle() return cursor.status() def state_is_finished(self, operation_handle): - LOG.info("-- checking finished state for operation: {0}".format(operation_handle)) + self.log_handle(operation_handle, 'checking finished state for operation') cursor = operation_handle.get_handle() # cursor.status contains a string representation of one of # TCLIService.TOperationState. return cursor.status() == "FINISHED_STATE" def get_exec_summary(self, operation_handle): - LOG.info("-- getting exec summary operation: {0}".format(operation_handle)) + self.log_handle(operation_handle, 'getting exec summary operation') cursor = operation_handle.get_handle() # summary returned is thrift, not string. return cursor.get_summary() def get_runtime_profile(self, operation_handle, profile_format): - LOG.info("-- getting runtime profile operation: {0}".format(operation_handle)) + self.log_handle(operation_handle, 'getting runtime profile operation') cursor = operation_handle.get_handle() return cursor.get_profile(profile_format=profile_format) def wait_for_finished_timeout(self, operation_handle, timeout): - LOG.info("-- waiting for query to reach FINISHED state: {0}".format(operation_handle)) + self.log_handle(operation_handle, 'waiting for query to reach FINISHED state') raise NotImplementedError("Not yet implemented for HS2 - states differ from beeswax") def wait_for_admission_control(self, operation_handle): - LOG.info("-- waiting for completion of the admission control processing of the " - "query: {0}".format(operation_handle)) + self.log_handle(operation_handle, 'waiting for completion of the admission control') raise NotImplementedError("Not yet implemented for HS2 - states differ from beeswax") def get_admission_result(self, operation_handle): - LOG.info("-- getting the admission result: {0}".format(operation_handle)) + self.log_handle(operation_handle, 'getting the admission result') raise NotImplementedError("Not yet implemented for HS2 - states differ from beeswax") def get_log(self, operation_handle): - LOG.info("-- getting log for operation: {0}".format(operation_handle)) + self.log_handle(operation_handle, 'getting log for operation') # HS2 includes non-error log messages that we need to filter out. cursor = operation_handle.get_handle() lines = [line for line in cursor.get_log().split('\n') @@ -450,7 +465,7 @@ class ImpylaHS2Connection(ImpalaConnection): return '\n'.join(lines) def fetch(self, sql_stmt, handle, max_rows=-1): - LOG.info("-- fetching results from: {0}".format(handle)) + self.log_handle(handle, 'fetching results') return self.__fetch_results(handle, max_rows) def __fetch_results(self, handle, max_rows=-1, @@ -471,7 +486,7 @@ class ImpylaHS2Connection(ImpalaConnection): else: result_tuples = cursor.fetchmany(max_rows) - if not self._is_hive: + if not self._is_hive and self._collect_profile_and_log: log = self.get_log(handle) profile = self.get_runtime_profile(handle, profile_format=profile_format) else: @@ -520,16 +535,19 @@ class ImpylaHS2ResultSet(object): def create_connection(host_port, use_kerberos=False, protocol='beeswax', - is_hive=False): + is_hive=False, use_ssl=False, collect_profile_and_log=True): if protocol == 'beeswax': - c = BeeswaxConnection(host_port=host_port, use_kerberos=use_kerberos) + c = BeeswaxConnection(host_port=host_port, use_kerberos=use_kerberos, + use_ssl=use_ssl) elif protocol == 'hs2': c = ImpylaHS2Connection(host_port=host_port, use_kerberos=use_kerberos, - is_hive=is_hive) + is_hive=is_hive, use_ssl=use_ssl, + collect_profile_and_log=collect_profile_and_log) else: assert protocol == 'hs2-http' c = ImpylaHS2Connection(host_port=host_port, use_kerberos=use_kerberos, - is_hive=is_hive, use_http_transport=True, http_path='cliservice') + is_hive=is_hive, use_http_transport=True, http_path='cliservice', + use_ssl=use_ssl, collect_profile_and_log=collect_profile_and_log) # A hook in conftest sets tests.common.current_node. Skip for Hive connections since # Hive cannot modify client_identifier at runtime. diff --git a/tests/util/compute_table_stats.py b/tests/util/compute_table_stats.py index bb25e6c67..206a70479 100755 --- a/tests/util/compute_table_stats.py +++ b/tests/util/compute_table_stats.py @@ -1,4 +1,4 @@ -#!/usr/bin/env impala-python +#!/usr/bin/env impala-python3 # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -21,14 +21,22 @@ from __future__ import absolute_import, division, print_function from contextlib import contextmanager -from optparse import OptionParser +from argparse import ArgumentParser import logging import multiprocessing import multiprocessing.pool +import os -from tests.beeswax.impala_beeswax import ImpalaBeeswaxClient +from tests.common.impala_connection import create_connection -def compute_stats_table(client_factory, db, table, continue_on_error): +logging.basicConfig(level=logging.INFO, + format='%(asctime)s %(threadName)s: %(message)s') +LOG = logging.getLogger(__name__) +DEFAULT_PARALLELISM = int( + os.environ.get('IMPALA_BUILD_THREADS', multiprocessing.cpu_count())) + + +def compute_stats_table(client_factory, db, table): """ Runs 'compute stats' on a given table. If continue_on_error is True, exceptions computing statistics are swallowed. @@ -36,17 +44,27 @@ def compute_stats_table(client_factory, db, table, continue_on_error): with client_factory() as impala_client: db_table = "%s.%s" % (db, table) statement = "compute stats %s" % (db_table,) - logging.info('Executing: %s', statement) + LOG.info('Executing: %s', statement) try: result = impala_client.execute(statement) - logging.info(" %s -> %s", db_table, ' '.join(result.data).strip()) - except: - logging.exception(' Failed on table %s', db_table) - if not continue_on_error: - raise + LOG.info(" %s -> %s", db_table, ' '.join(result.data).strip()) + return db_table + except Exception as e: + LOG.exception(' Failed on table %s', db_table) + raise e + + +def log_completion(completed, total_tables, error=None): + if error: + LOG.error("Completed COMPUTE STATS for %d/%d tables with error.", + completed, total_tables, exc_info=error) + else: + LOG.info("Completed COMPUTE STATS for %d/%d tables.", completed, total_tables) + def compute_stats(client_factory, db_names=None, table_names=None, - continue_on_error=False, parallelism=multiprocessing.cpu_count()): + exclude_table_names=None, continue_on_error=False, + parallelism=DEFAULT_PARALLELISM): """ Runs COMPUTE STATS over the selected tables. The target tables can be filtered by specifying a list of databases and/or table names. If no filters are specified this will @@ -55,68 +73,111 @@ def compute_stats(client_factory, db_names=None, table_names=None, parallelism controls the size of the thread pool to which compute_stats is sent. """ - logging.info("Enumerating databases and tables for compute stats.") + LOG.info("Enumerating databases and tables for compute stats. " + "db_names={} table_names={} exclude_table_names={} parallelism={}.".format( + str(db_names), str(table_names), str(exclude_table_names), parallelism + )) pool = multiprocessing.pool.ThreadPool(processes=parallelism) futures = [] + with client_factory() as impala_client: + db_table_map = {} + total_tables = 0 all_dbs = set(name.split('\t')[0].lower() for name in impala_client.execute("show databases").data) selected_dbs = all_dbs if db_names is None else set(db_names) for db in all_dbs.intersection(selected_dbs): - all_tables =\ - set([t.lower() for t in impala_client.execute("show tables in %s" % db).data]) + all_tables = set( + [t.lower() for t in impala_client.execute("show tables in %s" % db).data]) selected_tables = all_tables if table_names is None else set(table_names) - for table in all_tables.intersection(selected_tables): + excluded_tables = (set() if exclude_table_names is None + else set(exclude_table_names)) + tables_to_compute = (all_tables.intersection(selected_tables) + - excluded_tables) + db_table_map[db] = tables_to_compute + total_tables += len(tables_to_compute) + + for db, tables in db_table_map.items(): + for table in tables: # Submit command to threadpool - futures.append(pool.apply_async(compute_stats_table, - (client_factory, db, table, continue_on_error,))) + futures.append( + pool.apply_async(compute_stats_table, (client_factory, db, table,))) + # Wait for all stats commands to finish + completed = 0 for f in futures: - f.get() + try: + f.get() + completed += 1 + except Exception as e: + if not continue_on_error: + log_completion(completed, total_tables, e) + raise e + log_completion(completed, total_tables) + if __name__ == "__main__": - logging.basicConfig(level=logging.INFO, format='%(asctime)s %(threadName)s: %(message)s') - parser = OptionParser() - parser.add_option("--continue_on_error", dest="continue_on_error", - action="store_true", default=True, help="If True, continue "\ - "if there is an error executing the compute stats statement.") - parser.add_option("--stop_on_error", dest="continue_on_error", - action="store_false", default=True, help="If True, stop "\ - "if there is an error executing the compute stats statement.") - parser.add_option("--impalad", dest="impalad", default="localhost:21000", - help="Impala daemon to connect to.") - parser.add_option("--use_kerberos", action="store_true", default=False, - help="Compute stats on a kerberized cluster.") - parser.add_option("--use_ssl", action="store_true", default=False, - help="Compute stats on a cluster with SSL enabled.") - parser.add_option("--parallelism", type=int, default=multiprocessing.cpu_count(), - help="Number of parallel compute stats commands.") - parser.add_option("--db_names", dest="db_names", default=None, - help="Comma-separated list of database names for which to compute "\ - "stats. Can be used in conjunction with the --table_names flag. "\ - "If not specified, compute stats will run on tables from all "\ - "databases.") - parser.add_option("--table_names", dest="table_names", default=None, - help="Comma-separated list of table names to compute stats over. A"\ - " substring comparison is done. If no tables are specified stats "\ - "are computed across all tables.") - options, args = parser.parse_args() + parser = ArgumentParser() + group_continuation_opt = parser.add_mutually_exclusive_group() + group_continuation_opt.add_argument( + "--continue_on_error", dest="continue_on_error", action="store_true", + help="If True, continue if there is an error executing the compute stats statement.") + group_continuation_opt.add_argument( + "--stop_on_error", dest="continue_on_error", action="store_false", + help="If True, stop if there is an error executing the compute stats statement.") + parser.add_argument( + "--impalad", dest="impalad", default="localhost:21050", + help="Impala daemon to connect to.") + parser.add_argument( + "--use_kerberos", action="store_true", default=False, + help="Compute stats on a kerberized cluster.") + parser.add_argument( + "--use_ssl", action="store_true", default=False, + help="Compute stats on a cluster with SSL enabled.") + parser.add_argument( + "--parallelism", type=int, default=DEFAULT_PARALLELISM, + help="Number of parallel compute stats commands.") + parser.add_argument( + "--db_names", dest="db_names", default=None, help=( + "Comma-separated list of database names for which to compute stats. " + "Can be used in conjunction with the --table_names or --exclude_table_names flag. " + "If not specified, compute stats will run on tables from all databases.")) + group_selection_opt = parser.add_mutually_exclusive_group() + group_selection_opt.add_argument( + "--table_names", dest="table_names", default=None, help=( + "Comma-separated list of table names to compute stats over. " + "A substring comparison is done. If no tables are specified stats are computed " + "across all tables. Can not be used in conjunction with --exclude_table_names.")) + group_selection_opt.add_argument( + "--exclude_table_names", dest="exclude_table_names", default=None, help=( + "Comma-separated list of table names to exclude compute stats. " + "A substring comparison is done. If no tables are specified stats are computed " + "across all tables. Can not be used in conjunction with --table_names.")) + args = parser.parse_args() + table_names = None - if options.table_names is not None: - table_names = [name.lower().strip() for name in options.table_names.split(',')] + if args.table_names is not None: + table_names = [name.lower().strip() for name in args.table_names.split(',')] + + exclude_table_names = None + if args.exclude_table_names is not None: + exclude_table_names = [name.lower().strip() + for name in args.exclude_table_names.split(',')] db_names = None - if options.db_names is not None: - db_names = [name.lower().strip() for name in options.db_names.split(',')] + if args.db_names is not None: + db_names = [name.lower().strip() for name in args.db_names.split(',')] @contextmanager def client_factory(): - impala_client = ImpalaBeeswaxClient(options.impalad, - use_kerberos=options.use_kerberos, use_ssl=options.use_ssl) + impala_client = create_connection(args.impalad, + use_kerberos=args.use_kerberos, use_ssl=args.use_ssl, protocol='hs2', + collect_profile_and_log=False) impala_client.connect() yield impala_client - impala_client.close_connection() + impala_client.close() compute_stats(client_factory, db_names=db_names, table_names=table_names, - continue_on_error=options.continue_on_error, parallelism=options.parallelism) + exclude_table_names=exclude_table_names, + continue_on_error=args.continue_on_error, parallelism=args.parallelism)