IMPALA-14553: Run schema eval concurrently

The majority of time spent in generate-schema-statements.py is in
eval_section for schema operations that shell out, often uploading files
via the hadoop CLI or generating data files. These operations should be
independent.

Runs eval_section at the beginning so we don't repeat it for each row in
test_vectors, and executes them in parallel via a ThreadPool. Defaults
to NUM_CONCURRENT_TESTS threads because the underlying operations have
some concurrency to them (such as HDFS mirroring writes).

Also collects existing tables into a set to optimize lookup.

Reduces generate-schema-statements by ~60%, from 2m30s to 1m. Confirmed
that contents of logs/data_loading/sql/functional are identical.

Change-Id: I2a78d05fd6a0005c83561978713237da2dde6af2
Reviewed-on: http://gerrit.cloudera.org:8080/23627
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Michael Smith <michael.smith@cloudera.com>
This commit is contained in:
Michael Smith
2025-11-03 16:46:05 -08:00
parent bc99705252
commit 166b39547e

View File

@@ -104,6 +104,8 @@ import shutil
import subprocess
import sys
import tempfile
import time
from multiprocessing.pool import AsyncResult, ThreadPool
from optparse import OptionParser
from tests.common.environ import HIVE_MAJOR_VERSION
from tests.util.test_file_parser import parse_table_constraints, parse_test_file
@@ -136,6 +138,9 @@ parser.add_option("--table_formats", dest="table_formats", default=None,
"formats. Ex. --table_formats=seq/snap/block,text/none")
parser.add_option("--hdfs_namenode", dest="hdfs_namenode", default="localhost:20500",
help="HDFS name node for Avro schema URLs, default localhost:20500")
parser.add_option("--num_processes", type="int", dest="num_processes",
default=os.environ['NUM_CONCURRENT_TESTS'],
help="Number of parallel processes to use.")
(options, args) = parser.parse_args()
if options.workload is None:
@@ -699,13 +704,13 @@ def build_hbase_create_stmt(db_name, table_name, column_families, region_splits)
def get_hdfs_subdirs_with_data(path):
tmp_file = tempfile.TemporaryFile("w+")
cmd = "hadoop fs -du %s | grep -v '^0' | awk '{print $3}'" % path
subprocess.call([cmd], shell=True, stderr=open('/dev/null'), stdout=tmp_file)
subprocess.check_call([cmd], shell=True, stderr=open('/dev/null'), stdout=tmp_file)
tmp_file.seek(0)
# Results look like:
# <acls> - <user> <group> <date> /directory/subdirectory
# So to get subdirectory names just return everything after the last '/'
return [line[line.rfind('/') + 1:].strip() for line in tmp_file.readlines()]
return set([line[line.rfind('/') + 1:].strip() for line in tmp_file.readlines()])
class Statements(object):
@@ -727,20 +732,108 @@ class Statements(object):
return bool(self.create or self.load or self.load_base)
def eval_section(section_str):
def eval_section(pool, section_str):
"""section_str should be the contents of a section (i.e. a string). If section_str
starts with `, evaluates section_str as a shell command and returns the
output. Otherwise returns section_str."""
starts with `, evaluates section_str as a shell command in pool and returns an
AsyncResult to produce the output. Otherwise returns section_str. Results of this
function should be passed to unwrap() to get the actual value."""
if not section_str.startswith('`'): return section_str
cmd = section_str[1:]
# Use bash explicitly instead of setting shell=True so we get more advanced shell
# features (e.g. "for i in {1..n}")
p = subprocess.Popen(['/bin/bash', '-c', cmd], stdout=subprocess.PIPE,
universal_newlines=True)
stdout, stderr = p.communicate()
if stderr: print(stderr)
assert p.returncode == 0
return stdout.strip()
return pool.apply_async(subprocess.check_output,
(['/bin/bash', '-c', cmd],), {'universal_newlines': True})
def unwrap(result):
"""If result is an AsyncResult, get the actual value from it. Otherwise return
result."""
if type(result) is AsyncResult:
# If the command produced no output, return a newline so this section is still
# treated as having been set.
return result.get() or '\n'
return result
def eval_sections(test_vectors, sections, fails_only_constraint,
fails_include_constraint, fails_exclude_constraint):
"""Evaluates all sections that are shell commands in parallel using a thread pool.
Returns a new list of sections with all shell commands evaluated."""
new_sections = list()
table_names = None
if options.table_names:
table_names = [name.lower() for name in options.table_names.split(',')]
file_formats = [row.file_format for row in test_vectors]
table_formats = [f"{row.file_format}/{row.compression_codec}/{row.compression_type}"
for row in test_vectors]
# Sections are re-used for multiple test vectors, but eval_section is only needed once.
# Use a threadpool to execute eval_section in parallel as they shell out.
pool = ThreadPool(processes=options.num_processes)
for section in sections:
table_name = section['BASE_TABLE_NAME'].strip().lower()
if table_names and (table_name.lower() not in table_names):
print(f"Skipping table '{table_name}': table is not in specified table list")
continue
# Check Hive version requirement, if present.
if section['HIVE_MAJOR_VERSION'] and \
section['HIVE_MAJOR_VERSION'].strip() != \
os.environ['IMPALA_HIVE_MAJOR_VERSION'].strip():
print(f"Skipping table '{table_name}': wrong Hive major version")
continue
if all([fails_only_constraint(table_name, format) for format in table_formats]):
print(f"Skipping table '{table_name}': 'only' constraint for formats did not "
"include this table.")
continue
if all([fails_include_constraint(table_name, format) for format in table_formats]):
print(f"Skipping '{table_name}' due to include constraint matches.")
continue
if all([fails_exclude_constraint(table_name, format) for format in table_formats]):
print(f"Skipping '{table_name}' due to exclude constraint matches.")
continue
assert not (section['CREATE'] and section['CREATE_HIVE']), \
"Can't set both CREATE and CREATE_HIVE"
assert not (section['DEPENDENT_LOAD'] and section['DEPENDENT_LOAD_HIVE']), \
"Can't set both DEPENDENT_LOAD and DEPENDENT_LOAD_HIVE"
section['LOAD'] = eval_section(pool, section['LOAD'])
section['DEPENDENT_LOAD'] = eval_section(pool, section['DEPENDENT_LOAD'])
section['DEPENDENT_LOAD_HIVE'] = eval_section(pool, section['DEPENDENT_LOAD_HIVE'])
if 'kudu' in file_formats and section['DEPENDENT_LOAD_KUDU']:
section['DEPENDENT_LOAD_KUDU'] = eval_section(pool, section['DEPENDENT_LOAD_KUDU'])
if 'orc' in file_formats and section["DEPENDENT_LOAD_ACID"]:
section["DEPENDENT_LOAD_ACID"] = eval_section(pool, section["DEPENDENT_LOAD_ACID"])
if 'json' in file_formats and section["DEPENDENT_LOAD_JSON"]:
section["DEPENDENT_LOAD_JSON"] = eval_section(pool, section["DEPENDENT_LOAD_JSON"])
section['COLUMNS'] = eval_section(pool, section['COLUMNS'])
new_sections.append(section)
# Close the pool to new tasks and collect the results.
pool.close()
pool.join()
for section in new_sections:
# Ensure all async commands are done.
section['LOAD'] = unwrap(section['LOAD'])
section['DEPENDENT_LOAD'] = unwrap(section['DEPENDENT_LOAD'])
section['DEPENDENT_LOAD_HIVE'] = unwrap(section['DEPENDENT_LOAD_HIVE'])
section['DEPENDENT_LOAD_KUDU'] = unwrap(section['DEPENDENT_LOAD_KUDU'])
section["DEPENDENT_LOAD_ACID"] = unwrap(section["DEPENDENT_LOAD_ACID"])
section["DEPENDENT_LOAD_JSON"] = unwrap(section["DEPENDENT_LOAD_JSON"])
section['COLUMNS'] = unwrap(section['COLUMNS'])
return new_sections
def generate_statements(output_name, test_vectors, sections,
@@ -748,14 +841,27 @@ def generate_statements(output_name, test_vectors, sections,
schema_only_constraints, convert_orc_to_full_acid):
# TODO: This method has become very unwieldy. It has to be re-factored sooner than
# later.
# Parquet statements to be executed separately by Impala
def fails_only_constraint(table_name, table_format):
constraint = schema_only_constraints.get(table_format)
return constraint is not None and table_name not in constraint
def fails_include_constraint(table_name, table_format):
constraint = schema_include_constraints.get(table_name)
return constraint is not None and table_format not in constraint
def fails_exclude_constraint(table_name, table_format):
constraint = schema_exclude_constraints.get(table_name)
return constraint is not None and table_format in constraint
start = time.time()
sections = eval_sections(test_vectors, sections, fails_only_constraint,
fails_include_constraint, fails_exclude_constraint)
print(f"Evaluating sections took {time.time() - start:.3f} seconds")
hbase_output = Statements()
hbase_post_load = Statements()
impala_invalidate = Statements()
table_names = None
if options.table_names:
table_names = [name.lower() for name in options.table_names.split(',')]
existing_tables = get_hdfs_subdirs_with_data(options.hive_warehouse_dir)
for row in test_vectors:
impala_create = Statements()
@@ -772,62 +878,43 @@ def generate_statements(output_name, test_vectors, sections,
for section in sections:
table_name = section['BASE_TABLE_NAME'].strip()
if table_names and (table_name.lower() not in table_names):
print('Skipping table: %s.%s, table is not in specified table list' %
(db, table_name))
if fails_only_constraint(table_name.lower(), table_format):
print(f"Skipping table: {db}.{table_name}, 'only' constraint for format did not "
"include this table.")
continue
# Check Hive version requirement, if present.
if section['HIVE_MAJOR_VERSION'] and \
section['HIVE_MAJOR_VERSION'].strip() != \
os.environ['IMPALA_HIVE_MAJOR_VERSION'].strip():
print("Skipping table '{0}.{1}': wrong Hive major version".format(db, table_name))
if fails_include_constraint(table_name.lower(), table_format):
print(f"Skipping '{db}.{table_name}' due to include constraint match.")
continue
if table_format in schema_only_constraints and \
table_name.lower() not in schema_only_constraints[table_format]:
print(('Skipping table: %s.%s, \'only\' constraint for format did not '
'include this table.') % (db, table_name))
if fails_exclude_constraint(table_name.lower(), table_format):
print(f"Skipping '{db}.{table_name}' due to exclude constraint match.")
continue
if schema_include_constraints[table_name.lower()] and \
table_format not in schema_include_constraints[table_name.lower()]:
print('Skipping \'%s.%s\' due to include constraint match.' % (db, table_name))
continue
if schema_exclude_constraints[table_name.lower()] and\
table_format in schema_exclude_constraints[table_name.lower()]:
print('Skipping \'%s.%s\' due to exclude constraint match.' % (db, table_name))
continue
alter = section.get('ALTER')
alter = section['ALTER']
create = section['CREATE']
create_hive = section['CREATE_HIVE']
assert not (create and create_hive), "Can't set both CREATE and CREATE_HIVE"
table_properties = section['TABLE_PROPERTIES']
insert = eval_section(section['DEPENDENT_LOAD'])
insert_hive = eval_section(section['DEPENDENT_LOAD_HIVE'])
assert not (insert and insert_hive),\
"Can't set both DEPENDENT_LOAD and DEPENDENT_LOAD_HIVE"
load = eval_section(section['LOAD'])
insert = section['DEPENDENT_LOAD']
insert_hive = section['DEPENDENT_LOAD_HIVE']
load = section['LOAD']
if file_format == 'kudu':
create_kudu = section["CREATE_KUDU"]
if section['DEPENDENT_LOAD_KUDU']:
insert = eval_section(section['DEPENDENT_LOAD_KUDU'])
insert = section['DEPENDENT_LOAD_KUDU']
else:
create_kudu = None
if file_format == 'orc' and section["DEPENDENT_LOAD_ACID"]:
insert = None
insert_hive = eval_section(section["DEPENDENT_LOAD_ACID"])
insert_hive = section["DEPENDENT_LOAD_ACID"]
if file_format == 'json' and section["DEPENDENT_LOAD_JSON"]:
insert = None
insert_hive = eval_section(section["DEPENDENT_LOAD_JSON"])
insert_hive = section["DEPENDENT_LOAD_JSON"]
columns = eval_section(section['COLUMNS']).strip()
columns = section['COLUMNS'].strip()
partition_columns = section['PARTITION_COLUMNS'].strip()
row_format = section['ROW_FORMAT'].strip()
table_comment = section['COMMENT'].strip()