IMPALA-13642: Fix unused test vector in test_scanners.py

Several test vectors were ignored in test_scanners.py. This cause
repetition of the same test without actually varying the test
exec_option nor debug_action.

This patch fix it by:
- Use execute_query() instead of client.execute()
- Passing vector.get_value('exec_option') when executing test query.

Repurpose ImpalaTestMatrix.embed_independent_exec_options to deepcopy
'exec_option' dimension during vector generation. Therefore, each test
execution will have unique copy of 'exec_option' for them self.

This patch also adds flake8-unused-arguments plugin into
critique-gerrit-review.py and py3-requirements.txt so we can catch this
issue during code review. impala-flake8 is also updated to use
impala-python3-common.sh. Adds flake8==3.9.2 in py3-requirements.txt,
which is the highest version that has compatible dependencies with
pylint==2.10.2.

Drop unused 'dryrun' parameter in get_catalog_compatibility_comments
method of critique-gerrit-review.py.

Testing:
- Run impala-flake8 against test_scanners.py and confirm there is no
  more unused variable.
- Run and pass test_scanners.py in core exploration.

Change-Id: I3b78736327c71323d10bcd432e162400b7ed1d9d
Reviewed-on: http://gerrit.cloudera.org:8080/22301
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Riza Suminto
2025-01-03 19:35:55 -08:00
committed by Impala Public Jenkins
parent d23ba87d46
commit 134de01a59
5 changed files with 116 additions and 92 deletions

View File

@@ -17,5 +17,5 @@
# specific language governing permissions and limitations # specific language governing permissions and limitations
# under the License. # under the License.
source "$(dirname "$0")/impala-python-common.sh" source "$(dirname "$0")/impala-python3-common.sh"
exec "$PY_ENV_DIR/bin/flake8" "$@" exec "$PY_ENV_DIR/bin/flake8" "$@"

View File

@@ -51,6 +51,7 @@ import venv
FLAKE8_VERSION = "7.1.1" FLAKE8_VERSION = "7.1.1"
FLAKE8_DIFF_VERSION = "0.2.2" FLAKE8_DIFF_VERSION = "0.2.2"
PYPARSING_VERSION = "3.1.4" PYPARSING_VERSION = "3.1.4"
FLAKE8_UNUSED_ARG_VERSION = "0.0.13"
VENV_PATH = "gerrit_critic_venv" VENV_PATH = "gerrit_critic_venv"
VENV_BIN = os.path.join(VENV_PATH, "bin") VENV_BIN = os.path.join(VENV_PATH, "bin")
@@ -131,7 +132,8 @@ def setup_virtualenv():
"wheel", "wheel",
f"flake8=={FLAKE8_VERSION}", f"flake8=={FLAKE8_VERSION}",
f"flake8-diff=={FLAKE8_DIFF_VERSION}", f"flake8-diff=={FLAKE8_DIFF_VERSION}",
f"pyparsing=={PYPARSING_VERSION}"]) f"pyparsing=={PYPARSING_VERSION}",
f"flake8-unused-arguments=={FLAKE8_UNUSED_ARG_VERSION}"])
# Add the libpath of the installed venv to import pyparsing # Add the libpath of the installed venv to import pyparsing
sys.path.append(os.path.join(VENV_PATH, f"lib/python{sys.version_info.major}." sys.path.append(os.path.join(VENV_PATH, f"lib/python{sys.version_info.major}."
f"{sys.version_info.minor}/site-packages/")) f"{sys.version_info.minor}/site-packages/"))
@@ -346,7 +348,7 @@ def extract_thrift_defs_of_revision(revision, file_name):
return extract_thrift_defs(contents) return extract_thrift_defs(contents)
def get_catalog_compatibility_comments(base_revision, revision, dryrun=False): def get_catalog_compatibility_comments(base_revision, revision):
"""Get comments on Thrift/FlatBuffers changes that might break the communication """Get comments on Thrift/FlatBuffers changes that might break the communication
between impalad and catalogd/statestore""" between impalad and catalogd/statestore"""
comments = defaultdict(lambda: []) comments = defaultdict(lambda: [])
@@ -451,7 +453,7 @@ if __name__ == "__main__":
comments = get_flake8_comments(base_revision, revision) comments = get_flake8_comments(base_revision, revision)
merge_comments(comments, get_misc_comments(base_revision, revision, args.dryrun)) merge_comments(comments, get_misc_comments(base_revision, revision, args.dryrun))
merge_comments( merge_comments(
comments, get_catalog_compatibility_comments(base_revision, revision, args.dryrun)) comments, get_catalog_compatibility_comments(base_revision, revision))
merge_comments(comments, get_planner_tests_comments()) merge_comments(comments, get_planner_tests_comments())
review_input = {"comments": comments} review_input = {"comments": comments}
if len(comments) > 0: if len(comments) > 0:

View File

@@ -30,3 +30,7 @@ pylint == 2.10.2
platformdirs == 2.4.1 platformdirs == 2.4.1
typing-extensions == 3.10.0.2 typing-extensions == 3.10.0.2
k5test==0.10.3 k5test==0.10.3
# Below are needed only for dev
flake8==3.9.2
flake8-unused-arguments==0.0.13

View File

