IMPALA-13964: Fix test_tuple_cache_tpc_queries.py flakiness

This makes two changes to deflake test_tuple_cache_tpc_queries.py.
First, it increases the runtime filter wait time from 60 seconds to
600 seconds. The correctness verification slows down the path
that produces the runtime filter. The slowdown is dependent on
the speed of storage, so this can get very slow on test machines.

Second, this skips correctness checking for locations that are just
after streaming aggregations. Streaming aggregations can produce
variable output that the correctness checking can't handle.
For example a grouping aggregation computing a sum might have
a preaggregation produce either (A: 3) or (A: 2), (A: 1) or
(A: 1), (A: 1), (A: 1). The finalization sees these as equivalent.
This marks the nodes as variable starting with the preaggregation
and clears the mark at the finalize stage.

When skipping correctness checking, the tuple cache node does not
hit the cache normally. This guarantees that its children will run
and go through correctness checking.

Testing:
 - Ran test_tuple_cache_tpc_queries.py locally
 - Added a frontend test for this specific case

Change-Id: If5e1be287bdb489a89aea3b2d7bec416220feb9a
Reviewed-on: http://gerrit.cloudera.org:8080/23010
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Tested-by: Michael Smith <michael.smith@cloudera.com>
This commit is contained in:
Joe McDonnell
2025-05-21 18:34:23 -07:00
committed by Michael Smith
parent 935a5e2b8d
commit 14597c7e2f
10 changed files with 136 additions and 29 deletions

View File

