IMPALA-13152: Avoid NaN, infinite, and negative ProcessingCost

TOP-N cost will turn into NaN if inputCardinality is equal to 0 due to
Math.log(inputCardinality). This patch fix the issue by avoiding
Math.log(0) and replace it with 0 instead.

After this patch, Instantiating BaseProcessingCost with NaN, infinite,
or negative totalCost will throw IllegalArgumentException. In
BaseProcessingCost.getDetails(), "total-cost" is renamed to "raw-cost"
to avoid confusion with "cost-total" in ProcessingCost.getDetails().

Testing:
- Add testcase that run TOP-N query over empty table.
- Compute ProcessingCost in most FE and EE test even when
  COMPUTE_PROCESSING_COST option is not enabled by checking if
  RuntimeEnv.INSTANCE.isTestEnv() is True or TEST_REPLAN option is
  enabled.
- Pass core test.

Change-Id: Ib49c7ae397dadcb2cb69fde1850d442d33cdf177
Reviewed-on: http://gerrit.cloudera.org:8080/21504
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Riza Suminto
2024-06-11 09:50:15 -07:00
committed by Impala Public Jenkins
parent 3a2f5f28c9
commit 5d1bd80623
30 changed files with 218 additions and 76 deletions

View File

@@ -736,7 +736,9 @@ enum TImpalaQueryOptions {
// If true, replanning is enabled.
ENABLE_REPLAN = 143;
// If true, test replan by imposing artificial two executor groups in FE.
// If true, test replan by imposing artificial two executor groups in FE and always
// compute ProcessingCost. The degree of parallelism adjustment, however, still require
// COMPUTE_PROCESSING_COST option set to true.
TEST_REPLAN = 144;
// Maximum wait time on HMS ACID lock in seconds.

View File

@@ -734,6 +734,10 @@ public class AggregateInfo extends AggregateInfoBase {
public ProcessingCost computeProcessingCost(
String label, long inputCardinality, long intermediateOutputCardinality) {
Preconditions.checkArgument(
inputCardinality >= 0, "inputCardinality should not be negative!");
Preconditions.checkArgument(intermediateOutputCardinality >= 0,
"intermediateOutputCardinality should not be negative!");
// Benchmarking suggests we can estimate the processing cost as a linear function
// based the probe input cardinality, the "intermediate" output cardinality, and
// an incremental cost per input row for each additional aggregate function.

View File

@@ -575,13 +575,15 @@ public class AggregationNode extends PlanNode {
// duplicate keys across fragments. Calculate an overall "intermediate" output
// cardinality that attempts to account for the dups. Cap it at the input
// cardinality because an aggregation cannot increase the cardinality.
long inputCardinality = getAggClassNumGroup(prevAgg, aggInfo);
long inputCardinality = Math.max(0, getAggClassNumGroup(prevAgg, aggInfo));
long perInstanceNdv = fragment_.getPerInstanceNdvForCpuCosting(
inputCardinality, aggInfo.getGroupingExprs());
long intermediateOutputCardinality = Math.min(
inputCardinality, perInstanceNdv * fragment_.getNumInstancesForCosting());
long intermediateOutputCardinality = Math.max(0,
Math.min(
inputCardinality, perInstanceNdv * fragment_.getNumInstancesForCosting()));
long aggClassNumGroup = Math.max(0, getAggClassNumGroup(prevAgg, aggInfo));
ProcessingCost aggCost = aggInfo.computeProcessingCost(getDisplayLabel(),
getAggClassNumGroup(prevAgg, aggInfo), intermediateOutputCardinality);
aggClassNumGroup, intermediateOutputCardinality);
processingCost_ = ProcessingCost.sumCost(processingCost_, aggCost);
}
}

View File

@@ -17,6 +17,8 @@
package org.apache.impala.planner;
import com.google.common.base.Preconditions;
/**
* A basic implementation of {@link ProcessingCost} that takes account expression cost
* and average row size as per-row costing weight.
@@ -42,6 +44,10 @@ public class BaseProcessingCost extends ProcessingCost {
}
public BaseProcessingCost(double totalCost) {
Preconditions.checkArgument(!Double.isNaN(totalCost), "totalCost must not be a NaN!");
Preconditions.checkArgument(
Double.isFinite(totalCost), "totalCost must be a finite double!");
Preconditions.checkArgument(totalCost >= 0, "totalCost must not be a negative!");
cardinality_ = 0L;
exprsCost_ = 0.0F;
materializationCost_ = 0.0F;
@@ -74,7 +80,7 @@ public class BaseProcessingCost extends ProcessingCost {
public String getDetails() {
StringBuilder output = new StringBuilder();
output.append(super.getDetails());
output.append(" total-cost=").append(totalCost_);
output.append(" raw-cost=").append(totalCost_);
if (cardinality_ != 0L) { output.append(" cardinality=").append(cardinality_); }
return output.toString();
}

View File

@@ -36,7 +36,7 @@ public class BroadcastProcessingCost extends ProcessingCost {
protected BroadcastProcessingCost(
ProcessingCost cost, Supplier<Integer> countSupplier) {
Preconditions.checkArgument(
cost.isValid(), "BroadcastProcessingCost: cost is invalid!");
cost.isValid(), "BroadcastProcessingCost: cost is invalid! %s", cost);
childProcessingCost_ = cost;
setNumInstanceExpected(countSupplier);
}

View File

@@ -120,7 +120,8 @@ public class CostingSegment extends TreeNode<CostingSegment> {
}
private void appendCost(ProcessingCost additionalCost) {
Preconditions.checkArgument(additionalCost.isValid());
Preconditions.checkArgument(
additionalCost.isValid(), "additionalCost is invalid! %s", additionalCost);
ProcessingCost newTotalCost = ProcessingCost.sumCost(additionalCost, cost_);
newTotalCost.setNumRowToConsume(cost_.getNumRowToConsume());
newTotalCost.setNumRowToProduce(additionalCost.getNumRowToConsume());

View File

@@ -138,9 +138,9 @@ public abstract class DataSink {
*/
public void computeRowConsumptionAndProductionToCost() {
Preconditions.checkState(processingCost_.isValid(),
"Processing cost of DataSink " + fragment_.getId() + ":" + getLabel()
+ " is invalid!");
long inputOutputCardinality = fragment_.getPlanRoot().getCardinality();
"Processing cost of DataSink %s:%s is invalid! %s", fragment_.getId(), getLabel(),
processingCost_);
long inputOutputCardinality = Math.max(0, fragment_.getPlanRoot().getCardinality());
processingCost_.setNumRowToConsume(inputOutputCardinality);
processingCost_.setNumRowToProduce(inputOutputCardinality);
}

