IMPALA-9068: Use different directories for external vs managed warehouse

Hive 3 changed the typical storage model for tables to split them
between two directories:
 - hive.metastore.warehouse.dir stores managed tables (which is now
   defined to be only transactional tables)
 - hive.metastore.warehouse.external.dir stores external tables
   (everything that is not a transactional table)
In more recent commits of Hive, there is now validation that the
external tables cannot be stored in the managed directory. In order
to adopt these newer versions of Hive, we need to use separate
directories for external vs managed warehouses.

Most of our test tables are not transactional, so they would reside
in the external directory. To keep the test changes small, this uses
/test-warehouse for the external directory and /test-warehouse/managed
for the managed directory. Having the managed directory be a subdirectory
of /test-warehouse means that the data snapshot code should not need to
change.

The Hive 2 configuration doesn't change as it does not have this concept.

Since this changes the dataload layout, this also sets the CDH_MAJOR_VERSION
to 7 for USE_CDP_HIVE=true. This means that dataload will uses a separate
location for data as compared to USE_CDP_HIVE=false. That should reduce
conflicts between the two configurations.

Testing:
 - Ran exhaustive tests with USE_CDP_HIVE=false
 - Ran exhaustive tests with USE_CDP_HIVE=true (with current Hive version)
 - Verified that dataload succeeds and tests are able to run with a newer
   Hive version.

Change-Id: I3db69f1b8ca07ae98670429954f5f7a1a359eaec
Reviewed-on: http://gerrit.cloudera.org:8080/15026
Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Joe McDonnell
2019-10-29 10:52:25 -07:00
parent 3d10e8abbe
commit 0163a10332
17 changed files with 99 additions and 43 deletions

View File

@@ -85,14 +85,23 @@ def load():
with cluster.impala.cursor() as impala:
impala.ensure_empty_db(target_db)
impala.execute("USE %s" % target_db)
external = ""
tblproperties = "'{0}' = '{1}'".format(compression_key, compression_value)
# For Hive 3+, workaround for HIVE-22371 (CTAS puts files in the wrong place) by
# explicitly creating an external table so that files are in the external warehouse
# directory. Use external.table.purge=true so that it is equivalent to a Hive 2
# managed table.
if HIVE_MAJOR_VERSION >= 3:
external = "EXTERNAL"
tblproperties += ",'external.table.purge'='TRUE'"
sql_params = {
"source_db": source_db,
"target_db": target_db,
"file_format": file_format,
"compression_key": compression_key,
"compression_value": compression_value,
"chunks": chunks,
"warehouse_dir": cluster.hive.warehouse_dir}
"warehouse_dir": cluster.hive.warehouse_dir,
"tblproperties": tblproperties,
"external": external}
# Split table creation into multiple queries or "chunks" so less memory is needed.
for chunk_idx in xrange(chunks):
@@ -285,9 +294,9 @@ def load():
# For ORC format, we create the 'part' table by Hive
with cluster.hive.cursor(db_name=target_db) as hive:
hive.execute("""
CREATE TABLE part
CREATE {external} TABLE part
STORED AS ORC
TBLPROPERTIES('{compression_key}'='{compression_value}')
TBLPROPERTIES({tblproperties})
AS SELECT * FROM {source_db}.part""".format(**sql_params))
# Hive is used to convert the data into parquet/orc and drop all the temp tables.
@@ -298,14 +307,14 @@ def load():
SET parquet.block.size=10737418240;
SET dfs.block.size=1073741824;
CREATE TABLE region
CREATE {external} TABLE region
STORED AS {file_format}
TBLPROPERTIES('{compression_key}'='{compression_value}')
TBLPROPERTIES({tblproperties})
AS SELECT * FROM tmp_region;
CREATE TABLE supplier
CREATE {external} TABLE supplier
STORED AS {file_format}
TBLPROPERTIES('{compression_key}'='{compression_value}')
TBLPROPERTIES({tblproperties})
AS SELECT * FROM tmp_supplier;"""
# A simple CTAS for tpch_nested_parquet.customer would create 3 files with Hive3 vs
@@ -316,9 +325,9 @@ def load():
# TODO: find a less hacky way to ensure a fix number of files
if HIVE_MAJOR_VERSION >= 3 and file_format == "parquet":
create_final_tables_sql += """
CREATE TABLE customer
CREATE {external} TABLE customer
STORED AS {file_format}
TBLPROPERTIES('{compression_key}'='{compression_value}')
TBLPROPERTIES({tblproperties})
AS SELECT * FROM tmp_customer
WHERE c_custkey >= 10;
@@ -327,9 +336,9 @@ def load():
WHERE c_custkey < 10;"""
else:
create_final_tables_sql += """
CREATE TABLE customer
CREATE {external} TABLE customer
STORED AS {file_format}
TBLPROPERTIES('{compression_key}'='{compression_value}')
TBLPROPERTIES({tblproperties})
AS SELECT * FROM tmp_customer;"""
with cluster.hive.cursor(db_name=target_db) as hive: