IMPALA-13437 (part 2): Implement cost-based tuple cache placement

This changes the default behavior of the tuple cache to consider
cost when placing the TupleCacheNodes. It tries to pick the best
locations within a budget. First, it eliminates unprofitable locations
via a threshold. Next, it ranks the remaining locations by their
profitability. Finally, it picks the best locations in rank order
until it reaches the budget.

The threshold is based on the ratio of processing cost for regular
execution versus the processing cost for reading from the cache.
If the ratio is below the threshold, the location is eliminated.
The threshold is specified by the tuple_cache_required_cost_reduction_factor
query option. This defaults to 3.0, which means that the cost of
reading from the cache must be less than 1/3 the cost of computing
the value normally. A higher value makes this more restrictive
about caching locations, which pushes in the direction of lower
overhead.

The ranking is based on the cost reduction per byte. This is given
by the formula:
 (regular processing cost - cost to read from cache) / estimated serialized size
This prefers locations with small results or high reduction in cost.

The budget is based on the estimated serialized size per node. This
limits the total caching that a query will do. A higher value allows more
caching, which can increase the overhead on the first run of a query. A lower
value is less aggressive and can limit the overhead at the expense of less
caching. This uses a per-node limit as the limit should scale based on the
size of the executor group as each executor brings extra capacity. The budget
is specified by the tuple_cache_budget_bytes_per_executor.

The old behavior to place the tuple cache at all eligible locations is
still available via the tuple_cache_placement_policy query option. The
default is the cost_based policy described above, but the old behavior
is available via the all_eligible policy. This is useful for correctness
testing (and the existing tuple cache test cases).

This changes the explain plan output:
 - The hash trace is only enabled at VERBOSE level. This means that the regular
   profile will not contain the hash trace, as the regular profile uses EXTENDED.
 - This adds additional information at VERBOSE to display the cost information
   for each plan node. This can help trace why a particular location was
   not picked.

Testing:
 - This adds a TPC-DS planner test with tuple caching enabled (based on the
   existing TpcdsCpuCostPlannerTest)
 - This modifies existing tests to adapt to changes in the explain plan output

Change-Id: Ifc6e7b95621a7937d892511dc879bf7c8da07cdc
Reviewed-on: http://gerrit.cloudera.org:8080/23219
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Joe McDonnell
2025-07-21 16:48:57 -07:00
parent 3181fe1800
commit ca356a8df5
125 changed files with 140399 additions and 66 deletions

View File

@@ -1405,6 +1405,26 @@ Status impala::SetQueryOption(TImpalaQueryOptions::type option, const string& va
query_options->__set_broadcast_cost_scale_factor(double_val);
break;
}
case TImpalaQueryOptions::TUPLE_CACHE_PLACEMENT_POLICY: {
TTupleCachePlacementPolicy::type enum_type;
RETURN_IF_ERROR(GetThriftEnum(value, "Tuple cache placement policy",
_TTupleCachePlacementPolicy_VALUES_TO_NAMES, &enum_type));
query_options->__set_tuple_cache_placement_policy(enum_type);
break;
}
case TImpalaQueryOptions::TUPLE_CACHE_REQUIRED_COST_REDUCTION_FACTOR: {
double double_val = 0.0f;
RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckInclusiveLowerBound<double>(
option, value, 0.0, &double_val));
query_options->__set_tuple_cache_required_cost_reduction_factor(double_val);
break;
}
case TImpalaQueryOptions::TUPLE_CACHE_BUDGET_BYTES_PER_EXECUTOR: {
MemSpec mem_spec_val{};
RETURN_IF_ERROR(QueryOptionParser::Parse<MemSpec>(option, value, &mem_spec_val));
query_options->__set_tuple_cache_budget_bytes_per_executor(mem_spec_val.value);
break;
}
default:
string key = to_string(option);
if (IsRemovedQueryOption(key)) {

View File

@@ -51,7 +51,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
// plus one. Thus, the second argument to the DCHECK has to be updated every
// time we add or remove a query option to/from the enum TImpalaQueryOptions.
constexpr unsigned NUM_QUERY_OPTIONS =
TImpalaQueryOptions::BROADCAST_COST_SCALE_FACTOR + 1;
TImpalaQueryOptions::TUPLE_CACHE_BUDGET_BYTES_PER_EXECUTOR + 1;
#define QUERY_OPTS_TABLE \
DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(), NUM_QUERY_OPTIONS); \
REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
@@ -380,6 +380,12 @@ constexpr unsigned NUM_QUERY_OPTIONS =
QUERY_OPT_FN(hide_analyzed_query, HIDE_ANALYZED_QUERY, TQueryOptionLevel::ADVANCED) \
QUERY_OPT_FN(broadcast_cost_scale_factor, BROADCAST_COST_SCALE_FACTOR, \
TQueryOptionLevel::ADVANCED) \
TUPLE_CACHE_EXEMPT_QUERY_OPT_FN(tuple_cache_placement_policy, \
TUPLE_CACHE_PLACEMENT_POLICY, TQueryOptionLevel::ADVANCED) \
TUPLE_CACHE_EXEMPT_QUERY_OPT_FN(tuple_cache_required_cost_reduction_factor, \
TUPLE_CACHE_REQUIRED_COST_REDUCTION_FACTOR, TQueryOptionLevel::ADVANCED) \
TUPLE_CACHE_EXEMPT_QUERY_OPT_FN(tuple_cache_budget_bytes_per_executor, \
TUPLE_CACHE_BUDGET_BYTES_PER_EXECUTOR, TQueryOptionLevel::ADVANCED) \
;
/// Enforce practical limits on some query options to avoid undesired query state.

