Files
impala/tests/custom_cluster/test_tuple_cache.py
Joe McDonnell 78a27c56fe IMPALA-13898: Incorporate partition information into tuple cache keys
Currently, the tuple cache keys do not include partition
information in either the planner key or the fragment instance
key. However, the partition actually is important to correctness.

First, there are settings defined on the table and partition that
can impact the results. For example, for processing text files,
the separator, escape character, etc are specified at the table
level. This impacts the rows produced from a given file. There
are other such settings stored at the partition level (e.g.
the JSON binary format).

Second, it is possible to have two partitions pointed at the same
filesystem location. For example, scale_db.num_partitions_1234_blocks_per_partition_1
is a table that has all partitions pointing to the same
location. In that case, the cache can't tell the partitions
apart based on the files alone. This is an exotic configuration.
Incorporating an identifier of the partition (e.g. the partition
keys/values) allows the cache to tell the difference.

To fix this, we incorporate partition information into the
key. At planning time, when incorporating the scan range information,
we also incorporate information about the associated partitions.
This moves the code to HdfsScanNode and changes it to iterate over
the partitions, hashing both the partition information and the scan
ranges. At runtime, the TupleCacheNode looks up the partition
associated with a scan node and hashes the additional information
on the HdfsPartitionDescriptor.

This includes some test-only changes to make it possible to run the
TestBinaryType::test_json_binary_format test case with tuple caching.
ImpalaTestSuite::_get_table_location() (used by clone_table()) now
detects a fully-qualified table name and extracts the database from it.
It only uses the vector to calculate the database if the table is
not fully qualified. This allows a test to clone a table without
needing to manipulate its vector to match the right database. This
also changes _get_table_location() so that it does not switch into the
database. This required reworking test_scanners_fuzz.py to use absolute
paths for queries. It turns out that some tests in test_scanners_fuzz.py
were running in the wrong database and running against uncorrupted
tables. After this is corrected, some tests can crash Impala. This
xfails those tests until this can be fixed (tracked by IMPALA-14219).

Testing:
 - Added a frontend test in TupleCacheTest for a table with
   multiple partitions pointed at the same place.
 - Added custom cluster tests testing both issues

Change-Id: I3a7109fcf8a30bf915bb566f7d642f8037793a8c
Reviewed-on: http://gerrit.cloudera.org:8080/23074
Reviewed-by: Yida Wu <wydbaggio000@gmail.com>
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Tested-by: Joe McDonnell <joemcdonnell@cloudera.com>
2025-07-17 01:07:44 +00:00

