IMPALA-13075: Cap memory usage for ExprValuesCache at 256KB

ExprValuesCache uses BATCH_SIZE as a deciding factor to set its
capacity. It bounds the capacity such that expr_values_array_ memory
usage stays below 256KB. This patch tightens that limit to include all
memory usage from ExprValuesCache::MemUsage() instead of
expr_values_array_ only. Therefore, setting a very high BATCH_SIZE will
not push the total memory usage of ExprValuesCache beyond 256KB.

Simplify table dimension creation methods and fix few flake8 warnings in
test_dimensions.py.

Testing:
- Add test_join_queries.py::TestExprValueCache.
- Pass core tests.

Change-Id: Iee27cbbe8d3100301d05a6516b62c45975a8d0e0
Reviewed-on: http://gerrit.cloudera.org:8080/21455
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Riza Suminto
2024-05-23 11:19:33 -07:00
committed by Impala Public Jenkins
parent 9f5fbcd841
commit b1320bd1d6
6 changed files with 70 additions and 22 deletions

View File

@@ -366,13 +366,24 @@ Status HashTableCtx::ExprValuesCache::Init(RuntimeState* state, MemTracker* trac
return Status::OK();
}
DCHECK_GT(expr_values_bytes_per_row_, 0);
// Compute the maximum number of cached rows which can fit in the memory budget.
// Compute the maximum number of cached rows which can fit in the memory budget,
// which is 256KB (MAX_EXPR_VALUES_CACHE_BYTES). 'sample_size' is 64 because MemUsage
// account for Bitmap::MemUsage as well, which cost 8 bytes per 64 entries.
// TODO: Find the optimal prefetch batch size. This may be something
// processor dependent so we may need calibration at Impala startup time.
capacity_ = std::max(1, std::min(state->batch_size(),
MAX_EXPR_VALUES_ARRAY_SIZE / expr_values_bytes_per_row_));
const int sample_size = 64;
double mem_per_row =
(double)MemUsage(sample_size, expr_values_bytes_per_row_, num_exprs_) / sample_size;
double max_capacity = (MAX_EXPR_VALUES_CACHE_BYTES - 8) / mem_per_row;
capacity_ = std::max(1, std::min(state->batch_size(), (int)max_capacity));
// TODO: Add 'mem_usage' into minimum reservation of PlanNode that use HashTableCtx?
int mem_usage = MemUsage(capacity_, expr_values_bytes_per_row_, num_exprs_);
if (UNLIKELY(mem_usage > MAX_EXPR_VALUES_CACHE_BYTES)) {
LOG(WARNING) << "HashTableCtx::ExprValuesCache mem_usage (" << mem_usage
<< ") exceed MAX_EXPR_VALUES_CACHE_BYTES ("
<< MAX_EXPR_VALUES_CACHE_BYTES << "). capacity: " << capacity_;
}
if (UNLIKELY(!tracker->TryConsume(mem_usage))) {
capacity_ = 0;
string details = Substitute("HashTableCtx::ExprValuesCache failed to allocate $0 bytes.",

View File

@@ -377,7 +377,7 @@ class HashTableCtx {
}
/// Max amount of memory in bytes for caching evaluated expression values.
static const int MAX_EXPR_VALUES_ARRAY_SIZE = 256 << 10;
static const int MAX_EXPR_VALUES_CACHE_BYTES = 256 << 10;
/// Maximum number of rows of expressions evaluation states which this
/// ExprValuesCache can cache.

View File

@@ -219,6 +219,9 @@ tests/shell/test_file_comments.sql
tests/shell/test_file_no_comments.sql
tests/shell/test_var_substitution.sql
# symlink to testdata/workloads/tpcds/queries
testdata/workloads/tpcds_partitioned/queries
# Generated by Apache-licensed software:
be/src/transport/config.h

View File

@@ -0,0 +1 @@
../tpcds/queries

View File

@@ -45,6 +45,7 @@ FILE_FORMAT_TO_STORED_AS_MAP = {
'json': "JSONFILE",
}
# Describes the configuration used to execute a single tests. Contains both the details
# of what specific table format to target along with the exec options (num_nodes, etc)
# to use when running the query.
@@ -116,39 +117,34 @@ class TableFormatInfo(object):
return '_%s_%s' % (self.file_format, self.compression_codec)
def create_uncompressed_text_dimension(workload):
def create_table_format_dimension(workload, table_format_string):
dataset = get_dataset_from_workload(workload)
return ImpalaTestDimension('table_format',
TableFormatInfo.create_from_string(dataset, 'text/none'))
TableFormatInfo.create_from_string(dataset, table_format_string))
def create_uncompressed_text_dimension(workload):
return create_table_format_dimension(workload, 'text/none')
def create_uncompressed_json_dimension(workload):
dataset = get_dataset_from_workload(workload)
return ImpalaTestDimension('table_format',
TableFormatInfo.create_from_string(dataset, 'json/none'))
return create_table_format_dimension(workload, 'json/none')
def create_parquet_dimension(workload):
dataset = get_dataset_from_workload(workload)
return ImpalaTestDimension('table_format',
TableFormatInfo.create_from_string(dataset, 'parquet/none'))
return create_table_format_dimension(workload, 'parquet/none')
def create_orc_dimension(workload):
dataset = get_dataset_from_workload(workload)
return ImpalaTestDimension('table_format',
TableFormatInfo.create_from_string(dataset, 'orc/def'))
return create_table_format_dimension(workload, 'orc/def')
def create_avro_snappy_dimension(workload):
dataset = get_dataset_from_workload(workload)
return ImpalaTestDimension('table_format',
TableFormatInfo.create_from_string(dataset, 'avro/snap/block'))
return create_table_format_dimension(workload, 'avro/snap/block')
def create_kudu_dimension(workload):
dataset = get_dataset_from_workload(workload)
return ImpalaTestDimension('table_format',
TableFormatInfo.create_from_string(dataset, 'kudu/none'))
return create_table_format_dimension(workload, 'kudu/none')
def create_client_protocol_dimension():
@@ -200,6 +196,7 @@ def orc_schema_resolution_constraint(v):
orc_schema_resolution = v.get_value('orc_schema_resolution')
return file_format == 'orc' or orc_schema_resolution == 0
# Common sets of values for the exec option vectors
ALL_BATCH_SIZES = [0]
@@ -210,6 +207,7 @@ SINGLE_NODE_ONLY = [1]
ALL_NODES_ONLY = [0]
ALL_DISABLE_CODEGEN_OPTIONS = [True, False]
def create_single_exec_option_dimension(num_nodes=0, disable_codegen_rows_threshold=5000):
"""Creates an exec_option dimension that will produce a single test vector"""
return create_exec_option_dimension(cluster_sizes=[num_nodes],
@@ -243,6 +241,7 @@ def create_exec_option_dimension(cluster_sizes=ALL_NODES_ONLY,
exec_option_dimensions['debug_action'] = debug_action_options
return create_exec_option_dimension_from_dict(exec_option_dimensions)
def create_exec_option_dimension_from_dict(exec_option_dimensions):
"""
Builds a query exec option test dimension
@@ -297,11 +296,13 @@ def extend_exec_option_dimension(test_suite, key, value):
dim.extend(new_value)
test_suite.ImpalaTestMatrix.add_dimension(dim)
def get_dataset_from_workload(workload):
# TODO: We need a better way to define the workload -> dataset mapping so we can
# extract it without reading the actual test vector file
return load_table_info_dimension(workload, 'exhaustive')[0].value.dataset
def load_table_info_dimension(workload_name, exploration_strategy, file_formats=None,
compression_codecs=None):
"""Loads test vector corresponding to the given workload and exploration strategy"""
@@ -319,7 +320,7 @@ def load_table_info_dimension(workload_name, exploration_strategy, file_formats=
continue
# Extract each test vector and add them to a dictionary
vals = dict((key.strip(), value.strip()) for key, value in\
vals = dict((key.strip(), value.strip()) for key, value in
(item.split(':') for item in line.split(',')))
# If only loading specific file formats skip anything that doesn't match
@@ -332,6 +333,7 @@ def load_table_info_dimension(workload_name, exploration_strategy, file_formats=
return ImpalaTestDimension('table_format', *vector_values)
def is_supported_insert_format(table_format):
# Returns true if the given table_format is a supported Impala INSERT format
return table_format.compression_codec == 'none' and\

View File

@@ -24,6 +24,11 @@ from copy import deepcopy
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import SkipIf, SkipIfFS
from tests.common.test_vector import ImpalaTestDimension
from tests.common.test_dimensions import (
add_mandatory_exec_option,
create_single_exec_option_dimension,
create_table_format_dimension)
class TestJoinQueries(ImpalaTestSuite):
BATCH_SIZES = [0, 1]
@@ -217,3 +222,29 @@ class TestSpillingHashJoin(ImpalaTestSuite):
self.run_test_case('QueryTest/create-tables-impala-13138', vector, unique_database)
for i in range(0, 5):
self.run_test_case('QueryTest/query-impala-13138', vector, unique_database)
class TestExprValueCache(ImpalaTestSuite):
# Test that HashTableCtx::ExprValueCache memory usage stays under 256KB.
# Run TPC-DS Q97 with bare minimum memory limit, MT_DOP=1, and max BATCH_SIZE.
# Before IMPALA-13075, the test query will pass Planner and Admission Control,
# but later failed during backend execution due to memory limit exceeded.
@classmethod
def get_workload(cls):
return 'tpcds_partitioned'
@classmethod
def add_test_dimensions(cls):
super(TestExprValueCache, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(
create_single_exec_option_dimension())
cls.ImpalaTestMatrix.add_dimension(
create_table_format_dimension(cls.get_workload(), 'parquet/snap/block'))
add_mandatory_exec_option(cls, 'runtime_filter_mode', 'OFF')
add_mandatory_exec_option(cls, 'mem_limit', '149mb')
add_mandatory_exec_option(cls, 'mt_dop', 1)
add_mandatory_exec_option(cls, 'batch_size', 65536)
def test_expr_value_cache_fits(self, vector):
self.run_test_case('tpcds-q97', vector)