View File

@@ -301,6 +301,21 @@ DEFINE_int32(iceberg_catalog_num_threads, 16,
"Maximum number of threads to use for Iceberg catalog operations. These threads are "
"shared among concurrent Iceberg catalog operation (ie., ExpireSnapshot).");
// These coefficients have not been determined empirically. The write coefficient
// matches the coefficient for a broadcast sender in DataStreamSink. The read
// coefficient matches the coefficient for an exchange receiver in ExchandeNode.
// This includes both a bytes coefficient and a rows coefficient to allow experimentation
// and tuning.
// TODO: Tune these empirically.
DEFINE_double(tuple_cache_cost_coefficient_write_bytes, 0.0027,
"Cost coefficient for writing a byte to the tuple cache.");
DEFINE_double(tuple_cache_cost_coefficient_write_rows, 0.00,
"Cost coefficient for writing a row to the tuple cache.");
DEFINE_double(tuple_cache_cost_coefficient_read_bytes, 0.00,
"Cost coefficient for reading a byte from the tuple cache.");
DEFINE_double(tuple_cache_cost_coefficient_read_rows, 0.1329,
"Cost coefficient for reading a row from the tuple cache.");
using strings::Substitute;
namespace impala {
@@ -316,6 +331,15 @@ static bool ValidatePositiveDouble(const char* flagname, double value) {
return false;
}
static bool ValidateNonnegativeDouble(const char* flagname, double value) {
if (0.0 <= value) {
return true;
}
LOG(ERROR) << Substitute(
"$0 must be greater than or equal to 0.0, value $1 is invalid", flagname, value);
return false;
}
static bool ValidatePositiveInt64(const char* flagname, int64_t value) {
if (0 < value) {
return true;
@@ -333,6 +357,10 @@ DEFINE_validator(query_cpu_count_divisor, &ValidatePositiveDouble);
DEFINE_validator(min_processing_per_thread, &ValidatePositiveInt64);
DEFINE_validator(query_cpu_root_factor, &ValidatePositiveDouble);
DEFINE_validator(iceberg_catalog_num_threads, &ValidatePositiveInt32);
DEFINE_validator(tuple_cache_cost_coefficient_write_bytes, &ValidateNonnegativeDouble);
DEFINE_validator(tuple_cache_cost_coefficient_write_rows, &ValidateNonnegativeDouble);
DEFINE_validator(tuple_cache_cost_coefficient_read_bytes, &ValidateNonnegativeDouble);
DEFINE_validator(tuple_cache_cost_coefficient_read_rows, &ValidateNonnegativeDouble);
Status GetConfigFromCommand(const string& flag_cmd, string& result) {
result.clear();
@@ -552,6 +580,14 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
cfg.__set_warmup_tables_config_file(FLAGS_warmup_tables_config_file);
cfg.__set_keeps_warmup_tables_loaded(FLAGS_keeps_warmup_tables_loaded);
cfg.__set_truncate_external_tables_with_hms(FLAGS_truncate_external_tables_with_hms);
cfg.__set_tuple_cache_cost_coefficient_write_bytes(
FLAGS_tuple_cache_cost_coefficient_write_bytes);
cfg.__set_tuple_cache_cost_coefficient_write_rows(
FLAGS_tuple_cache_cost_coefficient_write_rows);
cfg.__set_tuple_cache_cost_coefficient_read_bytes(
FLAGS_tuple_cache_cost_coefficient_read_bytes);
cfg.__set_tuple_cache_cost_coefficient_read_rows(
FLAGS_tuple_cache_cost_coefficient_read_rows);
return Status::OK();
}