IMPALA-13469: Deflake test_query_cpu_count_on_insert

A new test case from IMPALA-13445 reveals a pre-existing bug where
cost-based planning may increase expectedNumInputInstance greater than
inputFragment.getNumInstances(), which leads to precondition violation.
The following scenario all happened when the Precondition was hit:

1. The environment is either Erasure Coded HDFS or Ozone.
2. The source table does not have stats nor numRows table property.
3. There is only one fragment consisting of a ScanNode in the plan tree
   before the addition of DML fragment.
4. Byte-based cardinality estimation logic kicks in.
5. Byte-based cardinality causes high scan cost, which leads to
   maxScanThread exceeding inputFragment.getPlanRoot().
6. expectedNumInputInstance is assigned equal to maxScanThread.
7. Precondition expectedNumInputInstance < inputFragment.getPlanRoot()
   is violated.

This scenario triggers a special condition that attempts to lower
expectedNumInputInstance. But instead of lowering
expectedNumInputInstance, the special logic increases it due to higher
byte-based cardinality estimation.

There is also a new bug where DistributedPlanner.java mistakenly passes
root.getInputCardinality() instead of root.getCardinality().

This patch fixes both issues and does minor refactoring to change
variable names into camel cases. Relaxed validation of the last test
case of test_query_cpu_count_on_insert to let it pass in Erasure Coded
HDFS and Ozone setup.

Testing:
- Make several assertions in test_executor_groups.py more verbose.
- Pass test_executor_groups.py in Erasure Coded HDFS and Ozone setup.
- Added new Planner tests with unknown cardinality estimation.
- Pass core tests in regular setup.

Change-Id: I834eb6bf896752521e733cd6b77a03f746e6a447
Reviewed-on: http://gerrit.cloudera.org:8080/21966
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Riza Suminto
2024-10-21 20:19:57 -07:00
committed by Impala Public Jenkins
parent c83e5d9769
commit b07bd6ddeb
5 changed files with 161 additions and 43 deletions

View File

@@ -944,27 +944,51 @@ class TestExecutorGroups(CustomClusterTestSuite):
nonexistence of 'not_expected_in_profile' in query profile.
Caller is reponsible to close self.client at the end of test."""
result = self.execute_query_expect_success(self.client, query)
profile = str(result.runtime_profile)
for expected_profile in expected_strings_in_profile:
assert expected_profile in str(result.runtime_profile)
assert expected_profile in profile, (
"Expect '{0}' IN query profile but can not find it.\n{1}".format(
expected_profile, profile
)
)
for not_expected in not_expected_in_profile:
assert not_expected not in str(result.runtime_profile)
assert not_expected not in profile, (
"Expect '{0}' NOT IN query profile but found it.\n{1}".format(
expected_profile, profile
)
)
return result
def __verify_fs_writers(self, result, expected_num_writers,
expected_instances_per_host):
assert 'HDFS WRITER' in result.exec_summary[0]['operator'], result.runtime_profile
num_writers = int(result.exec_summary[0]['num_instances'])
assert num_writers == expected_num_writers
assert num_writers == expected_num_writers, (
"Expect {0} num_writers but got {1}.\n{2}".format(
expected_num_writers, num_writers, result.runtime_profile)
)
num_hosts = len(expected_instances_per_host)
regex = (r'Per Host Number of Fragment Instances:'
+ (num_hosts * r'.*?\((.*?)\)') + r'.*?\n')
instance_count_key = 'Per Host Number of Fragment Instances:'
regex = (instance_count_key + (num_hosts * r'.*?\((.*?)\)') + r'.*?\n')
matches = re.findall(regex, result.runtime_profile)
assert len(matches) == 1
assert len(matches[0]) == num_hosts
assert len(matches) == 1, (
"Expect {0} info string matching '{1}' but got {2}.\n{3}".format(
1, instance_count_key, len(matches), result.runtime_profile
)
)
assert len(matches[0]) == num_hosts, (
"Expect {0} hosts in '{1}' info string but got {2}.\n{3}".format(
num_hosts, instance_count_key, len(matches[0]), result.runtime_profile
)
)
num_instances_per_host = [int(i) for i in matches[0]]
num_instances_per_host.sort()
expected_instances_per_host.sort()
assert num_instances_per_host == expected_instances_per_host
assert num_instances_per_host == expected_instances_per_host, (
"Expect {0} instance distribution but got {1}.\n{2}".format(
expected_instances_per_host, num_instances_per_host, result.runtime_profile
)
)
@UniqueDatabase.parametrize(sync_ddl=True)
@pytest.mark.execute_serially
@@ -1363,25 +1387,28 @@ class TestExecutorGroups(CustomClusterTestSuite):
["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
"Verdict: Match", "CpuAsk: 1", "CpuAskBounded: 1", "| partitions=6"])
self.__verify_fs_writers(result, 1, [0, 1])
# END testing insert + MAX_FS_WRITER
self._verify_query_num_for_resource_pool("root.tiny", 1)
self._verify_query_num_for_resource_pool("root.small", 2)
self._verify_query_num_for_resource_pool("root.large", 1)
self._verify_total_admitted_queries("root.tiny", 4)
self._verify_total_admitted_queries("root.small", 3)
self._verify_total_admitted_queries("root.large", 4)
# Starting from this point, do not validate request pool assignment and fs writers
# distribution, because different target file system may come up with different
# cardinality, cost, and parallelism. Successful query execution is sufficient.
# Test partitioned insert overwrite, with unknown partition estimate.
result = self._run_query_and_verify_profile(
# Cardinality is calculated using byte-based estimation.
self._run_query_and_verify_profile(
("insert overwrite {0}.{1} ({2}) partition (ss_store_sk) "
"select {3} from {0}.{4} "
"where ss_store_sk=1").format(
unique_database, "test_ctas7", store_sales_no_part_col, store_sales_columns,
"test_ctas4"),
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
"Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 9", "| partitions=unavailable"])
self.__verify_fs_writers(result, 3, [0, 3, 3, 3])
# END testing insert + MAX_FS_WRITER
self._verify_query_num_for_resource_pool("root.tiny", 1)
self._verify_query_num_for_resource_pool("root.small", 2)
self._verify_query_num_for_resource_pool("root.large", 2)
self._verify_total_admitted_queries("root.tiny", 4)
self._verify_total_admitted_queries("root.small", 3)
self._verify_total_admitted_queries("root.large", 5)
["| partitions=unavailable"])
@pytest.mark.execute_serially
def test_query_cpu_count_divisor_two(self):