IMPALA-13902: Calcite planner: Implement is_spool_query_results

The is_spool_query_results query option is now supported in Calcite. The
returnAtMostOneRow method is now implemented to support this.
PlanRootSink is refactored to extract sanitizing query options (a new
method sanitizeSpoolingOptions()) out of
PlanRootSink.computeResourceProfile(). The bulk of memory bounding
calculation is also extracted out to a new class SpoolingMemoryBound.

Added "sleep" in ImpalaOperatorTable.java since some EE tests related to
result spooling calls sleep() function. Changed ImpalaPlanRel to extends
RelNode interface.

A sanity test has been added to calcite.test, but the bulk of the
testing will be done through the Impala test framework when it is
enabled.

Testing:
- Pass FE tests PlannerTest#testResultSpooling, TpcdsCpuCostPlannerTest,
  and all java tests under calcite-planner project.
- Pass query_test/test_result_spooling.py and
  custom_cluster/test_result_spooling.py.

Co-authored-by: Riza Suminto

Change-Id: I5b9bf49e2874ee12de212b892bd898c296774c6f
Reviewed-on: http://gerrit.cloudera.org:8080/23562
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Steve Carlin
2025-08-29 08:18:02 -07:00
committed by Impala Public Jenkins
parent 898e03e9d5
commit bc99705252
14 changed files with 327 additions and 168 deletions

View File