View File

@@ -100,7 +100,7 @@ public class DataStreamSink extends DataSink {
public void computeProcessingCost(TQueryOptions queryOptions) {
// The sending part of the processing cost for the exchange node.
long outputCardinality = exchNode_.getFilteredCardinality();
long outputCardinality = Math.max(0, exchNode_.getFilteredCardinality());
long outputSize = (long) (exchNode_.getAvgDeserializedRowSize() * outputCardinality);
double totalCost = 0.0;
String exchType;

View File

@@ -20,7 +20,6 @@ package org.apache.impala.planner;
import java.util.ArrayList;
import java.util.List;
import org.apache.impala.analysis.AnalysisContext;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.BinaryPredicate;
import org.apache.impala.analysis.DmlStatementBase;

View File

@@ -253,7 +253,7 @@ public class ExchangeNode extends PlanNode {
// bottom sending fragment;
// 2. The receiving processing cost in the top receiving fragment which is computed
// here.
long inputCardinality = getChild(0).getFilteredCardinality();
long inputCardinality = Math.max(0, getChild(0).getFilteredCardinality());
// It's not obvious whether the per-byte CPU costs are more accurately estimated
// using the serialized or deserialized sizes, but the coefficients were determined

View File

@@ -339,12 +339,13 @@ public class HashJoinNode extends JoinNode {
// Compute the processing cost for lhs. Benchmarking suggests we can estimate the
// probe cost as a linear function combining the probe input cardinality and the
// estimated output cardinality.
long outputCardinality = Math.max(0, getCardinality());
double totalProbeCost =
(getProbeCardinalityForCosting() * COST_COEFFICIENT_PROBE_INPUT)
+ (getCardinality() * COST_COEFFICIENT_HASH_JOIN_OUTPUT);
+ (outputCardinality * COST_COEFFICIENT_HASH_JOIN_OUTPUT);
if (LOG.isTraceEnabled()) {
LOG.trace("Probe CPU cost estimate: " + totalProbeCost + ", Input Card: "
+ getProbeCardinalityForCosting() + ", Output Card: " + getCardinality());
+ getProbeCardinalityForCosting() + ", Output Card: " + outputCardinality);
}
ProcessingCost probeProcessingCost =
ProcessingCost.basicCost(getDisplayLabel(), totalProbeCost);
@@ -357,8 +358,8 @@ public class HashJoinNode extends JoinNode {
// build fragment count is fixed for broadcast(at num hosts) regardless of the cost
// computed here. But we should clean up the costing here to avoid any future
// confusion.
double totalBuildCost =
getChild(1).getFilteredCardinality() * COST_COEFFICIENT_BUILD_INPUT;
long buildCardinality = Math.max(0, getChild(1).getFilteredCardinality());
double totalBuildCost = buildCardinality * COST_COEFFICIENT_BUILD_INPUT;
ProcessingCost buildProcessingCost =
ProcessingCost.basicCost(getDisplayLabel() + " Build side", totalBuildCost);
return Pair.create(probeProcessingCost, buildProcessingCost);

View File

@@ -2228,8 +2228,6 @@ public class HdfsScanNode extends ScanNode {
*/
@Override
protected ProcessingCost computeScanProcessingCost(TQueryOptions queryOptions) {
Preconditions.checkArgument(queryOptions.isCompute_processing_cost());
long inputCardinality = getFilteredInputCardinality();
long estBytes = 0L;
double bytesCostCoefficient = 0.0;
@@ -2255,6 +2253,7 @@ public class HdfsScanNode extends ScanNode {
}
return ProcessingCost.basicCost(getDisplayLabel(), totalCost);
} else {
// Input cardinality is 0 or unknown. Fallback to superclass.
return super.computeScanProcessingCost(queryOptions);
}
}

View File

@@ -158,7 +158,7 @@ public class HdfsTableSink extends TableSink {
@Override
public void computeProcessingCost(TQueryOptions queryOptions) {
PlanNode inputNode = fragment_.getPlanRoot();
long cardinality = inputNode.getCardinality();
long cardinality = Math.max(0, inputNode.getCardinality());
float avgRowDataSize = inputNode.getAvgRowSizeWithoutPad();
long estBytesInserted = (long) Math.ceil(avgRowDataSize * (double) cardinality);
double totalCost = 0.0F;

View File

@@ -228,9 +228,9 @@ public class IcebergDeleteNode extends JoinNode {
getProbeCardinalityForCosting(), eqJoinPredicateEvalCost);
// Compute the processing cost for rhs.
ProcessingCost buildProcessingCost =
ProcessingCost.basicCost(getDisplayLabel() + " Build side",
getChild(1).getCardinality(), eqJoinPredicateEvalCost);
long buildCardinality = Math.max(0, getChild(1).getCardinality());
ProcessingCost buildProcessingCost = ProcessingCost.basicCost(
getDisplayLabel() + " Build side", buildCardinality, eqJoinPredicateEvalCost);
return Pair.create(probeProcessingCost, buildProcessingCost);
}
}

View File

@@ -1004,8 +1004,12 @@ public abstract class JoinNode extends PlanNode {
*/
public abstract Pair<ProcessingCost, ProcessingCost> computeJoinProcessingCost();
/**
* Get filtered cardinality of probe hand of join node.
* Sanitized unknown cardinality (-1) into 0.
*/
protected long getProbeCardinalityForCosting() {
return getChild(0).getFilteredCardinality();
return Math.max(0, getChild(0).getFilteredCardinality());
}
@Override

View File

@@ -111,7 +111,7 @@ public class NestedLoopJoinNode extends JoinNode {
// different costs here based on that RHS threshold.
// We return the full cost in the first element of the Pair.
long probeCardinality = getProbeCardinalityForCosting();
long buildCardinality = getChild(1).getCardinality();
long buildCardinality = Math.max(0, getChild(1).getCardinality());
long cardProduct = checkedMultiply(probeCardinality, buildCardinality);
long perInstanceBuildCardinality =
(long) Math.ceil(buildCardinality / fragment_.getNumInstancesForCosting());

View File

@@ -1101,7 +1101,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
List<CostingSegment> costingSegments = rootSegment_.getNodesPreOrder();
for (CostingSegment costingSegment : costingSegments) {
ProcessingCost cost = costingSegment.getProcessingCost();
Preconditions.checkState(cost.isValid());
Preconditions.checkState(cost.isValid(), "Segment cost is invalid! %s", cost);
Preconditions.checkState(
cost.getNumInstancesExpected() == getAdjustedInstanceCount());
}

View File

@@ -1045,7 +1045,8 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
*/
public void computeRowConsumptionAndProductionToCost() {
Preconditions.checkState(processingCost_.isValid(),
"Processing cost of PlanNode " + getDisplayLabel() + " is invalid!");
"Processing cost of PlanNode %s is invalid! %s", getDisplayLabel(),
processingCost_);
processingCost_.setNumRowToConsume(getInputCardinality());
processingCost_.setNumRowToProduce(getCardinality());
}
@@ -1279,6 +1280,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
filteredCardinality_ = newCardinality;
}
// May return -1.
// TODO: merge this with getCardinality().
protected long getFilteredCardinality() {
return filteredCardinality_ > -1 ? filteredCardinality_ : getCardinality();

View File

@@ -73,9 +73,9 @@ public class PlanRootSink extends DataSink {
if (queryOptions.isSpool_query_results() && queryOptions.getScratch_limit() != 0
&& !BackendConfig.INSTANCE.getScratchDirs().isEmpty()) {
// The processing cost to buffer these many rows in root.
processingCost_ =
ProcessingCost.basicCost(getLabel(), fragment_.getPlanRoot().getCardinality(),
ExprUtil.computeExprsTotalCost(outputExprs_));
long outputCardinality = Math.max(0, fragment_.getPlanRoot().getCardinality());
processingCost_ = ProcessingCost.basicCost(
getLabel(), outputCardinality, ExprUtil.computeExprsTotalCost(outputExprs_));
} else {
processingCost_ = ProcessingCost.zero();
}

View File

@@ -550,15 +550,23 @@ public class Planner {
public static void computeProcessingCost(
List<PlanFragment> planRoots, TQueryExecRequest request, PlannerContext planCtx) {
Analyzer rootAnalyzer = planCtx.getRootAnalyzer();
if (!rootAnalyzer.getQueryOptions().isCompute_processing_cost()) {
request.setCores_required(-1);
return;
}
TQueryOptions queryOptions = rootAnalyzer.getQueryOptions();
PlanFragment rootFragment = planRoots.get(0);
List<PlanFragment> postOrderFragments = rootFragment.getNodesPostOrder();
for (PlanFragment fragment : postOrderFragments) {
fragment.computeCostingSegment(rootAnalyzer.getQueryOptions());
List<PlanFragment> postOrderFragments = new ArrayList<>();
boolean testCostCalculation = queryOptions.isEnable_replan()
&& (RuntimeEnv.INSTANCE.isTestEnv() || queryOptions.isTest_replan());
if (queryOptions.isCompute_processing_cost() || testCostCalculation) {
postOrderFragments = rootFragment.getNodesPostOrder();
for (PlanFragment fragment : postOrderFragments) {
fragment.computeCostingSegment(queryOptions);
}
}
// Only do parallelism adjustment if COMPUTE_PROCESSING_COST is enabled.
if (!queryOptions.isCompute_processing_cost()) {
request.setCores_required(-1);
return;
}
if (LOG.isTraceEnabled()) {
@@ -571,7 +579,7 @@ public class Planner {
computeEffectiveParallelism(postOrderFragments,
rootAnalyzer.getMinParallelismPerNode(), rootAnalyzer.getMaxParallelismPerNode(),
rootAnalyzer.getQueryOptions());
queryOptions);
// Count bounded core count. This is taken from final instance count from previous
// step.

View File

@@ -80,10 +80,6 @@ public abstract class ProcessingCost implements Cloneable {
Math.max(0, cardinality), exprsCost, materializationCost);
}
private static ProcessingCost computeValidBaseCost(double totalCost) {
return new BaseProcessingCost(totalCost);
}
public static ProcessingCost basicCost(
String label, long cardinality, float exprsCost, float materializationCost) {
ProcessingCost processingCost =
@@ -99,10 +95,20 @@ public abstract class ProcessingCost implements Cloneable {
return processingCost;
}
/**
* Create new ProcessingCost.
* 'totalCost' must not be a negative, NaN, or infinite.
*/
public static ProcessingCost basicCost(String label, double totalCost) {
ProcessingCost processingCost = computeValidBaseCost(totalCost);
processingCost.setLabel(label);
return processingCost;
try {
ProcessingCost processingCost = new BaseProcessingCost(totalCost);
processingCost.setLabel(label);
return processingCost;
} catch (IllegalArgumentException ex) {
// Rethrow with label mentioned in the exception message.
throw new IllegalArgumentException(
String.format("Invalid totalCost supplied for %s", label), ex);
}
}
/**

View File

@@ -26,7 +26,8 @@ public class ScaledProcessingCost extends ProcessingCost {
private final long multiplier_;
protected ScaledProcessingCost(ProcessingCost cost, long multiplier) {
Preconditions.checkArgument(cost.isValid(), "ScaledProcessingCost: cost is invalid!");
Preconditions.checkArgument(
cost.isValid(), "ScaledProcessingCost: cost is invalid! %s", cost);
Preconditions.checkArgument(
multiplier >= 0, "ScaledProcessingCost: multiplier must be non-negative!");
cost_ = cost;

View File

@@ -345,6 +345,7 @@ abstract public class ScanNode extends PlanNode {
return capInputCardinalityWithLimit(inputCardinality_);
}
// May return -1.
// TODO: merge this with getInputCardinality().
public long getFilteredInputCardinality() {
return capInputCardinalityWithLimit(
@@ -396,8 +397,6 @@ abstract public class ScanNode extends PlanNode {
* number of scan ranges and related query options.
*/
protected int computeMaxScannerThreadsForCPC(TQueryOptions queryOptions) {
Preconditions.checkArgument(queryOptions.isCompute_processing_cost());
// maxThread calculation below intentionally does not include core count from
// executor group config. This is to allow scan fragment parallelism to scale
// regardless of the core count limit.
@@ -417,8 +416,6 @@ abstract public class ScanNode extends PlanNode {
* the return value of this method.
*/
protected ProcessingCost computeScanProcessingCost(TQueryOptions queryOptions) {
Preconditions.checkArgument(queryOptions.isCompute_processing_cost());
int maxScannerThreads = computeMaxScannerThreadsForCPC(queryOptions);
long inputCardinality = getFilteredInputCardinality();

View File

@@ -473,10 +473,12 @@ public class SortNode extends PlanNode {
// TODO: Benchmark partial sort cost separately.
// TODO: Improve this for larger spilling sorts.
double totalCost = 0.0F;
long inputCardinality = getChild(0).getFilteredCardinality();
long inputCardinality = Math.max(0, getChild(0).getFilteredCardinality());
double log2InputCardinality =
inputCardinality <= 0 ? 0.0 : (Math.log(inputCardinality) / Math.log(2));
if (type_ == TSortType.TOTAL || type_ == TSortType.PARTIAL) {
if (avgRowSize_ <= 10) {
totalCost = inputCardinality * (Math.log(inputCardinality) / Math.log(2))
totalCost = inputCardinality * log2InputCardinality
* COST_COEFFICIENT_SORT_TOTAL_SMALL_ROW;
} else {
double fullInputSize = inputCardinality * avgRowSize_;
@@ -487,8 +489,7 @@ public class SortNode extends PlanNode {
Preconditions.checkState(
type_ == TSortType.TOPN || type_ == TSortType.PARTITIONED_TOPN);
// Benchmarked TopN sort costs were ~ NlogN rows.
totalCost = inputCardinality * (Math.log(inputCardinality) / Math.log(2))
* COST_COEFFICIENT_SORT_TOPN;
totalCost = inputCardinality * log2InputCardinality * COST_COEFFICIENT_SORT_TOPN;
}
if (LOG.isTraceEnabled()) {
LOG.trace("Sort CPU cost estimate: " + totalCost + ", Type: " + type_

View File

@@ -26,8 +26,10 @@ public class SumProcessingCost extends ProcessingCost {
private final ProcessingCost cost2_;
protected SumProcessingCost(ProcessingCost cost1, ProcessingCost cost2) {
Preconditions.checkArgument(cost1.isValid(), "SumProcessingCost: cost1 is invalid!");
Preconditions.checkArgument(cost2.isValid(), "SumProcessingCost: cost2 is invalid!");
Preconditions.checkArgument(
cost1.isValid(), "SumProcessingCost: cost1 is invalid! %s", cost1);
Preconditions.checkArgument(
cost2.isValid(), "SumProcessingCost: cost2 is invalid! %s", cost2);
cost1_ = cost1;
cost2_ = cost2;
}

View File

@@ -164,7 +164,7 @@ public class UnionNode extends PlanNode {
PlanNode child = children_.get(i);
if (child.cardinality_ >= 0) {
totalMaterializedCardinality =
checkedAdd(totalMaterializedCardinality, child.cardinality_);
checkedAdd(totalMaterializedCardinality, Math.max(0, child.cardinality_));
}
}
long estBytesMaterialized =

View File

@@ -20,13 +20,25 @@ package org.apache.impala.analysis;
import org.apache.impala.analysis.AnalysisContext.AnalysisResult;
import org.apache.impala.common.FrontendTestBase;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.RuntimeEnv;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Tests computeNumDistinctValues() estimates for Exprs
*/
public class ExprNdvTest extends FrontendTestBase {
@BeforeClass
public static void setUpClass() throws Exception {
RuntimeEnv.INSTANCE.setTestEnv(true);
}
@AfterClass
public static void cleanUpClass() {
RuntimeEnv.INSTANCE.reset();
}
public void verifyNdv(String expr, long expectedNdv)
throws ImpalaException {

View File

@@ -18,6 +18,7 @@
package org.apache.impala.planner;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.Arrays;
@@ -25,11 +26,14 @@ import java.util.List;
import java.util.Set;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.RuntimeEnv;
import org.apache.impala.service.Frontend.PlanCtx;
import org.apache.impala.testutil.TestUtils;
import org.apache.impala.thrift.QueryConstants;
import org.apache.impala.thrift.TQueryCtx;
import org.apache.impala.thrift.TQueryOptions;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.collect.ImmutableSet;
@@ -42,6 +46,16 @@ public class CardinalityTest extends PlannerTestBase {
private static double CARDINALITY_TOLERANCE = 0.05;
@BeforeClass
public static void setUpClass() throws Exception {
RuntimeEnv.INSTANCE.setTestEnv(true);
}
@AfterClass
public static void cleanUpClass() {
RuntimeEnv.INSTANCE.reset();
}
/**
* Test the happy path: table with stats, no all-null cols.
*/
@@ -1117,6 +1131,9 @@ public class CardinalityTest extends PlannerTestBase {
// the distributed plan).
PlanNode currentNode = plan.get(plan.size() - 1).getPlanRoot();
for (Integer currentChildIndex: path) {
assertTrue(currentNode.getDisplayLabel() + " does not have child index "
+ currentChildIndex,
currentNode.hasChild(currentChildIndex));
currentNode = currentNode.getChild(currentChildIndex);
}
assertEquals("PlanNode class not matched: ", cl.getName(),

View File

@@ -200,3 +200,81 @@ max-parallelism=1 segment-costs=[28]
tuple-ids=0 row-size=12B cardinality=20 cost=3
in pipelines: 00(GETNEXT)
====
# IMPALA-13152: Regression test for TOP-N query over empty table.
select field, rk from (
select
field, f2,
row_number()
over (partition by field order by f2) rk
from functional.emptytable
) b
where rk = 1;
---- PARALLELPLANS
Max Per-Host Resource Reservation: Memory=48.00MB Threads=4
Per-Host Resource Estimates: Memory=48MB
F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
| Per-Instance Resources: mem-estimate=4.06MB mem-reservation=4.00MB thread-reservation=1
| max-parallelism=1 segment-costs=[0] cpu-comparison-result=2 [max(1 (self) vs 2 (sum children))]
PLAN-ROOT SINK
| output exprs: field, row_number()
| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 cost=0
|
06:EXCHANGE [UNPARTITIONED]
| mem-estimate=64.00KB mem-reservation=0B thread-reservation=0
| tuple-ids=4,3 row-size=24B cardinality=0 cost=0
| in pipelines: 05(GETNEXT)
|
F01:PLAN FRAGMENT [HASH(field)] hosts=1 instances=2 (adjusted from 1)
Per-Instance Resources: mem-estimate=16.12MB mem-reservation=16.00MB thread-reservation=1
max-parallelism=1 segment-costs=[0, 0] cpu-comparison-result=2 [max(2 (self) vs 1 (sum children))]
03:SELECT
| predicates: row_number() = CAST(1 AS BIGINT)
| mem-estimate=0B mem-reservation=0B thread-reservation=0
| tuple-ids=4,3 row-size=24B cardinality=0 cost=0
| in pipelines: 05(GETNEXT)
|
02:ANALYTIC
| functions: row_number()
| partition by: field
| order by: f2 ASC
| window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
| tuple-ids=4,3 row-size=24B cardinality=0 cost=0
| in pipelines: 05(GETNEXT)
|
05:TOP-N
| partition by: field
| order by: f2 ASC
| partition limit: 1
| mem-estimate=12.00MB mem-reservation=12.00MB spill-buffer=2.00MB thread-reservation=0
| tuple-ids=4 row-size=16B cardinality=0 cost=0
| in pipelines: 05(GETNEXT), 01(OPEN)
|
04:EXCHANGE [HASH(field)]
| mem-estimate=20.00KB mem-reservation=0B thread-reservation=0
| tuple-ids=4 row-size=16B cardinality=0 cost=0
| in pipelines: 01(GETNEXT)
|
F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
Per-Instance Resources: mem-estimate=12.16MB mem-reservation=12.00MB thread-reservation=1
max-parallelism=1 segment-costs=[0, 0]
01:TOP-N
| partition by: field
| order by: f2 ASC
| partition limit: 1
| source expr: row_number() = CAST(1 AS BIGINT)
| mem-estimate=12.00MB mem-reservation=12.00MB spill-buffer=2.00MB thread-reservation=0
| tuple-ids=4 row-size=16B cardinality=0 cost=0
| in pipelines: 01(GETNEXT), 00(OPEN)
|
00:SCAN HDFS [functional.emptytable, RANDOM]
partitions=0/0 files=0 size=0B
stored statistics:
table: rows=unavailable size=unavailable
partitions: 0/0 rows=0
columns missing stats: field
extrapolated-rows=disabled max-scan-range-rows=0
mem-estimate=0B mem-reservation=0B thread-reservation=0
tuple-ids=0 row-size=16B cardinality=0 cost=0
in pipelines: 00(GETNEXT)
====

View File

@@ -30,29 +30,29 @@ SumCost: cost-total=956 max-instances=1 adj-instances=1 cost/inst=956 #cons:#pro
PLAN-ROOT SINK
| output exprs: s_store_name, s_store_id, sum(CASE WHEN (d_day_name = 'Sunday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Monday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Tuesday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Wednesday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Thursday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Friday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Saturday') THEN ss_sales_price ELSE NULL END)
| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 cost=900
| cost-total=900 max-instances=1 adj-instances=1 cost/inst=900 #cons:#prod=100:100 reduction=1.0 cost/cons=9.0 cost/prod=9.0 total-cost=900.0 cardinality=100
| cost-total=900 max-instances=1 adj-instances=1 cost/inst=900 #cons:#prod=100:100 reduction=1.0 cost/cons=9.0 cost/prod=9.0 raw-cost=900.0 cardinality=100
|
11:MERGING-EXCHANGE [UNPARTITIONED]
order by: s_store_name ASC, s_store_id ASC, sum(CASE WHEN (d_day_name = 'Sunday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name = 'Monday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name = 'Tuesday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name = 'Wednesday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name = 'Thursday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name = 'Friday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name = 'Saturday') THEN ss_sales_price ELSE NULL END) ASC
limit: 100
mem-estimate=157.71KB mem-reservation=0B thread-reservation=0
cost-total=56 max-instances=1 adj-instances=1 cost/inst=56 #cons:#prod=100:100 reduction=1.0 cost/cons=0.56 cost/prod=0.56 total-cost=56.476
cost-total=56 max-instances=1 adj-instances=1 cost/inst=56 #cons:#prod=100:100 reduction=1.0 cost/cons=0.56 cost/prod=0.56 raw-cost=56.476
tuple-ids=4 row-size=156B cardinality=100 cost=56
in pipelines: 06(GETNEXT)
F03:PLAN FRAGMENT [HASH(s_store_name,s_store_id)] hosts=10 instances=10 (adjusted from 120)
Per-Instance Resources: mem-estimate=28.84MB mem-reservation=1.94MB thread-reservation=1
max-parallelism=10 segment-costs=[48483, 48650, 439] cpu-comparison-result=120 [max(10 (self) vs 120 (sum children))]
cost-total=439 max-instances=1 adj-instances=10 cost/inst=44 #cons:#prod=100:100 reduction=1.0 cost/cons=4.39 cost/prod=4.39 total-cost=439.9666
cost-total=48650 max-instances=1 adj-instances=10 cost/inst=4865 #cons:#prod=6780:100 reduction=67.8 cost/cons=7.175516 cost/prod=486.5 total-cost=48650.03791799733
cost-total=439 max-instances=1 adj-instances=10 cost/inst=44 #cons:#prod=100:100 reduction=1.0 cost/cons=4.39 cost/prod=4.39 raw-cost=439.9666
cost-total=48650 max-instances=1 adj-instances=10 cost/inst=4865 #cons:#prod=6780:100 reduction=67.8 cost/cons=7.175516 cost/prod=486.5 raw-cost=48650.03791799733
SumCost: cost-total=48483 max-instances=1 adj-instances=10 cost/inst=4849 #cons:#prod=6780:6780 reduction=1.0 cost/cons=7.150885 cost/prod=7.150885
DATASTREAM SINK [FRAGMENT=F04, EXCHANGE=11, UNPARTITIONED]
| mem-estimate=639.76KB mem-reservation=0B thread-reservation=0 cost=439
| cost-total=439 max-instances=1 adj-instances=10 cost/inst=44 #cons:#prod=100:100 reduction=1.0 cost/cons=4.39 cost/prod=4.39 total-cost=439.9666
| cost-total=439 max-instances=1 adj-instances=10 cost/inst=44 #cons:#prod=100:100 reduction=1.0 cost/cons=4.39 cost/prod=4.39 raw-cost=439.9666
06:TOP-N [LIMIT=100]
| order by: s_store_name ASC, s_store_id ASC, sum(CASE WHEN (d_day_name = 'Sunday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name = 'Monday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name = 'Tuesday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name = 'Wednesday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name = 'Thursday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name = 'Friday') THEN ss_sales_price ELSE NULL END) ASC, sum(CASE WHEN (d_day_name = 'Saturday') THEN ss_sales_price ELSE NULL END) ASC
| mem-estimate=15.23KB mem-reservation=0B thread-reservation=0
| cost-total=48650 max-instances=1 adj-instances=10 cost/inst=4865 #cons:#prod=6780:100 reduction=67.8 cost/cons=7.175516 cost/prod=486.5 total-cost=48650.03791799733
| cost-total=48650 max-instances=1 adj-instances=10 cost/inst=4865 #cons:#prod=6780:100 reduction=67.8 cost/cons=7.175516 cost/prod=486.5 raw-cost=48650.03791799733
| tuple-ids=4 row-size=156B cardinality=100 cost=48650
| in pipelines: 06(GETNEXT), 10(OPEN)
|
@@ -66,7 +66,7 @@ SumCost: cost-total=48483 max-instances=1 adj-instances=10 cost/inst=4849 #cons:
|
09:EXCHANGE [HASH(s_store_name,s_store_id)]
mem-estimate=18.84MB mem-reservation=0B thread-reservation=0
cost-total=5616 max-instances=1 adj-instances=10 cost/inst=562 #cons:#prod=6780:6780 reduction=1.0 cost/cons=0.8283186 cost/prod=0.8283186 total-cost=5616.677
cost-total=5616 max-instances=1 adj-instances=10 cost/inst=562 #cons:#prod=6780:6780 reduction=1.0 cost/cons=0.8283186 cost/prod=0.8283186 raw-cost=5616.677
tuple-ids=3 row-size=156B cardinality=6.78K cost=5616
in pipelines: 01(GETNEXT)
@@ -74,11 +74,11 @@ F00:PLAN FRAGMENT [RANDOM] hosts=10 instances=120
Per-Host Shared Resources: mem-estimate=2.00MB mem-reservation=2.00MB thread-reservation=0 runtime-filters-memory=2.00MB
Per-Instance Resources: mem-estimate=32.25MB mem-reservation=11.00MB thread-reservation=1
max-parallelism=330 segment-costs=[3290070641, 71580] cpu-comparison-result=120 [max(120 (self) vs 22 (sum children))]
cost-total=71580 max-instances=1 adj-instances=120 cost/inst=597 #cons:#prod=6780:6780 reduction=1.0 cost/cons=10.557522 cost/prod=10.557522 total-cost=71580.922
cost-total=71580 max-instances=1 adj-instances=120 cost/inst=597 #cons:#prod=6780:6780 reduction=1.0 cost/cons=10.557522 cost/prod=10.557522 raw-cost=71580.922
SumCost: cost-total=3290070641 max-instances=330 adj-instances=120 cost/inst=27417254 #cons:#prod=8639935193:888242617 reduction=9.726999 cost/cons=0.38079804 cost/prod=3.7040224
DATASTREAM SINK [FRAGMENT=F03, EXCHANGE=09, HASH(s_store_name,s_store_id)]
| mem-estimate=6.25MB mem-reservation=0B thread-reservation=0 cost=71580
| cost-total=71580 max-instances=1 adj-instances=120 cost/inst=597 #cons:#prod=6780:6780 reduction=1.0 cost/cons=10.557522 cost/prod=10.557522 total-cost=71580.922
| cost-total=71580 max-instances=1 adj-instances=120 cost/inst=597 #cons:#prod=6780:6780 reduction=1.0 cost/cons=10.557522 cost/prod=10.557522 raw-cost=71580.922
05:AGGREGATE [STREAMING]
| output: sum(CASE WHEN (d_day_name = 'Sunday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Monday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Tuesday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Wednesday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Thursday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Friday') THEN ss_sales_price ELSE NULL END), sum(CASE WHEN (d_day_name = 'Saturday') THEN ss_sales_price ELSE NULL END)
| group by: s_store_name, s_store_id
@@ -92,7 +92,7 @@ SumCost: cost-total=3290070641 max-instances=330 adj-instances=120 cost/inst=274
| hash predicates: ss_store_sk = s_store_sk
| fk/pk conjuncts: ss_store_sk = s_store_sk
| mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB thread-reservation=0
| cost-total=614141162 max-instances=62 adj-instances=120 cost/inst=5117843 #cons:#prod=1766829189:888242617 reduction=1.9891291 cost/cons=0.3475951 cost/prod=0.6914115 total-cost=6.141411629949E8
| cost-total=614141162 max-instances=62 adj-instances=120 cost/inst=5117843 #cons:#prod=1766829189:888242617 reduction=1.9891291 cost/cons=0.3475951 cost/prod=0.6914115 raw-cost=6.141411629949E8
| tuple-ids=1,0,2 row-size=91B cardinality=888.24M cost=614141162
| in pipelines: 01(GETNEXT), 02(OPEN)
|
@@ -101,7 +101,7 @@ SumCost: cost-total=3290070641 max-instances=330 adj-instances=120 cost/inst=274
| hash predicates: ss_sold_date_sk = d_date_sk
| fk/pk conjuncts: ss_sold_date_sk = d_date_sk
| mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB thread-reservation=0
| cost-total=773340988 max-instances=78 adj-instances=120 cost/inst=6444509 #cons:#prod=8639935566:1766828853 reduction=4.890081 cost/cons=0.08950773 cost/prod=0.4377 total-cost=7.733409889581001E8
| cost-total=773340988 max-instances=78 adj-instances=120 cost/inst=6444509 #cons:#prod=8639935566:1766828853 reduction=4.890081 cost/cons=0.08950773 cost/prod=0.4377 raw-cost=7.733409889581001E8
| tuple-ids=1,0 row-size=39B cardinality=1.77G cost=773340988
| in pipelines: 01(GETNEXT), 00(OPEN)
|
@@ -115,7 +115,7 @@ SumCost: cost-total=3290070641 max-instances=330 adj-instances=120 cost/inst=274
extrapolated-rows=disabled max-scan-range-rows=390.22M est-scan-range=374(filtered from 1824)
file formats: [PARQUET]
mem-estimate=16.00MB mem-reservation=8.00MB thread-reservation=0
cost-total=305308025 max-instances=31 adj-instances=120 cost/inst=2544234 #cons:#prod=8639935193:8639935193 reduction=1.0 cost/cons=0.03533684 cost/prod=0.03533684 total-cost=3.053080257984E8
cost-total=305308025 max-instances=31 adj-instances=120 cost/inst=2544234 #cons:#prod=8639935193:8639935193 reduction=1.0 cost/cons=0.03533684 cost/prod=0.03533684 raw-cost=3.053080257984E8
tuple-ids=1 row-size=12B cardinality=1.77G(filtered from 8.64G) cost=305308025
in pipelines: 01(GETNEXT)
@@ -128,7 +128,7 @@ SumCost: cost-total=776 max-instances=1 adj-instances=10 cost/inst=78 #cons:#pro
| build expressions: s_store_sk
| runtime filters: RF000[bloom] <- s_store_sk, RF001[min_max] <- s_store_sk
| mem-estimate=23.25MB mem-reservation=23.25MB spill-buffer=64.00KB thread-reservation=0 cost=336
| cost-total=336 max-instances=1 adj-instances=10 cost/inst=34 #cons:#prod=336:336 reduction=1.0 cost/cons=1.0 cost/prod=1.0 total-cost=336.0
| cost-total=336 max-instances=1 adj-instances=10 cost/inst=34 #cons:#prod=336:336 reduction=1.0 cost/cons=1.0 cost/prod=1.0 raw-cost=336.0
|
08:EXCHANGE [BROADCAST]
mem-estimate=35.40KB mem-reservation=0B thread-reservation=0
@@ -142,7 +142,7 @@ max-parallelism=1 segment-costs=[635]
SumCost: cost-total=635 max-instances=1 adj-instances=1 cost/inst=635 #cons:#prod=1350:336 reduction=4.017857 cost/cons=0.47037038 cost/prod=1.8898809
DATASTREAM SINK [FRAGMENT=F05, EXCHANGE=08, BROADCAST]
| mem-estimate=223.76KB mem-reservation=0B thread-reservation=0 cost=54
| cost-total=54 max-instances=1 adj-instances=1 cost/inst=54 #cons:#prod=336:336 reduction=1.0 cost/cons=0.16071428 cost/prod=0.16071428 total-cost=54.3753
| cost-total=54 max-instances=1 adj-instances=1 cost/inst=54 #cons:#prod=336:336 reduction=1.0 cost/cons=0.16071428 cost/prod=0.16071428 raw-cost=54.3753
02:SCAN HDFS [tpcds_partitioned_parquet_snap.store, RANDOM]
HDFS partitions=1/1 files=1 size=9.81KB
predicates: s_gmt_offset = CAST(-6 AS DECIMAL(3,0))
@@ -154,7 +154,7 @@ SumCost: cost-total=635 max-instances=1 adj-instances=1 cost/inst=635 #cons:#pro
parquet dictionary predicates: s_gmt_offset = CAST(-6 AS DECIMAL(3,0))
file formats: [PARQUET]
mem-estimate=16.00MB mem-reservation=32.00KB thread-reservation=0
cost-total=581 max-instances=1 adj-instances=1 cost/inst=581 #cons:#prod=1350:336 reduction=4.017857 cost/cons=0.43037036 cost/prod=1.7291666 total-cost=581.0742
cost-total=581 max-instances=1 adj-instances=1 cost/inst=581 #cons:#prod=1350:336 reduction=4.017857 cost/cons=0.43037036 cost/prod=1.7291666 raw-cost=581.0742
tuple-ids=2 row-size=52B cardinality=336 cost=581
in pipelines: 02(GETNEXT)
@@ -167,7 +167,7 @@ SumCost: cost-total=863 max-instances=1 adj-instances=10 cost/inst=87 #cons:#pro
| build expressions: d_date_sk
| runtime filters: RF002[bloom] <- d_date_sk
| mem-estimate=23.25MB mem-reservation=23.25MB spill-buffer=64.00KB thread-reservation=0 cost=373
| cost-total=373 max-instances=1 adj-instances=10 cost/inst=38 #cons:#prod=373:373 reduction=1.0 cost/cons=1.0 cost/prod=1.0 total-cost=373.0
| cost-total=373 max-instances=1 adj-instances=10 cost/inst=38 #cons:#prod=373:373 reduction=1.0 cost/cons=1.0 cost/prod=1.0 raw-cost=373.0
|
07:EXCHANGE [BROADCAST]
mem-estimate=21.23KB mem-reservation=0B thread-reservation=0
@@ -181,7 +181,7 @@ max-parallelism=1 segment-costs=[18016]
SumCost: cost-total=18016 max-instances=1 adj-instances=1 cost/inst=18016 #cons:#prod=73049:373 reduction=195.84183 cost/cons=0.24662897 cost/prod=48.300266
DATASTREAM SINK [FRAGMENT=F06, EXCHANGE=07, BROADCAST]
| mem-estimate=124.57KB mem-reservation=0B thread-reservation=0 cost=35
| cost-total=35 max-instances=1 adj-instances=1 cost/inst=35 #cons:#prod=373:373 reduction=1.0 cost/cons=0.09383378 cost/prod=0.09383378 total-cost=35.391600000000004
| cost-total=35 max-instances=1 adj-instances=1 cost/inst=35 #cons:#prod=373:373 reduction=1.0 cost/cons=0.09383378 cost/prod=0.09383378 raw-cost=35.391600000000004
00:SCAN HDFS [tpcds_partitioned_parquet_snap.date_dim, RANDOM]
HDFS partitions=1/1 files=1 size=2.17MB
predicates: d_year = CAST(1999 AS INT)
@@ -193,7 +193,7 @@ SumCost: cost-total=18016 max-instances=1 adj-instances=1 cost/inst=18016 #cons:
parquet dictionary predicates: d_year = CAST(1999 AS INT)
file formats: [PARQUET]
mem-estimate=16.00MB mem-reservation=512.00KB thread-reservation=0
cost-total=17981 max-instances=1 adj-instances=1 cost/inst=17981 #cons:#prod=73049:373 reduction=195.84183 cost/cons=0.24614984 cost/prod=48.206436 total-cost=17981.5393
cost-total=17981 max-instances=1 adj-instances=1 cost/inst=17981 #cons:#prod=73049:373 reduction=195.84183 cost/cons=0.24614984 cost/prod=48.206436 raw-cost=17981.5393
tuple-ids=0 row-size=27B cardinality=373 cost=17981
in pipelines: 00(GETNEXT)
====