IMPALA-13864: Implement ImpylaHS2ResultSet.exec_summary

This patch implement building exec summary table for
ImpylaHS2Connection. It adds fetch_exec_summary argument in
ImpalaConnection.execute(). If this argument is True, an exec summary
table will be added into the returned result object.

fetch_exec_summary is also implemented for BeeswaxConnection. Thus,
BeeswaxConnection will not fetch exec summary by default all the time.

Tests that validate exec summary table is updated to set
fetch_exec_summary=True and migrated to test against hs2 protocol.
Change TestExecutorGroup._set_query_options() to do query option setting
through hs2_client iconfig instead of SET query. Some flake8 issues are
addressed as well.

Move build_exec_summary_table to separate exec_summary.py file. Tweak it
a bit to return early if given TExecSummary is empty.

Fixed bug in ImpalaBeeswaxClient.fetch_results() where fetch will not
happen at all if discard_result argument is True.

Testing:
- Run and pass affected tests locally.

Change-Id: I7d88f78e58eeda29ce21e7828884c7a129d7efe6
Reviewed-on: http://gerrit.cloudera.org:8080/22626
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Riza Suminto
2025-03-08 13:28:52 -08:00
committed by Impala Public Jenkins
parent 6f949718f5
commit e73e2d40da
10 changed files with 344 additions and 244 deletions

View File

