IMPALA-13437 (part 1): Compute processing cost before TupleCachePlanner

This is a preparatory change for cost-based placement for
TupleCacheNodes. It reorders planning so that the processing cost and
filtered cardinality are calculated before running the TupleCachePlanner.
This computes the processing cost when enable_tuple_cache=true.
It also displays the cost information in the explain plan output
when enable_tuple_cache=true. This does not impact the adjustment
of fragment parallelism, which continues to be controlled by the
compute_processing_cost option.

This uses the processing cost to calculate a cumulative processing
cost in the TupleCacheInfo. This is all of the processing cost below
this point including other fragments. This is an indicator of how
much processing a cache hit could avoid. This does not accumulate the
cost when merging the TupleCacheInfo due to a runtime filter, as that
cost is not actually being avoided. This also computes the estimated
serialized size for the TupleCacheNode based on the filtered
cardinality and the row size.

Testing:
 - Ran a core job

Change-Id: If78f5d002b0e079eef1eece612f0d4fefde545c7
Reviewed-on: http://gerrit.cloudera.org:8080/23164
Reviewed-by: Yida Wu <wydbaggio000@gmail.com>
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Tested-by: Michael Smith <michael.smith@cloudera.com>
This commit is contained in:
Joe McDonnell
2025-03-14 09:24:14 -07:00
parent dc46aa48d9
commit 3181fe1800
7 changed files with 122 additions and 23 deletions

View File

