diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index 37bf638fc..ae2eee623 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -225,7 +225,7 @@ struct TQueryOptions { // Policy for resolving nested array fields in Parquet files. 54: optional TParquetArrayResolution parquet_array_resolution = - TParquetArrayResolution.TWO_LEVEL_THEN_THREE_LEVEL + TParquetArrayResolution.THREE_LEVEL // Indicates whether to read statistics from Parquet files and use them during query // processing. This includes skipping data based on the statistics and computing query diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index 0360f6c26..1911fc66d 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -247,10 +247,6 @@ enum TImpalaQueryOptions { // between the two and three level encodings with index-based field resolution. // The ambiguity can manually be resolved using this query option, or by using // PARQUET_FALLBACK_SCHEMA_RESOLUTION=name. - // The value TWO_LEVEL_THEN_THREE_LEVEL was the default mode since Impala 2.3. - // It is preserved as the default for compatibility. - // TODO: Remove the TWO_LEVEL_THEN_THREE_LEVEL mode completely or at least make - // it non-default in a compatibility breaking release. PARQUET_ARRAY_RESOLUTION, // Indicates whether to read statistics from Parquet files and use them during query diff --git a/tests/query_test/test_nested_types.py b/tests/query_test/test_nested_types.py index 85ed3a45a..d078cdff5 100644 --- a/tests/query_test/test_nested_types.py +++ b/tests/query_test/test_nested_types.py @@ -19,6 +19,7 @@ import os from subprocess import check_call +from pytest import skip from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.impala_test_suite import ImpalaTestSuite @@ -27,6 +28,7 @@ from tests.common.skip import ( SkipIfS3, SkipIfADLS, SkipIfLocal) +from tests.common.test_vector import ImpalaTestDimension from tests.util.filesystem_utils import WAREHOUSE, get_fs_path class TestNestedTypes(ImpalaTestSuite): @@ -130,6 +132,8 @@ class TestParquetArrayEncodings(ImpalaTestSuite): TESTFILE_DIR = os.path.join(os.environ['IMPALA_HOME'], "testdata/parquet_nested_types_encodings") + ARRAY_RESOLUTION_POLICIES = ["three_level", "two_level", "two_level_then_three_level"] + @classmethod def get_workload(self): return 'functional-query' @@ -137,9 +141,17 @@ class TestParquetArrayEncodings(ImpalaTestSuite): @classmethod def add_test_dimensions(cls): super(TestParquetArrayEncodings, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension( + 'parquet_array_resolution', *TestParquetArrayEncodings.ARRAY_RESOLUTION_POLICIES)) cls.ImpalaTestMatrix.add_constraint(lambda v: v.get_value('table_format').file_format == 'parquet') + def __init_arr_res(self, vector): + arr_res = vector.get_value('parquet_array_resolution') + qopts = vector.get_value('exec_option') + qopts['parquet_array_resolution'] = arr_res + return (arr_res, qopts) + # $ parquet-tools schema SingleFieldGroupInList.parquet # message SingleFieldGroupInList { # optional group single_element_groups (LIST) { @@ -156,54 +168,8 @@ class TestParquetArrayEncodings(ImpalaTestSuite): # .single_element_group: # ..count = 2345 def test_single_field_group_in_list(self, vector, unique_database): - tablename = "SingleFieldGroupInList" - full_name = "%s.%s" % (unique_database, tablename) - self._create_test_table(unique_database, tablename, "SingleFieldGroupInList.parquet", - "col1 array>") - - result = self.client.execute("select item.count from %s.col1" % full_name) - assert len(result.data) == 2 - assert result.data == ['1234', '2345'] - - result = self.client.execute("select item.count from %s t, t.col1" % full_name) - assert len(result.data) == 2 - assert result.data == ['1234', '2345'] - - result = self.client.execute( - "select cnt from %s t, (select count(*) cnt from t.col1) v" % full_name) - assert len(result.data) == 1 - assert result.data == ['2'] - - # $ parquet-tools schema AvroPrimitiveInList.parquet - # message AvroPrimitiveInList { - # required group list_of_ints (LIST) { - # repeated int32 array; - # } - # } - # - # $ parquet-tools cat AvroPrimitiveInList.parquet - # list_of_ints: - # .array = 34 - # .array = 35 - # .array = 36 - def test_avro_primitive_in_list(self, vector, unique_database): - tablename = "AvroPrimitiveInList" - full_name = "%s.%s" % (unique_database, tablename) - self._create_test_table(unique_database, tablename, "AvroPrimitiveInList.parquet", - "col1 array") - - result = self.client.execute("select item from %s.col1" % full_name) - assert len(result.data) == 3 - assert result.data == ['34', '35', '36'] - - result = self.client.execute("select item from %s t, t.col1" % full_name) - assert len(result.data) == 3 - assert result.data == ['34', '35', '36'] - - result = self.client.execute( - "select cnt from %s t, (select count(*) cnt from t.col1) v" % full_name) - assert len(result.data) == 1 - assert result.data == ['3'] + self.__test_single_field_group_in_list(unique_database, "SingleFieldGroupInList", + "SingleFieldGroupInList.parquet", vector) # $ parquet-tools schema AvroSingleFieldGroupInList.parquet # message AvroSingleFieldGroupInList { @@ -221,25 +187,102 @@ class TestParquetArrayEncodings(ImpalaTestSuite): # .array: # ..count = 2345 def test_avro_single_field_group_in_list(self, vector, unique_database): - tablename = "AvroSingleFieldGroupInList" - full_name = "%s.%s" % (unique_database, tablename) - # Note that the field name does not match the field name in the file schema. - self._create_test_table(unique_database, tablename, - "AvroSingleFieldGroupInList.parquet", "col1 array>") + self.__test_single_field_group_in_list(unique_database, "AvroSingleFieldGroupInList", + "AvroSingleFieldGroupInList.parquet", vector) - result = self.client.execute("select item.f1 from %s.col1" % full_name) - assert len(result.data) == 2 - assert result.data == ['1234', '2345'] + # $ parquet-tools schema ThriftSingleFieldGroupInList.parquet + # message ThriftSingleFieldGroupInList { + # optional group single_element_groups (LIST) { + # repeated group single_element_groups_tuple { + # required int64 count; + # } + # } + # } + # + # $ parquet-tools cat ThriftSingleFieldGroupInList.parquet + # single_element_groups: + # .single_element_groups_tuple: + # ..count = 1234 + # .single_element_groups_tuple: + # ..count = 2345 + def test_thrift_single_field_group_in_list(self, vector, unique_database): + self.__test_single_field_group_in_list(unique_database, + "ThriftSingleFieldGroupInList", "ThriftSingleFieldGroupInList.parquet", vector) - result = self.client.execute("select item.f1 from %s t, t.col1" % full_name) - assert len(result.data) == 2 - assert result.data == ['1234', '2345'] + def __test_single_field_group_in_list(self, db, tbl, parq_file, vector): + arr_res, qopts = self.__init_arr_res(vector) + full_name = "%s.%s" % (db, tbl) + if arr_res == "two_level" or arr_res == "two_level_then_three_level": + self._create_test_table(db, tbl, parq_file, "col1 array>") + result = self.execute_query("select item.f1 from %s.col1" % full_name, qopts) + assert result.data == ['1234', '2345'] + result = self.execute_query("select item.f1 from %s t, t.col1" % full_name, qopts) + assert result.data == ['1234', '2345'] - result = self.client.execute( - "select cnt from %s t, (select count(*) cnt from t.col1) v" % full_name) - assert len(result.data) == 1 + if arr_res == "three_level": + self._create_test_table(db, tbl, parq_file, "col1 array") + result = self.execute_query("select item from %s.col1" % full_name, qopts) + assert result.data == ['1234', '2345'] + result = self.execute_query("select item from %s t, t.col1" % full_name, qopts) + assert result.data == ['1234', '2345'] + + result = self.execute_query( + "select cnt from %s t, (select count(*) cnt from t.col1) v" % full_name, qopts) assert result.data == ['2'] + # $ parquet-tools schema AvroPrimitiveInList.parquet + # message AvroPrimitiveInList { + # required group list_of_ints (LIST) { + # repeated int32 array; + # } + # } + # + # $ parquet-tools cat AvroPrimitiveInList.parquet + # list_of_ints: + # .array = 34 + # .array = 35 + # .array = 36 + def test_avro_primitive_in_list(self, vector, unique_database): + self.__test_primitive_in_list(unique_database, "AvroPrimitiveInList", + "AvroPrimitiveInList.parquet", vector) + + # $ parquet-tools schema ThriftPrimitiveInList.parquet + # message ThriftPrimitiveInList { + # required group list_of_ints (LIST) { + # repeated int32 list_of_ints_tuple; + # } + # } + # + # $ parquet-tools cat ThriftPrimitiveInList.parquet + # list_of_ints: + # .list_of_ints_tuple = 34 + # .list_of_ints_tuple = 35 + # .list_of_ints_tuple = 36 + def test_thrift_primitive_in_list(self, vector, unique_database): + self.__test_primitive_in_list(unique_database, "ThriftPrimitiveInList", + "ThriftPrimitiveInList.parquet", vector) + + def __test_primitive_in_list(self, db, tbl, parq_file, vector): + arr_res, qopts = self.__init_arr_res(vector) + full_name = "%s.%s" % (db, tbl) + self._create_test_table(db, tbl, parq_file, "col1 array") + + if arr_res == "two_level" or arr_res == "two_level_then_three_level": + result = self.execute_query("select item from %s.col1" % full_name, qopts) + assert result.data == ['34', '35', '36'] + result = self.execute_query("select item from %s t, t.col1" % full_name, qopts) + assert result.data == ['34', '35', '36'] + + if arr_res == "three_level": + result = self.execute_query("select item from %s.col1" % full_name, qopts) + assert result.data == ['NULL', 'NULL', 'NULL'] + result = self.execute_query("select item from %s t, t.col1" % full_name, qopts) + assert result.data == ['NULL', 'NULL', 'NULL'] + + result = self.execute_query( + "select cnt from %s t, (select count(*) cnt from t.col1) v" % full_name, qopts) + assert result.data == ['3'] + # $ parquet-tools schema bad-avro.parquet # message org.apache.spark.sql.execution.datasources.parquet.test.avro.AvroArrayOfArray { # required group int_arrays_column (LIST) { @@ -280,94 +323,8 @@ class TestParquetArrayEncodings(ImpalaTestSuite): # # [Same int_arrays_column repeated 8x more] def test_avro_array_of_arrays(self, vector, unique_database): - tablename = "AvroArrayOfArrays" - full_name = "%s.%s" % (unique_database, tablename) - self._create_test_table(unique_database, tablename, "bad-avro.parquet", - "col1 array>") - - result = self.client.execute("select item from %s.col1.item" % full_name) - assert len(result.data) == 9 * 10 - assert result.data == ['0', '1', '2', '3', '4', '5', '6', '7', '8'] * 10 - - result = self.client.execute( - "select a2.item from %s t, t.col1 a1, a1.item a2" % full_name) - assert len(result.data) == 9 * 10 - assert result.data == ['0', '1', '2', '3', '4', '5', '6', '7', '8'] * 10 - - result = self.client.execute( - "select cnt from %s t, (select count(*) cnt from t.col1) v" % full_name) - assert len(result.data) == 10 - assert result.data == ['3'] * 10 - - result = self.client.execute( - "select cnt from %s t, t.col1 a1, (select count(*) cnt from a1.item) v" % full_name) - assert len(result.data) == 3 * 10 - assert result.data == ['3', '3', '3'] * 10 - - # $ parquet-tools schema ThriftPrimitiveInList.parquet - # message ThriftPrimitiveInList { - # required group list_of_ints (LIST) { - # repeated int32 list_of_ints_tuple; - # } - # } - # - # $ parquet-tools cat ThriftPrimitiveInList.parquet - # list_of_ints: - # .list_of_ints_tuple = 34 - # .list_of_ints_tuple = 35 - # .list_of_ints_tuple = 36 - def test_thrift_primitive_in_list(self, vector, unique_database): - tablename = "ThriftPrimitiveInList" - full_name = "%s.%s" % (unique_database, tablename) - self._create_test_table(unique_database, tablename, - "ThriftPrimitiveInList.parquet", "col1 array") - - result = self.client.execute("select item from %s.col1" % full_name) - assert len(result.data) == 3 - assert result.data == ['34', '35', '36'] - - result = self.client.execute("select item from %s t, t.col1" % full_name) - assert len(result.data) == 3 - assert result.data == ['34', '35', '36'] - - result = self.client.execute( - "select cnt from %s t, (select count(*) cnt from t.col1) v" % full_name) - assert len(result.data) == 1 - assert result.data == ['3'] - - # $ parquet-tools schema ThriftSingleFieldGroupInList.parquet - # message ThriftSingleFieldGroupInList { - # optional group single_element_groups (LIST) { - # repeated group single_element_groups_tuple { - # required int64 count; - # } - # } - # } - # - # $ parquet-tools cat ThriftSingleFieldGroupInList.parquet - # single_element_groups: - # .single_element_groups_tuple: - # ..count = 1234 - # .single_element_groups_tuple: - # ..count = 2345 - def test_thrift_single_field_group_in_list(self, vector, unique_database): - tablename = "ThriftSingleFieldGroupInList" - full_name = "%s.%s" % (unique_database, tablename) - self._create_test_table(unique_database, tablename, - "ThriftSingleFieldGroupInList.parquet", "col1 array>") - - result = self.client.execute("select item.f1 from %s.col1" % full_name) - assert len(result.data) == 2 - assert result.data == ['1234', '2345'] - - result = self.client.execute("select item.f1 from %s t, t.col1" % full_name) - assert len(result.data) == 2 - assert result.data == ['1234', '2345'] - - result = self.client.execute( - "select cnt from %s t, (select count(*) cnt from t.col1) v" % full_name) - assert len(result.data) == 1 - assert result.data == ['2'] + self.__test_array_of_arrays(unique_database, "AvroArrayOfArrays", + "bad-avro.parquet", vector, 10) # $ parquet-tools schema bad-thrift.parquet # message ParquetSchema { @@ -393,29 +350,39 @@ class TestParquetArrayEncodings(ImpalaTestSuite): # ..intListsColumn_tuple_tuple = 7 # ..intListsColumn_tuple_tuple = 8 def test_thrift_array_of_arrays(self, vector, unique_database): - tablename = "ThriftArrayOfArrays" - full_name = "%s.%s" % (unique_database, tablename) - self._create_test_table(unique_database, tablename, "bad-thrift.parquet", - "col1 array>") + self.__test_array_of_arrays(unique_database, "ThriftArrayOfArrays", + "bad-thrift.parquet", vector, 1) - result = self.client.execute("select item from %s.col1.item" % full_name) - assert len(result.data) == 9 - assert result.data == ['0', '1', '2', '3', '4', '5', '6', '7', '8'] + def __test_array_of_arrays(self, db, tbl, parq_file, vector, mult): + arr_res, qopts = self.__init_arr_res(vector) + full_name = "%s.%s" % (db, tbl) + self._create_test_table(db, tbl, parq_file, "col1 array>") - result = self.client.execute( - "select a2.item from %s t, t.col1 a1, a1.item a2" % full_name) - assert len(result.data) == 9 - assert result.data == ['0', '1', '2', '3', '4', '5', '6', '7', '8'] + if arr_res == "two_level" or arr_res == "two_level_then_three_level": + result = self.execute_query("select item from %s.col1.item" % full_name, qopts) + assert result.data == ['0', '1', '2', '3', '4', '5', '6', '7', '8'] * mult + result = self.execute_query( + "select a2.item from %s t, t.col1 a1, a1.item a2" % full_name) + assert result.data == ['0', '1', '2', '3', '4', '5', '6', '7', '8'] * mult + result = self.execute_query( + "select cnt from %s t, (select count(*) cnt from t.col1) v" % full_name, qopts) + assert result.data == ['3'] * mult + result = self.execute_query( + "select cnt from %s t, t.col1 a1, (select count(*) cnt from a1.item) v"\ + % full_name, qopts) + assert result.data == ['3', '3', '3'] * mult - result = self.client.execute( - "select cnt from %s t, (select count(*) cnt from t.col1) v" % full_name) - assert len(result.data) == 1 - assert result.data == ['3'] - - result = self.client.execute( - "select cnt from %s t, t.col1 a1, (select count(*) cnt from a1.item) v" % full_name) - assert len(result.data) == 3 - assert result.data == ['3', '3', '3'] + if arr_res == "three_level": + expected_err = "has an incompatible Parquet schema" + try: + self.execute_query("select item from %s.col1.item" % full_name, qopts) + except Exception, e: + assert expected_err in str(e) + try: + self.execute_query("select cnt from %s t, (select count(*) cnt from t.col1) v"\ + % full_name, qopts) + except Exception, e: + assert expected_err in str(e) # $ parquet-tools schema UnannotatedListOfPrimitives.parquet # message UnannotatedListOfPrimitives { @@ -427,22 +394,18 @@ class TestParquetArrayEncodings(ImpalaTestSuite): # list_of_ints = 35 # list_of_ints = 36 def test_unannotated_list_of_primitives(self, vector, unique_database): + arr_res, qopts = self.__init_arr_res(vector) tablename = "UnannotatedListOfPrimitives" full_name = "%s.%s" % (unique_database, tablename) self._create_test_table(unique_database, tablename, "UnannotatedListOfPrimitives.parquet", "col1 array") - result = self.client.execute("select item from %s.col1" % full_name) - assert len(result.data) == 3 + result = self.execute_query("select item from %s.col1" % full_name, qopts) assert result.data == ['34', '35', '36'] - - result = self.client.execute("select item from %s t, t.col1" % full_name) - assert len(result.data) == 3 + result = self.execute_query("select item from %s t, t.col1" % full_name, qopts) assert result.data == ['34', '35', '36'] - - result = self.client.execute( - "select cnt from %s t, (select count(*) cnt from t.col1) v" % full_name) - assert len(result.data) == 1 + result = self.execute_query( + "select cnt from %s t, (select count(*) cnt from t.col1) v" % full_name, qopts) assert result.data == ['3'] # $ parquet-tools schema UnannotatedListOfGroups.parquet @@ -461,22 +424,18 @@ class TestParquetArrayEncodings(ImpalaTestSuite): # .x = 2.0 # .y = 2.0 def test_unannotated_list_of_groups(self, vector, unique_database): + arr_res, qopts = self.__init_arr_res(vector) tablename = "UnannotatedListOfGroups" full_name = "%s.%s" % (unique_database, tablename) self._create_test_table(unique_database, tablename, "UnannotatedListOfGroups.parquet", "col1 array>") - result = self.client.execute("select f1, f2 from %s.col1" % full_name) - assert len(result.data) == 2 + result = self.execute_query("select f1, f2 from %s.col1" % full_name, qopts) assert result.data == ['1\t1', '2\t2'] - - result = self.client.execute("select f1, f2 from %s t, t.col1" % full_name) - assert len(result.data) == 2 + result = self.execute_query("select f1, f2 from %s t, t.col1" % full_name, qopts) assert result.data == ['1\t1', '2\t2'] - - result = self.client.execute( - "select cnt from %s t, (select count(*) cnt from t.col1) v" % full_name) - assert len(result.data) == 1 + result = self.execute_query( + "select cnt from %s t, (select count(*) cnt from t.col1) v" % full_name, qopts) assert result.data == ['2'] # $ parquet-tools schema AmbiguousList_Modern.parquet @@ -548,6 +507,11 @@ class TestParquetArrayEncodings(ImpalaTestSuite): not exactly match the data files'. The name-based policy generally does not have this problem because it avoids traversing incorrect schema paths. """ + + # The Parquet resolution policy is manually set in the .test files. + if vector.get_value('parquet_array_resolution') != "three_level": + skip("Test only run with three_level") + ambig_modern_tbl = "ambig_modern" self._create_test_table(unique_database, ambig_modern_tbl, "AmbiguousList_Modern.parquet",