diff --git a/bin/impala-flake8 b/bin/impala-flake8 index 1d321c86d..4ec6a329e 100755 --- a/bin/impala-flake8 +++ b/bin/impala-flake8 @@ -17,5 +17,5 @@ # specific language governing permissions and limitations # under the License. -source "$(dirname "$0")/impala-python-common.sh" +source "$(dirname "$0")/impala-python3-common.sh" exec "$PY_ENV_DIR/bin/flake8" "$@" diff --git a/bin/jenkins/critique-gerrit-review.py b/bin/jenkins/critique-gerrit-review.py index fbdaa8f5f..56aaf1b6b 100755 --- a/bin/jenkins/critique-gerrit-review.py +++ b/bin/jenkins/critique-gerrit-review.py @@ -51,6 +51,7 @@ import venv FLAKE8_VERSION = "7.1.1" FLAKE8_DIFF_VERSION = "0.2.2" PYPARSING_VERSION = "3.1.4" +FLAKE8_UNUSED_ARG_VERSION = "0.0.13" VENV_PATH = "gerrit_critic_venv" VENV_BIN = os.path.join(VENV_PATH, "bin") @@ -131,7 +132,8 @@ def setup_virtualenv(): "wheel", f"flake8=={FLAKE8_VERSION}", f"flake8-diff=={FLAKE8_DIFF_VERSION}", - f"pyparsing=={PYPARSING_VERSION}"]) + f"pyparsing=={PYPARSING_VERSION}", + f"flake8-unused-arguments=={FLAKE8_UNUSED_ARG_VERSION}"]) # Add the libpath of the installed venv to import pyparsing sys.path.append(os.path.join(VENV_PATH, f"lib/python{sys.version_info.major}." f"{sys.version_info.minor}/site-packages/")) @@ -346,7 +348,7 @@ def extract_thrift_defs_of_revision(revision, file_name): return extract_thrift_defs(contents) -def get_catalog_compatibility_comments(base_revision, revision, dryrun=False): +def get_catalog_compatibility_comments(base_revision, revision): """Get comments on Thrift/FlatBuffers changes that might break the communication between impalad and catalogd/statestore""" comments = defaultdict(lambda: []) @@ -451,7 +453,7 @@ if __name__ == "__main__": comments = get_flake8_comments(base_revision, revision) merge_comments(comments, get_misc_comments(base_revision, revision, args.dryrun)) merge_comments( - comments, get_catalog_compatibility_comments(base_revision, revision, args.dryrun)) + comments, get_catalog_compatibility_comments(base_revision, revision)) merge_comments(comments, get_planner_tests_comments()) review_input = {"comments": comments} if len(comments) > 0: diff --git a/infra/python/deps/py3-requirements.txt b/infra/python/deps/py3-requirements.txt index b61bc461c..c21bb3e92 100644 --- a/infra/python/deps/py3-requirements.txt +++ b/infra/python/deps/py3-requirements.txt @@ -30,3 +30,7 @@ pylint == 2.10.2 platformdirs == 2.4.1 typing-extensions == 3.10.0.2 k5test==0.10.3 + +# Below are needed only for dev +flake8==3.9.2 +flake8-unused-arguments==0.0.13 diff --git a/tests/common/test_vector.py b/tests/common/test_vector.py index 3ccaa2dc2..bdab6cf5b 100644 --- a/tests/common/test_vector.py +++ b/tests/common/test_vector.py @@ -199,28 +199,32 @@ class ImpalaTestMatrix(object): else: raise ValueError('Unknown exploration strategy: %s' % exploration_strategy) - def embed_independent_exec_options(self, vector_values): - if not self.independent_exec_option_names: - return vector_values + def __deepcopy_vector_values(self, vector_values): + """Return a deepcopy of vector_values and merge exec options declared through + add_exec_option_dimension() into 'exec_option' dimension.""" values = [] exec_values = [] exec_option = None for val in vector_values: if val.name == EXEC_OPTION_KEY: + # 'exec_option' is a map. We need to deepcopy the value for each vector. exec_option = deepcopy(val.value) elif val.name in self.independent_exec_option_names: + # save this to merge into exec_option later. exec_values.append(val) else: values.append(val) - assert exec_option is not None, ( - "Must have '" + EXEC_OPTION_KEY + "' dimension previously declared!") - for val in exec_values: - exec_option[val.name] = val.value - values.append(ImpalaTestVector.Value(EXEC_OPTION_KEY, exec_option)) + if self.independent_exec_option_names: + assert exec_option is not None, ( + "Must have '" + EXEC_OPTION_KEY + "' dimension previously declared!") + for val in exec_values: + exec_option[val.name] = val.value + if exec_option: + values.append(ImpalaTestVector.Value(EXEC_OPTION_KEY, exec_option)) return values def __generate_exhaustive_combinations(self): - return [ImpalaTestVector(self.embed_independent_exec_options(vec)) + return [ImpalaTestVector(self.__deepcopy_vector_values(vec)) for vec in product(*self.__extract_vector_values()) if self.is_valid(vec)] def __generate_pairwise_combinations(self): @@ -231,7 +235,7 @@ class ImpalaTestMatrix(object): # results will be the same. if len(self.dimensions) == 1: return self.__generate_exhaustive_combinations() - return [ImpalaTestVector(self.embed_independent_exec_options(vec)) + return [ImpalaTestVector(self.__deepcopy_vector_values(vec)) for vec in all_pairs(self.__extract_vector_values(), filter_func=self.is_valid)] def add_constraint(self, constraint_func): diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py index 4da8977da..c8b703381 100644 --- a/tests/query_test/test_scanners.py +++ b/tests/query_test/test_scanners.py @@ -398,7 +398,7 @@ class TestWideTable(ImpalaTestSuite): cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension("num_cols", *cls.NUM_COLS)) # To cut down on test execution time, only run in exhaustive. if cls.exploration_strategy() != 'exhaustive': - cls.ImpalaTestMatrix.add_constraint(lambda v: False) + pytest.skip("Only run in exhaustive") def test_wide_table(self, vector): if vector.get_value('table_format').file_format == 'kudu': @@ -437,6 +437,7 @@ class TestHdfsScannerSkew(ImpalaTestSuite): @classmethod def add_test_dimensions(cls): super(TestHdfsScannerSkew, cls).add_test_dimensions() + add_mandatory_exec_option(cls, 'mt_dop', 2) cls.ImpalaTestMatrix.add_constraint(lambda v: v.get_value('table_format').file_format in ('text') and v.get_value('table_format').compression_codec == 'none') @@ -481,7 +482,7 @@ class TestHdfsScannerSkew(ImpalaTestSuite): tbl_name = unique_database + ".lineitem_skew" with self.create_impala_client() as imp_client: - imp_client.set_configuration_option('mt_dop', '2') + imp_client.set_configuration(vector.get_value('exec_option')) imp_client.execute("""create table {} like tpch.lineitem""".format(tbl_name)) # Create a couple of small data files for i in range(1, 11): @@ -661,16 +662,18 @@ class TestParquet(ImpalaTestSuite): repetetion level is set to REPEATED succeeds without errors.""" create_table_from_parquet(self.client, unique_database, "repeated_root_schema") - result = self.client.execute( - "select * from %s.repeated_root_schema" % unique_database) + result = self.execute_query( + "select * from %s.repeated_root_schema" % unique_database, + vector.get_value('exec_option')) assert len(result.data) == 300 def test_huge_num_rows(self, vector, unique_database): """IMPALA-5021: Tests that a zero-slot scan on a file with a huge num_rows in the footer succeeds without errors.""" create_table_from_parquet(self.client, unique_database, "huge_num_rows") - result = self.client.execute("select count(*) from %s.huge_num_rows" - % unique_database) + result = self.execute_query( + "select count(*) from %s.huge_num_rows" % unique_database, + vector.get_value('exec_option')) assert len(result.data) == 1 assert "4294967294" in result.data @@ -811,30 +814,31 @@ class TestParquet(ImpalaTestSuite): # functional.parquet.alltypes is well-formatted. 'NumScannersWithNoReads' counters are # set to 0. table_name = 'functional_parquet.alltypes' - self._misaligned_parquet_row_groups_helper(table_name, 7300) + self._misaligned_parquet_row_groups_helper(vector, table_name, 7300) # lineitem_multiblock_parquet/000000_0 is ill-formatted but every scanner reads some # row groups. 'NumScannersWithNoReads' counters are set to 0. table_name = 'functional_parquet.lineitem_multiblock' - self._misaligned_parquet_row_groups_helper(table_name, 20000) + self._misaligned_parquet_row_groups_helper(vector, table_name, 20000) # lineitem_sixblocks.parquet is ill-formatted but every scanner reads some row groups. # 'NumScannersWithNoReads' counters are set to 0. table_name = 'functional_parquet.lineitem_sixblocks' - self._misaligned_parquet_row_groups_helper(table_name, 40000) + self._misaligned_parquet_row_groups_helper(vector, table_name, 40000) # Scanning lineitem_one_row_group.parquet finds two scan ranges that end up doing no # reads because the file is poorly formatted. table_name = 'functional_parquet.lineitem_multiblock_one_row_group' self._misaligned_parquet_row_groups_helper( - table_name, 40000, num_scanners_with_no_reads=2) + vector, table_name, 40000, num_scanners_with_no_reads=2) def _misaligned_parquet_row_groups_helper( - self, table_name, rows_in_table, num_scanners_with_no_reads=0, log_prefix=None): + self, vector, table_name, rows_in_table, num_scanners_with_no_reads=0, + log_prefix=None): """Checks if executing a query logs any warnings and if there are any scanners that end up doing no reads. 'log_prefix' specifies the prefix of the expected warning. 'num_scanners_with_no_reads' indicates the expected number of scanners that don't read anything because the underlying file is poorly formatted """ query = 'select * from %s' % table_name - result = self.client.execute(query) + result = self.execute_query(query, vector.get_value('exec_option')) assert len(result.data) == rows_in_table assert (not result.log and not log_prefix) or \ (log_prefix and result.log.startswith(log_prefix)) @@ -864,8 +868,9 @@ class TestParquet(ImpalaTestSuite): instead we verify sum of ranges read on a backend is 2.""" query = 'select count(l_orderkey) from functional_parquet.lineitem_sixblocks' try: - self.client.set_configuration_option('mt_dop', '2') - result = self.client.execute(query) + options = vector.get_value('exec_option') + options['mt_dop'] = 2 + result = self.execute_query(query, options) TOTAL_ROWS = 40000 ranges_complete_list = re.findall(r'ScanRangesComplete: ([0-9]*)', result.runtime_profile) @@ -905,11 +910,11 @@ class TestParquet(ImpalaTestSuite): # For IMPALA-1881. The table functional_parquet.lineitem_multiblock has 3 blocks, so # each impalad should read 1 scan range. table_name = 'functional_parquet.lineitem_multiblock' - self._multiple_blocks_helper(table_name, 20000, ranges_per_node=1) + self._multiple_blocks_helper(vector, table_name, 20000, ranges_per_node=1) table_name = 'functional_parquet.lineitem_sixblocks' # 2 scan ranges per node should be created to read 'lineitem_sixblocks' because # there are 6 blocks and 3 scan nodes. - self._multiple_blocks_helper(table_name, 40000, ranges_per_node=2) + self._multiple_blocks_helper(vector, table_name, 40000, ranges_per_node=2) @SkipIfFS.hdfs_small_block @SkipIfLocal.multiple_impalad @@ -919,10 +924,10 @@ class TestParquet(ImpalaTestSuite): # scan range reads everything from this row group. table_name = 'functional_parquet.lineitem_multiblock_one_row_group' self._multiple_blocks_helper( - table_name, 40000, one_row_group=True, ranges_per_node=1) + vector, table_name, 40000, one_row_group=True, ranges_per_node=1) def _multiple_blocks_helper( - self, table_name, rows_in_table, one_row_group=False, ranges_per_node=1): + self, vector, table_name, rows_in_table, one_row_group=False, ranges_per_node=1): """ This function executes a simple SELECT query on a multiblock parquet table and verifies the number of ranges issued per node and verifies that at least one row group was read. If 'one_row_group' is True, then one scan range is expected to read the data @@ -930,7 +935,7 @@ class TestParquet(ImpalaTestSuite): how many scan ranges we expect to be issued per node. """ query = 'select count(l_orderkey) from %s' % table_name - result = self.client.execute(query) + result = self.execute_query(query, vector.get_value('exec_option')) assert len(result.data) == 1 assert result.data[0] == str(rows_in_table) @@ -984,17 +989,19 @@ class TestParquet(ImpalaTestSuite): if self.exploration_strategy() != 'exhaustive': pytest.skip("Only run in exhaustive") # Create table + options = vector.get_value('exec_option') TABLE_NAME = "parquet_annotate_utf8_test" qualified_table_name = "%s.%s" % (unique_database, TABLE_NAME) query = 'create table %s (a string, b char(10), c varchar(10), d string) ' \ 'stored as parquet' % qualified_table_name - self.client.execute(query) + self.execute_query(query, options) # Insert data that should have UTF8 annotation query = 'insert overwrite table %s '\ 'values("a", cast("b" as char(10)), cast("c" as varchar(10)), "d")' \ % qualified_table_name - self.execute_query(query, {'parquet_annotate_strings_utf8': True}) + options['parquet_annotate_strings_utf8'] = True + self.execute_query(query, options) def get_schema_elements(): # Copy the created file to the local filesystem and parse metadata @@ -1026,7 +1033,8 @@ class TestParquet(ImpalaTestSuite): assert d_schema_elt.converted_type == ConvertedType.UTF8 # Create table and insert data that should not have UTF8 annotation for strings - self.execute_query(query, {'parquet_annotate_strings_utf8': False}) + options['parquet_annotate_strings_utf8'] = False + self.execute_query(query, options) # Check that the schema does not use the UTF8 annotation except for CHAR and VARCHAR # columns @@ -1071,7 +1079,8 @@ class TestParquet(ImpalaTestSuite): TABLE_NAME = "dict_encoding_with_large_bit_width" create_table_from_parquet(self.client, unique_database, TABLE_NAME) result = self.execute_query( - "select * from {0}.{1}".format(unique_database, TABLE_NAME)) + "select * from {0}.{1}".format(unique_database, TABLE_NAME), + vector.get_value('exec_option')) assert(len(result.data) == 33) def test_rle_dictionary_encoding(self, vector, unique_database): @@ -1168,12 +1177,13 @@ class TestParquet(ImpalaTestSuite): """IMPALA-6964: Test that the counter Parquet[Un]compressedPageSize is updated when reading [un]compressed Parquet files, and that the counter Parquet[Un]compressedPageSize is not updated.""" + options = vector.get_value('exec_option') # lineitem_sixblocks is not compressed so ParquetCompressedPageSize should be empty, # but ParquetUncompressedPageSize should have been updated. Query needs an order by # so that all rows are read. Only access a couple of columns to reduce query runtime. - result = self.client.execute("select l_orderkey" - " from functional_parquet.lineitem_sixblocks" - " order by l_orderkey limit 10") + result = self.execute_query("select l_orderkey" + " from functional_parquet.lineitem_sixblocks" + " order by l_orderkey limit 10", options) compressed_page_size_summaries = get_bytes_summary_stats_counter( "ParquetCompressedPageSize", result.runtime_profile) @@ -1195,8 +1205,8 @@ class TestParquet(ImpalaTestSuite): # alltypestiny is compressed so both ParquetCompressedPageSize and # ParquetUncompressedPageSize should have been updated # Query needs an order by so that all rows are read. - result = self.client.execute("select int_col from functional_parquet.alltypestiny" - " order by int_col limit 10") + result = self.execute_query("select int_col from functional_parquet.alltypestiny" + " order by int_col limit 10", options) for summary_name in ("ParquetCompressedPageSize", "ParquetUncompressedPageSize"): page_size_summaries = get_bytes_summary_stats_counter( @@ -1211,13 +1221,14 @@ class TestParquet(ImpalaTestSuite): """IMPALA-6964: Test that the counter Parquet[Un]compressedBytesReadPerColumn is updated when reading [un]compressed Parquet files, and that the counter Parquet[Un]CompressedBytesReadPerColumn is not updated.""" + options = vector.get_value('exec_option') # lineitem_sixblocks is not compressed so ParquetCompressedBytesReadPerColumn should # be empty, but ParquetUncompressedBytesReadPerColumn should have been updated # Query needs an order by so that all rows are read. Only access a couple of # columns to reduce query runtime. - result = self.client.execute("select l_orderkey, l_partkey " - "from functional_parquet.lineitem_sixblocks " - " order by l_orderkey limit 10") + result = self.execute_query("select l_orderkey, l_partkey " + "from functional_parquet.lineitem_sixblocks " + " order by l_orderkey limit 10", options) compressed_bytes_read_per_col_summaries = get_bytes_summary_stats_counter( "ParquetCompressedBytesReadPerColumn", result.runtime_profile) @@ -1244,8 +1255,8 @@ class TestParquet(ImpalaTestSuite): # alltypestiny is compressed so both ParquetCompressedBytesReadPerColumn and # ParquetUncompressedBytesReadPerColumn should have been updated # Query needs an order by so that all rows are read. - result = self.client.execute("select * from functional_parquet.alltypestiny" - " order by int_col limit 10") + result = self.execute_query("select * from functional_parquet.alltypestiny" + " order by int_col limit 10", options) for summary_name in ("ParquetCompressedBytesReadPerColumn", "ParquetUncompressedBytesReadPerColumn"): @@ -1433,7 +1444,7 @@ class TestParquetScanRangeAssigment(ImpalaTestSuite): result = self.execute_query("select sum(l_extendedprice * l_discount) as revenue " "from tpch_parquet.lineitem where l_shipdate >= '1994-01-01' and " "l_shipdate < '1995-01-01' and l_discount between 0.05 and 0.07 and " - "l_quantity < 24") + "l_quantity < 24", vector.get_value('exec_option')) # NumRowGroups tracks the number of row groups actually read, not necessarily the # number assigned. Assert that each fragment processed exactly one row group. @@ -1523,11 +1534,10 @@ class TestTextSplitDelimiters(ImpalaTestSuite): - materializes (no) """ DATA = "abc\r\nde\r\nfg\rhij\rklm\r\nno\r" - max_scan_range_length = 4 + vector.get_value('exec_option')['max_scan_range_length'] = 4 expected_result = ['abc', 'de', 'fg', 'hij', 'klm', 'no'] - self._create_and_query_test_table( - vector, unique_database, DATA, max_scan_range_length, expected_result) + self._create_and_query_test_table(vector, unique_database, DATA, expected_result) def test_text_split_across_buffers_delimiter(self, vector, unique_database): """Creates and queries a datafile that exercises a split "\r\n" across io buffers (but @@ -1545,31 +1555,29 @@ class TestTextSplitDelimiters(ImpalaTestSuite): max_scan_range_length = DEFAULT_IO_BUFFER_SIZE * 2 expected_result = data.split("\r\n") - new_vector = deepcopy(vector) - new_vector.get_value('exec_option')['max_row_size'] = 9 * 1024 * 1024 + vector.get_value('exec_option')['max_row_size'] = 9 * 1024 * 1024 + vector.get_value('exec_option')['max_scan_range_length'] = max_scan_range_length - self._create_and_query_test_table( - new_vector, unique_database, data, max_scan_range_length, expected_result) + self._create_and_query_test_table(vector, unique_database, data, expected_result) - def _create_and_query_test_table(self, vector, unique_database, data, - max_scan_range_length, expected_result): + def _create_and_query_test_table(self, vector, unique_database, data, expected_result): + options = vector.get_value('exec_option') TABLE_NAME = "test_text_split_delimiters" qualified_table_name = "%s.%s" % (unique_database, TABLE_NAME) location = get_fs_path("/test-warehouse/%s_%s" % (unique_database, TABLE_NAME)) query = "create table %s (s string) location '%s'" % (qualified_table_name, location) - self.client.execute(query) + self.execute_query(query, options) # Passing "w+" to NamedTemporaryFile prevents it from opening the file in bytes mode with tempfile.NamedTemporaryFile(mode="w+") as f: f.write(data) f.flush() self.filesystem_client.copy_from_local(f.name, location) - self.client.execute("refresh %s" % qualified_table_name) + self.execute_query("refresh %s" % qualified_table_name, options) - vector.get_value('exec_option')['max_scan_range_length'] = max_scan_range_length query = "select * from %s" % qualified_table_name result = self.execute_query_expect_success( - self.client, query, vector.get_value('exec_option')) + self.client, query, options) assert sorted(result.data) == sorted(expected_result) @@ -1627,22 +1635,23 @@ class TestScanTruncatedFiles(ImpalaTestSuite): cls.ImpalaTestMatrix.add_dimension( create_uncompressed_text_dimension(cls.get_workload())) else: - cls.ImpalaTestMatrix.add_constraint(lambda v: False) + pytest.skip("Only run in exhaustive") def test_scan_truncated_file_empty(self, vector, unique_database): - self.scan_truncated_file(0, unique_database) + self.scan_truncated_file(vector, 0, unique_database) def test_scan_truncated_file(self, vector, unique_database): - self.scan_truncated_file(10, unique_database) + self.scan_truncated_file(vector, 10, unique_database) - def scan_truncated_file(self, num_rows, db_name): + def scan_truncated_file(self, vector, num_rows, db_name): fq_tbl_name = db_name + ".truncated_file_test" - self.execute_query("create table %s (s string)" % fq_tbl_name) + options = vector.get_value('exec_option') + self.execute_query("create table %s (s string)" % fq_tbl_name, options) self.run_stmt_in_hive("insert overwrite table %s select string_col from " "functional.alltypes" % fq_tbl_name) # Update the Impala metadata - self.execute_query("refresh %s" % fq_tbl_name) + self.execute_query("refresh %s" % fq_tbl_name, options) # Insert overwrite with a truncated file self.run_stmt_in_hive("insert overwrite table %s select string_col from " @@ -1650,9 +1659,9 @@ class TestScanTruncatedFiles(ImpalaTestSuite): # The file will not exist if the table is empty and the insert is done by Hive 3, so # another refresh is needed. - self.execute_query("refresh %s" % fq_tbl_name) + self.execute_query("refresh %s" % fq_tbl_name, options) - result = self.execute_query("select count(*) from %s" % fq_tbl_name) + result = self.execute_query("select count(*) from %s" % fq_tbl_name, options) assert(len(result.data) == 1) assert(result.data[0] == str(num_rows)) @@ -1691,49 +1700,52 @@ class TestOrc(ImpalaTestSuite): @SkipIfFS.hdfs_small_block @SkipIfLocal.multiple_impalad def test_misaligned_orc_stripes(self, vector, unique_database): - self._build_lineitem_table_helper(unique_database, 'lineitem_threeblocks', + self._build_lineitem_table_helper(vector, unique_database, 'lineitem_threeblocks', 'lineitem_threeblocks.orc') - self._build_lineitem_table_helper(unique_database, 'lineitem_sixblocks', + self._build_lineitem_table_helper(vector, unique_database, 'lineitem_sixblocks', 'lineitem_sixblocks.orc') - self._build_lineitem_table_helper(unique_database, + self._build_lineitem_table_helper(vector, unique_database, 'lineitem_orc_multiblock_one_stripe', 'lineitem_orc_multiblock_one_stripe.orc') # functional_orc.alltypes is well-formatted. 'NumScannersWithNoReads' counters are # set to 0. table_name = 'functional_orc_def.alltypes' - self._misaligned_orc_stripes_helper(table_name, 7300) + self._misaligned_orc_stripes_helper(vector, table_name, 7300) # lineitem_threeblock.orc is ill-formatted but every scanner reads some stripes. # 'NumScannersWithNoReads' counters are set to 0. table_name = unique_database + '.lineitem_threeblocks' - self._misaligned_orc_stripes_helper(table_name, 16000) + self._misaligned_orc_stripes_helper(vector, table_name, 16000) # lineitem_sixblocks.orc is ill-formatted but every scanner reads some stripes. # 'NumScannersWithNoReads' counters are set to 0. table_name = unique_database + '.lineitem_sixblocks' - self._misaligned_orc_stripes_helper(table_name, 30000) + self._misaligned_orc_stripes_helper(vector, table_name, 30000) # Scanning lineitem_orc_multiblock_one_stripe.orc finds two scan ranges that end up # doing no reads because the file is poorly formatted. table_name = unique_database + '.lineitem_orc_multiblock_one_stripe' self._misaligned_orc_stripes_helper( - table_name, 16000, num_scanners_with_no_reads=2) + vector, table_name, 16000, num_scanners_with_no_reads=2) - def _build_lineitem_table_helper(self, db, tbl, file): - self.client.execute("create table %s.%s like tpch.lineitem stored as orc" % (db, tbl)) + def _build_lineitem_table_helper(self, vector, db, tbl, file): + options = vector.get_value('exec_option') + self.execute_query( + "create table %s.%s like tpch.lineitem stored as orc" % (db, tbl), options) tbl_loc = get_fs_path("/test-warehouse/%s.db/%s" % (db, tbl)) # set block size to 156672 so lineitem_threeblocks.orc occupies 3 blocks, # lineitem_sixblocks.orc occupies 6 blocks. check_call(['hdfs', 'dfs', '-Ddfs.block.size=156672', '-copyFromLocal', '-d', '-f', os.environ['IMPALA_HOME'] + "/testdata/LineItemMultiBlock/" + file, tbl_loc]) - self.client.execute("refresh %s.%s" % (db, tbl)) + self.execute_query("refresh %s.%s" % (db, tbl), options) def _misaligned_orc_stripes_helper( - self, table_name, rows_in_table, num_scanners_with_no_reads=0): + self, vector, table_name, rows_in_table, num_scanners_with_no_reads=0): """Checks if 'num_scanners_with_no_reads' indicates the expected number of scanners that don't read anything because the underlying file is poorly formatted. Additionally, test that select count(star) match with expected number of rows. """ + options = vector.get_value('exec_option') query = 'select * from %s' % table_name - result = self.client.execute(query) + result = self.execute_query(query, options) assert len(result.data) == rows_in_table num_scanners_with_no_reads_list = re.findall( @@ -1753,7 +1765,7 @@ class TestOrc(ImpalaTestSuite): # Test that select count(star) match with expected number of rows. query = 'select count(*) from %s' % table_name - result = self.client.execute(query) + result = self.execute_query(query, options) assert int(result.data[0]) == rows_in_table # Skip this test on non-HDFS filesystems, because orc-type-check.test contains Hive @@ -1886,7 +1898,7 @@ class TestOrc(ImpalaTestSuite): self.run_test_case("QueryTest/orc_timestamp_with_local_timezone", vector, unique_database) - def _run_invalid_schema_test(self, unique_database, test_name, expected_error): + def _run_invalid_schema_test(self, vector, unique_database, test_name, expected_error): """Copies 'test_name'.orc to a table and runs a simple query. These tests should cause an error during the processing of the ORC schema, so the file's columns do not have to match with the table's columns. @@ -1896,14 +1908,15 @@ class TestOrc(ImpalaTestSuite): "CREATE TABLE {db}.{tbl} (id BIGINT) STORED AS ORC", unique_database, test_name, test_files) err = self.execute_query_expect_failure(self.client, - "select count(id) from {0}.{1}".format(unique_database, test_name)) + "select count(id) from {0}.{1}".format(unique_database, test_name), + vector.get_value('exec_option')) assert expected_error in str(err) def test_invalid_schema(self, vector, unique_database): """Test scanning of ORC file with malformed schema.""" - self._run_invalid_schema_test(unique_database, "corrupt_schema", + self._run_invalid_schema_test(vector, unique_database, "corrupt_schema", "Encountered parse error in tail of ORC file") - self._run_invalid_schema_test(unique_database, "corrupt_root_type", + self._run_invalid_schema_test(vector, unique_database, "corrupt_root_type", "Root of the selected type returned by the ORC lib is not STRUCT: boolean.") def test_date_out_of_range_orc(self, vector, unique_database): @@ -2076,29 +2089,30 @@ class TestSingleFileTable(ImpalaTestSuite): def test_single_file_table(self, vector, unique_database): # Create a simple table with one column. + options = vector.get_value('exec_option') params = {"db": unique_database, "tbl": "single_file_table"} create_tbl_ddl = ("create external table {db}.{tbl} (c1 int) " "stored as textfile").format(**params) - self.execute_query_expect_success(self.client, create_tbl_ddl) + self.execute_query_expect_success(self.client, create_tbl_ddl, options) # Insert one value to the table. insert_stmt = "insert into {db}.{tbl} values (1)".format(**params) - self.execute_query_expect_success(self.client, insert_stmt) + self.execute_query_expect_success(self.client, insert_stmt, options) # Show files and get the path to the first data file. show_files_stmt = "show files in {db}.{tbl}".format(**params) - res = self.execute_query_expect_success(self.client, show_files_stmt) + res = self.execute_query_expect_success(self.client, show_files_stmt, options) assert len(res.data) == 1 hdfs_file_path = res.data[0].split("\t")[0] params['new_location'] = hdfs_file_path # Alter location to point a data file. alter_stmt = "alter table {db}.{tbl} set location '{new_location}'".format(**params) - self.execute_query_expect_success(self.client, alter_stmt) + self.execute_query_expect_success(self.client, alter_stmt, options) # Show files and count star should still work. - res = self.execute_query_expect_success(self.client, show_files_stmt) + res = self.execute_query_expect_success(self.client, show_files_stmt, options) assert res.data[0].split("\t")[0] == (hdfs_file_path + '/') select_stmt = "select count(*) from {db}.{tbl}".format(**params) - res = self.execute_query_expect_success(self.client, select_stmt) + res = self.execute_query_expect_success(self.client, select_stmt, options) assert res.data[0].split("\t")[0] == '1'