@@ -60,7 +60,7 @@ public abstract class DataSink {
if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
output.append(detailPrefix);
output.append(resourceProfile_.getExplainString());
if (queryOptions.isCompute_processing_cost()) {
if (Planner.isProcessingCostAvailable(queryOptions)) {
// Show processing cost total.
output.append(" cost=");
if (processingCost_.isValid()) {

View File

@@ -744,14 +744,14 @@ public class PlanFragment extends TreeNode<PlanFragment> {
*/
public String getFragmentHeaderString(String firstLinePrefix, String detailPrefix,
TQueryOptions queryOptions, TExplainLevel explainLevel) {
boolean isComputeCost = queryOptions.isCompute_processing_cost();
boolean adjustsInstanceCount = queryOptions.isCompute_processing_cost();
boolean useMTFragment = Planner.useMTFragment(queryOptions);
StringBuilder builder = new StringBuilder();
builder.append(String.format("%s%s:PLAN FRAGMENT [%s]", firstLinePrefix,
fragmentId_.toString(), dataPartition_.getExplainString()));
builder.append(PrintUtils.printNumHosts(" ", getNumNodes()));
builder.append(PrintUtils.printNumInstances(" ", getNumInstances()));
if (isComputeCost && originalInstanceCount_ != getNumInstances()) {
if (adjustsInstanceCount && originalInstanceCount_ != getNumInstances()) {
builder.append(" (adjusted from " + originalInstanceCount_ + ")");
}
builder.append("\n");
@@ -807,7 +807,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
builder.append(perInstanceExplainString);
builder.append("\n");
}
if (isComputeCost && rootSegment_ != null
if (Planner.isProcessingCostAvailable(queryOptions) && rootSegment_ != null
&& explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
// Print processing cost.
builder.append(detailPrefix);

View File

@@ -388,7 +388,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
expBuilder.append(nodeResourceProfile_.getExplainString());
expBuilder.append("\n");
if (queryOptions.isCompute_processing_cost() && processingCost_.isValid()
if (Planner.isProcessingCostAvailable(queryOptions) && processingCost_.isValid()
&& detailLevel.ordinal() >= TExplainLevel.VERBOSE.ordinal()) {
// Print processing cost.
expBuilder.append(processingCost_.getExplainString(detailPrefix, false));
@@ -421,7 +421,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
} else {
expBuilder.append(PrintUtils.printEstCardinality(cardinality_));
}
if (queryOptions.isCompute_processing_cost()) {
if (Planner.isProcessingCostAvailable(queryOptions)) {
// Show processing cost total.
expBuilder.append(" cost=");
if (processingCost_.isValid()) {
@@ -1412,6 +1412,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
// Leaf node, add query options hash.
tupleCacheInfo_.hashThrift("Query options hash", queryOptsHash);
}
tupleCacheInfo_.calculateCostInformation(this);
tupleCacheInfo_.finalizeHash();
LOG.trace("Hash for {}:", this);
for (HashTraceElement elem : tupleCacheInfo_.getHashTraces()) {

View File

@@ -166,6 +166,7 @@ public class Planner {
ctx_.getTimeline().markEvent("Runtime filters computed");
checkAndOverrideMinmaxFilterThresholdAndLevel(ctx_.getQueryOptions());
reduceCardinalityByRuntimeFilter(rootFragment, ctx_);
}
rootFragment.verifyTree();
@@ -308,7 +309,8 @@ public class Planner {
* return a single-node, distributed, or parallel plan depending on the query and
* configuration.
*/
public List<PlanFragment> createPlans() throws ImpalaException {
public List<PlanFragment> createPlans(TQueryExecRequest result)
throws ImpalaException {
List<PlanFragment> distrPlan = createPlanFragments();
Preconditions.checkNotNull(distrPlan);
if (useParallelPlan(ctx_)) {
@@ -318,11 +320,17 @@ public class Planner {
} else {
distrPlan = Collections.singletonList(distrPlan.get(0));
}
// Compute the processing cost and adjust fragment parallelism
// The TupleCachePlanner uses the processing cost and fragment parallelism, so
// this needs to happen first.
computeProcessingCost(distrPlan, result, ctx_);
// TupleCachePlanner comes last, because it needs to compute the eligibility of
// various locations in the PlanNode tree. Runtime filters and other modifications
// to the tree can change this, so this comes after all those modifications are
// complete.
if (useTupleCache(ctx_)) {
if (useTupleCache(ctx_.getQueryOptions())) {
TupleCachePlanner cachePlanner = new TupleCachePlanner(ctx_);
distrPlan = cachePlanner.createPlans(distrPlan);
ctx_.getTimeline().markEvent("Tuple caching plan created");
@@ -345,8 +353,15 @@ public class Planner {
}
// Return true if ENABLE_TUPLE_CACHE=true
public static boolean useTupleCache(PlannerContext planCtx) {
return planCtx.getQueryOptions().isEnable_tuple_cache();
public static boolean useTupleCache(TQueryOptions queryOptions) {
return queryOptions.isEnable_tuple_cache();
}
// Return true if the processing cost has been computed. This is true for
// COMPUTE_PROCESSING_COST=true and ENABLE_TUPLE_CACHE=true
public static boolean isProcessingCostAvailable(TQueryOptions queryOptions) {
return queryOptions.isCompute_processing_cost() ||
useTupleCache(queryOptions);
}
/**
@@ -548,12 +563,11 @@ public class Planner {
* computation.
*/
public static void reduceCardinalityByRuntimeFilter(
List<PlanFragment> planRoots, PlannerContext planCtx) {
PlanFragment rootFragment, PlannerContext planCtx) {
double reductionScale = planCtx.getRootAnalyzer()
.getQueryOptions()
.getRuntime_filter_cardinality_reduction_scale();
if (reductionScale <= 0) return;
PlanFragment rootFragment = planRoots.get(0);
Stack<PlanNode> nodeStack = new Stack<>();
rootFragment.getPlanRoot().reduceCardinalityByRuntimeFilter(
nodeStack, reductionScale);
@@ -572,7 +586,7 @@ public class Planner {
List<PlanFragment> postOrderFragments = new ArrayList<>();
boolean testCostCalculation = queryOptions.isEnable_replan()
&& (RuntimeEnv.INSTANCE.isTestEnv() || queryOptions.isTest_replan());
if (queryOptions.isCompute_processing_cost() || testCostCalculation) {
if (isProcessingCostAvailable(queryOptions) || testCostCalculation) {
postOrderFragments = rootFragment.getNodesPostOrder();
for (PlanFragment fragment : postOrderFragments) {
fragment.computeCostingSegment(queryOptions);

View File

@@ -34,7 +34,9 @@ import org.apache.impala.analysis.TupleId;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.FeView;
import org.apache.impala.common.IdGenerator;
import org.apache.impala.common.PrintUtils;
import org.apache.impala.common.ThriftSerializationCtx;
import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.TFileSplitGeneratorSpec;
import org.apache.impala.thrift.TScanRange;
import org.apache.impala.thrift.TScanRangeLocationList;
@@ -166,6 +168,14 @@ public class TupleCacheInfo {
private boolean finalized_ = false;
private String finalizedHashString_ = null;
// Cumulative processing cost from all nodes that feed into this node, including nodes
// from other fragments (e.g. the build side of a join).
private long cumulativeProcessingCost_ = 0;
// Estimated size of the result at this location. This is the row size multiplied by
// the filtered cardinality.
private long estimatedSerializedSize_ = -1;
public TupleCacheInfo(DescriptorTable descTbl) {
ineligibilityReasons_ = EnumSet.noneOf(IneligibilityReason.class);
descriptorTable_ = descTbl;
@@ -221,6 +231,58 @@ public class TupleCacheInfo {
finalized_ = true;
}
public long getCumulativeProcessingCost() {
Preconditions.checkState(isEligible(),
"TupleCacheInfo only has cost information if it is cache eligible.");
Preconditions.checkState(finalized_, "TupleCacheInfo not finalized");
return cumulativeProcessingCost_;
}
public long getEstimatedSerializedSize() {
Preconditions.checkState(isEligible(),
"TupleCacheInfo only has cost information if it is cache eligible.");
Preconditions.checkState(finalized_, "TupleCacheInfo not finalized");
return estimatedSerializedSize_;
}
/**
* Calculate the tuple cache cost information for this plan node. This must be called
* with the matching PlanNode for this TupleCacheInfo. This pulls in any information
* from the PlanNode or from any children recursively. This cost information is used
* for planning decisions. It is also displayed in the explain plan output for
* debugging.
*/
public void calculateCostInformation(PlanNode thisPlanNode) {
Preconditions.checkState(!finalized_,
"TupleCacheInfo is finalized and can't be modified");
Preconditions.checkState(isEligible(),
"TupleCacheInfo only calculates cost information if it is cache eligible.");
Preconditions.checkState(thisPlanNode.getTupleCacheInfo() == this,
"calculateCostInformation() must be called with its enclosing PlanNode");
// This was already called on our children, which are known to be eligible.
// Pull in the information from our children.
for (PlanNode child : thisPlanNode.getChildren()) {
cumulativeProcessingCost_ +=
child.getTupleCacheInfo().getCumulativeProcessingCost();
// If the child is from a different fragment (e.g. the build side of a hash join),
// incorporate the cost of the sink
if (child.getFragment() != thisPlanNode.getFragment()) {
cumulativeProcessingCost_ +=
child.getFragment().getSink().getProcessingCost().getTotalCost();
}
}
cumulativeProcessingCost_ += thisPlanNode.getProcessingCost().getTotalCost();
// If there are stats, compute the estimated serialized size. If there are no stats
// (i.e. cardinality == -1), then there is nothing to do.
if (thisPlanNode.getFilteredCardinality() > -1) {
long cardinality = thisPlanNode.getFilteredCardinality();
estimatedSerializedSize_ = (long) Math.round(
ExchangeNode.getAvgSerializedRowSize(thisPlanNode) * cardinality);
}
}
/**
* Pull in a child's TupleCacheInfo into this TupleCacheInfo. If the child is
* ineligible, then this is marked ineligible and there is no need to calculate
@@ -499,6 +561,24 @@ public class TupleCacheInfo {
return builder.toString();
}
/**
* Produce explain output describing the cost information for this tuple cache location
*/
public String getCostExplainString(String detailPrefix) {
StringBuilder output = new StringBuilder();
output.append(detailPrefix + "estimated serialized size: ");
if (estimatedSerializedSize_ > -1) {
output.append(PrintUtils.printBytes(estimatedSerializedSize_));
} else {
output.append("unavailable");
}
output.append("\n");
output.append(detailPrefix + "cumulative processing cost: ");
output.append(getCumulativeProcessingCost());
output.append("\n");
return output.toString();
}
/**
* Construct a comma separated list of the ineligibility reasons.
*/

View File

@@ -45,32 +45,37 @@ public class TupleCacheNode extends PlanNode {
protected boolean displayCorrectnessCheckingInfo_;
protected boolean skipCorrectnessVerification_;
protected final List<Integer> inputScanNodeIds_ = new ArrayList<Integer>();
protected final TupleCacheInfo childTupleCacheInfo_;
public TupleCacheNode(PlanNodeId id, PlanNode child,
boolean displayCorrectnessCheckingInfo) {
super(id, "TUPLE CACHE");
addChild(child);
cardinality_ = child.getCardinality();
if (child.getFilteredCardinality() != cardinality_) {
setFilteredCardinality(child.getFilteredCardinality());
}
limit_ = child.limit_;
TupleCacheInfo childCacheInfo = child.getTupleCacheInfo();
Preconditions.checkState(childCacheInfo.isEligible());
compileTimeKey_ = childCacheInfo.getHashString();
childTupleCacheInfo_ = child.getTupleCacheInfo();
Preconditions.checkState(childTupleCacheInfo_.isEligible());
compileTimeKey_ = childTupleCacheInfo_.getHashString();
// If there is variability due to a streaming agg, skip the correctness verification
// for this location.
skipCorrectnessVerification_ = childCacheInfo.getStreamingAggVariability();
skipCorrectnessVerification_ = childTupleCacheInfo_.getStreamingAggVariability();
displayCorrectnessCheckingInfo_ = displayCorrectnessCheckingInfo;
for (HdfsScanNode scanNode : childCacheInfo.getInputScanNodes()) {
for (HdfsScanNode scanNode : childTupleCacheInfo_.getInputScanNodes()) {
// Inputs into the tuple cache need to use deterministic scan range assignment
scanNode.setDeterministicScanRangeAssignment(true);
inputScanNodeIds_.add(scanNode.getId().asInt());
}
computeTupleIds();
}
@Override
public void init(Analyzer analyzer) throws ImpalaException {
super.init(analyzer);
computeTupleIds();
Preconditions.checkState(conjuncts_.isEmpty());
}
@Override
@@ -128,6 +133,7 @@ public class TupleCacheNode extends PlanNode {
inputScanNodeIds_.stream().map(Object::toString).collect(Collectors.toList());
output.append(detailPrefix + "input scan node ids: " +
String.join(",", input_scan_node_ids_strs) + "\n");
output.append(childTupleCacheInfo_.getCostExplainString(detailPrefix));
return output.toString();
}

View File

@@ -2031,16 +2031,14 @@ public class Frontend {
*/
private TQueryExecRequest createExecRequest(
Planner planner, PlanCtx planCtx) throws ImpalaException {
TQueryExecRequest result = new TQueryExecRequest();
TQueryCtx queryCtx = planner.getQueryCtx();
List<PlanFragment> planRoots = planner.createPlans();
List<PlanFragment> planRoots = planner.createPlans(result);
if (planCtx.planCaptureRequested()) {
planCtx.plan_ = planRoots;
}
// Compute resource requirements of the final plans.
TQueryExecRequest result = new TQueryExecRequest();
Planner.reduceCardinalityByRuntimeFilter(planRoots, planner.getPlannerCtx());
Planner.computeProcessingCost(planRoots, result, planner.getPlannerCtx());
Planner.computeResourceReqs(planRoots, queryCtx, result,
planner.getPlannerCtx(), planner.getAnalysisResult().isQueryStmt());