IMPALA-13961: Remove usage of ImpalaBeeswaxResult.schema

An equivalent of ImpalaBeeswaxResult.schema is not implemented at
ImpylaHS2ResultSet. However, column_labels and column_types fields are
implemented for both.

This patch removes usage of ImpalaBeeswaxResult.schema and replaces it
with either column_labels or column_types field. Tests that used to
access ImpalaBeeswaxResult.schema are migrated to test using hs2
protocol by default. Also fix flake8 issues in modified test files.

Testing:
Run and pass modified test files in exhaustive exploration.

Change-Id: I060fe2d3cded1470fd09b86675cb22442c19fbee
Reviewed-on: http://gerrit.cloudera.org:8080/22776
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Riza Suminto
2025-04-13 22:08:03 -07:00
committed by Impala Public Jenkins
parent c5a0ec8bdf
commit b46d541501
8 changed files with 58 additions and 23 deletions

View File

@@ -956,6 +956,11 @@ class ImpylaHS2ResultSet(object):
"""Return the raw HS2 result set, which is a list of tuples."""
return self.__result_tuples
def get_data(self):
if self.data:
return '\n'.join(self.data)
return ''
def __convert_result_row(self, result_tuple):
"""Take primitive values from a result tuple and construct the tab-separated string
that would have been returned via beeswax."""

View File

@@ -569,10 +569,10 @@ class ImpalaTestSuite(BaseTestSuite):
'include_fields'. Field names are compared case-insensitively.
"""
exec_result = self.client.execute('show partitions %s' % table_name)
fieldSchemas = exec_result.schema.fieldSchemas
column_labels = exec_result.column_labels
fields_dict = {}
for idx, fs in enumerate(fieldSchemas):
fields_dict[fs.name.lower()] = idx
for idx, name in enumerate(column_labels):
fields_dict[name.lower()] = idx
rows = exec_result.get_data().split('\n')
rows.pop()

View File

@@ -18,11 +18,16 @@
from __future__ import absolute_import, division, print_function
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.skip import SkipIfFS
from tests.common.test_vector import HS2
@SkipIfFS.hive
class TestMetadataNoEventsProcessing(CustomClusterTestSuite):
@classmethod
def default_test_protocol(cls):
return HS2
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=0")
def test_refresh_updated_partitions(self, unique_database):
"""
@@ -121,7 +126,7 @@ class TestMetadataNoEventsProcessing(CustomClusterTestSuite):
assert len(result) == 6
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=0")
def test_add_overlapping_partitions(self, vector, unique_database):
def test_add_overlapping_partitions(self, unique_database):
"""
IMPALA-1670, IMPALA-4141: Test interoperability with Hive when adding overlapping
partitions to a table
@@ -183,7 +188,7 @@ class TestMetadataNoEventsProcessing(CustomClusterTestSuite):
assert x1_location.endswith("/x=1")
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=0")
def test_add_preexisting_partitions_with_data(self, unique_database, vector):
def test_add_preexisting_partitions_with_data(self, unique_database):
"""
IMPALA-1670, IMPALA-4141: After addding partitions that already exist in HMS, Impala
can access the partition data.
@@ -223,7 +228,7 @@ class TestMetadataNoEventsProcessing(CustomClusterTestSuite):
'select x, a from %s order by x, a' % table_name).get_data().split('\n')
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=0")
def test_refresh_invalid_partition(self, vector, unique_database):
def test_refresh_invalid_partition(self, unique_database):
"""
Trying to refresh a partition that does not exist does not modify anything
either in impala or hive.
@@ -241,7 +246,7 @@ class TestMetadataNoEventsProcessing(CustomClusterTestSuite):
assert ['y=333/z=5309'] == self.hive_partition_names(table_name)
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=0")
def test_add_data_and_refresh(self, vector, unique_database):
def test_add_data_and_refresh(self, unique_database):
"""
Data added through hive is visible in impala after refresh of partition.
"""
@@ -266,7 +271,7 @@ class TestMetadataNoEventsProcessing(CustomClusterTestSuite):
'select * from %s' % table_name).get_data()
@CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=0")
def test_refresh_partition_num_rows(self, vector, unique_database):
def test_refresh_partition_num_rows(self, unique_database):
"""Refreshing a partition should not change it's numRows stat."""
# Create a partitioned table and add data to it.
tbl = unique_database + ".t1"