@@ -70,6 +70,8 @@ Status TupleCacheNode::Prepare(RuntimeState* state) {
ComputeFragmentInstanceKey(state);
combined_key_ = plan_node().tnode_->tuple_cache_node.compile_time_key + "_" +
std::to_string(fragment_instance_key_);
skip_correctness_verification_ =
plan_node().tnode_->tuple_cache_node.skip_correctness_verification;
runtime_profile()->AddInfoString("Combined Key", combined_key_);
return Status::OK();
@@ -90,33 +92,38 @@ Status TupleCacheNode::Open(RuntimeState* state) {
handle_ = tuple_cache_mgr->Lookup(combined_key_, true);
if (tuple_cache_mgr->IsAvailableForRead(handle_)) {
if (tuple_cache_mgr->DebugDumpEnabled() && TupleCacheVerificationEnabled(state)) {
// We need the original fragment id to construct the path for the reference debug
// cache file. If it's missing from the metadata, we return an error status
// immediately.
string org_fragment_id = tuple_cache_mgr->GetFragmentIdForTupleCache(combined_key_);
if (org_fragment_id.empty()) {
return Status(TErrorCode::TUPLE_CACHE_INCONSISTENCY,
Substitute("Metadata of tuple cache '$0' is missing for correctness check",
combined_key_));
// If the node is marked to skip correctness verification, we don't want to read
// from the cache as that would prevent its children from executing.
if (!skip_correctness_verification_) {
// We need the original fragment id to construct the path for the reference debug
// cache file. If it's missing from the metadata, we return an error status
// immediately.
string org_fragment_id =
tuple_cache_mgr->GetFragmentIdForTupleCache(combined_key_);
if (org_fragment_id.empty()) {
return Status(TErrorCode::TUPLE_CACHE_INCONSISTENCY,
Substitute("Metadata of tuple cache '$0' is missing for correctness check",
combined_key_));
}
string ref_sub_dir;
string sub_dir;
string ref_file_path = GetDebugDumpPath(state, org_fragment_id, &ref_sub_dir);
string file_path = GetDebugDumpPath(state, string(), &sub_dir);
DCHECK_EQ(ref_sub_dir, sub_dir);
DCHECK(!ref_sub_dir.empty());
DCHECK(!ref_file_path.empty());
DCHECK(!file_path.empty());
// Create the subdirectory for the debug caches if needed.
RETURN_IF_ERROR(tuple_cache_mgr->CreateDebugDumpSubdir(ref_sub_dir));
// Open the writer for writing the tuple data from the cache entries to be
// the reference cache data.
debug_dump_text_writer_ref_ = make_unique<TupleTextFileWriter>(ref_file_path);
RETURN_IF_ERROR(debug_dump_text_writer_ref_->Open());
// Open the writer for writing the tuple data from children in GetNext() to
// compare with the reference debug cache file.
debug_dump_text_writer_ = make_unique<TupleTextFileWriter>(file_path);
RETURN_IF_ERROR(debug_dump_text_writer_->Open());
}
string ref_sub_dir;
string sub_dir;
string ref_file_path = GetDebugDumpPath(state, org_fragment_id, &ref_sub_dir);
string file_path = GetDebugDumpPath(state, string(), &sub_dir);
DCHECK_EQ(ref_sub_dir, sub_dir);
DCHECK(!ref_sub_dir.empty());
DCHECK(!ref_file_path.empty());
DCHECK(!file_path.empty());
// Create the subdirectory for the debug caches if needed.
RETURN_IF_ERROR(tuple_cache_mgr->CreateDebugDumpSubdir(ref_sub_dir));
// Open the writer for writing the tuple data from the cache entries to be
// the reference cache data.
debug_dump_text_writer_ref_ = make_unique<TupleTextFileWriter>(ref_file_path);
RETURN_IF_ERROR(debug_dump_text_writer_ref_->Open());
// Open the writer for writing the tuple data from children in GetNext() to
// compare with the reference debug cache file.
debug_dump_text_writer_ = make_unique<TupleTextFileWriter>(file_path);
RETURN_IF_ERROR(debug_dump_text_writer_->Open());
} else {
reader_ = make_unique<TupleFileReader>(
tuple_cache_mgr->GetPath(handle_), mem_tracker(), runtime_profile());

View File

@@ -62,6 +62,12 @@ private:
// This combination is unique for a given fragment instance.
std::string combined_key_;
// This caching location should skip correctness verification. This can be true when a
// location has variability in its results that is tolerated by nodes higher in the
// plan (e.g. streaming aggregations can produce variable results that do not change
// the result out of the finalization phase).
bool skip_correctness_verification_;
/// Number of results that were found in the tuple cache
RuntimeProfile::Counter* num_hits_counter_ = nullptr;
/// Number of results that were too large for the cache

View File

@@ -725,6 +725,9 @@ struct TTupleCacheNode {
// into this node. The TupleCacheNode will hash the scan ranges for its fragment
// at runtime.
2: required list<i32> input_scan_node_ids;
// Skip correctness verification at this node. This can be true if the result at this
// location is variable in a way that does not impact correctness.
3: required bool skip_correctness_verification;
}
enum TMergeCaseType {

View File

@@ -140,4 +140,23 @@ public class ThriftSerializationCtx {
tupleCacheInfo_.incorporateScans();
}
}
/**
* Mark this location as variable due a streaming aggregation.
*/
public void setStreamingAggVariability() {
if (isTupleCache()) {
tupleCacheInfo_.setStreamingAggVariability();
}
}
/**
* Clear any mark indicating variability due to a streaming aggregation.
* It is safe to call this even if there is no existing variability.
*/
public void clearStreamingAggVariability() {
if (isTupleCache()) {
tupleCacheInfo_.clearStreamingAggVariability();
}
}
}

View File

@@ -821,6 +821,16 @@ public class AggregationNode extends PlanNode implements SpillableOperator {
}
msg.agg_node.addToAggregators(taggregator);
}
// Streaming aggregations can have variable output that is handled and undone
// by the finalize stage. For example, a grouping aggregation doing a sum could
// return (a, 3) or (a, 2), (a, 1) or (a, 1), (a, 1), (a, 1). Mark the node as
// variable, which disables automated correctness checking. Clear the mark at the
// finalize stage, as the finalize stage undoes the variability.
if (useStreamingPreagg_) {
serialCtx.setStreamingAggVariability();
} else if (needsFinalize_) {
serialCtx.clearStreamingAggVariability();
}
}
@Override

View File

@@ -130,6 +130,13 @@ public class TupleCacheInfo {
// to hash the scan ranges of input scan nodes to generate the key.
private final List<HdfsScanNode> inputScanNodes_ = new ArrayList<HdfsScanNode>();
// This node has variable output due to a streaming aggregation. For example, a
// grouping aggregation doing a sum could return (a, 3) or (a, 2), (a, 1) or
// (a, 1), (a, 1), (a, 1). These all mean the same thing to the finalize stage.
// We need to know about it to disable automated correctness checking for locations
// with this variability.
private boolean streamingAggVariability_ = false;
// These fields accumulate partial results until finalizeHash() is called.
private Hasher hasher_ = Hashing.murmur3_128().newHasher();
@@ -157,6 +164,19 @@ public class TupleCacheInfo {
return ineligibilityReasons_.isEmpty();
}
public void setStreamingAggVariability() {
Preconditions.checkState(!streamingAggVariability_);
streamingAggVariability_ = true;
}
public void clearStreamingAggVariability() {
streamingAggVariability_ = false;
}
public boolean getStreamingAggVariability() {
return streamingAggVariability_;
}
public String getHashString() {
Preconditions.checkState(isEligible(),
"TupleCacheInfo only has a hash if it is cache eligible");
@@ -289,6 +309,11 @@ public class TupleCacheInfo {
// id translation maps.
registerTupleHelper(id, false);
}
// The variability transmits up the tree until the aggregation is finalized.
if (child.streamingAggVariability_) {
streamingAggVariability_ = true;
}
return true;
}
}

View File

@@ -43,9 +43,12 @@ public class TupleCacheNode extends PlanNode {
protected String compileTimeKey_;
protected String hashTrace_;
protected boolean displayCorrectnessCheckingInfo_;
protected boolean skipCorrectnessVerification_;
protected final List<Integer> inputScanNodeIds_ = new ArrayList<Integer>();
public TupleCacheNode(PlanNodeId id, PlanNode child) {
public TupleCacheNode(PlanNodeId id, PlanNode child,
boolean displayCorrectnessCheckingInfo) {
super(id, "TUPLE CACHE");
addChild(child);
cardinality_ = child.getCardinality();
@@ -55,6 +58,10 @@ public class TupleCacheNode extends PlanNode {
Preconditions.checkState(childCacheInfo.isEligible());
compileTimeKey_ = childCacheInfo.getHashString();
hashTrace_ = childCacheInfo.getHashTrace();
// If there is variability due to a streaming agg, skip the correctness verification
// for this location.
skipCorrectnessVerification_ = childCacheInfo.getStreamingAggVariability();
displayCorrectnessCheckingInfo_ = displayCorrectnessCheckingInfo;
for (HdfsScanNode scanNode : childCacheInfo.getInputScanNodes()) {
// Inputs into the tuple cache need to use deterministic scan range assignment
scanNode.setDeterministicScanRangeAssignment(true);
@@ -87,6 +94,7 @@ public class TupleCacheNode extends PlanNode {
TTupleCacheNode tupleCacheNode = new TTupleCacheNode();
tupleCacheNode.setCompile_time_key(compileTimeKey_);
tupleCacheNode.setInput_scan_node_ids(inputScanNodeIds_);
tupleCacheNode.setSkip_correctness_verification(skipCorrectnessVerification_);
msg.setTuple_cache_node(tupleCacheNode);
}
@@ -113,6 +121,11 @@ public class TupleCacheNode extends PlanNode {
StringBuilder output = new StringBuilder();
output.append(String.format("%s%s:%s\n", prefix, id_.toString(), displayName_));
output.append(detailPrefix + "cache key: " + compileTimeKey_ + "\n");
// Only display information about correctness verification if it is enabled
if (displayCorrectnessCheckingInfo_) {
output.append(detailPrefix + "skip correctness verification: " +
skipCorrectnessVerification_ + "\n");
}
// For debuggability, always print the hash trace until the cache key calculation
// matures. Print trace in chunks to avoid excessive wrapping and padding in

View File

@@ -20,6 +20,7 @@ package org.apache.impala.planner;
import java.util.List;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.thrift.TQueryOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -93,8 +94,12 @@ public class TupleCachePlanner {
if (LOG.isTraceEnabled()) {
LOG.trace("Adding TupleCacheNode above node " + node.getId().toString());
}
// Get current query options
TQueryOptions queryOptions =
ctx_.getRootAnalyzer().getQueryCtx().client_request.getQuery_options();
// Allocate TupleCacheNode
TupleCacheNode tupleCacheNode = new TupleCacheNode(ctx_.getNextNodeId(), node);
TupleCacheNode tupleCacheNode = new TupleCacheNode(ctx_.getNextNodeId(), node,
queryOptions.isEnable_tuple_cache_verification());
tupleCacheNode.init(ctx_.getRootAnalyzer());
PlanFragment curFragment = node.getFragment();
if (node == curFragment.getPlanRoot()) {

View File

@@ -420,6 +420,25 @@ public class TupleCacheTest extends PlannerTestBase {
}
}
@Test
public void testSkipCorrectnessChecking() {
// Locations after a streaming aggregation before the finalize can have variability
// that correctness checking can't handle.
List<PlanNode> cacheEligibleNodes =
getCacheEligibleNodes(
"select int_col, count(*) from functional.alltypes group by int_col",
/* isDistributedPlan */ true);
// In this plan, there is a streaming aggregation that feeds into a partitioned
// exchange. The finalize phase is past that partitioned exchange. That means that
// the only eligible node marked with streaming agg variability is the initial
// streaming AggregationNode.
for (PlanNode node : cacheEligibleNodes) {
if (node instanceof AggregationNode) {
assertTrue(node.getTupleCacheInfo().getStreamingAggVariability());
}
}
}
protected List<PlanNode> getCacheEligibleNodes(String query,
boolean isDistributedPlan) {
List<PlanFragment> plan = getPlan(query, isDistributedPlan);

View File

@@ -33,7 +33,7 @@ def run_tuple_cache_test(self, vector, query, mtdop):
# Use a long runtime filter wait time (1 minute) to ensure filters arrive before
# generating the tuple cache for correctness check.
if IS_TUPLE_CACHE_CORRECT_CHECK:
vector.get_value('exec_option')['runtime_filter_wait_time_ms'] = 60000
vector.get_value('exec_option')['runtime_filter_wait_time_ms'] = 600000
vector.get_value('exec_option')['enable_tuple_cache_verification'] = True
vector.get_value('exec_option')['mt_dop'] = mtdop
# Run twice to test write and read the tuple cache.