IMPALA-12601: Add a fully partitioned TPC-DS database

The current tpcds dataset only has store_sales table fully partitioned
and leaves the other facts table unpartitioned. This is intended for
faster data loading during tests. However, this is not an accurate
reflection of the larger scale TPC-DS dataset where all facts tables are
partitioned. Impala planner may change the details of the query plan if
a partition column exists.

This patch adds a new dataset tpcds_partitioned, loading a fully
partitioned TPC-DS db in parquet format named
tpcds_partitioned_parquet_snap. This dataset can not be loaded
independently and requires the base 'tpcds' db from the tpcds dataset to
be preloaded first. An example of how to load this dataset can be seen
at function load-tpcds-data in bin/create-load-data.sh.

This patch also changes PlannerTest#testProcessingCost from targeting
tpcds_parquet to tpcds_partitioned_parquet_snap. Other planner tests are
that currently target tpcds_parquet will be gradually changed to test
against tpcds_partitioned_parquet_snap in follow-up patches.

This addition adds a couple of seconds in the "Computing table stats"
step, but loading itself is negligible since it is parallelized with
TPC-H and functional-query. The total loading time for the three
datasets remains similar after this patch.

This patch also adds several improvements in the following files:

bin/load-data.py:
- Log elapsed time on serial steps.

testdata/bin/create-load-data.sh:
- Rename MSG to LOAD_MSG to avoid collision with the same variable name
  in ./testdata/bin/run-step.sh

testdata/bin/generate-schema-statements.py:
- Remove redundant FILE_FORMAT_MAP.
- Add build_partitioned_load to simplify expressing partitioned insert
  query in SQL template.

testdata/datasets/tpcds/tpcds_schema_template.sql:
- Reorder schema template to load all dimension tables before fact tables.

Testing:
- Pass core tests.

Change-Id: I3a2e66c405639554f325ae78c66628d464f6c453
Reviewed-on: http://gerrit.cloudera.org:8080/20756
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Riza Suminto
2022-12-06 15:34:49 -08:00
committed by Impala Public Jenkins
parent 1141a6a80c
commit 8661f922d3
14 changed files with 2857 additions and 1987 deletions

View File

@@ -34,7 +34,7 @@ import time
import traceback
from optparse import OptionParser
from tests.beeswax.impala_beeswax import *
from tests.beeswax.impala_beeswax import ImpalaBeeswaxClient
from multiprocessing.pool import ThreadPool
LOG = logging.getLogger('load-data.py')
@@ -159,11 +159,17 @@ def exec_hive_query_from_file_beeline(file_name):
return is_success
def exec_hbase_query_from_file(file_name):
def exec_hbase_query_from_file(file_name, step_name):
if not os.path.exists(file_name): return
LOG.info('Begin step "%s".' % step_name)
start_time = time.time()
hbase_cmd = "hbase shell %s" % file_name
LOG.info('Executing HBase Command: %s' % hbase_cmd)
exec_cmd(hbase_cmd, error_msg='Error executing hbase create commands')
total_time = time.time() - start_time
LOG.info('End step "%s". Total time: %.2fs\n' % (step_name, total_time))
# KERBEROS TODO: fails when kerberized and impalad principal isn't "impala"
def exec_impala_query_from_file(file_name):
@@ -263,7 +269,8 @@ def exec_hadoop_fs_cmd(args, exit_on_error=True):
exec_cmd(cmd, error_msg="Error executing Hadoop command, exiting",
exit_on_error=exit_on_error)
def exec_query_files_parallel(thread_pool, query_files, execution_type):
def exec_query_files_parallel(thread_pool, query_files, execution_type, step_name):
"""Executes the query files provided using the execution engine specified
in parallel using the given thread pool. Aborts immediately if any execution
encounters an error."""
@@ -274,16 +281,23 @@ def exec_query_files_parallel(thread_pool, query_files, execution_type):
elif execution_type == 'hive':
execution_function = exec_hive_query_from_file_beeline
LOG.info('Begin step "%s".' % step_name)
start_time = time.time()
for result in thread_pool.imap_unordered(execution_function, query_files):
if not result:
thread_pool.terminate()
sys.exit(1)
total_time = time.time() - start_time
LOG.info('End step "%s". Total time: %.2fs\n' % (step_name, total_time))
def impala_exec_query_files_parallel(thread_pool, query_files):
exec_query_files_parallel(thread_pool, query_files, 'impala')
def hive_exec_query_files_parallel(thread_pool, query_files):
exec_query_files_parallel(thread_pool, query_files, 'hive')
def impala_exec_query_files_parallel(thread_pool, query_files, step_name):
exec_query_files_parallel(thread_pool, query_files, 'impala', step_name)
def hive_exec_query_files_parallel(thread_pool, query_files, step_name):
exec_query_files_parallel(thread_pool, query_files, 'hive', step_name)
def main():
logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%H:%M:%S')
@@ -446,38 +460,40 @@ def main():
# so they're done at the end. Finally, the Hbase Tables that have been filled with data
# need to be flushed.
impala_exec_query_files_parallel(thread_pool, impala_create_files)
impala_exec_query_files_parallel(thread_pool, impala_create_files, "Impala Create")
# There should be at most one hbase creation script
assert(len(hbase_create_files) <= 1)
for hbase_create in hbase_create_files:
exec_hbase_query_from_file(hbase_create)
exec_hbase_query_from_file(hbase_create, "HBase Create")
# If this is loading text tables plus multiple other formats, the text tables
# need to be loaded first
assert(len(hive_load_text_files) <= 1)
hive_exec_query_files_parallel(thread_pool, hive_load_text_files)
hive_exec_query_files_parallel(thread_pool, hive_load_text_files, "Hive Load Text")
# IMPALA-9923: Run ORC serially separately from other non-text formats. This hacks
# around flakiness seen when loading this in parallel. This should be removed as
# soon as possible.
# around flakiness seen when loading this in parallel (see IMPALA-12630 comments for
# broken tests). This should be removed as soon as possible.
assert(len(hive_load_orc_files) <= 1)
hive_exec_query_files_parallel(thread_pool, hive_load_orc_files)
hive_exec_query_files_parallel(thread_pool, hive_load_orc_files, "Hive Load ORC")
# Load all non-text formats (goes parallel)
hive_exec_query_files_parallel(thread_pool, hive_load_nontext_files)
hive_exec_query_files_parallel(thread_pool, hive_load_nontext_files,
"Hive Load Non-Text")
assert(len(hbase_postload_files) <= 1)
for hbase_postload in hbase_postload_files:
exec_hbase_query_from_file(hbase_postload)
exec_hbase_query_from_file(hbase_postload, "HBase Post-Load")
# Invalidate so that Impala sees the loads done by Hive before loading Parquet/Kudu
# Note: This only invalidates tables for this workload.
assert(len(invalidate_files) <= 1)
if impala_load_files:
impala_exec_query_files_parallel(thread_pool, invalidate_files)
impala_exec_query_files_parallel(thread_pool, impala_load_files)
impala_exec_query_files_parallel(thread_pool, invalidate_files,
"Impala Invalidate 1")
impala_exec_query_files_parallel(thread_pool, impala_load_files, "Impala Load")
# Final invalidate for this workload
impala_exec_query_files_parallel(thread_pool, invalidate_files)
impala_exec_query_files_parallel(thread_pool, invalidate_files, "Impala Invalidate 2")
loading_time_map[workload] = time.time() - start_time
total_time = 0.0