Files
impala/tests/query_test/test_tuple_cache_tpc_queries.py
Joe McDonnell ca356a8df5 IMPALA-13437 (part 2): Implement cost-based tuple cache placement
This changes the default behavior of the tuple cache to consider
cost when placing the TupleCacheNodes. It tries to pick the best
locations within a budget. First, it eliminates unprofitable locations
via a threshold. Next, it ranks the remaining locations by their
profitability. Finally, it picks the best locations in rank order
until it reaches the budget.

The threshold is based on the ratio of processing cost for regular
execution versus the processing cost for reading from the cache.
If the ratio is below the threshold, the location is eliminated.
The threshold is specified by the tuple_cache_required_cost_reduction_factor
query option. This defaults to 3.0, which means that the cost of
reading from the cache must be less than 1/3 the cost of computing
the value normally. A higher value makes this more restrictive
about caching locations, which pushes in the direction of lower
overhead.

The ranking is based on the cost reduction per byte. This is given
by the formula:
 (regular processing cost - cost to read from cache) / estimated serialized size
This prefers locations with small results or high reduction in cost.

The budget is based on the estimated serialized size per node. This
limits the total caching that a query will do. A higher value allows more
caching, which can increase the overhead on the first run of a query. A lower
value is less aggressive and can limit the overhead at the expense of less
caching. This uses a per-node limit as the limit should scale based on the
size of the executor group as each executor brings extra capacity. The budget
is specified by the tuple_cache_budget_bytes_per_executor.

The old behavior to place the tuple cache at all eligible locations is
still available via the tuple_cache_placement_policy query option. The
default is the cost_based policy described above, but the old behavior
is available via the all_eligible policy. This is useful for correctness
testing (and the existing tuple cache test cases).

This changes the explain plan output:
 - The hash trace is only enabled at VERBOSE level. This means that the regular
   profile will not contain the hash trace, as the regular profile uses EXTENDED.
 - This adds additional information at VERBOSE to display the cost information
   for each plan node. This can help trace why a particular location was
   not picked.

Testing:
 - This adds a TPC-DS planner test with tuple caching enabled (based on the
   existing TpcdsCpuCostPlannerTest)
 - This modifies existing tests to adapt to changes in the explain plan output

Change-Id: Ifc6e7b95621a7937d892511dc879bf7c8da07cdc
Reviewed-on: http://gerrit.cloudera.org:8080/23219
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-09-18 21:02:51 +00:00

85 lines
3.5 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Functional tests running the TPCH and TPCDS workload twice to test tuple cache.
from __future__ import absolute_import, division, print_function
import pytest
from tests.common.environ import IS_TUPLE_CACHE_CORRECT_CHECK
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import SkipIf
from tests.common.test_dimensions import create_single_exec_option_dimension
from tests.util.test_file_parser import load_tpc_queries_name_sorted
MT_DOP_VALUES = [0, 4]
def run_tuple_cache_test(self, vector, query, mtdop):
vector.get_value('exec_option')['enable_tuple_cache'] = True
# Use a long runtime filter wait time (1 minute) to ensure filters arrive before
# generating the tuple cache for correctness check.
if IS_TUPLE_CACHE_CORRECT_CHECK:
vector.get_value('exec_option')['runtime_filter_wait_time_ms'] = 600000
vector.get_value('exec_option')['enable_tuple_cache_verification'] = True
vector.get_value('exec_option')['tuple_cache_placement_policy'] = 'all_eligible'
vector.get_value('exec_option')['mt_dop'] = mtdop
# Run twice to test write and read the tuple cache.
self.run_test_case(query, vector)
self.run_test_case(query, vector)
@SkipIf.not_tuple_cache
class TestTupleCacheTpchQuery(ImpalaTestSuite):
@classmethod
def get_workload(self):
return 'tpch'
@classmethod
def add_test_dimensions(cls):
super(TestTupleCacheTpchQuery, cls).add_test_dimensions()
if cls.exploration_strategy() != 'exhaustive':
cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format == 'parquet'
and v.get_value('table_format').compression_codec == 'none')
@pytest.mark.parametrize("query", load_tpc_queries_name_sorted('tpch'))
@pytest.mark.parametrize("mtdop", MT_DOP_VALUES)
def test_tpch(self, vector, query, mtdop):
run_tuple_cache_test(self, vector, query, mtdop)
@SkipIf.not_tuple_cache
class TestTupleCacheTpcdsQuery(ImpalaTestSuite):
@classmethod
def get_workload(self):
return 'tpcds'
@classmethod
def add_test_dimensions(cls):
super(TestTupleCacheTpcdsQuery, cls).add_test_dimensions()
if cls.exploration_strategy() != 'exhaustive':
cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format == 'parquet'
and v.get_value('table_format').compression_codec == 'none')
@pytest.mark.parametrize("query", load_tpc_queries_name_sorted('tpcds'))
@pytest.mark.parametrize("mtdop", MT_DOP_VALUES)
def test_tpcds(self, vector, query, mtdop):
run_tuple_cache_test(self, vector, query, mtdop)