IMPALA-11123: Optimize count(star) for ORC scans

This patch provides count(star) optimization for ORC scans, similar to
the work done in IMPALA-5036 for Parquet scans. We use the stripes num
rows statistics when computing the count star instead of materializing
empty rows. The aggregate function changed from a count to a special sum
function initialized to 0.

This count(star) optimization is disabled for the full ACID table
because the scanner might need to read and validate the
'currentTransaction' column in table's special schema.

This patch drops 'parquet' from names related to the count star
optimization. It also improves the count(star) operation in general by
serving the result just from the file's footer stats for both Parquet
and ORC. We unify the optimized count star and zero slot scan functions
into HdfsColumnarScanner.

The following table shows a performance comparison before and after the
patch. primitive_count_star query target tpch10_parquet.lineitem
table (10GB scale TPC-H). Meanwhile, count_star_parq and count_star_orc
query is a modified primitive_count_star query that targets
tpch_parquet.lineitem and tpch_orc_def.lineitem table accordingly.

+-------------------+----------------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| Workload          | Query                | File Format           | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%)  | Base StdDev(%) | Iters | Median Diff(%) | MW Zval | Tval  |
+-------------------+----------------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| tpch_parquet      | count_star_parq      | parquet / none / none | 0.06   | 0.07        |   -10.45%  |   2.87%    | * 25.51% *     | 9     |   -1.47%       | -1.26   | -1.22 |
| tpch_orc_def      | count_star_orc       | orc / def / none      | 0.06   | 0.08        |   -22.37%  |   6.22%    | * 30.95% *     | 9     |   -1.85%       | -1.16   | -2.14 |
| TARGETED-PERF(10) | primitive_count_star | parquet / none / none | 0.06   | 0.08        | I -30.40%  |   2.68%    | * 29.63% *     | 9     | I -7.20%       | -2.42   | -3.07 |
+-------------------+----------------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+

Testing:
- Add PlannerTest.testOrcStatsAgg
- Add TestAggregationQueries::test_orc_count_star_optimization
- Exercise count(star) in TestOrc::test_misaligned_orc_stripes
- Pass core tests

Change-Id: I0fafa1182f97323aeb9ee39dd4e8ecd418fa6091
Reviewed-on: http://gerrit.cloudera.org:8080/18327
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Riza Suminto
2022-03-14 18:07:05 -07:00
committed by Impala Public Jenkins
parent 85ddd27b64
commit f932d78ad0
31 changed files with 1046 additions and 256 deletions

View File

@@ -76,6 +76,11 @@ class TestQueryRetries(CustomClusterTestSuite):
union all
select count(*) from functional.alltypes where bool_col = sleep(50)"""
# A simple count query with predicate. The predicate is needed so that the planner does
# not create the optimized count(star) query plan.
_count_query = "select count(*) from tpch_parquet.lineitem where l_orderkey < 50"
_count_query_result = "55"
@classmethod
def get_workload(cls):
return 'functional-query'
@@ -246,7 +251,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# Kill an impalad, and run a query. The query should be retried.
killed_impalad = self.__kill_random_impalad()
query = "select count(*) from tpch_parquet.lineitem"
query = self._count_query
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true'})
self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)
@@ -258,7 +263,7 @@ class TestQueryRetries(CustomClusterTestSuite):
results = self.client.fetch(query, handle)
assert results.success
assert len(results.data) == 1
assert "6001215" in results.data[0]
assert self._count_query_result in results.data[0]
# The runtime profile of the retried query.
retried_runtime_profile = self.client.get_runtime_profile(handle)
@@ -306,7 +311,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# and the query should be retried. Add delay before admission so that the 2nd node
# is removed from the blacklist before scheduler makes schedule for the retried
# query.
query = "select count(*) from tpch_parquet.lineitem"
query = self._count_query
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true',
'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@18000'})
@@ -319,7 +324,7 @@ class TestQueryRetries(CustomClusterTestSuite):
results = self.client.fetch(query, handle)
assert results.success
assert len(results.data) == 1
assert "6001215" in results.data[0]
assert self._count_query_result in results.data[0]
# The runtime profile of the retried query.
retried_runtime_profile = self.client.get_runtime_profile(handle)
@@ -370,7 +375,7 @@ class TestQueryRetries(CustomClusterTestSuite):
rpc_not_accessible_impalad = self.cluster.impalads[1]
assert rpc_not_accessible_impalad.service.krpc_port == FAILED_KRPC_PORT
query = "select count(*) from tpch_parquet.lineitem"
query = self._count_query
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true',
'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@18000'})
@@ -692,7 +697,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# Kill an impalad, and run a query. The query should be retried.
self.cluster.impalads[1].kill()
query = "select count(*) from tpch_parquet.lineitem"
query = self._count_query
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true'})
self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)
@@ -730,7 +735,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# Kill an impalad, and run a query. The query should be retried.
self.cluster.impalads[1].kill()
query = "select count(*) from tpch_parquet.lineitem"
query = self._count_query
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true'})
self.__wait_until_retry_state(handle, 'RETRYING')
@@ -759,7 +764,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# Kill an impalad, and run a query. The query should be retried.
self.cluster.impalads[1].kill()
query = "select count(*) from tpch_parquet.lineitem"
query = self._count_query
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true'})
@@ -782,7 +787,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# Kill an impalad, and run a query. The query should be retried.
self.cluster.impalads[1].kill()
query = "select count(*) from tpch_parquet.lineitem"
query = self._count_query
self.hs2_client.set_configuration({'retry_failed_queries': 'true'})
self.hs2_client.set_configuration_option('impala.resultset.cache.size', '1024')
self.hs2_client.execute_async(query)
@@ -808,7 +813,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# Kill an impalad, and run a query. The query should be retried.
self.cluster.impalads[1].kill()
query = "select count(*) from tpch_parquet.lineitem"
query = self._count_query
self.execute_query_async(query, query_options={'retry_failed_queries': 'true'})
# The number of in-flight queries is 0 at the beginning, then 1 when the original
# query is submitted. It's 2 when the retried query is registered. Although the retry
@@ -837,7 +842,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# Kill an impalad, and run a query. The query should be retried.
self.cluster.impalads[1].kill()
query = "select count(*) from tpch_parquet.lineitem"
query = self._count_query
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true', 'query_timeout_s': '1'})
self.wait_for_state(handle, self.client.QUERY_STATES['EXCEPTION'], 60)
@@ -876,7 +881,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# Kill an impalad, and run a query. The query should be retried.
self.cluster.impalads[1].kill()
query = "select count(*) from tpch_parquet.lineitem"
query = self._count_query
client = self.cluster.get_first_impalad().service.create_beeswax_client()
client.set_configuration({'retry_failed_queries': 'true'})
handle = client.execute_async(query)
@@ -906,7 +911,7 @@ class TestQueryRetries(CustomClusterTestSuite):
"""Test query retries with the HS2 protocol. Enable the results set cache as well and
test that query retries work with the results cache."""
self.cluster.impalads[1].kill()
query = "select count(*) from tpch_parquet.lineitem"
query = self._count_query
self.hs2_client.set_configuration({'retry_failed_queries': 'true'})
self.hs2_client.set_configuration_option('impala.resultset.cache.size', '1024')
handle = self.hs2_client.execute_async(query)
@@ -915,7 +920,7 @@ class TestQueryRetries(CustomClusterTestSuite):
results = self.hs2_client.fetch(query, handle)
assert results.success
assert len(results.data) == 1
assert int(results.data[0]) == 6001215
assert results.data[0] == self._count_query_result
# Validate the live exec summary.
retried_query_id = \