@@ -199,28 +199,32 @@ class ImpalaTestMatrix(object):
else: else:
raise ValueError('Unknown exploration strategy: %s' % exploration_strategy) raise ValueError('Unknown exploration strategy: %s' % exploration_strategy)
def embed_independent_exec_options(self, vector_values): def __deepcopy_vector_values(self, vector_values):
if not self.independent_exec_option_names: """Return a deepcopy of vector_values and merge exec options declared through
return vector_values add_exec_option_dimension() into 'exec_option' dimension."""
values = [] values = []
exec_values = [] exec_values = []
exec_option = None exec_option = None
for val in vector_values: for val in vector_values:
if val.name == EXEC_OPTION_KEY: if val.name == EXEC_OPTION_KEY:
# 'exec_option' is a map. We need to deepcopy the value for each vector.
exec_option = deepcopy(val.value) exec_option = deepcopy(val.value)
elif val.name in self.independent_exec_option_names: elif val.name in self.independent_exec_option_names:
# save this to merge into exec_option later.
exec_values.append(val) exec_values.append(val)
else: else:
values.append(val) values.append(val)
assert exec_option is not None, ( if self.independent_exec_option_names:
"Must have '" + EXEC_OPTION_KEY + "' dimension previously declared!") assert exec_option is not None, (
for val in exec_values: "Must have '" + EXEC_OPTION_KEY + "' dimension previously declared!")
exec_option[val.name] = val.value for val in exec_values:
values.append(ImpalaTestVector.Value(EXEC_OPTION_KEY, exec_option)) exec_option[val.name] = val.value
if exec_option:
values.append(ImpalaTestVector.Value(EXEC_OPTION_KEY, exec_option))
return values return values
def __generate_exhaustive_combinations(self): def __generate_exhaustive_combinations(self):
return [ImpalaTestVector(self.embed_independent_exec_options(vec)) return [ImpalaTestVector(self.__deepcopy_vector_values(vec))
for vec in product(*self.__extract_vector_values()) if self.is_valid(vec)] for vec in product(*self.__extract_vector_values()) if self.is_valid(vec)]
def __generate_pairwise_combinations(self): def __generate_pairwise_combinations(self):
@@ -231,7 +235,7 @@ class ImpalaTestMatrix(object):
# results will be the same. # results will be the same.
if len(self.dimensions) == 1: if len(self.dimensions) == 1:
return self.__generate_exhaustive_combinations() return self.__generate_exhaustive_combinations()
return [ImpalaTestVector(self.embed_independent_exec_options(vec)) return [ImpalaTestVector(self.__deepcopy_vector_values(vec))
for vec in all_pairs(self.__extract_vector_values(), filter_func=self.is_valid)] for vec in all_pairs(self.__extract_vector_values(), filter_func=self.is_valid)]
def add_constraint(self, constraint_func): def add_constraint(self, constraint_func):

View File