@@ -987,18 +987,17 @@ class TestExecutorGroups(CustomClusterTestSuite):
exec_group_set_prefix="root.large") == 1
def _set_query_options(self, query_options):
"""Set query options by running it as an SQL statement.
To mimic impala-shell behavior, use self.client.set_configuration() instead.
"""
"""Set query options by setting client configuration."""
for k, v in query_options.items():
self.execute_query_expect_success(self.client, "SET {}='{}'".format(k, v))
self.hs2_client.set_configuration_option(k, v)
def _run_query_and_verify_profile(self, query,
expected_strings_in_profile, not_expected_in_profile=[]):
def _run_query_and_verify_profile(self, query, expected_strings_in_profile,
not_expected_in_profile=[],
fetch_exec_summary=False):
"""Run 'query' and assert existence of 'expected_strings_in_profile' and
nonexistence of 'not_expected_in_profile' in query profile.
Caller is reponsible to close self.client at the end of test."""
result = self.execute_query_expect_success(self.client, query)
Caller is reponsible to close self.hs2_client at the end of test."""
result = self.hs2_client.execute(query, fetch_exec_summary=fetch_exec_summary)
profile = str(result.runtime_profile)
for expected_profile in expected_strings_in_profile:
assert expected_profile in profile, (
@@ -1050,7 +1049,6 @@ class TestExecutorGroups(CustomClusterTestSuite):
def test_query_cpu_count_divisor_default(self, unique_database):
coordinator_test_args = "-gen_experimental_profile=true"
self._setup_three_exec_group_cluster(coordinator_test_args)
self.client.clear_configuration()
# The default query options for this test.
# Some test case will change these options along the test, but should eventually
@@ -1091,7 +1089,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
# Create small table based on tpcds_parquet.store_sales that will be used later
# for COMPUTE STATS test. Forcing large parallelism to speed up CTAS.
# Otherwise, query will go to tiny pool.
self.client.set_configuration({
self._set_query_options({
'REQUEST_POOL': 'root.large',
'PROCESSING_COST_MIN_THREADS': '4'})
self._run_query_and_verify_profile(
@@ -1102,7 +1100,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
["Executor Group: root.large", "ExecutorGroupsConsidered: 1",
"Verdict: query option REQUEST_POOL=root.large is set. "
"Memory and cpu limit checking is skipped."])
self.client.set_configuration({
self._set_query_options({
'REQUEST_POOL': '',
'PROCESSING_COST_MIN_THREADS': ''})
@@ -1128,14 +1126,13 @@ class TestExecutorGroups(CustomClusterTestSuite):
# Test that child queries follow REQUEST_POOL that is set through client
# configuration. Two child queries should all run in root.small.
self.client.set_configuration({'REQUEST_POOL': 'root.small'})
self._set_query_options({'REQUEST_POOL': 'root.small'})
self._run_query_and_verify_profile(compute_stats_query,
["Query Options (set by configuration): REQUEST_POOL=root.small",
"ExecutorGroupsConsidered: 1",
"Verdict: Assign to first group because query is not auto-scalable"],
["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
self._verify_total_admitted_queries("root.small", 2)
self.client.clear_configuration()
# Test that child queries follow REQUEST_POOL that is set through SQL statement.
# Two child queries should all run in root.large.
@@ -1239,7 +1236,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
"ExecutorGroupsConsidered: 1", "AvgAdmissionSlotsPerExecutor: 2",
"CpuAsk: 2", "CpuAskBounded: 2"])
self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': '1'})
result = self.execute_query_expect_failure(self.client, CPU_TEST_QUERY)
result = self.execute_query_expect_failure(self.hs2_client, CPU_TEST_QUERY)
status = (r"PROCESSING_COST_MIN_THREADS \(2\) can not be larger than "
r"MAX_FRAGMENT_INSTANCES_PER_NODE \(1\).")
assert re.search(status, str(result))
@@ -1387,7 +1384,6 @@ class TestExecutorGroups(CustomClusterTestSuite):
def test_query_cpu_count_on_insert(self, unique_database):
coordinator_test_args = "-gen_experimental_profile=true"
self._setup_three_exec_group_cluster(coordinator_test_args)
self.client.clear_configuration()
# The default query options for this test.
# Some test case will change these options along the test, but should eventually
@@ -1404,7 +1400,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
"select id, year from functional_parquet.alltypes"
).format(unique_database, "test_ctas1"),
["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
"Verdict: Match", "CpuAsk: 1", "AvgAdmissionSlotsPerExecutor: 1"])
"Verdict: Match", "CpuAsk: 1", "AvgAdmissionSlotsPerExecutor: 1"],
fetch_exec_summary=True)
self.__verify_fs_writers(result, 1, [0, 1])
# Test unpartitioned insert, small scan, no MAX_FS_WRITER, with limit.
@@ -1414,7 +1411,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
"select id, year from functional_parquet.alltypes limit 100000"
).format(unique_database, "test_ctas2"),
["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
"Verdict: Match", "CpuAsk: 2", "AvgAdmissionSlotsPerExecutor: 2"])
"Verdict: Match", "CpuAsk: 2", "AvgAdmissionSlotsPerExecutor: 2"],
fetch_exec_summary=True)
self.__verify_fs_writers(result, 1, [0, 2])
# Test partitioned insert, small scan, no MAX_FS_WRITER.
@@ -1424,7 +1422,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
"select id, year from functional_parquet.alltypes"
).format(unique_database, "test_ctas3"),
["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
"Verdict: Match", "CpuAsk: 1", "AvgAdmissionSlotsPerExecutor: 1"])
"Verdict: Match", "CpuAsk: 1", "AvgAdmissionSlotsPerExecutor: 1"],
fetch_exec_summary=True)
self.__verify_fs_writers(result, 1, [0, 1])
store_sales_no_part_col = (
@@ -1445,7 +1444,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
("create table {0}.{1} as {2}").format(
unique_database, "test_ctas4", big_select),
["Executor Group: root.small-group", "ExecutorGroupsConsidered: 2",
"Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 2"])
"Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 2"],
fetch_exec_summary=True)
self.__verify_fs_writers(result, 2, [0, 1, 1])
# Test partitioned insert, large scan, no MAX_FS_WRITER.
@@ -1453,7 +1453,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
("create table {0}.{1} partitioned by (ss_store_sk) as {2}").format(
unique_database, "test_ctas5", big_select),
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
"Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 9"])
"Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 9"],
fetch_exec_summary=True)
self.__verify_fs_writers(result, 3, [0, 3, 3, 3])
# Test partitioned insert, large scan, high MAX_FS_WRITER.
@@ -1462,7 +1463,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
("create table {0}.{1} partitioned by (ss_store_sk) as {2}").format(
unique_database, "test_ctas6", big_select),
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
"Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 9"])
"Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 9"],
fetch_exec_summary=True)
self.__verify_fs_writers(result, 3, [0, 3, 3, 3])
# Test partitioned insert, large scan, low MAX_FS_WRITER.
@@ -1471,7 +1473,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
("create table {0}.{1} partitioned by (ss_store_sk) as {2}").format(
unique_database, "test_ctas7", big_select),
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
"Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 8"])
"Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 8"],
fetch_exec_summary=True)
self.__verify_fs_writers(result, 2, [0, 2, 3, 3])
# Test unpartitioned insert overwrite. MAX_FS_WRITER=2.
@@ -1479,7 +1482,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
("insert overwrite {0}.{1} {2}").format(
unique_database, "test_ctas4", big_select),
["Executor Group: root.small-group", "ExecutorGroupsConsidered: 2",
"Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 2"])
"Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 2"],
fetch_exec_summary=True)
self.__verify_fs_writers(result, 2, [0, 1, 1])
# Test partitioned insert overwrite. MAX_FS_WRITER=2.
@@ -1487,7 +1491,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
("insert overwrite {0}.{1} ({2}) partition (ss_store_sk) {3}").format(
unique_database, "test_ctas7", store_sales_no_part_col, big_select),
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
"Verdict: Match", "CpuAsk: 8", "CpuAskBounded: 8"])
"Verdict: Match", "CpuAsk: 8", "CpuAskBounded: 8"],
fetch_exec_summary=True)
self.__verify_fs_writers(result, 2, [0, 2, 3, 3])
# Test unpartitioned insert overwrite. MAX_FS_WRITER=1.
@@ -1496,7 +1501,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
("insert overwrite {0}.{1} {2}").format(
unique_database, "test_ctas4", big_select),
["Executor Group: root.small-group", "ExecutorGroupsConsidered: 2",
"Verdict: Match", "CpuAsk: 7", "CpuAskBounded: 7"])
"Verdict: Match", "CpuAsk: 7", "CpuAskBounded: 7"],
fetch_exec_summary=True)
self.__verify_fs_writers(result, 1, [0, 3, 4])
# Unset MAX_FS_WRITERS.
@@ -1510,7 +1516,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
"where ss_store_sk=1").format(
unique_database, "test_ctas7", store_sales_no_part_col, store_sales_columns),
["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
"Verdict: Match", "CpuAsk: 1", "CpuAskBounded: 1", "| partitions=6"])
"Verdict: Match", "CpuAsk: 1", "CpuAskBounded: 1", "| partitions=6"],
fetch_exec_summary=True)
self.__verify_fs_writers(result, 1, [0, 1])
# END testing insert + MAX_FS_WRITER
@@ -1628,7 +1635,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
self._set_query_options({
'COMPUTE_PROCESSING_COST': 'true',
'SLOT_COUNT_STRATEGY': 'PLANNER_CPU_ASK'})
result = self.execute_query_expect_failure(self.client, CPU_TEST_QUERY)
result = self.execute_query_expect_failure(self.hs2_client, CPU_TEST_QUERY)
assert ("AnalysisException: The query does not fit largest executor group sets. "
"Reason: not enough cpu cores (require=300, max=192).") in str(result)