IMPALA-12395: Override scan cardinality for optimized count star

The cardinality estimate in HdfsScanNode.java for count queries does not
account for the fact that the count optimization only scans metadata and
not the actual columns. Optimized count star scan will return only 1 row
per parquet row group.

This patch override the scan cardinality with total number of files,
which is the closest estimate to number of row group. Similar override
already exist in IcebergScanNode.java.

Testing:
- Add count query testcases in test_query_cpu_count_divisor_default
- Pass core tests

Change-Id: Id5ce967657208057d50bd80adadac29ebb51cbc5
Reviewed-on: http://gerrit.cloudera.org:8080/20406
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Riza Suminto
2023-08-22 17:51:28 -07:00
committed by Impala Public Jenkins
parent bda3280cb4
commit 0c8fc997ef
4 changed files with 73 additions and 23 deletions

View File

@@ -79,6 +79,11 @@ class TestQueryRetries(CustomClusterTestSuite):
union all
select count(*) from functional.alltypes where bool_col = sleep(50)"""
# A simple count query with predicate. The predicate is needed so that the planner does
# not create the optimized count(star) query plan.
_count_query = "select count(*) from tpch_parquet.lineitem where l_orderkey < 50"
_count_query_result = "55"
@classmethod
def get_workload(cls):
return 'functional-query'
@@ -252,7 +257,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# Kill an impalad, and run a query. The query should be retried.
killed_impalad = self.__kill_random_impalad()
query = "select count(*) from tpch_parquet.lineitem"
query = self._count_query
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true'})
self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)
@@ -264,7 +269,7 @@ class TestQueryRetries(CustomClusterTestSuite):
results = self.client.fetch(query, handle)
assert results.success
assert len(results.data) == 1
assert "6001215" in results.data[0]
assert self._count_query_result in results.data[0]
# The runtime profile of the retried query.
retried_runtime_profile = self.client.get_runtime_profile(handle)
@@ -312,7 +317,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# and the query should be retried. Add delay before admission so that the 2nd node
# is removed from the blacklist before scheduler makes schedule for the retried
# query.
query = "select count(*) from tpch_parquet.lineitem"
query = self._count_query
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true',
'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@18000'})
@@ -325,7 +330,7 @@ class TestQueryRetries(CustomClusterTestSuite):
results = self.client.fetch(query, handle)
assert results.success
assert len(results.data) == 1
assert "6001215" in results.data[0]
assert self._count_query_result in results.data[0]
# The runtime profile of the retried query.
retried_runtime_profile = self.client.get_runtime_profile(handle)
@@ -375,7 +380,7 @@ class TestQueryRetries(CustomClusterTestSuite):
rpc_not_accessible_impalad = self.cluster.impalads[1]
assert rpc_not_accessible_impalad.service.krpc_port == FAILED_KRPC_PORT
query = "select count(*) from tpch_parquet.lineitem"
query = self._count_query
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true',
'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@18000'})
@@ -698,7 +703,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# Kill an impalad, and run a query. The query should be retried.
self.cluster.impalads[1].kill()
query = "select count(*) from tpch_parquet.lineitem"
query = self._count_query
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true'})
self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)
@@ -737,7 +742,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# Kill an impalad, and run a query. The query should be retried.
self.cluster.impalads[1].kill()
query = "select count(*) from tpch_parquet.lineitem"
query = self._count_query
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true'})
self.__wait_until_retry_state(handle, 'RETRYING')
@@ -767,7 +772,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# Kill an impalad, and run a query. The query should be retried.
self.cluster.impalads[1].kill()
query = "select count(*) from tpch_parquet.lineitem"
query = self._count_query
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true'})
@@ -791,7 +796,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# Kill an impalad, and run a query. The query should be retried.
self.cluster.impalads[1].kill()
query = "select count(*) from tpch_parquet.lineitem"
query = self._count_query
self.hs2_client.set_configuration({'retry_failed_queries': 'true'})
self.hs2_client.set_configuration_option('impala.resultset.cache.size', '1024')
self.hs2_client.execute_async(query)
@@ -818,7 +823,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# Kill an impalad, and run a query. The query should be retried.
self.cluster.impalads[1].kill()
query = "select count(*) from tpch_parquet.lineitem"
query = self._count_query
self.execute_query_async(query, query_options={'retry_failed_queries': 'true'})
# The number of in-flight queries is 0 at the beginning, then 1 when the original
# query is submitted. It's 2 when the retried query is registered. Although the retry
@@ -848,7 +853,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# Kill an impalad, and run a query. The query should be retried.
self.cluster.impalads[1].kill()
query = "select count(*) from tpch_parquet.lineitem"
query = self._count_query
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true', 'query_timeout_s': '1'})
self.wait_for_state(handle, self.client.QUERY_STATES['EXCEPTION'], 60)
@@ -887,7 +892,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# Kill an impalad, and run a query. The query should be retried.
self.cluster.impalads[1].kill()
query = "select count(*) from tpch_parquet.lineitem"
query = self._count_query
client = self.cluster.get_first_impalad().service.create_beeswax_client()
client.set_configuration({'retry_failed_queries': 'true'})
handle = client.execute_async(query)
@@ -917,7 +922,7 @@ class TestQueryRetries(CustomClusterTestSuite):
"""Test query retries with the HS2 protocol. Enable the results set cache as well and
test that query retries work with the results cache."""
self.cluster.impalads[1].kill()
query = "select count(*) from tpch_parquet.lineitem"
query = self._count_query
self.hs2_client.set_configuration({'retry_failed_queries': 'true'})
self.hs2_client.set_configuration_option('impala.resultset.cache.size', '1024')
handle = self.hs2_client.execute_async(query)
@@ -926,7 +931,7 @@ class TestQueryRetries(CustomClusterTestSuite):
results = self.hs2_client.fetch(query, handle)
assert results.success
assert len(results.data) == 1
assert int(results.data[0]) == 6001215
assert results.data[0] == self._count_query_result
# Validate the live exec summary.
retried_query_id = \