@@ -398,7 +398,7 @@ class TestWideTable(ImpalaTestSuite):
cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension("num_cols", *cls.NUM_COLS)) cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension("num_cols", *cls.NUM_COLS))
# To cut down on test execution time, only run in exhaustive. # To cut down on test execution time, only run in exhaustive.
if cls.exploration_strategy() != 'exhaustive': if cls.exploration_strategy() != 'exhaustive':
cls.ImpalaTestMatrix.add_constraint(lambda v: False) pytest.skip("Only run in exhaustive")
def test_wide_table(self, vector): def test_wide_table(self, vector):
if vector.get_value('table_format').file_format == 'kudu': if vector.get_value('table_format').file_format == 'kudu':
@@ -437,6 +437,7 @@ class TestHdfsScannerSkew(ImpalaTestSuite):
@classmethod @classmethod
def add_test_dimensions(cls): def add_test_dimensions(cls):
super(TestHdfsScannerSkew, cls).add_test_dimensions() super(TestHdfsScannerSkew, cls).add_test_dimensions()
add_mandatory_exec_option(cls, 'mt_dop', 2)
cls.ImpalaTestMatrix.add_constraint(lambda v: cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format in ('text') and v.get_value('table_format').file_format in ('text') and
v.get_value('table_format').compression_codec == 'none') v.get_value('table_format').compression_codec == 'none')
@@ -481,7 +482,7 @@ class TestHdfsScannerSkew(ImpalaTestSuite):
tbl_name = unique_database + ".lineitem_skew" tbl_name = unique_database + ".lineitem_skew"
with self.create_impala_client() as imp_client: with self.create_impala_client() as imp_client:
imp_client.set_configuration_option('mt_dop', '2') imp_client.set_configuration(vector.get_value('exec_option'))
imp_client.execute("""create table {} like tpch.lineitem""".format(tbl_name)) imp_client.execute("""create table {} like tpch.lineitem""".format(tbl_name))
# Create a couple of small data files # Create a couple of small data files
for i in range(1, 11): for i in range(1, 11):
@@ -661,16 +662,18 @@ class TestParquet(ImpalaTestSuite):
repetetion level is set to REPEATED succeeds without errors.""" repetetion level is set to REPEATED succeeds without errors."""
create_table_from_parquet(self.client, unique_database, "repeated_root_schema") create_table_from_parquet(self.client, unique_database, "repeated_root_schema")
result = self.client.execute( result = self.execute_query(
"select * from %s.repeated_root_schema" % unique_database) "select * from %s.repeated_root_schema" % unique_database,
vector.get_value('exec_option'))
assert len(result.data) == 300 assert len(result.data) == 300
def test_huge_num_rows(self, vector, unique_database): def test_huge_num_rows(self, vector, unique_database):
"""IMPALA-5021: Tests that a zero-slot scan on a file with a huge num_rows in the """IMPALA-5021: Tests that a zero-slot scan on a file with a huge num_rows in the
footer succeeds without errors.""" footer succeeds without errors."""
create_table_from_parquet(self.client, unique_database, "huge_num_rows") create_table_from_parquet(self.client, unique_database, "huge_num_rows")
result = self.client.execute("select count(*) from %s.huge_num_rows" result = self.execute_query(
% unique_database) "select count(*) from %s.huge_num_rows" % unique_database,
vector.get_value('exec_option'))
assert len(result.data) == 1 assert len(result.data) == 1
assert "4294967294" in result.data assert "4294967294" in result.data
@@ -811,30 +814,31 @@ class TestParquet(ImpalaTestSuite):
# functional.parquet.alltypes is well-formatted. 'NumScannersWithNoReads' counters are # functional.parquet.alltypes is well-formatted. 'NumScannersWithNoReads' counters are
# set to 0. # set to 0.
table_name = 'functional_parquet.alltypes' table_name = 'functional_parquet.alltypes'
self._misaligned_parquet_row_groups_helper(table_name, 7300) self._misaligned_parquet_row_groups_helper(vector, table_name, 7300)
# lineitem_multiblock_parquet/000000_0 is ill-formatted but every scanner reads some # lineitem_multiblock_parquet/000000_0 is ill-formatted but every scanner reads some
# row groups. 'NumScannersWithNoReads' counters are set to 0. # row groups. 'NumScannersWithNoReads' counters are set to 0.
table_name = 'functional_parquet.lineitem_multiblock' table_name = 'functional_parquet.lineitem_multiblock'
self._misaligned_parquet_row_groups_helper(table_name, 20000) self._misaligned_parquet_row_groups_helper(vector, table_name, 20000)
# lineitem_sixblocks.parquet is ill-formatted but every scanner reads some row groups. # lineitem_sixblocks.parquet is ill-formatted but every scanner reads some row groups.
# 'NumScannersWithNoReads' counters are set to 0. # 'NumScannersWithNoReads' counters are set to 0.
table_name = 'functional_parquet.lineitem_sixblocks' table_name = 'functional_parquet.lineitem_sixblocks'
self._misaligned_parquet_row_groups_helper(table_name, 40000) self._misaligned_parquet_row_groups_helper(vector, table_name, 40000)
# Scanning lineitem_one_row_group.parquet finds two scan ranges that end up doing no # Scanning lineitem_one_row_group.parquet finds two scan ranges that end up doing no
# reads because the file is poorly formatted. # reads because the file is poorly formatted.
table_name = 'functional_parquet.lineitem_multiblock_one_row_group' table_name = 'functional_parquet.lineitem_multiblock_one_row_group'
self._misaligned_parquet_row_groups_helper( self._misaligned_parquet_row_groups_helper(
table_name, 40000, num_scanners_with_no_reads=2) vector, table_name, 40000, num_scanners_with_no_reads=2)
def _misaligned_parquet_row_groups_helper( def _misaligned_parquet_row_groups_helper(
self, table_name, rows_in_table, num_scanners_with_no_reads=0, log_prefix=None): self, vector, table_name, rows_in_table, num_scanners_with_no_reads=0,
log_prefix=None):
"""Checks if executing a query logs any warnings and if there are any scanners that """Checks if executing a query logs any warnings and if there are any scanners that
end up doing no reads. 'log_prefix' specifies the prefix of the expected warning. end up doing no reads. 'log_prefix' specifies the prefix of the expected warning.
'num_scanners_with_no_reads' indicates the expected number of scanners that don't read 'num_scanners_with_no_reads' indicates the expected number of scanners that don't read
anything because the underlying file is poorly formatted anything because the underlying file is poorly formatted
""" """
query = 'select * from %s' % table_name query = 'select * from %s' % table_name
result = self.client.execute(query) result = self.execute_query(query, vector.get_value('exec_option'))
assert len(result.data) == rows_in_table assert len(result.data) == rows_in_table
assert (not result.log and not log_prefix) or \ assert (not result.log and not log_prefix) or \
(log_prefix and result.log.startswith(log_prefix)) (log_prefix and result.log.startswith(log_prefix))
@@ -864,8 +868,9 @@ class TestParquet(ImpalaTestSuite):
instead we verify sum of ranges read on a backend is 2.""" instead we verify sum of ranges read on a backend is 2."""
query = 'select count(l_orderkey) from functional_parquet.lineitem_sixblocks' query = 'select count(l_orderkey) from functional_parquet.lineitem_sixblocks'
try: try:
self.client.set_configuration_option('mt_dop', '2') options = vector.get_value('exec_option')
result = self.client.execute(query) options['mt_dop'] = 2
result = self.execute_query(query, options)
TOTAL_ROWS = 40000 TOTAL_ROWS = 40000
ranges_complete_list = re.findall(r'ScanRangesComplete: ([0-9]*)', ranges_complete_list = re.findall(r'ScanRangesComplete: ([0-9]*)',
result.runtime_profile) result.runtime_profile)
@@ -905,11 +910,11 @@ class TestParquet(ImpalaTestSuite):
# For IMPALA-1881. The table functional_parquet.lineitem_multiblock has 3 blocks, so # For IMPALA-1881. The table functional_parquet.lineitem_multiblock has 3 blocks, so
# each impalad should read 1 scan range. # each impalad should read 1 scan range.
table_name = 'functional_parquet.lineitem_multiblock' table_name = 'functional_parquet.lineitem_multiblock'
self._multiple_blocks_helper(table_name, 20000, ranges_per_node=1) self._multiple_blocks_helper(vector, table_name, 20000, ranges_per_node=1)
table_name = 'functional_parquet.lineitem_sixblocks' table_name = 'functional_parquet.lineitem_sixblocks'
# 2 scan ranges per node should be created to read 'lineitem_sixblocks' because # 2 scan ranges per node should be created to read 'lineitem_sixblocks' because
# there are 6 blocks and 3 scan nodes. # there are 6 blocks and 3 scan nodes.
self._multiple_blocks_helper(table_name, 40000, ranges_per_node=2) self._multiple_blocks_helper(vector, table_name, 40000, ranges_per_node=2)
@SkipIfFS.hdfs_small_block @SkipIfFS.hdfs_small_block
@SkipIfLocal.multiple_impalad @SkipIfLocal.multiple_impalad
@@ -919,10 +924,10 @@ class TestParquet(ImpalaTestSuite):
# scan range reads everything from this row group. # scan range reads everything from this row group.
table_name = 'functional_parquet.lineitem_multiblock_one_row_group' table_name = 'functional_parquet.lineitem_multiblock_one_row_group'
self._multiple_blocks_helper( self._multiple_blocks_helper(
table_name, 40000, one_row_group=True, ranges_per_node=1) vector, table_name, 40000, one_row_group=True, ranges_per_node=1)
def _multiple_blocks_helper( def _multiple_blocks_helper(
self, table_name, rows_in_table, one_row_group=False, ranges_per_node=1): self, vector, table_name, rows_in_table, one_row_group=False, ranges_per_node=1):
""" This function executes a simple SELECT query on a multiblock parquet table and """ This function executes a simple SELECT query on a multiblock parquet table and
verifies the number of ranges issued per node and verifies that at least one row group verifies the number of ranges issued per node and verifies that at least one row group
was read. If 'one_row_group' is True, then one scan range is expected to read the data was read. If 'one_row_group' is True, then one scan range is expected to read the data
@@ -930,7 +935,7 @@ class TestParquet(ImpalaTestSuite):
how many scan ranges we expect to be issued per node. """ how many scan ranges we expect to be issued per node. """
query = 'select count(l_orderkey) from %s' % table_name query = 'select count(l_orderkey) from %s' % table_name
result = self.client.execute(query) result = self.execute_query(query, vector.get_value('exec_option'))
assert len(result.data) == 1 assert len(result.data) == 1
assert result.data[0] == str(rows_in_table) assert result.data[0] == str(rows_in_table)
@@ -984,17 +989,19 @@ class TestParquet(ImpalaTestSuite):
if self.exploration_strategy() != 'exhaustive': pytest.skip("Only run in exhaustive") if self.exploration_strategy() != 'exhaustive': pytest.skip("Only run in exhaustive")
# Create table # Create table
options = vector.get_value('exec_option')
TABLE_NAME = "parquet_annotate_utf8_test" TABLE_NAME = "parquet_annotate_utf8_test"
qualified_table_name = "%s.%s" % (unique_database, TABLE_NAME) qualified_table_name = "%s.%s" % (unique_database, TABLE_NAME)
query = 'create table %s (a string, b char(10), c varchar(10), d string) ' \ query = 'create table %s (a string, b char(10), c varchar(10), d string) ' \
'stored as parquet' % qualified_table_name 'stored as parquet' % qualified_table_name
self.client.execute(query) self.execute_query(query, options)
# Insert data that should have UTF8 annotation # Insert data that should have UTF8 annotation
query = 'insert overwrite table %s '\ query = 'insert overwrite table %s '\
'values("a", cast("b" as char(10)), cast("c" as varchar(10)), "d")' \ 'values("a", cast("b" as char(10)), cast("c" as varchar(10)), "d")' \
% qualified_table_name % qualified_table_name
self.execute_query(query, {'parquet_annotate_strings_utf8': True}) options['parquet_annotate_strings_utf8'] = True
self.execute_query(query, options)
def get_schema_elements(): def get_schema_elements():
# Copy the created file to the local filesystem and parse metadata # Copy the created file to the local filesystem and parse metadata
@@ -1026,7 +1033,8 @@ class TestParquet(ImpalaTestSuite):
assert d_schema_elt.converted_type == ConvertedType.UTF8 assert d_schema_elt.converted_type == ConvertedType.UTF8
# Create table and insert data that should not have UTF8 annotation for strings # Create table and insert data that should not have UTF8 annotation for strings
self.execute_query(query, {'parquet_annotate_strings_utf8': False}) options['parquet_annotate_strings_utf8'] = False
self.execute_query(query, options)
# Check that the schema does not use the UTF8 annotation except for CHAR and VARCHAR # Check that the schema does not use the UTF8 annotation except for CHAR and VARCHAR
# columns # columns
@@ -1071,7 +1079,8 @@ class TestParquet(ImpalaTestSuite):
TABLE_NAME = "dict_encoding_with_large_bit_width" TABLE_NAME = "dict_encoding_with_large_bit_width"
create_table_from_parquet(self.client, unique_database, TABLE_NAME) create_table_from_parquet(self.client, unique_database, TABLE_NAME)
result = self.execute_query( result = self.execute_query(
"select * from {0}.{1}".format(unique_database, TABLE_NAME)) "select * from {0}.{1}".format(unique_database, TABLE_NAME),
vector.get_value('exec_option'))
assert(len(result.data) == 33) assert(len(result.data) == 33)
def test_rle_dictionary_encoding(self, vector, unique_database): def test_rle_dictionary_encoding(self, vector, unique_database):
@@ -1168,12 +1177,13 @@ class TestParquet(ImpalaTestSuite):
"""IMPALA-6964: Test that the counter Parquet[Un]compressedPageSize is updated """IMPALA-6964: Test that the counter Parquet[Un]compressedPageSize is updated
when reading [un]compressed Parquet files, and that the counter when reading [un]compressed Parquet files, and that the counter
Parquet[Un]compressedPageSize is not updated.""" Parquet[Un]compressedPageSize is not updated."""
options = vector.get_value('exec_option')
# lineitem_sixblocks is not compressed so ParquetCompressedPageSize should be empty, # lineitem_sixblocks is not compressed so ParquetCompressedPageSize should be empty,
# but ParquetUncompressedPageSize should have been updated. Query needs an order by # but ParquetUncompressedPageSize should have been updated. Query needs an order by
# so that all rows are read. Only access a couple of columns to reduce query runtime. # so that all rows are read. Only access a couple of columns to reduce query runtime.
result = self.client.execute("select l_orderkey" result = self.execute_query("select l_orderkey"
" from functional_parquet.lineitem_sixblocks" " from functional_parquet.lineitem_sixblocks"
" order by l_orderkey limit 10") " order by l_orderkey limit 10", options)
compressed_page_size_summaries = get_bytes_summary_stats_counter( compressed_page_size_summaries = get_bytes_summary_stats_counter(
"ParquetCompressedPageSize", result.runtime_profile) "ParquetCompressedPageSize", result.runtime_profile)
@@ -1195,8 +1205,8 @@ class TestParquet(ImpalaTestSuite):
# alltypestiny is compressed so both ParquetCompressedPageSize and # alltypestiny is compressed so both ParquetCompressedPageSize and
# ParquetUncompressedPageSize should have been updated # ParquetUncompressedPageSize should have been updated
# Query needs an order by so that all rows are read. # Query needs an order by so that all rows are read.
result = self.client.execute("select int_col from functional_parquet.alltypestiny" result = self.execute_query("select int_col from functional_parquet.alltypestiny"
" order by int_col limit 10") " order by int_col limit 10", options)
for summary_name in ("ParquetCompressedPageSize", "ParquetUncompressedPageSize"): for summary_name in ("ParquetCompressedPageSize", "ParquetUncompressedPageSize"):
page_size_summaries = get_bytes_summary_stats_counter( page_size_summaries = get_bytes_summary_stats_counter(
@@ -1211,13 +1221,14 @@ class TestParquet(ImpalaTestSuite):
"""IMPALA-6964: Test that the counter Parquet[Un]compressedBytesReadPerColumn is """IMPALA-6964: Test that the counter Parquet[Un]compressedBytesReadPerColumn is
updated when reading [un]compressed Parquet files, and that the counter updated when reading [un]compressed Parquet files, and that the counter
Parquet[Un]CompressedBytesReadPerColumn is not updated.""" Parquet[Un]CompressedBytesReadPerColumn is not updated."""
options = vector.get_value('exec_option')
# lineitem_sixblocks is not compressed so ParquetCompressedBytesReadPerColumn should # lineitem_sixblocks is not compressed so ParquetCompressedBytesReadPerColumn should
# be empty, but ParquetUncompressedBytesReadPerColumn should have been updated # be empty, but ParquetUncompressedBytesReadPerColumn should have been updated
# Query needs an order by so that all rows are read. Only access a couple of # Query needs an order by so that all rows are read. Only access a couple of
# columns to reduce query runtime. # columns to reduce query runtime.
result = self.client.execute("select l_orderkey, l_partkey " result = self.execute_query("select l_orderkey, l_partkey "
"from functional_parquet.lineitem_sixblocks " "from functional_parquet.lineitem_sixblocks "
" order by l_orderkey limit 10") " order by l_orderkey limit 10", options)
compressed_bytes_read_per_col_summaries = get_bytes_summary_stats_counter( compressed_bytes_read_per_col_summaries = get_bytes_summary_stats_counter(
"ParquetCompressedBytesReadPerColumn", result.runtime_profile) "ParquetCompressedBytesReadPerColumn", result.runtime_profile)
@@ -1244,8 +1255,8 @@ class TestParquet(ImpalaTestSuite):
# alltypestiny is compressed so both ParquetCompressedBytesReadPerColumn and # alltypestiny is compressed so both ParquetCompressedBytesReadPerColumn and
# ParquetUncompressedBytesReadPerColumn should have been updated # ParquetUncompressedBytesReadPerColumn should have been updated
# Query needs an order by so that all rows are read. # Query needs an order by so that all rows are read.
result = self.client.execute("select * from functional_parquet.alltypestiny" result = self.execute_query("select * from functional_parquet.alltypestiny"
" order by int_col limit 10") " order by int_col limit 10", options)
for summary_name in ("ParquetCompressedBytesReadPerColumn", for summary_name in ("ParquetCompressedBytesReadPerColumn",
"ParquetUncompressedBytesReadPerColumn"): "ParquetUncompressedBytesReadPerColumn"):
@@ -1433,7 +1444,7 @@ class TestParquetScanRangeAssigment(ImpalaTestSuite):
result = self.execute_query("select sum(l_extendedprice * l_discount) as revenue " result = self.execute_query("select sum(l_extendedprice * l_discount) as revenue "
"from tpch_parquet.lineitem where l_shipdate >= '1994-01-01' and " "from tpch_parquet.lineitem where l_shipdate >= '1994-01-01' and "
"l_shipdate < '1995-01-01' and l_discount between 0.05 and 0.07 and " "l_shipdate < '1995-01-01' and l_discount between 0.05 and 0.07 and "
"l_quantity < 24") "l_quantity < 24", vector.get_value('exec_option'))
# NumRowGroups tracks the number of row groups actually read, not necessarily the # NumRowGroups tracks the number of row groups actually read, not necessarily the
# number assigned. Assert that each fragment processed exactly one row group. # number assigned. Assert that each fragment processed exactly one row group.
@@ -1523,11 +1534,10 @@ class TestTextSplitDelimiters(ImpalaTestSuite):
- materializes (no) - materializes (no)
""" """
DATA = "abc\r\nde\r\nfg\rhij\rklm\r\nno\r" DATA = "abc\r\nde\r\nfg\rhij\rklm\r\nno\r"
max_scan_range_length = 4 vector.get_value('exec_option')['max_scan_range_length'] = 4
expected_result = ['abc', 'de', 'fg', 'hij', 'klm', 'no'] expected_result = ['abc', 'de', 'fg', 'hij', 'klm', 'no']
self._create_and_query_test_table( self._create_and_query_test_table(vector, unique_database, DATA, expected_result)
vector, unique_database, DATA, max_scan_range_length, expected_result)
def test_text_split_across_buffers_delimiter(self, vector, unique_database): def test_text_split_across_buffers_delimiter(self, vector, unique_database):
"""Creates and queries a datafile that exercises a split "\r\n" across io buffers (but """Creates and queries a datafile that exercises a split "\r\n" across io buffers (but
@@ -1545,31 +1555,29 @@ class TestTextSplitDelimiters(ImpalaTestSuite):
max_scan_range_length = DEFAULT_IO_BUFFER_SIZE * 2 max_scan_range_length = DEFAULT_IO_BUFFER_SIZE * 2
expected_result = data.split("\r\n") expected_result = data.split("\r\n")
new_vector = deepcopy(vector) vector.get_value('exec_option')['max_row_size'] = 9 * 1024 * 1024
new_vector.get_value('exec_option')['max_row_size'] = 9 * 1024 * 1024 vector.get_value('exec_option')['max_scan_range_length'] = max_scan_range_length
self._create_and_query_test_table( self._create_and_query_test_table(vector, unique_database, data, expected_result)
new_vector, unique_database, data, max_scan_range_length, expected_result)
def _create_and_query_test_table(self, vector, unique_database, data, def _create_and_query_test_table(self, vector, unique_database, data, expected_result):
max_scan_range_length, expected_result): options = vector.get_value('exec_option')
TABLE_NAME = "test_text_split_delimiters" TABLE_NAME = "test_text_split_delimiters"
qualified_table_name = "%s.%s" % (unique_database, TABLE_NAME) qualified_table_name = "%s.%s" % (unique_database, TABLE_NAME)
location = get_fs_path("/test-warehouse/%s_%s" % (unique_database, TABLE_NAME)) location = get_fs_path("/test-warehouse/%s_%s" % (unique_database, TABLE_NAME))
query = "create table %s (s string) location '%s'" % (qualified_table_name, location) query = "create table %s (s string) location '%s'" % (qualified_table_name, location)
self.client.execute(query) self.execute_query(query, options)
# Passing "w+" to NamedTemporaryFile prevents it from opening the file in bytes mode # Passing "w+" to NamedTemporaryFile prevents it from opening the file in bytes mode
with tempfile.NamedTemporaryFile(mode="w+") as f: with tempfile.NamedTemporaryFile(mode="w+") as f:
f.write(data) f.write(data)
f.flush() f.flush()
self.filesystem_client.copy_from_local(f.name, location) self.filesystem_client.copy_from_local(f.name, location)
self.client.execute("refresh %s" % qualified_table_name) self.execute_query("refresh %s" % qualified_table_name, options)
vector.get_value('exec_option')['max_scan_range_length'] = max_scan_range_length
query = "select * from %s" % qualified_table_name query = "select * from %s" % qualified_table_name
result = self.execute_query_expect_success( result = self.execute_query_expect_success(
self.client, query, vector.get_value('exec_option')) self.client, query, options)
assert sorted(result.data) == sorted(expected_result) assert sorted(result.data) == sorted(expected_result)
@@ -1627,22 +1635,23 @@ class TestScanTruncatedFiles(ImpalaTestSuite):
cls.ImpalaTestMatrix.add_dimension( cls.ImpalaTestMatrix.add_dimension(
create_uncompressed_text_dimension(cls.get_workload())) create_uncompressed_text_dimension(cls.get_workload()))
else: else:
cls.ImpalaTestMatrix.add_constraint(lambda v: False) pytest.skip("Only run in exhaustive")
def test_scan_truncated_file_empty(self, vector, unique_database): def test_scan_truncated_file_empty(self, vector, unique_database):
self.scan_truncated_file(0, unique_database) self.scan_truncated_file(vector, 0, unique_database)
def test_scan_truncated_file(self, vector, unique_database): def test_scan_truncated_file(self, vector, unique_database):
self.scan_truncated_file(10, unique_database) self.scan_truncated_file(vector, 10, unique_database)
def scan_truncated_file(self, num_rows, db_name): def scan_truncated_file(self, vector, num_rows, db_name):
fq_tbl_name = db_name + ".truncated_file_test" fq_tbl_name = db_name + ".truncated_file_test"
self.execute_query("create table %s (s string)" % fq_tbl_name) options = vector.get_value('exec_option')
self.execute_query("create table %s (s string)" % fq_tbl_name, options)
self.run_stmt_in_hive("insert overwrite table %s select string_col from " self.run_stmt_in_hive("insert overwrite table %s select string_col from "
"functional.alltypes" % fq_tbl_name) "functional.alltypes" % fq_tbl_name)
# Update the Impala metadata # Update the Impala metadata
self.execute_query("refresh %s" % fq_tbl_name) self.execute_query("refresh %s" % fq_tbl_name, options)
# Insert overwrite with a truncated file # Insert overwrite with a truncated file
self.run_stmt_in_hive("insert overwrite table %s select string_col from " self.run_stmt_in_hive("insert overwrite table %s select string_col from "
@@ -1650,9 +1659,9 @@ class TestScanTruncatedFiles(ImpalaTestSuite):
# The file will not exist if the table is empty and the insert is done by Hive 3, so # The file will not exist if the table is empty and the insert is done by Hive 3, so
# another refresh is needed. # another refresh is needed.
self.execute_query("refresh %s" % fq_tbl_name) self.execute_query("refresh %s" % fq_tbl_name, options)
result = self.execute_query("select count(*) from %s" % fq_tbl_name) result = self.execute_query("select count(*) from %s" % fq_tbl_name, options)
assert(len(result.data) == 1) assert(len(result.data) == 1)
assert(result.data[0] == str(num_rows)) assert(result.data[0] == str(num_rows))
@@ -1691,49 +1700,52 @@ class TestOrc(ImpalaTestSuite):
@SkipIfFS.hdfs_small_block @SkipIfFS.hdfs_small_block
@SkipIfLocal.multiple_impalad @SkipIfLocal.multiple_impalad
def test_misaligned_orc_stripes(self, vector, unique_database): def test_misaligned_orc_stripes(self, vector, unique_database):
self._build_lineitem_table_helper(unique_database, 'lineitem_threeblocks', self._build_lineitem_table_helper(vector, unique_database, 'lineitem_threeblocks',
'lineitem_threeblocks.orc') 'lineitem_threeblocks.orc')
self._build_lineitem_table_helper(unique_database, 'lineitem_sixblocks', self._build_lineitem_table_helper(vector, unique_database, 'lineitem_sixblocks',
'lineitem_sixblocks.orc') 'lineitem_sixblocks.orc')
self._build_lineitem_table_helper(unique_database, self._build_lineitem_table_helper(vector, unique_database,
'lineitem_orc_multiblock_one_stripe', 'lineitem_orc_multiblock_one_stripe',
'lineitem_orc_multiblock_one_stripe.orc') 'lineitem_orc_multiblock_one_stripe.orc')
# functional_orc.alltypes is well-formatted. 'NumScannersWithNoReads' counters are # functional_orc.alltypes is well-formatted. 'NumScannersWithNoReads' counters are
# set to 0. # set to 0.
table_name = 'functional_orc_def.alltypes' table_name = 'functional_orc_def.alltypes'
self._misaligned_orc_stripes_helper(table_name, 7300) self._misaligned_orc_stripes_helper(vector, table_name, 7300)
# lineitem_threeblock.orc is ill-formatted but every scanner reads some stripes. # lineitem_threeblock.orc is ill-formatted but every scanner reads some stripes.
# 'NumScannersWithNoReads' counters are set to 0. # 'NumScannersWithNoReads' counters are set to 0.
table_name = unique_database + '.lineitem_threeblocks' table_name = unique_database + '.lineitem_threeblocks'
self._misaligned_orc_stripes_helper(table_name, 16000) self._misaligned_orc_stripes_helper(vector, table_name, 16000)
# lineitem_sixblocks.orc is ill-formatted but every scanner reads some stripes. # lineitem_sixblocks.orc is ill-formatted but every scanner reads some stripes.
# 'NumScannersWithNoReads' counters are set to 0. # 'NumScannersWithNoReads' counters are set to 0.
table_name = unique_database + '.lineitem_sixblocks' table_name = unique_database + '.lineitem_sixblocks'
self._misaligned_orc_stripes_helper(table_name, 30000) self._misaligned_orc_stripes_helper(vector, table_name, 30000)
# Scanning lineitem_orc_multiblock_one_stripe.orc finds two scan ranges that end up # Scanning lineitem_orc_multiblock_one_stripe.orc finds two scan ranges that end up
# doing no reads because the file is poorly formatted. # doing no reads because the file is poorly formatted.
table_name = unique_database + '.lineitem_orc_multiblock_one_stripe' table_name = unique_database + '.lineitem_orc_multiblock_one_stripe'
self._misaligned_orc_stripes_helper( self._misaligned_orc_stripes_helper(
table_name, 16000, num_scanners_with_no_reads=2) vector, table_name, 16000, num_scanners_with_no_reads=2)
def _build_lineitem_table_helper(self, db, tbl, file): def _build_lineitem_table_helper(self, vector, db, tbl, file):
self.client.execute("create table %s.%s like tpch.lineitem stored as orc" % (db, tbl)) options = vector.get_value('exec_option')
self.execute_query(
"create table %s.%s like tpch.lineitem stored as orc" % (db, tbl), options)
tbl_loc = get_fs_path("/test-warehouse/%s.db/%s" % (db, tbl)) tbl_loc = get_fs_path("/test-warehouse/%s.db/%s" % (db, tbl))
# set block size to 156672 so lineitem_threeblocks.orc occupies 3 blocks, # set block size to 156672 so lineitem_threeblocks.orc occupies 3 blocks,
# lineitem_sixblocks.orc occupies 6 blocks. # lineitem_sixblocks.orc occupies 6 blocks.
check_call(['hdfs', 'dfs', '-Ddfs.block.size=156672', '-copyFromLocal', '-d', '-f', check_call(['hdfs', 'dfs', '-Ddfs.block.size=156672', '-copyFromLocal', '-d', '-f',
os.environ['IMPALA_HOME'] + "/testdata/LineItemMultiBlock/" + file, tbl_loc]) os.environ['IMPALA_HOME'] + "/testdata/LineItemMultiBlock/" + file, tbl_loc])
self.client.execute("refresh %s.%s" % (db, tbl)) self.execute_query("refresh %s.%s" % (db, tbl), options)
def _misaligned_orc_stripes_helper( def _misaligned_orc_stripes_helper(
self, table_name, rows_in_table, num_scanners_with_no_reads=0): self, vector, table_name, rows_in_table, num_scanners_with_no_reads=0):
"""Checks if 'num_scanners_with_no_reads' indicates the expected number of scanners """Checks if 'num_scanners_with_no_reads' indicates the expected number of scanners
that don't read anything because the underlying file is poorly formatted. that don't read anything because the underlying file is poorly formatted.
Additionally, test that select count(star) match with expected number of rows. Additionally, test that select count(star) match with expected number of rows.
""" """
options = vector.get_value('exec_option')
query = 'select * from %s' % table_name query = 'select * from %s' % table_name
result = self.client.execute(query) result = self.execute_query(query, options)
assert len(result.data) == rows_in_table assert len(result.data) == rows_in_table
num_scanners_with_no_reads_list = re.findall( num_scanners_with_no_reads_list = re.findall(
@@ -1753,7 +1765,7 @@ class TestOrc(ImpalaTestSuite):
# Test that select count(star) match with expected number of rows. # Test that select count(star) match with expected number of rows.
query = 'select count(*) from %s' % table_name query = 'select count(*) from %s' % table_name
result = self.client.execute(query) result = self.execute_query(query, options)
assert int(result.data[0]) == rows_in_table assert int(result.data[0]) == rows_in_table
# Skip this test on non-HDFS filesystems, because orc-type-check.test contains Hive # Skip this test on non-HDFS filesystems, because orc-type-check.test contains Hive
@@ -1886,7 +1898,7 @@ class TestOrc(ImpalaTestSuite):
self.run_test_case("QueryTest/orc_timestamp_with_local_timezone", vector, self.run_test_case("QueryTest/orc_timestamp_with_local_timezone", vector,
unique_database) unique_database)
def _run_invalid_schema_test(self, unique_database, test_name, expected_error): def _run_invalid_schema_test(self, vector, unique_database, test_name, expected_error):
"""Copies 'test_name'.orc to a table and runs a simple query. These tests should """Copies 'test_name'.orc to a table and runs a simple query. These tests should
cause an error during the processing of the ORC schema, so the file's columns do cause an error during the processing of the ORC schema, so the file's columns do
not have to match with the table's columns. not have to match with the table's columns.
@@ -1896,14 +1908,15 @@ class TestOrc(ImpalaTestSuite):
"CREATE TABLE {db}.{tbl} (id BIGINT) STORED AS ORC", "CREATE TABLE {db}.{tbl} (id BIGINT) STORED AS ORC",
unique_database, test_name, test_files) unique_database, test_name, test_files)
err = self.execute_query_expect_failure(self.client, err = self.execute_query_expect_failure(self.client,
"select count(id) from {0}.{1}".format(unique_database, test_name)) "select count(id) from {0}.{1}".format(unique_database, test_name),
vector.get_value('exec_option'))
assert expected_error in str(err) assert expected_error in str(err)
def test_invalid_schema(self, vector, unique_database): def test_invalid_schema(self, vector, unique_database):
"""Test scanning of ORC file with malformed schema.""" """Test scanning of ORC file with malformed schema."""
self._run_invalid_schema_test(unique_database, "corrupt_schema", self._run_invalid_schema_test(vector, unique_database, "corrupt_schema",
"Encountered parse error in tail of ORC file") "Encountered parse error in tail of ORC file")
self._run_invalid_schema_test(unique_database, "corrupt_root_type", self._run_invalid_schema_test(vector, unique_database, "corrupt_root_type",
"Root of the selected type returned by the ORC lib is not STRUCT: boolean.") "Root of the selected type returned by the ORC lib is not STRUCT: boolean.")
def test_date_out_of_range_orc(self, vector, unique_database): def test_date_out_of_range_orc(self, vector, unique_database):
@@ -2076,29 +2089,30 @@ class TestSingleFileTable(ImpalaTestSuite):
def test_single_file_table(self, vector, unique_database): def test_single_file_table(self, vector, unique_database):
# Create a simple table with one column. # Create a simple table with one column.
options = vector.get_value('exec_option')
params = {"db": unique_database, "tbl": "single_file_table"} params = {"db": unique_database, "tbl": "single_file_table"}
create_tbl_ddl = ("create external table {db}.{tbl} (c1 int) " create_tbl_ddl = ("create external table {db}.{tbl} (c1 int) "
"stored as textfile").format(**params) "stored as textfile").format(**params)
self.execute_query_expect_success(self.client, create_tbl_ddl) self.execute_query_expect_success(self.client, create_tbl_ddl, options)
# Insert one value to the table. # Insert one value to the table.
insert_stmt = "insert into {db}.{tbl} values (1)".format(**params) insert_stmt = "insert into {db}.{tbl} values (1)".format(**params)
self.execute_query_expect_success(self.client, insert_stmt) self.execute_query_expect_success(self.client, insert_stmt, options)
# Show files and get the path to the first data file. # Show files and get the path to the first data file.
show_files_stmt = "show files in {db}.{tbl}".format(**params) show_files_stmt = "show files in {db}.{tbl}".format(**params)
res = self.execute_query_expect_success(self.client, show_files_stmt) res = self.execute_query_expect_success(self.client, show_files_stmt, options)
assert len(res.data) == 1 assert len(res.data) == 1
hdfs_file_path = res.data[0].split("\t")[0] hdfs_file_path = res.data[0].split("\t")[0]
params['new_location'] = hdfs_file_path params['new_location'] = hdfs_file_path
# Alter location to point a data file. # Alter location to point a data file.
alter_stmt = "alter table {db}.{tbl} set location '{new_location}'".format(**params) alter_stmt = "alter table {db}.{tbl} set location '{new_location}'".format(**params)
self.execute_query_expect_success(self.client, alter_stmt) self.execute_query_expect_success(self.client, alter_stmt, options)
# Show files and count star should still work. # Show files and count star should still work.
res = self.execute_query_expect_success(self.client, show_files_stmt) res = self.execute_query_expect_success(self.client, show_files_stmt, options)
assert res.data[0].split("\t")[0] == (hdfs_file_path + '/') assert res.data[0].split("\t")[0] == (hdfs_file_path + '/')
select_stmt = "select count(*) from {db}.{tbl}".format(**params) select_stmt = "select count(*) from {db}.{tbl}".format(**params)
res = self.execute_query_expect_success(self.client, select_stmt) res = self.execute_query_expect_success(self.client, select_stmt, options)
assert res.data[0].split("\t")[0] == '1' assert res.data[0].split("\t")[0] == '1'