View File

@@ -19,15 +19,20 @@ from __future__ import absolute_import, division, print_function
import pytest
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.test_dimensions import (
create_exec_option_dimension,
create_single_exec_option_dimension,
create_uncompressed_text_dimension)
from tests.common.test_vector import HS2
class TestStatsExtrapolation(CustomClusterTestSuite):
"""Minimal end-to-end test for the --enable_stats_extrapolation impalad flag. This test
primarly checks that the flag is propagated to the FE. More testing is done in FE unit
tests and metadata/test_stats_extrapolation.py."""
@classmethod
def default_test_protocol(cls):
return HS2
@classmethod
def add_test_dimensions(cls):
super(TestStatsExtrapolation, cls).add_test_dimensions()
@@ -52,13 +57,13 @@ class TestStatsExtrapolation(CustomClusterTestSuite):
"compute stats {0} tablesample system (13)".format(part_test_tbl))
# Check that table stats were set.
table_stats = self.client.execute("show table stats {0}".format(part_test_tbl))
col_names = [fs.name.upper() for fs in table_stats.schema.fieldSchemas]
col_names = table_stats.column_labels
extrap_rows_idx = col_names.index("EXTRAP #ROWS")
for row in table_stats.data:
assert int(row.split("\t")[extrap_rows_idx]) >= 0
# Check that column stats were set.
col_stats = self.client.execute("show column stats {0}".format(part_test_tbl))
col_names = [fs.name.upper() for fs in col_stats.schema.fieldSchemas]
col_names = col_stats.column_labels
ndv_col_idx = col_names.index("#DISTINCT VALUES")
for row in col_stats.data:
assert int(row.split("\t")[ndv_col_idx]) >= 0

View File

@@ -25,6 +25,7 @@ from tests.common.test_dimensions import (
add_mandatory_exec_option)
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import SkipIfFS, SkipIfHive2, SkipIfCatalogV2
from tests.common.test_vector import HS2
from tests.metadata.test_event_processing_base import TestEventProcessingBase
from tests.util.event_processor_utils import EventProcessorUtils
@@ -37,6 +38,10 @@ class TestEventProcessing(ImpalaTestSuite):
"""This class contains tests that exercise the event processing mechanism in the
catalog."""
@classmethod
def default_test_protocol(cls):
return HS2
@SkipIfHive2.acid
def test_transactional_insert_events(self, unique_database):
"""Executes 'run_test_insert_events' for transactional tables.

View File

@@ -36,6 +36,7 @@ from tests.common.skip import SkipIfFS, SkipIfHive2, SkipIfHive3
from tests.common.test_dimensions import (
create_single_exec_option_dimension,
create_uncompressed_text_dimension)
from tests.common.test_vector import HS2
from tests.util.event_processor_utils import EventProcessorUtils
from tests.util.hive_utils import HiveDbWrapper, HiveTableWrapper
@@ -133,6 +134,10 @@ class TestHmsIntegrationSanity(ImpalaTestSuite):
@SkipIfFS.hive
class TestHmsIntegration(ImpalaTestSuite):
@classmethod
def default_test_protocol(cls):
return HS2
@classmethod
def add_test_dimensions(cls):
super(TestHmsIntegration, cls).add_test_dimensions()

View File

@@ -19,6 +19,7 @@ from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.test_dimensions import create_single_exec_option_dimension
from tests.common.test_dimensions import create_uncompressed_text_dimension
from tests.common.skip import SkipIfFS
from tests.common.test_vector import HS2
from tests.util.filesystem_utils import get_fs_path
@@ -29,6 +30,10 @@ class TestRefreshPartition(ImpalaTestSuite):
for a table in HDFS
"""
@classmethod
def default_test_protocol(cls):
return HS2
@classmethod
def add_test_dimensions(cls):
super(TestRefreshPartition, cls).add_test_dimensions()
@@ -162,4 +167,4 @@ class TestRefreshPartition(ImpalaTestSuite):
# Check that data is visible for the second partition after refresh
self.client.execute("refresh %s partition (year=2010, month=2)" % table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(file_num_rows*2)]
assert result.data == [str(file_num_rows * 2)]

View File

