mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
Parquet data loading.
This commit is contained in:
3
.gitignore
vendored
3
.gitignore
vendored
@@ -22,9 +22,12 @@ CMakeFiles
|
||||
cmake_install.cmake
|
||||
CTestTestfile.cmake
|
||||
!CMakeLists.txt
|
||||
|
||||
|
||||
Testing/
|
||||
llvm-ir/
|
||||
shell/gen-py/
|
||||
shell/build/
|
||||
tests/results
|
||||
hdfs-data/
|
||||
avro_schemas/
|
||||
|
||||
@@ -75,13 +75,24 @@ def exec_hive_query_from_file(file_name):
|
||||
sys.exit(ret_val)
|
||||
|
||||
def exec_impala_query_from_file(file_name):
|
||||
impala_refresh_cmd = "%s --impalad=%s -q 'refresh'" %\
|
||||
(IMPALA_SHELL_CMD, options.impala_shell_args)
|
||||
impala_cmd = "%s --impalad=%s -f %s" %\
|
||||
(IMPALA_SHELL_CMD, options.impala_shell_args, file_name)
|
||||
# Refresh catalog before and after
|
||||
ret_val = subprocess.call(impala_refresh_cmd, shell = True)
|
||||
if ret_val != 0:
|
||||
print 'Error executing refresh from Impala.'
|
||||
sys.exit(ret_val)
|
||||
print 'Executing Impala Command: ' + impala_cmd
|
||||
ret_val = subprocess.call(impala_cmd, shell = True)
|
||||
if ret_val != 0:
|
||||
print 'Error executing file from Impala: ' + file_name
|
||||
sys.exit(ret_val)
|
||||
ret_val = subprocess.call(impala_refresh_cmd, shell = True)
|
||||
if ret_val != 0:
|
||||
print 'Error executing refresh from Impala.'
|
||||
sys.exit(ret_val)
|
||||
|
||||
def exec_bash_script(file_name):
|
||||
bash_cmd = "bash %s" % file_name
|
||||
@@ -156,18 +167,31 @@ if __name__ == "__main__":
|
||||
dataset_dir = os.path.join(DATASET_DIR, dataset)
|
||||
os.chdir(dataset_dir)
|
||||
generate_schema_statements(workload)
|
||||
# We load Avro tables separately due to bugs in the Avro SerDe.
|
||||
# generate-schema-statements.py separates the avro statements into a
|
||||
# separate file to get around this.
|
||||
# See https://issues.apache.org/jira/browse/HIVE-4195.
|
||||
|
||||
generated_file = 'load-%s-%s-generated.sql' % (workload, options.exploration_strategy)
|
||||
if os.path.exists(generated_file):
|
||||
exec_hive_query_from_file(os.path.join(dataset_dir, generated_file))
|
||||
|
||||
generated_avro_file = \
|
||||
'load-%s-%s-avro-generated.sql' % (workload, options.exploration_strategy)
|
||||
if os.path.exists(generated_avro_file):
|
||||
# We load Avro tables separately due to bugs in the Avro SerDe.
|
||||
# generate-schema-statements.py separates the avro statements into a
|
||||
# separate file to get around this.
|
||||
# See https://issues.apache.org/jira/browse/HIVE-4195.
|
||||
copy_avro_schemas_to_hdfs(AVRO_SCHEMA_DIR)
|
||||
exec_hive_query_from_file(os.path.join(dataset_dir, generated_avro_file))
|
||||
|
||||
generated_parquet_file = \
|
||||
'load-%s-%s-parquet-generated.sql' % (workload, options.exploration_strategy)
|
||||
if os.path.exists(generated_parquet_file):
|
||||
if workload == 'functional-query':
|
||||
# TODO This needs IMPALA-156
|
||||
print "Functional query is not yet working with parquet. Skipping"
|
||||
continue
|
||||
# For parquet, the data loading is run through impala instead of hive
|
||||
exec_impala_query_from_file(os.path.join(dataset_dir, generated_parquet_file))
|
||||
|
||||
loading_time_map[workload] = time.time() - start_time
|
||||
|
||||
total_time = 0.0
|
||||
|
||||
@@ -389,7 +389,8 @@ public class Catalog {
|
||||
LOG.info(String.format("Dropping table %s.%s", dbName, tableName));
|
||||
synchronized (metastoreCreateDropLock) {
|
||||
getMetaStoreClient().getHiveClient().dropTable(dbName, tableName, true, ifExists);
|
||||
dbs.get(dbName).removeTable(tableName);
|
||||
Db db = dbs.get(dbName);
|
||||
if (db != null) db.removeTable(tableName);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
25
testdata/bin/generate-schema-statements.py
vendored
25
testdata/bin/generate-schema-statements.py
vendored
@@ -103,9 +103,7 @@ FILE_FORMAT_MAP = {
|
||||
'text': 'TEXTFILE',
|
||||
'seq': 'SEQUENCEFILE',
|
||||
'rc': 'RCFILE',
|
||||
'parquet':
|
||||
"\nINPUTFORMAT 'com.cloudera.impala.hive.serde.ParquetInputFormat'" +
|
||||
"\nOUTPUTFORMAT 'com.cloudera.impala.hive.serde.ParquetOutputFormat'",
|
||||
'parquet': 'PARQUETFILE',
|
||||
'text_lzo':
|
||||
"\nINPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'" +
|
||||
"\nOUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'",
|
||||
@@ -151,8 +149,7 @@ def build_table_template(file_format, columns, partition_columns, row_format,
|
||||
avro_schema_dir):
|
||||
partitioned_by = str()
|
||||
if partition_columns:
|
||||
partitioned_by = 'PARTITIONED BY (%s)' % \
|
||||
', '.join(partition_columns.split('\n'))
|
||||
partitioned_by = 'PARTITIONED BY (%s)' % ', '.join(partition_columns.split('\n'))
|
||||
|
||||
row_format_stmt = str()
|
||||
if row_format:
|
||||
@@ -165,17 +162,19 @@ def build_table_template(file_format, columns, partition_columns, row_format,
|
||||
% (options.hdfs_namenode, avro_schema_dir)
|
||||
# Override specified row format
|
||||
row_format_stmt = "ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'"
|
||||
elif file_format == 'parquet':
|
||||
row_format_stmt = str()
|
||||
|
||||
# Note: columns are ignored but allowed if a custom serde is specified
|
||||
# (e.g. Avro)
|
||||
return """
|
||||
stmt = """
|
||||
CREATE EXTERNAL TABLE {{db_name}}{{db_suffix}}.{{table_name}} (
|
||||
{columns})
|
||||
{partitioned_by}
|
||||
{row_format}
|
||||
STORED AS {{file_format}}
|
||||
LOCATION '{hive_warehouse_dir}/{{hdfs_location}}'
|
||||
{tblproperties};
|
||||
{tblproperties}
|
||||
""".format(
|
||||
row_format=row_format_stmt,
|
||||
columns=',\n'.join(columns.split('\n')),
|
||||
@@ -184,6 +183,12 @@ LOCATION '{hive_warehouse_dir}/{{hdfs_location}}'
|
||||
tblproperties=tblproperties
|
||||
).strip()
|
||||
|
||||
# Remove empty lines from the stmt string. There is an empty line for
|
||||
# each of the sections that didn't have anything (e.g. partitioned_by)
|
||||
stmt = os.linesep.join([s for s in stmt.splitlines() if s])
|
||||
stmt += ';'
|
||||
return stmt
|
||||
|
||||
def avro_schema(columns):
|
||||
record = {
|
||||
"name": "a", # doesn't matter
|
||||
@@ -313,7 +318,11 @@ def generate_statements(output_name, test_vectors, sections,
|
||||
file_format, data_set, codec, compression_type =\
|
||||
[row.file_format, row.dataset, row.compression_codec, row.compression_type]
|
||||
table_format = '%s/%s/%s' % (file_format, codec, compression_type)
|
||||
output = default_output if 'avro' not in table_format else avro_output
|
||||
output = default_output
|
||||
if file_format == 'avro':
|
||||
output = avro_output
|
||||
elif file_format == 'parquet':
|
||||
output = parquet_output
|
||||
|
||||
for section in sections:
|
||||
alter = section.get('ALTER')
|
||||
|
||||
@@ -175,6 +175,10 @@ show tables
|
||||
string
|
||||
====
|
||||
---- QUERY
|
||||
drop table if exists non_existent_db.tbl
|
||||
---- RESULTS
|
||||
====
|
||||
---- QUERY
|
||||
# Need to switch databases before dropping
|
||||
use default
|
||||
---- RESULTS
|
||||
|
||||
1
testdata/workloads/tpcds/tpcds_core.csv
vendored
1
testdata/workloads/tpcds/tpcds_core.csv
vendored
@@ -1,3 +1,4 @@
|
||||
# Generated File.
|
||||
file_format: text, dataset: tpcds, compression_codec: none, compression_type: none
|
||||
file_format: seq, dataset: tpcds, compression_codec: snap, compression_type: block
|
||||
file_format: parquet, dataset: tpcds, compression_codec: none, compression_type: none
|
||||
|
||||
|
@@ -1,4 +1,4 @@
|
||||
file_format: text,seq,rc,avro
|
||||
file_format: text,seq,rc,avro,parquet
|
||||
dataset: tpcds
|
||||
compression_codec: none,def,gzip,bzip,snap,lzo
|
||||
compression_type: none,block,record
|
||||
|
||||
|
@@ -16,4 +16,8 @@ file_format: rc, dataset: tpcds, compression_codec: gzip, compression_type: bloc
|
||||
file_format: rc, dataset: tpcds, compression_codec: bzip, compression_type: block
|
||||
file_format: rc, dataset: tpcds, compression_codec: snap, compression_type: block
|
||||
file_format: avro, dataset: tpcds, compression_codec: none, compression_type: none
|
||||
file_format: avro, dataset: tpcds, compression_codec: def, compression_type: block
|
||||
file_format: avro, dataset: tpcds, compression_codec: snap, compression_type: block
|
||||
file_format: parquet, dataset: tpcds, compression_codec: none, compression_type: none
|
||||
file_format: parquet, dataset: tpcds, compression_codec: def, compression_type: block
|
||||
file_format: parquet, dataset: tpcds, compression_codec: snap, compression_type: block
|
||||
|
||||
|
9
testdata/workloads/tpcds/tpcds_pairwise.csv
vendored
9
testdata/workloads/tpcds/tpcds_pairwise.csv
vendored
@@ -3,8 +3,13 @@ file_format: text, dataset: tpcds, compression_codec: none, compression_type: no
|
||||
file_format: seq, dataset: tpcds, compression_codec: def, compression_type: block
|
||||
file_format: rc, dataset: tpcds, compression_codec: gzip, compression_type: block
|
||||
file_format: avro, dataset: tpcds, compression_codec: snap, compression_type: block
|
||||
file_format: avro, dataset: tpcds, compression_codec: none, compression_type: none
|
||||
file_format: parquet, dataset: tpcds, compression_codec: snap, compression_type: block
|
||||
file_format: parquet, dataset: tpcds, compression_codec: def, compression_type: block
|
||||
file_format: avro, dataset: tpcds, compression_codec: def, compression_type: block
|
||||
file_format: rc, dataset: tpcds, compression_codec: bzip, compression_type: block
|
||||
file_format: seq, dataset: tpcds, compression_codec: none, compression_type: none
|
||||
file_format: seq, dataset: tpcds, compression_codec: snap, compression_type: record
|
||||
file_format: text, dataset: tpcds, compression_codec: lzo, compression_type: block
|
||||
file_format: rc, dataset: tpcds, compression_codec: def, compression_type: block
|
||||
file_format: avro, dataset: tpcds, compression_codec: none, compression_type: none
|
||||
file_format: parquet, dataset: tpcds, compression_codec: none, compression_type: none
|
||||
file_format: rc, dataset: tpcds, compression_codec: none, compression_type: none
|
||||
|
||||
|
1
testdata/workloads/tpch/tpch_core.csv
vendored
1
testdata/workloads/tpch/tpch_core.csv
vendored
@@ -5,3 +5,4 @@ file_format:seq, dataset:tpch, compression_codec:snap, compression_type:block
|
||||
file_format:rc, dataset:tpch, compression_codec:none, compression_type:none
|
||||
file_format:avro, dataset:tpch, compression_codec: none, compression_type: none
|
||||
file_format:avro, dataset:tpch, compression_codec: snap, compression_type: block
|
||||
file_format:parquet, dataset:tpch, compression_codec: none, compression_type: none
|
||||
|
||||
|
2
testdata/workloads/tpch/tpch_dimensions.csv
vendored
2
testdata/workloads/tpch/tpch_dimensions.csv
vendored
@@ -1,4 +1,4 @@
|
||||
file_format: text,seq
|
||||
file_format: text,seq,rc,avro,parquet
|
||||
dataset: tpch
|
||||
compression_codec: none,def,gzip,bzip,snap,lzo
|
||||
compression_type: none,block,record
|
||||
|
||||
|
11
testdata/workloads/tpch/tpch_exhaustive.csv
vendored
11
testdata/workloads/tpch/tpch_exhaustive.csv
vendored
@@ -10,3 +10,14 @@ file_format: seq, dataset: tpch, compression_codec: bzip, compression_type: bloc
|
||||
file_format: seq, dataset: tpch, compression_codec: bzip, compression_type: record
|
||||
file_format: seq, dataset: tpch, compression_codec: snap, compression_type: block
|
||||
file_format: seq, dataset: tpch, compression_codec: snap, compression_type: record
|
||||
file_format: rc, dataset: tpch, compression_codec: none, compression_type: none
|
||||
file_format: rc, dataset: tpch, compression_codec: def, compression_type: block
|
||||
file_format: rc, dataset: tpch, compression_codec: gzip, compression_type: block
|
||||
file_format: rc, dataset: tpch, compression_codec: bzip, compression_type: block
|
||||
file_format: rc, dataset: tpch, compression_codec: snap, compression_type: block
|
||||
file_format: avro, dataset: tpch, compression_codec: none, compression_type: none
|
||||
file_format: avro, dataset: tpch, compression_codec: def, compression_type: block
|
||||
file_format: avro, dataset: tpch, compression_codec: snap, compression_type: block
|
||||
file_format: parquet, dataset: tpch, compression_codec: none, compression_type: none
|
||||
file_format: parquet, dataset: tpch, compression_codec: def, compression_type: block
|
||||
file_format: parquet, dataset: tpch, compression_codec: snap, compression_type: block
|
||||
|
||||
|
12
testdata/workloads/tpch/tpch_pairwise.csv
vendored
12
testdata/workloads/tpch/tpch_pairwise.csv
vendored
@@ -1,5 +1,15 @@
|
||||
# Generated File.
|
||||
file_format: text, dataset: tpch, compression_codec: none, compression_type: none
|
||||
file_format: seq, dataset: tpch, compression_codec: def, compression_type: block
|
||||
file_format: seq, dataset: tpch, compression_codec: gzip, compression_type: record
|
||||
file_format: rc, dataset: tpch, compression_codec: gzip, compression_type: block
|
||||
file_format: avro, dataset: tpch, compression_codec: snap, compression_type: block
|
||||
file_format: parquet, dataset: tpch, compression_codec: snap, compression_type: block
|
||||
file_format: parquet, dataset: tpch, compression_codec: def, compression_type: block
|
||||
file_format: avro, dataset: tpch, compression_codec: def, compression_type: block
|
||||
file_format: rc, dataset: tpch, compression_codec: bzip, compression_type: block
|
||||
file_format: seq, dataset: tpch, compression_codec: snap, compression_type: record
|
||||
file_format: text, dataset: tpch, compression_codec: lzo, compression_type: block
|
||||
file_format: rc, dataset: tpch, compression_codec: def, compression_type: block
|
||||
file_format: avro, dataset: tpch, compression_codec: none, compression_type: none
|
||||
file_format: parquet, dataset: tpch, compression_codec: none, compression_type: none
|
||||
file_format: rc, dataset: tpch, compression_codec: none, compression_type: none
|
||||
|
||||
|
@@ -32,6 +32,8 @@ class TestCancellation(ImpalaTestSuite):
|
||||
cls.TestMatrix.add_dimension(TestDimension('query', *QUERIES))
|
||||
cls.TestMatrix.add_dimension(TestDimension('cancel_delay', *CANCEL_DELAY_IN_SECONDS))
|
||||
cls.TestMatrix.add_constraint(lambda v: v.get_value('exec_option')['batch_size'] == 0)
|
||||
cls.TestMatrix.add_constraint(lambda v:\
|
||||
v.get_value('table_format').file_format != 'parquet')
|
||||
if cls.exploration_strategy() != 'core':
|
||||
NUM_CANCELATION_ITERATIONS = 3
|
||||
|
||||
|
||||
@@ -35,6 +35,9 @@ class TestLimit(ImpalaTestSuite):
|
||||
# network traffic and makes the machine run very slowly.
|
||||
cls.TestMatrix.add_constraint(lambda v:\
|
||||
v.get_value('limit_value') < 100 or v.get_value('exec_option')['batch_size'] == 0)
|
||||
|
||||
cls.TestMatrix.add_constraint(lambda v:\
|
||||
v.get_value('table_format').file_format != 'parquet')
|
||||
|
||||
def test_limit(self, vector):
|
||||
# We can't validate the rows that are returned since that is non-deterministic.
|
||||
|
||||
@@ -18,6 +18,7 @@ class TestTpcdsQuery(ImpalaTestSuite):
|
||||
# Cut down on the execution time for these tests
|
||||
cls.TestMatrix.add_constraint(lambda v:\
|
||||
v.get_value('table_format').file_format != 'rc' and\
|
||||
v.get_value('table_format').file_format != 'parquet' and\
|
||||
v.get_value('table_format').compression_codec in ['none', 'snap'] and\
|
||||
v.get_value('table_format').compression_type != 'record')
|
||||
cls.TestMatrix.add_constraint(lambda v:\
|
||||
|
||||
Reference in New Issue
Block a user