IMPALA-6372: Go parallel for Hive dataload

This changes generate-schema-statements.py to produce
separate SQL files for different file formats for Hive.
This changes load-data.py to go parallel on these
separate Hive SQL files. For correctness, the text
version of all tables must be loaded before any
of the other file formats.

load-data.py runs DDLs to create the tables in Impala
and goes parallel. Currently, there are some minor
dependencies so that text tables must be created
prior to creating the other table formats. This
changes the definitions of some tables in
testdata/datasets/functional/functional_schema_template.sql
to remove these dependencies. Now, the DDLs for the
text tables can run in parallel to the other file formats.

To unify the parallelism for Impala and Hive, load-data.py
now uses a single fixed-size pool of processes to run all
SQL files rather than spawning a thread per SQL file.

This also modifies the locations that do invalidate to
use refresh where possible and eliminate global
invalidates.

For debuggability, different SQL executions output to
different log files rather than to standard out. If an
error occurs, this will point out the relevant log
file.

This saves about 10-15 minutes on dataload (including
for GVO).

Change-Id: I34b71e6df3c8f23a5a31451280e35f4dc015a2fd
Reviewed-on: http://gerrit.cloudera.org:8080/8894
Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Joe McDonnell
2017-12-20 10:29:10 -08:00
committed by Impala Public Jenkins
parent bc6c3c7447
commit d481cd4842
4 changed files with 397 additions and 179 deletions

View File

@@ -263,32 +263,43 @@ def load():
TBLPROPERTIES('parquet.compression'='SNAPPY')
AS SELECT * FROM tmp_customer;
DROP TABLE tmp_orders_string;
DROP TABLE tmp_customer_string;
DROP TABLE tmp_customer;
CREATE TABLE region
STORED AS PARQUET
TBLPROPERTIES('parquet.compression'='SNAPPY')
AS SELECT * FROM tmp_region;
DROP TABLE tmp_region_string;
DROP TABLE tmp_region;
CREATE TABLE supplier
STORED AS PARQUET
TBLPROPERTIES('parquet.compression'='SNAPPY')
AS SELECT * FROM tmp_supplier;
DROP TABLE tmp_supplier;
DROP TABLE tmp_supplier_string;""".split(";"):
AS SELECT * FROM tmp_supplier;""".split(";"):
if not stmt.strip():
continue
LOG.info("Executing: {0}".format(stmt))
hive.execute(stmt)
with cluster.impala.cursor(db_name=target_db) as impala:
impala.invalidate_metadata()
# Drop the temporary tables. These temporary tables were created
# in Impala, so they exist in Impala's metadata. This drop is executed by
# Impala so that the metadata is automatically updated.
for stmt in """
DROP TABLE tmp_orders_string;
DROP TABLE tmp_customer_string;
DROP TABLE tmp_customer;
DROP TABLE tmp_region_string;
DROP TABLE tmp_region;
DROP TABLE tmp_supplier;
DROP TABLE tmp_supplier_string;""".split(";"):
if not stmt.strip():
continue
LOG.info("Executing: {0}".format(stmt))
impala.execute(stmt)
impala.invalidate_metadata(table_name="customer")
impala.invalidate_metadata(table_name="part")
impala.invalidate_metadata(table_name="region")
impala.invalidate_metadata(table_name="supplier")
impala.compute_stats()
LOG.info("Done loading nested TPCH data")