@@ -17,12 +17,12 @@
from __future__ import absolute_import, division, print_function
from builtins import range
from os import path
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import SkipIfEC
from tests.common.test_dimensions import (
create_single_exec_option_dimension,
create_uncompressed_text_dimension)
from tests.common.test_vector import HS2
class TestStatsExtrapolation(ImpalaTestSuite):
@@ -30,6 +30,10 @@ class TestStatsExtrapolation(ImpalaTestSuite):
enabled via table property and not via the impalad startup flag so these tests can be
run as regular tests (non-custom-cluster) and in parallel with other tests."""
@classmethod
def default_test_protocol(cls):
return HS2
@classmethod
def add_test_dimensions(cls):
super(TestStatsExtrapolation, cls).add_test_dimensions()
@@ -70,8 +74,8 @@ class TestStatsExtrapolation(ImpalaTestSuite):
# Test unpartitioned table.
nopart_test_tbl = unique_database + ".alltypesnopart"
self.client.execute("create table {0} as select * from functional.alltypes"\
.format(nopart_test_tbl))
self.client.execute(
"create table {0} as select * from functional.alltypes".format(nopart_test_tbl))
# Clone to use as a baseline. We run the regular COMPUTE STATS on this table.
nopart_test_tbl_base = unique_database + ".alltypesnopart_base"
self.clone_table(nopart_test_tbl, nopart_test_tbl_base, False, vector)
@@ -118,7 +122,7 @@ class TestStatsExtrapolation(ImpalaTestSuite):
def __set_extrapolation_tblprop(self, tbl):
"""Alters the given table to enable stats extrapolation via tblproperty."""
self.client.execute("alter table {0} set "\
self.client.execute("alter table {0} set "
"tblproperties('impala.enable.stats.extrapolation'='true')".format(tbl))
def __run_sampling_test(self, tbl, cols, expected_tbl, perc, seed):
@@ -127,8 +131,9 @@ class TestStatsExtrapolation(ImpalaTestSuite):
the resulting table and column stats are reasonably close to those of
'expected_tbl'."""
self.client.execute("drop stats {0}".format(tbl))
self.client.execute("compute stats {0}{1} tablesample system ({2}) repeatable ({3})"\
.format(tbl, cols, perc, seed))
self.client.execute(
"compute stats {0}{1} tablesample system ({2}) repeatable ({3})".format(
tbl, cols, perc, seed))
self.__check_table_stats(tbl, expected_tbl)
self.__check_column_stats(cols, tbl, expected_tbl)
@@ -139,8 +144,8 @@ class TestStatsExtrapolation(ImpalaTestSuite):
actual = self.client.execute("show table stats {0}".format(tbl))
expected = self.client.execute("show table stats {0}".format(expected_tbl))
assert len(actual.data) == len(expected.data)
assert len(actual.schema.fieldSchemas) == len(expected.schema.fieldSchemas)
col_names = [fs.name.upper() for fs in actual.schema.fieldSchemas]
assert len(actual.column_labels) == len(expected.column_labels)
col_names = actual.column_labels
rows_col_idx = col_names.index("#ROWS")
extrap_rows_col_idx = col_names.index("EXTRAP #ROWS")
for i in range(0, len(actual.data)):
@@ -149,7 +154,7 @@ class TestStatsExtrapolation(ImpalaTestSuite):
assert int(exp_cols[rows_col_idx]) >= 0
# The expected_tbl is expected to have valid extrapolated #rows for every partition.
assert int(act_cols[extrap_rows_col_idx]) >= 0
self.appx_equals(\
self.appx_equals(
int(act_cols[extrap_rows_col_idx]), int(exp_cols[rows_col_idx]), 1.0)
# Only the table-level row count is stored. The partition row counts
# are extrapolated.
@@ -167,8 +172,8 @@ class TestStatsExtrapolation(ImpalaTestSuite):
actual = self.client.execute("show column stats {0}".format(tbl))
expected = self.client.execute("show column stats {0}".format(expected_tbl))
assert len(actual.data) == len(expected.data)
assert len(actual.schema.fieldSchemas) == len(expected.schema.fieldSchemas)
col_names = [fs.name.upper() for fs in actual.schema.fieldSchemas]
assert len(actual.column_labels) == len(expected.column_labels)
col_names = actual.column_labels
ndv_col_idx = col_names.index("#DISTINCT VALUES")
for i in range(0, len(actual.data)):
act_cols = actual.data[i].split("\t")