@@ -497,17 +497,6 @@ public class AnalysisContext {
authzChecker.postAnalyze(authzCtx);
ImpalaException analysisException = analysisResult_.getException();
// A statement that returns at most one row does not need to spool query results.
// IMPALA-13902: returnsAtMostOneRow should be in planner interface so it is
// accessible by the Calcite planner.
if (analysisException == null && analysisResult_.getStmt() instanceof SelectStmt &&
((SelectStmt)analysisResult_.getStmt()).returnsAtMostOneRow()) {
clientRequest.query_options.setSpool_query_results(false);
if (LOG.isTraceEnabled()) {
LOG.trace("Result spooling is disabled due to the statement returning at most "
+ "one row.");
}
}
long durationMs = timeline_.markEvent("Analysis finished") / 1000000;
LOG.info("Analysis took {} ms", durationMs);

View File

@@ -20,7 +20,6 @@ package org.apache.impala.analysis;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import com.google.common.collect.Sets;
@@ -29,8 +28,6 @@ import org.apache.impala.catalog.Type;
import org.apache.impala.catalog.View;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.SqlCastException;
import org.apache.impala.planner.DataSink;
import org.apache.impala.planner.PlanRootSink;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
@@ -503,9 +500,12 @@ public abstract class QueryStmt extends StatementBase {
resultExprs_ = Expr.substituteList(resultExprs_, smap, analyzer, true);
}
public DataSink createDataSink(List<Expr> resultExprs) {
return new PlanRootSink(resultExprs);
}
/**
* Return True if this statement is eligible for result spooling.
* Planner can still decide to disable result spooling if there are other factors
* during analysis that prevents spooling from happening.
*/
public boolean canSpoolResult() { return true; }
public List<OrderByElement> cloneOrderByElements() {
if (orderByElements_ == null) return null;

View File

@@ -1915,4 +1915,9 @@ public class SelectStmt extends QueryStmt {
(sortInfo_ != null &&
TreeNode.contains(sortInfo_.getSortExprs(), Expr.IS_AGGREGATE));
}
@Override
public boolean canSpoolResult() {
return !returnsAtMostOneRow();
}
}

View File

@@ -19,6 +19,7 @@ package org.apache.impala.planner;
import java.util.List;
import com.google.common.base.MoreObjects;
import org.apache.impala.analysis.Expr;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TDataSink;
@@ -45,11 +46,33 @@ public class PlanRootSink extends DataSink {
// IMPALA-4268 for details on how this value was chosen.
private static final long DEFAULT_RESULT_SPOOLING_ESTIMATED_MEMORY = 10 * 1024 * 1024;
/**
* Create a PlanRootSink. This method will sanitize spooling-related query options
* within given PlannerContext before creating the PlanRootSink. Once sanitized here,
* the spooling-related query options is safe to use directly and should not be modified
* anywhere else.
* @param ctx the planner context.
* @param outputExprs the output expressions for the PlanRootSink.
* @param requireSpooling hard override to disable result spooling that can be supplied
* by caller.
* @return the created PlanRootSink.
*/
public static PlanRootSink create(
PlannerContext ctx, List<Expr> outputExprs, boolean requireSpooling) {
sanitizeSpoolingOptions(ctx.getQueryOptions(), requireSpooling);
return new PlanRootSink(outputExprs);
}
// One expression per result column for the query.
private final List<Expr> outputExprs_;
/**
* DEPRECATED.
* Use {@link PlanRootSink#create(PlannerContext, List, boolean)} instead.
* This constructor will be made private. It is only temporarily preserved to prevent
* compilation error.
*/
public PlanRootSink(List<Expr> outputExprs) {
Preconditions.checkState(outputExprs != null);
outputExprs_ = outputExprs;
}
@@ -70,8 +93,10 @@ public class PlanRootSink extends DataSink {
@Override
public void computeProcessingCost(TQueryOptions queryOptions) {
if (queryOptions.isSpool_query_results() && queryOptions.getScratch_limit() != 0
&& !BackendConfig.INSTANCE.getScratchDirs().isEmpty()) {
// TODO: this sanitization should have already happened in PlanRootSink.create().
// This can be removed once PlanRootSink constructor is made private.
sanitizeSpoolingOptions(queryOptions, true);
if (queryOptions.isSpool_query_results()) {
// The processing cost to buffer these many rows in root.
long outputCardinality = Math.max(0, fragment_.getPlanRoot().getCardinality());
processingCost_ = ProcessingCost.basicCost(
@@ -98,115 +123,136 @@ public class PlanRootSink extends DataSink {
* the maximum between default spillable buffer size and MAX_ROW_SIZE (rounded up to
* nearest power of 2) to account for the read and write pages in the
* BufferedTupleStream used by the backend plan-root-sink. The maximum reservation is
* set to the query-level config MAX_PINNED_RESULT_SPOOLING_MEMORY.
*
* If SPOOL_QUERY_RESULTS is true but spill is disabled either due to SCRATCH_LIMIT = 0
* or SCRATCH_DIRS is empty, SPOOL_QUERY_RESULTS will be set to false. A ResourceProfile
* is returned with no reservation or buffer sizes, and the estimated memory consumption
* is 0.
* set to the query-level config MAX_SPILLED_RESULT_SPOOLING_MEM.
*/
@Override
public void computeResourceProfile(TQueryOptions queryOptions) {
if (queryOptions.isSpool_query_results()) {
// Check if we need to disable result spooling because we can not spill.
long scratchLimit = queryOptions.getScratch_limit();
String scratchDirs = BackendConfig.INSTANCE.getScratchDirs();
if (scratchLimit == 0 || scratchDirs.isEmpty()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Result spooling is disabled due to unavailability of scratch "
+ "space.");
}
queryOptions.setSpool_query_results(false);
resourceProfile_ = ResourceProfile.noReservation(0);
return;
}
long maxSpoolingMem = queryOptions.getMax_result_spooling_mem();
if (maxSpoolingMem <= 0) {
// max_result_spooling_mem = 0 means unbounded. But instead of setting unlimited
// memory reservation, we fallback to the default max_result_spooling_mem.
TQueryOptions defaults = new TQueryOptions();
maxSpoolingMem = defaults.getMax_result_spooling_mem();
queryOptions.setMax_result_spooling_mem(maxSpoolingMem);
}
long bufferSize = queryOptions.getDefault_spillable_buffer_size();
long maxRowBufferSize = PlanNode.computeMaxSpillableBufferSize(
bufferSize, queryOptions.getMax_row_size());
long minMemReservationBytes = 2 * maxRowBufferSize;
long maxMemReservationBytes = Math.max(maxSpoolingMem, minMemReservationBytes);
// User might set query option scratch_limit that is lower than either of
// minMemReservationBytes, maxMemReservationBytes, or
// max_spilled_result_spooling_mem. We define:
//
// maxAllowedScratchLimit = scratchLimit - maxRowBufferSize
//
// If maxAllowedScratchLimit < minMemReservationBytes, we fall back to use
// BlockingPlanRootSink in the backend by silently disabling result spooling.
// If maxAllowedScratchLimit < maxMemReservationBytes, we silently lower
// maxMemReservationBytes, max_result_spooling_mem, and
// max_spilled_result_spooling_mem accordingly to fit maxAllowedScratchLimit.
// Otherwise, do nothing.
//
// BufferedPlanRootSink may slightly exceed its maxMemReservationBytes when it
// decides to spill. maxRowBufferSize bytes is subtracted in maxAllowedScratchLimit
// to give extra space, ensuring spill does not exceed scratch_limit.
if (scratchLimit > -1) {
long maxAllowedScratchLimit = scratchLimit - maxRowBufferSize;
if (maxAllowedScratchLimit < minMemReservationBytes) {
if (LOG.isTraceEnabled()) {
LOG.trace("Result spooling is disabled due to low scratch_limit ("
+ scratchLimit + "). Try increasing scratch_limit to >= "
+ (minMemReservationBytes + maxRowBufferSize)
+ " to enable result spooling.");
}
queryOptions.setSpool_query_results(false);
resourceProfile_ = ResourceProfile.noReservation(0);
return;
} else if (maxAllowedScratchLimit < maxMemReservationBytes) {
maxMemReservationBytes = maxAllowedScratchLimit;
queryOptions.setMax_result_spooling_mem(maxAllowedScratchLimit);
if (LOG.isTraceEnabled()) {
LOG.trace("max_result_spooling_mem is lowered to " + maxMemReservationBytes
+ " to fit scratch_limit (" + scratchLimit + ").");
}
}
// If we got here, it means we can use BufferedPlanRootSink with at least
// minMemReservationBytes in memory. But the amount of memory we can spill to disk
// may still be limited by scratch_limit. Thus, we need to lower
// max_spilled_result_spooling_mem as necessary.
if (maxAllowedScratchLimit < queryOptions.getMax_spilled_result_spooling_mem()) {
queryOptions.setMax_spilled_result_spooling_mem(maxAllowedScratchLimit);
if (LOG.isTraceEnabled()) {
LOG.trace("max_spilled_result_spooling_mem is lowered to "
+ maxAllowedScratchLimit + " to fit scratch_limit ("
+ scratchLimit + ").");
}
}
}
PlanNode inputNode = fragment_.getPlanRoot();
long memEstimateBytes;
if (inputNode.getCardinality() == -1 || inputNode.getAvgRowSize() == -1) {
memEstimateBytes = DEFAULT_RESULT_SPOOLING_ESTIMATED_MEMORY;
} else {
long inputCardinality = Math.max(1L, inputNode.getCardinality());
memEstimateBytes = (long) Math.ceil(inputCardinality * inputNode.getAvgRowSize());
}
memEstimateBytes = Math.min(memEstimateBytes, maxMemReservationBytes);
// TODO: this sanitization should have already happened in PlanRootSink.create().
// This can be removed once PlanRootSink constructor is made private.
sanitizeSpoolingOptions(queryOptions, true);
if (!queryOptions.isSpool_query_results()) {
resourceProfile_ = ResourceProfile.noReservation(0);
} else {
SpoolingMemoryBound memBound = new SpoolingMemoryBound(queryOptions);
long memEstimateBytes = getMemEstimateBytes(memBound.maxMemReservationBytes_);
resourceProfile_ = new ResourceProfileBuilder()
.setMemEstimateBytes(memEstimateBytes)
.setMinMemReservationBytes(minMemReservationBytes)
.setMaxMemReservationBytes(maxMemReservationBytes)
.setMaxRowBufferBytes(maxRowBufferSize)
.setSpillableBufferBytes(bufferSize)
.setMinMemReservationBytes(memBound.minMemReservationBytes_)
.setMaxMemReservationBytes(memBound.maxMemReservationBytes_)
.setMaxRowBufferBytes(memBound.maxRowBufferSize_)
.setSpillableBufferBytes(memBound.bufferSize_)
.build();
}
}
private long getMemEstimateBytes(long maxMemReservationBytes) {
PlanNode inputNode = fragment_.getPlanRoot();
long memEstimateBytes;
if (inputNode.getCardinality() == -1 || inputNode.getAvgRowSize() == -1) {
memEstimateBytes = DEFAULT_RESULT_SPOOLING_ESTIMATED_MEMORY;
} else {
resourceProfile_ = ResourceProfile.noReservation(0);
long inputCardinality = Math.max(1L, inputNode.getCardinality());
memEstimateBytes = (long) Math.ceil(inputCardinality * inputNode.getAvgRowSize());
}
memEstimateBytes = Math.min(memEstimateBytes, maxMemReservationBytes);
return memEstimateBytes;
}
/**
* Helper method to disable SPOOL_QUERY_RESULTS options if it is not possible to do
* so. This method will check various limits around scratch space and disable spooling
* option if such limit will be exceeded. Once disabled, SPOOL_QUERY_RESULTS option
* should not be enabled anywhere else.
*
* If SPOOL_QUERY_RESULTS is true but spill is disabled either due to SCRATCH_LIMIT = 0
* or SCRATCH_DIRS is empty, SPOOL_QUERY_RESULTS will be set to false.
*
* If this method does not disable spooling option, sanitize MAX_RESULT_SPOOLING_MEM
* and MAX_SPILLED_RESULT_SPOOLING_MEM option into a valid value.
*
* @param queryOptions query options to potentially modify.
* @param requireSpooling hard override to disable result spooling that can be supplied
* by caller.
*/
private static void sanitizeSpoolingOptions(
TQueryOptions queryOptions, boolean requireSpooling) {
if (!requireSpooling || !queryOptions.isSpool_query_results()) {
queryOptions.setSpool_query_results(false);
return;
}
// Check if we need to disable result spooling because we can not spill.
long scratchLimit = queryOptions.getScratch_limit();
String scratchDirs = BackendConfig.INSTANCE.getScratchDirs();
if (scratchLimit == 0 || scratchDirs.isEmpty()) {
LOG.info("Result spooling is disabled due to unavailability of scratch space.");
queryOptions.setSpool_query_results(false);
return;
}
long maxSpoolingMem = queryOptions.getMax_result_spooling_mem();
if (maxSpoolingMem <= 0) {
// max_result_spooling_mem = 0 means unbounded. But instead of setting unlimited
// memory reservation, we fallback to the default max_result_spooling_mem.
TQueryOptions defaults = new TQueryOptions();
maxSpoolingMem = defaults.getMax_result_spooling_mem();
queryOptions.setMax_result_spooling_mem(maxSpoolingMem);
}
SpoolingMemoryBound memBound = new SpoolingMemoryBound(queryOptions);
if (!memBound.hasSufficientScratchLimit()) {
LOG.info("Result spooling is disabled due to low scratch_limit ("
+ memBound.scratchLimit_ + "). Try increasing scratch_limit to >= "
+ (memBound.minRequiredScratchLimit()) + " to enable result spooling.");
queryOptions.setSpool_query_results(false);
return;
}
// User might set query option scratch_limit that is lower than either of
// minMemReservationBytes, maxMemReservationBytes, or
// max_spilled_result_spooling_mem. We define:
//
// maxAllowedScratchLimit = scratchLimit - maxRowBufferSize
//
// If maxAllowedScratchLimit < minMemReservationBytes, we fall back to use
// BlockingPlanRootSink in the backend by silently disabling result spooling.
// If maxAllowedScratchLimit < maxMemReservationBytes, we silently lower
// max_result_spooling_mem and max_spilled_result_spooling_mem accordingly to fit
// maxAllowedScratchLimit. Otherwise, do nothing.
//
// BufferedPlanRootSink may slightly exceed its maxMemReservationBytes when it
// decides to spill. maxRowBufferSize bytes is subtracted in maxAllowedScratchLimit
// to give extra space, ensuring spill does not exceed scratch_limit.
if (memBound.hasBoundedScratchLimit()) {
Preconditions.checkState(memBound.scratchLimit_ > 0);
if (memBound.maxAllowedScratchLimit_ < memBound.minMemReservationBytes_) {
LOG.info("Result spooling is disabled due to low scratch_limit (" + scratchLimit
+ "). Try increasing scratch_limit to >= "
+ memBound.minRequiredScratchLimit() + " to enable result spooling.");
queryOptions.setSpool_query_results(false);
return;
}
if (memBound.maxMemReservationBytes_ < queryOptions.getMax_result_spooling_mem()) {
queryOptions.setMax_result_spooling_mem(memBound.maxMemReservationBytes_);
LOG.info("max_result_spooling_mem is lowered to "
+ memBound.maxMemReservationBytes_ + " to fit scratch_limit ("
+ memBound.scratchLimit_ + ").");
}
// If we got here, it means we can use BufferedPlanRootSink with at least
// minMemReservationBytes in memory. But the amount of memory we can spill to disk
// may still be limited by scratch_limit. Thus, we need to lower
// max_spilled_result_spooling_mem as necessary.
if (memBound.maxAllowedScratchLimit_
< queryOptions.getMax_spilled_result_spooling_mem()) {
queryOptions.setMax_spilled_result_spooling_mem(memBound.maxAllowedScratchLimit_);
LOG.info("max_spilled_result_spooling_mem is lowered to "
+ memBound.maxAllowedScratchLimit_ + " to fit scratch_limit ("
+ memBound.scratchLimit_ + ").");
}
}
}
@@ -232,4 +278,70 @@ public class PlanRootSink extends DataSink {
super.computeRowConsumptionAndProductionToCost();
fragment_.setFixedInstanceCount(fragment_.getNumInstances());
}
private static class SpoolingMemoryBound {
// Values taken as is from query options.
public final long scratchLimit_;
public final long bufferSize_;
public final long maxRowSize_;
public final long maxSpoolingMem_;
// Derived values from query options.
public final long maxRowBufferSize_;
public final long minMemReservationBytes_;
public final long maxMemReservationBytes_;
// Only valid if scratch limit is bounded.
public final long maxAllowedScratchLimit_;
public SpoolingMemoryBound(TQueryOptions queryOptions) {
Preconditions.checkArgument(queryOptions.isSpool_query_results());
Preconditions.checkArgument(queryOptions.getMax_result_spooling_mem() > 0);
scratchLimit_ = queryOptions.getScratch_limit();
bufferSize_ = queryOptions.getDefault_spillable_buffer_size();
maxRowSize_ = queryOptions.getMax_row_size();
maxSpoolingMem_ = queryOptions.getMax_result_spooling_mem();
maxRowBufferSize_ =
PlanNode.computeMaxSpillableBufferSize(bufferSize_, maxRowSize_);
minMemReservationBytes_ = 2 * maxRowBufferSize_;
long maxAllowedScratchLimit = queryOptions.getMax_spilled_result_spooling_mem();
long maxMemReservationBytes = Math.max(maxSpoolingMem_, minMemReservationBytes_);
if (hasBoundedScratchLimit()) {
maxAllowedScratchLimit = scratchLimit_ - maxRowBufferSize_;
if (maxAllowedScratchLimit < maxMemReservationBytes) {
maxMemReservationBytes = maxAllowedScratchLimit;
}
}
maxAllowedScratchLimit_ = maxAllowedScratchLimit;
maxMemReservationBytes_ = maxMemReservationBytes;
}
public boolean hasBoundedScratchLimit() { return scratchLimit_ > -1; }
public long minRequiredScratchLimit() {
return minMemReservationBytes_ + maxRowBufferSize_;
}
public boolean hasSufficientScratchLimit() {
return !hasBoundedScratchLimit()
|| maxAllowedScratchLimit_ >= minMemReservationBytes_;
}
@Override
public String toString() {
MoreObjects.ToStringHelper toStrHelper = MoreObjects.toStringHelper(this);
toStrHelper.add("scratchLimit", scratchLimit_);
toStrHelper.add("bufferSize", bufferSize_);
toStrHelper.add("maxRowSize", maxRowSize_);
toStrHelper.add("maxSpoolingMem", maxSpoolingMem_);
toStrHelper.add("maxRowBufferSize", maxRowBufferSize_);
toStrHelper.add("minMemReservationBytes", minMemReservationBytes_);
toStrHelper.add("maxMemReservationBytes", maxMemReservationBytes_);
toStrHelper.add("maxAllowedScratchLimit", maxAllowedScratchLimit_);
return toStrHelper.toString();
}
}
}

View File

@@ -2380,7 +2380,7 @@ public class SingleNodePlanner implements SingleNodePlannerIntf {
QueryStmt queryStmt = ctx_.getQueryStmt();
queryStmt.substituteResultExprs(rootNodeSmap, ctx_.getRootAnalyzer());
List<Expr> resultExprs = queryStmt.getResultExprs();
return ctx_.getAnalysisResult().getQueryStmt().createDataSink(resultExprs);
return PlanRootSink.create(ctx_, resultExprs, queryStmt.canSpoolResult());
}
@Override

View File

@@ -84,6 +84,7 @@ public class ImpalaOperatorTable extends ReflectiveSqlOperatorTable {
.add("regr_count")
.add("localtime")
.add("translate")
.add("sleep")
.build();
private static ImpalaOperatorTable INSTANCE;

View File

@@ -17,6 +17,8 @@
package org.apache.impala.calcite.rel.node;
import com.google.common.base.Preconditions;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.Filter;
@@ -31,8 +33,7 @@ import org.apache.impala.common.ImpalaException;
/**
* ImpalaPlanRel. Interface used for all Impala intermediary RelNodes
*/
public interface ImpalaPlanRel {
public interface ImpalaPlanRel extends RelNode {
/**
* Enum representing the type of class used in the RelNode
* Using an enum here so that Impala Plan RelNodes can be used in
@@ -99,8 +100,8 @@ public interface ImpalaPlanRel {
* mentioned below can be in between the Aggregate RelNode and the Table Scan
* RelNode.
*/
public static boolean canPassThroughParentAggregate(ImpalaPlanRel planRel) {
switch (getRelNodeType((RelNode) planRel)) {
public static boolean canPassThroughParentAggregate(RelNode relNode) {
switch (getRelNodeType(relNode)) {
case FILTER:
case PROJECT:
return true;

View File

@@ -17,28 +17,26 @@
package org.apache.impala.calcite.service;
import java.util.ArrayList;
import java.util.List;
import org.apache.calcite.rel.RelNode;
import org.apache.impala.analysis.AnalysisDriver;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.Values;
import org.apache.impala.analysis.ExprSubstitutionMap;
import org.apache.impala.analysis.ParsedStatement;
import org.apache.impala.calcite.rel.node.ImpalaPlanRel;
import org.apache.impala.calcite.rel.node.NodeWithExprs;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.planner.DataSink;
import org.apache.impala.planner.PlannerContext;
import org.apache.impala.planner.PlanNode;
import org.apache.impala.planner.PlanRootSink;
import org.apache.impala.planner.PlannerContext;
import org.apache.impala.planner.SingleNodePlannerIntf;
import org.apache.impala.thrift.TColumn;
import org.apache.impala.thrift.TResultSetMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* Implementation of the SingleNodePlannerIntf which returns a PlanNode
@@ -53,6 +51,7 @@ public class CalciteSingleNodePlanner implements SingleNodePlannerIntf {
private final CalciteAnalysisResult analysisResult_;
private NodeWithExprs rootNode_;
private List<String> fieldNames_;
private boolean returnsMoreThanOneRow_;
public CalciteSingleNodePlanner(PlannerContext ctx) {
ctx_ = ctx;
@@ -71,6 +70,8 @@ public class CalciteSingleNodePlanner implements SingleNodePlannerIntf {
new CalciteOptimizer(analysisResult_, ctx_.getTimeline());
ImpalaPlanRel optimizedPlan = optimizer.optimize(logicalPlan);
returnsMoreThanOneRow_ = returnsMoreThanOneRow(optimizedPlan);
// Create Physical Impala PlanNodes
CalcitePhysPlanCreator physPlanCreator =
new CalcitePhysPlanCreator(analysisResult_.getAnalyzer(), ctx_);
@@ -86,7 +87,47 @@ public class CalciteSingleNodePlanner implements SingleNodePlannerIntf {
*/
@Override
public DataSink createDataSink(ExprSubstitutionMap rootNodeSmap) {
return new PlanRootSink(rootNode_.outputExprs_);
return PlanRootSink.create(ctx_, rootNode_.outputExprs_, returnsMoreThanOneRow_);
}
private boolean returnsMoreThanOneRow(RelNode logicalPlan) {
return !isSingleRowValues(logicalPlan) && !hasOneRowAgg(logicalPlan);
}
private boolean isSingleRowValues(RelNode relNode) {
// A project will keep the row count the same. Theoretically, a filter
// can reduce the row count, but there should not be any filter
// over a values clause because the optimization rules should take
// care of this situation.
while (relNode instanceof Project) {
relNode = relNode.getInput(0);
}
if (!(relNode instanceof Values)) {
return false;
}
Values values = (Values) relNode;
return Values.isEmpty(values) || Values.isSingleValue(values);
}
/**
* Checks if there is an aggregation at the root that guarantees there
* will be at most one row. Avoid aggs that have groups via a group keyword
* or a distinct keyword.
*/
private boolean hasOneRowAgg(RelNode relNode) {
while (ImpalaPlanRel.canPassThroughParentAggregate(relNode)) {
relNode = relNode.getInput(0);
}
if (!(relNode instanceof Aggregate)) {
return false;
}
Aggregate agg = (Aggregate) relNode;
if (agg.getGroupCount() > 0 || agg.containsDistinctCall()) {
return false;
}
return true;
}
@Override

View File

@@ -250,7 +250,7 @@ public class ExecRequestCreator implements CompilerStep {
rootFragment.verifyTree();
List<Expr> resultExprs = outputExprs;
rootFragment.setSink(new PlanRootSink(resultExprs));
rootFragment.setSink(PlanRootSink.create(ctx, resultExprs, true));
Planner.checkForDisableCodegen(rootFragment.getPlanRoot(), ctx);
// finalize exchanges: this ensures that for hash partitioned joins, the partitioning

View File

@@ -55,10 +55,10 @@ Max Per-Host Resource Reservation: Memory=72.52MB Threads=1
Per-Host Resource Estimates: Memory=269MB
F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
| Per-Instance Resources: mem-estimate=268.97MB mem-reservation=72.52MB thread-reservation=1 runtime-filters-memory=6.00MB
| max-parallelism=1 segment-costs=[1440342244, 4]
| max-parallelism=1 segment-costs=[1440342244, 0]
PLAN-ROOT SINK
| output exprs: avg(tpcds_partitioned_parquet_snap.store_sales.ss_quantity), avg(tpcds_partitioned_parquet_snap.store_sales.ss_ext_sales_price), avg(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost), sum(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost)
| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 cost=4
| mem-estimate=0B mem-reservation=0B thread-reservation=0 cost=0
|
11:AGGREGATE [FINALIZE]
| output: avg(CAST(tpcds_partitioned_parquet_snap.store_sales.ss_quantity AS BIGINT)), avg(tpcds_partitioned_parquet_snap.store_sales.ss_ext_sales_price), avg(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost), sum(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost)
@@ -184,14 +184,14 @@ PLAN-ROOT SINK
tuple-ids=0 row-size=40B cardinality=176.68M(filtered from 863.99M) cost=1216284982
in pipelines: 00(GETNEXT)
---- DISTRIBUTEDPLAN
Max Per-Host Resource Reservation: Memory=404.45MB Threads=24
Per-Host Resource Estimates: Memory=677MB
Max Per-Host Resource Reservation: Memory=400.45MB Threads=24
Per-Host Resource Estimates: Memory=673MB
F07:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
| Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
| max-parallelism=1 segment-costs=[68, 4] cpu-comparison-result=130 [max(1 (self) vs 130 (sum children))]
| Per-Instance Resources: mem-estimate=68.03KB mem-reservation=0B thread-reservation=1
| max-parallelism=1 segment-costs=[68, 0] cpu-comparison-result=130 [max(1 (self) vs 130 (sum children))]
PLAN-ROOT SINK
| output exprs: avg(tpcds_partitioned_parquet_snap.store_sales.ss_quantity), avg(tpcds_partitioned_parquet_snap.store_sales.ss_ext_sales_price), avg(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost), sum(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost)
| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 cost=4
| mem-estimate=0B mem-reservation=0B thread-reservation=0 cost=0
|
19:AGGREGATE [FINALIZE]
| output: avg:merge(tpcds_partitioned_parquet_snap.store_sales.ss_quantity), avg:merge(tpcds_partitioned_parquet_snap.store_sales.ss_ext_sales_price), avg:merge(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost), sum:merge(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost)
@@ -425,14 +425,14 @@ max-parallelism=150 segment-costs=[1456086510]
tuple-ids=0 row-size=40B cardinality=176.68M(filtered from 863.99M) cost=1216284982
in pipelines: 00(GETNEXT)
---- PARALLELPLANS
Max Per-Host Resource Reservation: Memory=404.45MB Threads=24
Per-Host Resource Estimates: Memory=677MB
Max Per-Host Resource Reservation: Memory=400.45MB Threads=24
Per-Host Resource Estimates: Memory=673MB
F07:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
| Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
| max-parallelism=1 segment-costs=[68, 4] cpu-comparison-result=130 [max(1 (self) vs 130 (sum children))]
| Per-Instance Resources: mem-estimate=68.03KB mem-reservation=0B thread-reservation=1
| max-parallelism=1 segment-costs=[68, 0] cpu-comparison-result=130 [max(1 (self) vs 130 (sum children))]
PLAN-ROOT SINK
| output exprs: avg(tpcds_partitioned_parquet_snap.store_sales.ss_quantity), avg(tpcds_partitioned_parquet_snap.store_sales.ss_ext_sales_price), avg(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost), sum(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost)
| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 cost=4
| mem-estimate=0B mem-reservation=0B thread-reservation=0 cost=0
|
19:AGGREGATE [FINALIZE]
| output: avg:merge(tpcds_partitioned_parquet_snap.store_sales.ss_quantity), avg:merge(tpcds_partitioned_parquet_snap.store_sales.ss_ext_sales_price), avg:merge(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost), sum:merge(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost)

View File

@@ -71,10 +71,10 @@ Max Per-Host Resource Reservation: Memory=69.52MB Threads=1
Per-Host Resource Estimates: Memory=250MB
F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
| Per-Instance Resources: mem-estimate=250.04MB mem-reservation=69.52MB thread-reservation=1 runtime-filters-memory=5.00MB
| max-parallelism=1 segment-costs=[1126444413, 1]
| max-parallelism=1 segment-costs=[1126444413, 0]
PLAN-ROOT SINK
| output exprs: sum(tpcds_partitioned_parquet_snap.store_sales.ss_quantity)
| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 cost=1
| mem-estimate=0B mem-reservation=0B thread-reservation=0 cost=0
|
09:AGGREGATE [FINALIZE]
| output: sum(CAST(tpcds_partitioned_parquet_snap.store_sales.ss_quantity AS BIGINT))
@@ -179,14 +179,14 @@ PLAN-ROOT SINK
tuple-ids=0 row-size=28B cardinality=176.68M(filtered from 863.99M) cost=910976956
in pipelines: 00(GETNEXT)
---- DISTRIBUTEDPLAN
Max Per-Host Resource Reservation: Memory=379.14MB Threads=22
Per-Host Resource Estimates: Memory=622MB
Max Per-Host Resource Reservation: Memory=375.14MB Threads=22
Per-Host Resource Estimates: Memory=618MB
F06:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
| Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
| max-parallelism=1 segment-costs=[25, 1] cpu-comparison-result=130 [max(1 (self) vs 130 (sum children))]
| Per-Instance Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
| max-parallelism=1 segment-costs=[25, 0] cpu-comparison-result=130 [max(1 (self) vs 130 (sum children))]
PLAN-ROOT SINK
| output exprs: sum(tpcds_partitioned_parquet_snap.store_sales.ss_quantity)
| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 cost=1
| mem-estimate=0B mem-reservation=0B thread-reservation=0 cost=0
|
16:AGGREGATE [FINALIZE]
| output: sum:merge(tpcds_partitioned_parquet_snap.store_sales.ss_quantity)
@@ -382,14 +382,14 @@ max-parallelism=130 segment-costs=[1221102531]
tuple-ids=0 row-size=28B cardinality=176.68M(filtered from 863.99M) cost=910976956
in pipelines: 00(GETNEXT)
---- PARALLELPLANS
Max Per-Host Resource Reservation: Memory=379.14MB Threads=22
Per-Host Resource Estimates: Memory=622MB
Max Per-Host Resource Reservation: Memory=375.14MB Threads=22
Per-Host Resource Estimates: Memory=618MB
F06:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
| Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
| max-parallelism=1 segment-costs=[25, 1] cpu-comparison-result=130 [max(1 (self) vs 130 (sum children))]
| Per-Instance Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
| max-parallelism=1 segment-costs=[25, 0] cpu-comparison-result=130 [max(1 (self) vs 130 (sum children))]
PLAN-ROOT SINK
| output exprs: sum(tpcds_partitioned_parquet_snap.store_sales.ss_quantity)
| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 cost=1
| mem-estimate=0B mem-reservation=0B thread-reservation=0 cost=0
|
16:AGGREGATE [FINALIZE]
| output: sum:merge(tpcds_partitioned_parquet_snap.store_sales.ss_quantity)

View File

@@ -27,10 +27,10 @@ Max Per-Host Resource Reservation: Memory=192.94MB Threads=1
Per-Host Resource Estimates: Memory=52.18GB
F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
| Per-Instance Resources: mem-estimate=52.18GB mem-reservation=192.94MB thread-reservation=1 runtime-filters-memory=51.00MB
| max-parallelism=1 segment-costs=[54918503750, 27478414018, 72455093642, 13789590829, 19351054919, 22241751, 1]
| max-parallelism=1 segment-costs=[54918503750, 27478414018, 72455093642, 13789590829, 19351054919, 22241751, 0]
PLAN-ROOT SINK
| output exprs: count()
| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 cost=1
| mem-estimate=0B mem-reservation=0B thread-reservation=0 cost=0
|
22:AGGREGATE [FINALIZE]
| output: count()
@@ -238,11 +238,11 @@ PLAN-ROOT SINK
Max Per-Host Resource Reservation: Memory=4.62GB Threads=85
Per-Host Resource Estimates: Memory=59.85GB
F16:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
| Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
| max-parallelism=1 segment-costs=[25, 1] cpu-comparison-result=360 [max(1 (self) vs 360 (sum children))]
| Per-Instance Resources: mem-estimate=184.84KB mem-reservation=0B thread-reservation=1
| max-parallelism=1 segment-costs=[25, 0] cpu-comparison-result=360 [max(1 (self) vs 360 (sum children))]
PLAN-ROOT SINK
| output exprs: count()
| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 cost=1
| mem-estimate=0B mem-reservation=0B thread-reservation=0 cost=0
|
40:AGGREGATE [FINALIZE]
| output: count:merge()
@@ -641,11 +641,11 @@ max-parallelism=1824 segment-costs=[54881581784, 24500370827] cpu-comparison-res
Max Per-Host Resource Reservation: Memory=4.62GB Threads=85
Per-Host Resource Estimates: Memory=59.85GB
F16:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
| Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
| max-parallelism=1 segment-costs=[25, 1] cpu-comparison-result=360 [max(1 (self) vs 360 (sum children))]
| Per-Instance Resources: mem-estimate=184.84KB mem-reservation=0B thread-reservation=1
| max-parallelism=1 segment-costs=[25, 0] cpu-comparison-result=360 [max(1 (self) vs 360 (sum children))]
PLAN-ROOT SINK
| output exprs: count()
| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 cost=1
| mem-estimate=0B mem-reservation=0B thread-reservation=0 cost=0
|
40:AGGREGATE [FINALIZE]
| output: count:merge()

View File

@@ -71,7 +71,7 @@ Per-Instance Resources: mem-estimate=16.00MB mem-reservation=32.00KB thread-rese
in pipelines: 00(GETNEXT)
====
# Validate that the maximum memory reservation for PLAN-ROOT SINK is bounded by
# MAX_PINNED_RESULT_SPOOLING_MEMORY.
# MAX_SPILLED_RESULT_SPOOLING_MEM.
select * from tpch.lineitem order by l_orderkey
---- DISTRIBUTEDPLAN
Max Per-Host Resource Reservation: Memory=24.00MB Threads=3

View File

@@ -1128,3 +1128,13 @@ NaN,Inf
---- TYPES
DOUBLE, FLOAT
====
---- QUERY
select count(*) from functional.alltypestiny;
---- RUNTIME_PROFILE
row_regex: .*SPOOL_QUERY_RESULTS=0.*
====
---- QUERY
select * from (values(0));
---- RUNTIME_PROFILE
row_regex: .*SPOOL_QUERY_RESULTS=0.*
====