977 lines
44 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
import os
import random
import re
import string
import time
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.skip import SkipIfDockerizedCluster, SkipIf
from tests.common.test_dimensions import (
add_exec_option_dimension, add_mandatory_exec_option)
from tests.util.parse_util import (
match_memory_estimate, parse_mem_to_mb, match_cache_key)
TABLE_LAYOUT = 'name STRING, age INT, address STRING'
CACHE_START_ARGS = "--tuple_cache_dir=/tmp --log_level=2"
NUM_HITS = 'NumTupleCacheHits'
NUM_HALTED = 'NumTupleCacheHalted'
NUM_SKIPPED = 'NumTupleCacheSkipped'
# Indenation used for TUPLE_CACHE_NODE in specific fragments (not averaged fragment).
NODE_INDENT = ' - '
# Generates a random table entry of at least 15 bytes.
def table_value(seed):
r = random.Random(seed)
name = "".join([r.choice(string.ascii_letters) for _ in range(r.randint(5, 20))])
age = r.randint(1, 90)
address = "{0} {1}".format(r.randint(1, 9999),
"".join([r.choice(string.ascii_letters) for _ in range(r.randint(4, 12))]))
return '"{0}", {1}, "{2}"'.format(name, age, address)
def getCounterValues(profile, key):
# This matches lines like these:
# NumTupleCacheHits: 1 (1)
# TupleCacheBytesWritten: 123.00 B (123)
# The regex extracts the value inside the parenthesis to get a simple numeric value
# rather than a pretty print of the same value.
counter_str_list = re.findall(r"{0}{1}: .* \((.*)\)".format(NODE_INDENT, key), profile)
return [int(v) for v in counter_str_list]
def assertCounterOrder(profile, key, vals):
values = getCounterValues(profile, key)
assert values == vals, values
def assertCounter(profile, key, val, num_matches):
if not isinstance(num_matches, list):
num_matches = [num_matches]
values = getCounterValues(profile, key)
assert len([v for v in values if v == val]) in num_matches, values
def assertCounters(profile, num_hits, num_halted, num_skipped, num_matches=1):
assertCounter(profile, NUM_HITS, num_hits, num_matches)
assertCounter(profile, NUM_HALTED, num_halted, num_matches)
assertCounter(profile, NUM_SKIPPED, num_skipped, num_matches)
def get_cache_keys(profile):
cache_keys = {}
last_node_id = -1
matcher = re.compile(r'TUPLE_CACHE_NODE \(id=([0-9]*)\)')
for line in profile.splitlines():
if "Combined Key:" in line:
key = line.split(":")[1].strip()
cache_keys[last_node_id].append(key)
continue
match = matcher.search(line)
if match:
last_node_id = int(match.group(1))
if last_node_id not in cache_keys:
cache_keys[last_node_id] = []
# Sort cache keys: with multiple nodes, order in the profile may change.
for _, val in cache_keys.items():
val.sort()
return next(iter(cache_keys.values())) if len(cache_keys) == 1 else cache_keys
def assert_deterministic_scan(vector, profile):
if vector.get_value('exec_option')['mt_dop'] > 0:
assert "deterministic scan range assignment: true" in profile
class TestTupleCacheBase(CustomClusterTestSuite):
@classmethod
def setup_class(cls):
super(TestTupleCacheBase, cls).setup_class()
# Unset this environment variable to ensure it doesn't affect
# the test like test_cache_disabled.
cls.org_tuple_cache_dir = os.getenv("TUPLE_CACHE_DIR")
if cls.org_tuple_cache_dir is not None:
os.unsetenv("TUPLE_CACHE_DIR")
@classmethod
def teardown_class(cls):
if cls.org_tuple_cache_dir is not None:
os.environ["TUPLE_CACHE_DIR"] = cls.org_tuple_cache_dir
super(TestTupleCacheBase, cls).teardown_class()
@classmethod
def add_test_dimensions(cls):
super(TestTupleCacheBase, cls).add_test_dimensions()
add_mandatory_exec_option(cls, 'enable_tuple_cache', 'true')
# Generates a table containing at least <scale> KB of data.
def create_table(self, fq_table, scale=1):
self.execute_query("CREATE TABLE {0} ({1})".format(fq_table, TABLE_LAYOUT))
# To make the rows distinct, we keep using a different seed for table_value
global_index = 0
for _ in range(scale):
values = [table_value(i) for i in range(global_index, global_index + 70)]
self.execute_query("INSERT INTO {0} VALUES ({1})".format(
fq_table, "), (".join(values)))
global_index += 70
# Helper function to get a tuple cache metric from a single impalad.
def get_tuple_cache_metric(self, impalaservice, suffix):
return impalaservice.get_metric_value('impala.tuple-cache.' + suffix)
class TestTupleCacheOptions(TestTupleCacheBase):
"""Tests Impala with different tuple cache startup options."""
@classmethod
def add_test_dimensions(cls):
super(TestTupleCacheOptions, cls).add_test_dimensions()
add_mandatory_exec_option(cls, 'mt_dop', 1)
@CustomClusterTestSuite.with_args(cluster_size=1)
def test_cache_disabled(self, vector, unique_database):
self.client.set_configuration(vector.get_value('exec_option'))
fq_table = "{0}.cache_disabled".format(unique_database)
self.create_table(fq_table)
result1 = self.execute_query("SELECT * from {0}".format(fq_table))
result2 = self.execute_query("SELECT * from {0}".format(fq_table))
assert result1.success
assert result2.success
assert result1.data == result2.data
assertCounters(result1.runtime_profile, num_hits=0, num_halted=0, num_skipped=1)
assertCounters(result2.runtime_profile, num_hits=0, num_halted=0, num_skipped=1)
@CustomClusterTestSuite.with_args(
start_args=CACHE_START_ARGS + " --tuple_cache_capacity=64MB", cluster_size=1,
impalad_args="--cache_force_single_shard")
def test_cache_halted_select(self, vector):
# The cache is set to the minimum cache size, so run a SQL that produces enough
# data to exceed the cache size and halt caching.
self.client.set_configuration(vector.get_value('exec_option'))
big_enough_query = "SELECT o_comment from tpch.orders"
result1 = self.execute_query(big_enough_query)
result2 = self.execute_query(big_enough_query)
assert result1.success
assert result2.success
assert result1.data == result2.data
assertCounters(result1.runtime_profile, num_hits=0, num_halted=1, num_skipped=0)
bytes_written = getCounterValues(result1.runtime_profile, "TupleCacheBytesWritten")
# This is running on a single node, so there should be a single location where
# TupleCacheBytesWritten exceeds 0.
assert len([v for v in bytes_written if v > 0]) == 1
assertCounters(result2.runtime_profile, num_hits=0, num_halted=0, num_skipped=1)
@CustomClusterTestSuite.with_args(
start_args=CACHE_START_ARGS, cluster_size=1,
impalad_args="--tuple_cache_ignore_query_options=true")
def test_failpoints(self, vector, unique_database):
fq_table = "{0}.failpoints".format(unique_database)
# Scale 20 gets us enough rows to force multiple RowBatches (needed for the
# the reader GetNext() cases).
self.create_table(fq_table, scale=20)
query = "SELECT * from {0}".format(fq_table)
def execute_debug(query, action):
exec_options = dict(vector.get_value('exec_option'))
exec_options['debug_action'] = action
return self.execute_query(query, exec_options)
# Fail when writing cache entry. All of these are handled and will not fail the
# query.
# Case 1: fail during Open()
result = execute_debug(query, "TUPLE_FILE_WRITER_OPEN:FAIL@1.0")
assert result.success
assertCounters(result.runtime_profile, num_hits=0, num_halted=0, num_skipped=1)
# Case 2: fail during Write()
result = execute_debug(query, "TUPLE_FILE_WRITER_WRITE:FAIL@1.0")
assert result.success
assertCounters(result.runtime_profile, num_hits=0, num_halted=0, num_skipped=0)
# Case 3: fail during Commit()
result = execute_debug(query, "TUPLE_FILE_WRITER_COMMIT:FAIL@1.0")
assert result.success
assertCounters(result.runtime_profile, num_hits=0, num_halted=0, num_skipped=0)
# Now, successfully add a cache entry
result1 = self.execute_query(query, vector.get_value('exec_option'))
assert result1.success
assertCounters(result1.runtime_profile, num_hits=0, num_halted=0, num_skipped=0)
# Fail when reading a cache entry
# Case 1: fail during Open()
result = execute_debug(query, "TUPLE_FILE_READER_OPEN:FAIL@1.0")
assert result.success
# Do an unordered compare (the rows are unique)
assert set(result.data) == set(result1.data)
# Not a hit
assertCounters(result.runtime_profile, num_hits=0, num_halted=0, num_skipped=1)
# Case 2: fail during the first GetNext() call
result = execute_debug(query, "TUPLE_FILE_READER_FIRST_GETNEXT:FAIL@1.0")
assert result.success
# Do an unordered compare (the rows are unique)
assert set(result.data) == set(result1.data)
# Technically, this is a hit
assertCounters(result.runtime_profile, num_hits=1, num_halted=0, num_skipped=0)
# Case 3: fail during the second GetNext() call
# This one must fail for correctness, as it cannot fall back to the child if it
# has already returned cached rows
hit_error = False
try:
result = execute_debug(query, "TUPLE_FILE_READER_SECOND_GETNEXT:FAIL@1.0")
except Exception:
hit_error = True
assert hit_error
@CustomClusterTestSuite.with_args(
start_args=CACHE_START_ARGS, cluster_size=1,
impalad_args='--tuple_cache_exempt_query_options=max_errors,exec_time_limit_s')
def test_custom_exempt_query_options(self, vector, unique_database):
"""Custom list of exempt query options share cache entry"""
fq_table = "{0}.query_options".format(unique_database)
self.create_table(fq_table)
query = "SELECT * from {0}".format(fq_table)
errors_10 = dict(vector.get_value('exec_option'))
errors_10['max_errors'] = '10'
exec_time_limit = dict(vector.get_value('exec_option'))
exec_time_limit['exec_time_limit_s'] = '30'
exempt1 = self.execute_query(query, query_options=errors_10)
exempt2 = self.execute_query(query, query_options=exec_time_limit)
exempt3 = self.execute_query(query, query_options=vector.get_value('exec_option'))
assert exempt1.success
assert exempt2.success
assert exempt1.data == exempt2.data
assert exempt1.data == exempt3.data
assertCounters(exempt1.runtime_profile, num_hits=0, num_halted=0, num_skipped=0)
assertCounters(exempt2.runtime_profile, num_hits=1, num_halted=0, num_skipped=0)
assertCounters(exempt3.runtime_profile, num_hits=1, num_halted=0, num_skipped=0)
@CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS, cluster_size=1)
class TestTupleCacheSingle(TestTupleCacheBase):
"""Tests Impala with a single executor and mt_dop=1."""
@classmethod
def add_test_dimensions(cls):
super(TestTupleCacheSingle, cls).add_test_dimensions()
add_mandatory_exec_option(cls, 'mt_dop', 1)
def test_create_and_select(self, vector, unique_database):
self.client.set_configuration(vector.get_value('exec_option'))
fq_table = "{0}.create_and_select".format(unique_database)
self.create_table(fq_table)
result1 = self.execute_query("SELECT * from {0}".format(fq_table))
result2 = self.execute_query("SELECT * from {0}".format(fq_table))
assert result1.success
assert result2.success
assert result1.data == result2.data
assertCounters(result1.runtime_profile, num_hits=0, num_halted=0, num_skipped=0)
assertCounters(result2.runtime_profile, num_hits=1, num_halted=0, num_skipped=0)
# Verify that the bytes written by the first profile are the same as the bytes
# read by the second profile.
bytes_written = getCounterValues(result1.runtime_profile, "TupleCacheBytesWritten")
bytes_read = getCounterValues(result2.runtime_profile, "TupleCacheBytesRead")
assert sorted(bytes_written) == sorted(bytes_read)
def test_non_exempt_query_options(self, vector, unique_database):
"""Non-exempt query options result in different cache entries"""
fq_table = "{0}.query_options".format(unique_database)
self.create_table(fq_table)
query = "SELECT * from {0}".format(fq_table)
strict_true = dict(vector.get_value('exec_option'))
strict_true['strict_mode'] = 'true'
strict_false = dict(vector.get_value('exec_option'))
strict_false['strict_mode'] = 'false'
noexempt1 = self.execute_query(query, query_options=strict_false)
noexempt2 = self.execute_query(query, query_options=strict_true)
noexempt3 = self.execute_query(query, query_options=strict_false)
noexempt4 = self.execute_query(query, query_options=strict_true)
noexempt5 = self.execute_query(query, query_options=vector.get_value('exec_option'))
assert noexempt1.success
assert noexempt2.success
assert noexempt3.success
assert noexempt4.success
assert noexempt5.success
assert noexempt1.data == noexempt2.data
assert noexempt1.data == noexempt3.data
assert noexempt1.data == noexempt4.data
assert noexempt1.data == noexempt5.data
assertCounters(noexempt1.runtime_profile, num_hits=0, num_halted=0, num_skipped=0)
assertCounters(noexempt2.runtime_profile, num_hits=0, num_halted=0, num_skipped=0)
assertCounters(noexempt3.runtime_profile, num_hits=1, num_halted=0, num_skipped=0)
assertCounters(noexempt4.runtime_profile, num_hits=1, num_halted=0, num_skipped=0)
assertCounters(noexempt5.runtime_profile, num_hits=1, num_halted=0, num_skipped=0)
def test_exempt_query_options(self, vector, unique_database):
"""Exempt query options share cache entry"""
fq_table = "{0}.query_options".format(unique_database)
self.create_table(fq_table)
query = "SELECT * from {0}".format(fq_table)
codegen_false = dict(vector.get_value('exec_option'))
codegen_false['disable_codegen'] = 'true'
codegen_true = dict(vector.get_value('exec_option'))
codegen_true['disable_codegen'] = 'false'
exempt1 = self.execute_query(query, query_options=codegen_true)
exempt2 = self.execute_query(query, query_options=codegen_false)
exempt3 = self.execute_query(query, query_options=vector.get_value('exec_option'))
assert exempt1.success
assert exempt2.success
assert exempt1.data == exempt2.data
assert exempt1.data == exempt3.data
assertCounters(exempt1.runtime_profile, num_hits=0, num_halted=0, num_skipped=0)
assertCounters(exempt2.runtime_profile, num_hits=1, num_halted=0, num_skipped=0)
assertCounters(exempt3.runtime_profile, num_hits=1, num_halted=0, num_skipped=0)
def test_aggregate(self, vector, unique_database):
"""Simple aggregation can be cached"""
self.client.set_configuration(vector.get_value('exec_option'))
fq_table = "{0}.agg".format(unique_database)
self.create_table(fq_table)
result1 = self.execute_query("SELECT sum(age) FROM {0}".format(fq_table))
result2 = self.execute_query("SELECT sum(age) FROM {0}".format(fq_table))
assert result1.success
assert result2.success
assert result1.data == result2.data
assertCounters(result1.runtime_profile, 0, 0, 0, num_matches=2)
# Aggregate should hit, and scan node below it will miss.
assertCounterOrder(result2.runtime_profile, NUM_HITS, [1, 0])
assertCounter(result2.runtime_profile, NUM_HALTED, 0, num_matches=2)
assertCounter(result2.runtime_profile, NUM_SKIPPED, 0, num_matches=2)
# Verify that the bytes written by the first profile are the same as the bytes
# read by the second profile.
bytes_written = getCounterValues(result1.runtime_profile, "TupleCacheBytesWritten")
bytes_read = getCounterValues(result2.runtime_profile, "TupleCacheBytesRead")
assert len(bytes_written) == 2
assert len(bytes_read) == 1
assert bytes_written[0] == bytes_read[0]
def test_aggregate_reuse(self, vector):
"""Cached aggregation can be re-used"""
self.client.set_configuration(vector.get_value('exec_option'))
result = self.execute_query("SELECT sum(int_col) FROM functional.alltypes")
assert result.success
assertCounters(result.runtime_profile, 0, 0, 0, num_matches=2)
result_scan = self.execute_query("SELECT avg(int_col) FROM functional.alltypes")
assert result_scan.success
assertCounterOrder(result_scan.runtime_profile, NUM_HITS, [0, 1])
result_agg = self.execute_query(
"SELECT avg(a) FROM (SELECT sum(int_col) as a FROM functional.alltypes) b")
assert result_agg.success
assertCounterOrder(result_agg.runtime_profile, NUM_HITS, [1, 0])
def test_parquet_resolution_by_name(self, vector, unique_database):
"""Verify that parquet_fallback_schema_resolution=NAME works with tuple caching"""
self.run_test_case('QueryTest/parquet-resolution-by-name', vector,
use_db=unique_database)
def test_partition_information(self, vector):
"""Verify that partition information is incorporated into the runtime cache key"""
self.client.set_configuration(vector.get_value('exec_option'))
# scale_db.num_partitions_1234_blocks_per_partition_1 is an exotic table where all
# the partitions point to the same filesystem location. A single file is read many
# times for different partitions. It is not possible to tell the partitions apart
# by the file path, so this verifies that the partition information is being included
# properly.
query_template = \
"select i, j from scale_db.num_partitions_1234_blocks_per_partition_1 where j={0}"
# Run against the j=1 partition
result1 = self.execute_query(query_template.format(1))
assert result1.success
assertCounters(result1.runtime_profile, 0, 0, 0)
assert len(result1.data) == 1
assert result1.data[0].split("\t") == ["1", "1"]
# Run against the j=2 partition. There should not be a cache hit, because they are
# running against different partitions. This only works if the runtime key
# incorporates the partition information.
result2 = self.execute_query(query_template.format(2))
assert result2.success
assertCounters(result2.runtime_profile, 0, 0, 0)
assert len(result2.data) == 1
assert result2.data[0].split("\t") == ["1", "2"]
def test_json_binary_format(self, vector, unique_database):
"""This is identical to test_scanners.py's TestBinaryType::test_json_binary_format.
That test modifies a table's serde properties to change the json binary format.
The tuple cache detects that by including the partition's storage descriptor
information. This fails if that doesn't happen."""
test_tbl = unique_database + '.binary_tbl'
self.clone_table('functional_json.binary_tbl', test_tbl, False, vector)
self.run_test_case('QueryTest/json-binary-format', vector, unique_database)
@CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS)
class TestTupleCacheCluster(TestTupleCacheBase):
"""Tests Impala with 3 executors and mt_dop=1."""
@classmethod
def add_test_dimensions(cls):
super(TestTupleCacheCluster, cls).add_test_dimensions()
add_mandatory_exec_option(cls, 'mt_dop', 1)
def test_runtime_filters(self, vector, unique_database):
"""
This tests that adding files to a table results in different runtime filter keys.
"""
self.client.set_configuration(vector.get_value('exec_option'))
fq_table = "{0}.runtime_filters".format(unique_database)
# A query containing multiple runtime filters
# - scan of A receives runtime filters from B and C, so it depends on contents of B/C
# - scan of B receives runtime filter from C, so it depends on contents of C
query = "select straight_join a.id from functional.alltypes a, functional.alltypes" \
" b, {0} c where a.id = b.id and a.id = c.age order by a.id".format(fq_table)
query_a_id = 10
query_b_id = 11
query_c_id = 12
# Create an empty table
self.create_table(fq_table, scale=0)
# Establish a baseline
empty_result = self.execute_query(query)
empty_cache_keys = get_cache_keys(empty_result.runtime_profile)
# Tables a and b have multiple files, so they are distributed across all 3 nodes.
# Table c has one file, so it has a single entry.
assert len(empty_cache_keys) == 3
assert len(empty_cache_keys[query_c_id]) == 1
empty_c_compile_key, empty_c_finst_key = empty_cache_keys[query_c_id][0].split("_")
assert empty_c_finst_key == "0"
assert len(empty_result.data) == 0
# Insert a row, which creates a file / scan range
self.execute_query("INSERT INTO {0} VALUES ({1})".format(fq_table, table_value(0)))
# Now, there is a scan range, so the fragment instance key should be non-zero.
one_file_result = self.execute_query(query)
one_cache_keys = get_cache_keys(one_file_result.runtime_profile)
assert len(one_cache_keys) == 3
assert len(empty_cache_keys[query_c_id]) == 1
one_c_compile_key, one_c_finst_key = one_cache_keys[query_c_id][0].split("_")
assert one_c_finst_key != "0"
# This should be a cache miss
assertCounters(one_file_result.runtime_profile, 0, 0, 0, 7)
assert len(one_file_result.data) == 1
# The new scan range did not change the compile-time key, but did change the runtime
# filter keys.
for id in [query_a_id, query_b_id]:
assert len(empty_cache_keys[id]) == len(one_cache_keys[id])
for empty, one in zip(empty_cache_keys[id], one_cache_keys[id]):
assert empty != one
assert empty_c_compile_key == one_c_compile_key
# Insert another row, which creates a file / scan range
self.execute_query("INSERT INTO {0} VALUES ({1})".format(fq_table, table_value(1)))
# There is a second scan range, so the fragment instance key should change again
two_files_result = self.execute_query(query)
two_cache_keys = get_cache_keys(two_files_result.runtime_profile)
assert len(two_cache_keys) == 3
assert len(two_cache_keys[query_c_id]) == 2
two_c1_compile_key, two_c1_finst_key = two_cache_keys[query_c_id][0].split("_")
two_c2_compile_key, two_c2_finst_key = two_cache_keys[query_c_id][1].split("_")
assert two_c1_finst_key != "0"
assert two_c2_finst_key != "0"
# There may be a cache hit for the prior "c" scan range (if scheduled to the same
# instance), and the rest cache misses.
assertCounter(two_files_result.runtime_profile, NUM_HITS, 0, num_matches=[7, 8])
assertCounter(two_files_result.runtime_profile, NUM_HITS, 1, num_matches=[0, 1])
assertCounter(two_files_result.runtime_profile, NUM_HALTED, 0, num_matches=8)
assertCounter(two_files_result.runtime_profile, NUM_SKIPPED, 0, num_matches=8)
assert len(two_files_result.data) == 2
# Ordering can vary by environment. Ensure one matches and one differs.
assert one_c_finst_key == two_c1_finst_key or one_c_finst_key == two_c2_finst_key
assert one_c_finst_key != two_c1_finst_key or one_c_finst_key != two_c2_finst_key
overlapping_rows = set(one_file_result.data).intersection(set(two_files_result.data))
assert len(overlapping_rows) == 1
# The new scan range did not change the compile-time key, but did change the runtime
# filter keys.
for id in [query_a_id, query_b_id]:
assert len(empty_cache_keys[id]) == len(one_cache_keys[id])
for empty, one in zip(empty_cache_keys[id], one_cache_keys[id]):
assert empty != one
assert one_c_compile_key == two_c1_compile_key
assert one_c_compile_key == two_c2_compile_key
# Invalidate metadata and rerun the last query. The keys should stay the same.
self.execute_query("invalidate metadata")
rerun_two_files_result = self.execute_query(query)
# Verify that this is a cache hit
assertCounters(rerun_two_files_result.runtime_profile, 1, 0, 0, num_matches=8)
rerun_cache_keys = get_cache_keys(rerun_two_files_result.runtime_profile)
assert rerun_cache_keys == two_cache_keys
assert rerun_two_files_result.data == two_files_result.data
def test_runtime_filter_reload(self, vector, unique_database):
"""
This tests that reloading files to a table results in matching runtime filter keys.
"""
self.client.set_configuration(vector.get_value('exec_option'))
fq_table = "{0}.runtime_filter_genspec".format(unique_database)
# Query where fq_table generates a runtime filter.
query = "select straight_join a.id from functional.alltypes a, {0} b " \
"where a.id = b.age order by a.id".format(fq_table)
# Create a partitioned table with 3 partitions
self.execute_query("CREATE EXTERNAL TABLE {0} (name STRING) "
"PARTITIONED BY (age INT)".format(fq_table))
self.execute_query(
"INSERT INTO {0} PARTITION(age=4) VALUES (\"Vanessa\")".format(fq_table))
self.execute_query(
"INSERT INTO {0} PARTITION(age=5) VALUES (\"Carl\")".format(fq_table))
self.execute_query(
"INSERT INTO {0} PARTITION(age=6) VALUES (\"Cleopatra\")".format(fq_table))
# Prime the cache
base_result = self.execute_query(query)
base_cache_keys = get_cache_keys(base_result.runtime_profile)
assert len(base_cache_keys) == 3
# Drop and reload the table
self.execute_query("DROP TABLE {0}".format(fq_table))
self.execute_query("CREATE EXTERNAL TABLE {0} (name STRING, address STRING) "
"PARTITIONED BY (age INT)".format(fq_table))
self.execute_query("ALTER TABLE {0} RECOVER PARTITIONS".format(fq_table))
# Verify we reuse the cache
reload_result = self.execute_query(query)
reload_cache_keys = get_cache_keys(reload_result.runtime_profile)
assert base_result.data == reload_result.data
assert base_cache_keys == reload_cache_keys
# Skips verifying cache hits as fragments may not be assigned to the same nodes.
def test_join_modifications(self, vector, unique_database):
"""
This tests caching above a join without runtime filters and verifies that changes
to the build side table results in a different cache key.
"""
fq_table = "{0}.join_modifications".format(unique_database)
query = "select straight_join probe.id from functional.alltypes probe join " \
"/* +broadcast */ {0} build on (probe.id = build.age) ".format(fq_table) + \
"order by probe.id"
# Create an empty table
self.create_table(fq_table, scale=0)
probe_id = 6
build_id = 7
above_join_id = 8
# Run without runtime filters to verify the regular path works
no_runtime_filters = dict(vector.get_value('exec_option'))
no_runtime_filters['runtime_filter_mode'] = 'off'
# Establish a baseline
empty_result = self.execute_query(query, no_runtime_filters)
empty_cache_keys = get_cache_keys(empty_result.runtime_profile)
# The build side is on one node. The probe side is on three nodes.
assert len(empty_cache_keys) == 3
assert len(empty_cache_keys[probe_id]) == 3
assert len(empty_cache_keys[build_id]) == 1
assert len(empty_cache_keys[above_join_id]) == 3
empty_build_key = empty_cache_keys[build_id][0]
empty_build_compile_key, empty_build_finst_key = empty_build_key.split("_")
assert empty_build_finst_key == "0"
assert len(empty_result.data) == 0
empty_join_compile_key = empty_cache_keys[above_join_id][0].split("_")[0]
# Insert a row, which creates a file / scan range
self.execute_query("INSERT INTO {0} VALUES ({1})".format(fq_table, table_value(0)))
# There is a build-side scan range, so the fragment instance key should be non-zero.
one_file_result = self.execute_query(query, no_runtime_filters)
assert len(one_file_result.data) == 1
one_cache_keys = get_cache_keys(one_file_result.runtime_profile)
assert len(one_cache_keys) == 3
assert len(one_cache_keys[probe_id]) == 3
assert len(one_cache_keys[build_id]) == 1
assert len(one_cache_keys[above_join_id]) == 3
one_build_key = one_cache_keys[build_id][0]
one_build_compile_key, one_build_finst_key = one_build_key.split("_")
assert one_build_finst_key != "0"
assert one_build_compile_key == empty_build_compile_key
# This should be a cache miss for the build side and above the join, but a cache
# hit for the probe side (3 instances).
assertCounter(one_file_result.runtime_profile, NUM_HITS, 1, 3)
assertCounter(one_file_result.runtime_profile, NUM_HALTED, 0, 7)
assertCounter(one_file_result.runtime_profile, NUM_SKIPPED, 0, 7)
# The above join compile time key should have changed, because it incorporates the
# build side scan ranges.
one_join_compile_key = one_cache_keys[above_join_id][0].split("_")[0]
assert one_join_compile_key != empty_join_compile_key
def test_join_timing(self, vector):
"""
This verifies that a very short query with a cache hit above a join can complete
below a certain threshold. This should be sensitive to issues with synchronization
with the shared join builder.
"""
query = "select straight_join probe.id from functional.alltypes probe join " \
"/* +broadcast */ functional.alltypes build on (probe.id = build.id) " \
"order by probe.id"
# To avoid interaction with cache entries from previous tests, set an unrelated
# query option to keep the key different.
custom_options = dict(vector.get_value('exec_option'))
custom_options['batch_size'] = '1234'
first_run_result = self.execute_query(query, custom_options)
assert len(first_run_result.data) == 7300
assertCounter(first_run_result.runtime_profile, NUM_HITS, 0, 9)
assertCounter(first_run_result.runtime_profile, NUM_HALTED, 0, 9)
assertCounter(first_run_result.runtime_profile, NUM_SKIPPED, 0, 9)
start_time = time.time()
second_run_result = self.execute_query(query, custom_options)
end_time = time.time()
# The location above the join hits and the location on the build side hits,
# but the probe location is below the join and doesn't hit.
assertCounter(second_run_result.runtime_profile, NUM_HITS, 1, 6)
assertCounter(second_run_result.runtime_profile, NUM_HALTED, 0, 9)
assertCounter(second_run_result.runtime_profile, NUM_SKIPPED, 0, 9)
# As a sanity check for the synchronization pieces, verify that this runs in less
# than 750 milliseconds.
assert end_time - start_time < 0.75
@CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS, cluster_size=1)
class TestTupleCacheRuntimeKeysBasic(TestTupleCacheBase):
"""Simpler tests that run on a single node with mt_dop=0 or mt_dop=1."""
@classmethod
def add_test_dimensions(cls):
super(TestTupleCacheRuntimeKeysBasic, cls).add_test_dimensions()
add_exec_option_dimension(cls, 'mt_dop', [0, 1])
def test_scan_range_basics(self, vector, unique_database):
"""
This tests that adding/removing files to a table results in different keys.
"""
self.client.set_configuration(vector.get_value('exec_option'))
fq_table = "{0}.scan_range_basics".format(unique_database)
query = "SELECT * from {0}".format(fq_table)
# Create an empty table
self.create_table(fq_table, scale=0)
# When there are no scan ranges, then fragment instance key is 0. This is
# somewhat of a toy case and we probably want to avoid caching in this
# case. Nonetheless, it is a good sanity check.
empty_result = self.execute_query(query)
cache_keys = get_cache_keys(empty_result.runtime_profile)
assert len(cache_keys) == 1
empty_table_compile_key, empty_table_finst_key = cache_keys[0].split("_")
assert empty_table_finst_key == "0"
assert len(empty_result.data) == 0
assert_deterministic_scan(vector, empty_result.runtime_profile)
# Insert a row, which creates a file / scan range
self.execute_query("INSERT INTO {0} VALUES ({1})".format(
fq_table, table_value(0)))
# Now, there is a scan range, so the fragment instance key should be non-zero.
one_file_result = self.execute_query(query)
cache_keys = get_cache_keys(one_file_result.runtime_profile)
assert len(cache_keys) == 1
one_file_compile_key, one_file_finst_key = cache_keys[0].split("_")
assert one_file_finst_key != "0"
# This should be a cache miss
assertCounters(one_file_result.runtime_profile, 0, 0, 0)
assert len(one_file_result.data) == 1
assert_deterministic_scan(vector, one_file_result.runtime_profile)
# The new scan range did not change the compile-time key
assert empty_table_compile_key == one_file_compile_key
# Insert another row, which creates a file / scan range
self.execute_query("INSERT INTO {0} VALUES ({1})".format(
fq_table, table_value(1)))
# There is a second scan range, so the fragment instance key should change again
two_files_result = self.execute_query(query)
cache_keys = get_cache_keys(two_files_result.runtime_profile)
assert len(cache_keys) == 1
two_files_compile_key, two_files_finst_key = cache_keys[0].split("_")
assert two_files_finst_key != "0"
assertCounters(two_files_result.runtime_profile, 0, 0, 0)
assert len(two_files_result.data) == 2
assert one_file_finst_key != two_files_finst_key
overlapping_rows = set(one_file_result.data).intersection(set(two_files_result.data))
assert len(overlapping_rows) == 1
assert_deterministic_scan(vector, two_files_result.runtime_profile)
# The new scan range did not change the compile-time key
assert one_file_compile_key == two_files_compile_key
# Invalidate metadata and rerun the last query. The keys should stay the same.
self.execute_query("invalidate metadata")
rerun_two_files_result = self.execute_query(query)
# Verify that this is a cache hit
assertCounters(rerun_two_files_result.runtime_profile, 1, 0, 0)
cache_keys = get_cache_keys(rerun_two_files_result.runtime_profile)
assert len(cache_keys) == 1
rerun_two_files_compile_key, rerun_two_files_finst_key = cache_keys[0].split("_")
assert rerun_two_files_finst_key == two_files_finst_key
assert rerun_two_files_compile_key == two_files_compile_key
assert rerun_two_files_result.data == two_files_result.data
def test_scan_range_partitioned(self, vector):
"""
This tests a basic partitioned case where the query is identical except that
it operates on different partitions (and thus different scan ranges).
"""
self.client.set_configuration(vector.get_value('exec_option'))
year2009_result = self.execute_query(
"select * from functional.alltypes where year=2009")
cache_keys = get_cache_keys(year2009_result.runtime_profile)
assert len(cache_keys) == 1
year2009_compile_key, year2009_finst_key = cache_keys[0].split("_")
year2010_result = self.execute_query(
"select * from functional.alltypes where year=2010")
cache_keys = get_cache_keys(year2010_result.runtime_profile)
assert len(cache_keys) == 1
year2010_compile_key, year2010_finst_key = cache_keys[0].split("_")
# This should be a cache miss
assertCounters(year2010_result.runtime_profile, 0, 0, 0)
# The year=X predicate is on a partition column, so it is enforced by pruning
# partitions and doesn't carry through to execution. The compile keys for
# the two queries are the same, but the fragment instance keys are different due
# to the different scan ranges from different partitions.
assert year2009_compile_key == year2010_compile_key
assert year2009_finst_key != year2010_finst_key
# Verify that the results are completely different
year2009_result_set = set(year2009_result.data)
year2010_result_set = set(year2010_result.data)
overlapping_rows = year2009_result_set.intersection(year2010_result_set)
assert len(overlapping_rows) == 0
assert year2009_result.data[0].find("2009") != -1
assert year2009_result.data[0].find("2010") == -1
assert year2010_result.data[0].find("2010") != -1
assert year2010_result.data[0].find("2009") == -1
@CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS)
class TestTupleCacheFullCluster(TestTupleCacheBase):
"""Test with 3 executors and a range of mt_dop values."""
@classmethod
def add_test_dimensions(cls):
super(TestTupleCacheFullCluster, cls).add_test_dimensions()
add_exec_option_dimension(cls, 'mt_dop', [0, 1, 2])
def test_scan_range_distributed(self, vector, unique_database):
"""
This tests the distributed case where there are multiple fragment instances
processing different scan ranges. Each fragment instance should have a
distinct cache key. When adding a scan range, at least one fragment instance
cache key should change.
"""
self.client.set_configuration(vector.get_value('exec_option'))
mt_dop = vector.get_value('exec_option')['mt_dop']
fq_table = "{0}.scan_range_distributed".format(unique_database)
query = "SELECT * from {0}".format(fq_table)
entries_baseline = {
impalad: self.get_tuple_cache_metric(impalad.service, "entries-in-use")
for impalad in self.cluster.impalads}
# Create a table with several files so that we always have enough work for multiple
# fragment instances
self.create_table(fq_table, scale=20)
# We run a simple select. This is running with multiple impalads, so there are
# always multiple fragment instances
before_result = self.execute_query(query)
cache_keys = get_cache_keys(before_result.runtime_profile)
expected_num_keys = 3 * max(mt_dop, 1)
assert len(cache_keys) == expected_num_keys
# Every cache key should be distinct, as the fragment instances are processing
# different data
unique_cache_keys = set(cache_keys)
assert len(unique_cache_keys) == expected_num_keys
# Every cache key has the same compile key
unique_compile_keys = set([key.split("_")[0] for key in unique_cache_keys])
assert len(unique_compile_keys) == 1
# Verify the cache metrics for each impalad. Determine number of new cache entries,
# which should be the same as the number of cache keys.
for impalad in self.cluster.impalads:
entries_in_use = self.get_tuple_cache_metric(impalad.service, "entries-in-use")
assert entries_in_use - entries_baseline[impalad] == max(mt_dop, 1)
assert_deterministic_scan(vector, before_result.runtime_profile)
# Insert another row, which creates a file / scan range
# This uses a very large seed for table_value() to get a unique row that isn't
# already in the table.
self.execute_query("INSERT INTO {0} VALUES ({1})".format(
fq_table, table_value(1000000)))
# Rerun the query with the extra scan range
after_insert_result = self.execute_query(query)
cache_keys = get_cache_keys(after_insert_result.runtime_profile)
expected_num_keys = 3 * max(mt_dop, 1)
assert len(cache_keys) == expected_num_keys
# Every cache key should be distinct, as the fragment instances are processing
# different data
after_insert_unique_cache_keys = set(cache_keys)
assert len(after_insert_unique_cache_keys) == expected_num_keys
# Every cache key has the same compile key
unique_compile_keys = \
set([key.split("_")[0] for key in after_insert_unique_cache_keys])
assert len(unique_compile_keys) == 1
# Verify the cache metrics. We can do a more exact bound by looking at the total
# across all impalads. The lower bound for this is the number of unique cache
# keys across both queries we ran. The upper bound for the number of entries is
# double the expected number from the first run of the query.
#
# This is not the exact number, because cache key X could have run on executor 1
# for the first query and on executor 2 for the second query. Even though it would
# appear as a single unique cache key, it is two different cache entries in different
# executors.
all_cache_keys = unique_cache_keys.union(after_insert_unique_cache_keys)
total_entries_in_use = 0
for impalad in self.cluster.impalads:
new_entries_in_use = self.get_tuple_cache_metric(impalad.service, "entries-in-use")
new_entries_in_use -= entries_baseline[impalad]
assert new_entries_in_use >= max(mt_dop, 1)
assert new_entries_in_use <= (2 * max(mt_dop, 1))
total_entries_in_use += new_entries_in_use
assert total_entries_in_use >= len(all_cache_keys)
assert_deterministic_scan(vector, after_insert_result.runtime_profile)
# The extra scan range means that at least one fragment instance key changed
# Since scheduling can change completely with the addition of a single scan range,
# we can't assert that only one cache key changes.
changed_cache_keys = unique_cache_keys.symmetric_difference(
after_insert_unique_cache_keys)
assert len(changed_cache_keys) != 0
# Each row is distinct, so that makes it easy to verify that the results overlap
# except the second result contains one more row than the first result.
before_result_set = set(before_result.data)
after_insert_result_set = set(after_insert_result.data)
assert len(before_result_set) == 70 * 20
assert len(before_result_set) + 1 == len(after_insert_result_set)
different_rows = before_result_set.symmetric_difference(after_insert_result_set)
assert len(different_rows) == 1
@SkipIfDockerizedCluster.internal_hostname
@SkipIf.hardcoded_uris
def test_iceberg_deletes(self, vector): # noqa: U100
"""
Test basic Iceberg v2 deletes, which relies on the directed mode and looking
past TupleCacheNodes to find the scan nodes.
"""
# This query tests both equality deletes and positional deletes.
query = "select * from functional_parquet.iceberg_v2_delete_both_eq_and_pos " + \
"order by i"
result1 = self.execute_query(query)
result2 = self.execute_query(query)
assert result1.success and result2.success
assert result1.data == result2.data
assert result1.data[0].split("\t") == ["2", "str2_updated", "2023-12-13"]
assert result1.data[1].split("\t") == ["3", "str3", "2023-12-23"]
@CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS, cluster_size=1)
class TestTupleCacheMtdop(TestTupleCacheBase):
"""Test with single executor and mt_dop=0 or 2."""
@classmethod
def add_test_dimensions(cls):
super(TestTupleCacheMtdop, cls).add_test_dimensions()
add_exec_option_dimension(cls, 'mt_dop', [0, 2])
def test_tuple_cache_count_star(self, vector, unique_database):
"""
This test is a regression test for IMPALA-13411 to see whether it hits
the DCHECK.
"""
self.client.set_configuration(vector.get_value('exec_option'))
fq_table = "{0}.tuple_cache_count_star".format(unique_database)
# Create a table.
self.create_table(fq_table, scale=1)
# Run twice and see if it hits the DCHECK.
query = "select count(*) from {0}".format(fq_table)
result1 = self.execute_query(query)
result2 = self.execute_query(query)
assert result1.success and result2.success
def test_tuple_cache_key_with_stats(self, vector, unique_database):
"""
This test verifies if compute stats affect the tuple cache key.
"""
self.client.set_configuration(vector.get_value('exec_option'))
fq_table = "{0}.tuple_cache_stats_test".format(unique_database)
# Create a table.
self.create_table(fq_table, scale=1)
# Get the explain text for a simple query.
query = "explain select * from {0}".format(fq_table)
result1 = self.execute_query(query)
# Insert rows to make the stats different.
for i in range(10):
self.execute_query("INSERT INTO {0} VALUES ({1})".format(
fq_table, table_value(i)))
# Run compute stats and get the explain text again for the same query.
self.client.execute("COMPUTE STATS {0}".format(fq_table))
result2 = self.execute_query(query)
# Verify memory estimations are different, while the cache keys are identical.
assert result1.success and result2.success
mem_limit1, units1 = match_memory_estimate(result1.data)
mem_limit1 = parse_mem_to_mb(mem_limit1, units1)
mem_limit2, units2 = match_memory_estimate(result2.data)
mem_limit2 = parse_mem_to_mb(mem_limit2, units2)
assert mem_limit1 != mem_limit2
cache_key1 = match_cache_key(result1.data)
cache_key2 = match_cache_key(result2.data)
assert cache_key1 is not None and cache_key1 == cache_key2