diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java index 8a50bbf91..222ad9986 100644 --- a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java +++ b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java @@ -497,17 +497,6 @@ public class AnalysisContext { authzChecker.postAnalyze(authzCtx); ImpalaException analysisException = analysisResult_.getException(); - // A statement that returns at most one row does not need to spool query results. - // IMPALA-13902: returnsAtMostOneRow should be in planner interface so it is - // accessible by the Calcite planner. - if (analysisException == null && analysisResult_.getStmt() instanceof SelectStmt && - ((SelectStmt)analysisResult_.getStmt()).returnsAtMostOneRow()) { - clientRequest.query_options.setSpool_query_results(false); - if (LOG.isTraceEnabled()) { - LOG.trace("Result spooling is disabled due to the statement returning at most " - + "one row."); - } - } long durationMs = timeline_.markEvent("Analysis finished") / 1000000; LOG.info("Analysis took {} ms", durationMs); diff --git a/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java b/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java index b5a036975..29fbcac70 100644 --- a/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java @@ -20,7 +20,6 @@ package org.apache.impala.analysis; import java.util.ArrayList; import java.util.HashSet; import java.util.List; -import java.util.Optional; import java.util.Set; import com.google.common.collect.Sets; @@ -29,8 +28,6 @@ import org.apache.impala.catalog.Type; import org.apache.impala.catalog.View; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.SqlCastException; -import org.apache.impala.planner.DataSink; -import org.apache.impala.planner.PlanRootSink; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; @@ -503,9 +500,12 @@ public abstract class QueryStmt extends StatementBase { resultExprs_ = Expr.substituteList(resultExprs_, smap, analyzer, true); } - public DataSink createDataSink(List resultExprs) { - return new PlanRootSink(resultExprs); - } + /** + * Return True if this statement is eligible for result spooling. + * Planner can still decide to disable result spooling if there are other factors + * during analysis that prevents spooling from happening. + */ + public boolean canSpoolResult() { return true; } public List cloneOrderByElements() { if (orderByElements_ == null) return null; diff --git a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java index f96be1986..d152769c8 100644 --- a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java @@ -1915,4 +1915,9 @@ public class SelectStmt extends QueryStmt { (sortInfo_ != null && TreeNode.contains(sortInfo_.getSortExprs(), Expr.IS_AGGREGATE)); } + + @Override + public boolean canSpoolResult() { + return !returnsAtMostOneRow(); + } } diff --git a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java index df4784935..27535cb7d 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java +++ b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java @@ -19,6 +19,7 @@ package org.apache.impala.planner; import java.util.List; +import com.google.common.base.MoreObjects; import org.apache.impala.analysis.Expr; import org.apache.impala.service.BackendConfig; import org.apache.impala.thrift.TDataSink; @@ -45,11 +46,33 @@ public class PlanRootSink extends DataSink { // IMPALA-4268 for details on how this value was chosen. private static final long DEFAULT_RESULT_SPOOLING_ESTIMATED_MEMORY = 10 * 1024 * 1024; + /** + * Create a PlanRootSink. This method will sanitize spooling-related query options + * within given PlannerContext before creating the PlanRootSink. Once sanitized here, + * the spooling-related query options is safe to use directly and should not be modified + * anywhere else. + * @param ctx the planner context. + * @param outputExprs the output expressions for the PlanRootSink. + * @param requireSpooling hard override to disable result spooling that can be supplied + * by caller. + * @return the created PlanRootSink. + */ + public static PlanRootSink create( + PlannerContext ctx, List outputExprs, boolean requireSpooling) { + sanitizeSpoolingOptions(ctx.getQueryOptions(), requireSpooling); + return new PlanRootSink(outputExprs); + } + // One expression per result column for the query. private final List outputExprs_; + /** + * DEPRECATED. + * Use {@link PlanRootSink#create(PlannerContext, List, boolean)} instead. + * This constructor will be made private. It is only temporarily preserved to prevent + * compilation error. + */ public PlanRootSink(List outputExprs) { - Preconditions.checkState(outputExprs != null); outputExprs_ = outputExprs; } @@ -70,8 +93,10 @@ public class PlanRootSink extends DataSink { @Override public void computeProcessingCost(TQueryOptions queryOptions) { - if (queryOptions.isSpool_query_results() && queryOptions.getScratch_limit() != 0 - && !BackendConfig.INSTANCE.getScratchDirs().isEmpty()) { + // TODO: this sanitization should have already happened in PlanRootSink.create(). + // This can be removed once PlanRootSink constructor is made private. + sanitizeSpoolingOptions(queryOptions, true); + if (queryOptions.isSpool_query_results()) { // The processing cost to buffer these many rows in root. long outputCardinality = Math.max(0, fragment_.getPlanRoot().getCardinality()); processingCost_ = ProcessingCost.basicCost( @@ -98,115 +123,136 @@ public class PlanRootSink extends DataSink { * the maximum between default spillable buffer size and MAX_ROW_SIZE (rounded up to * nearest power of 2) to account for the read and write pages in the * BufferedTupleStream used by the backend plan-root-sink. The maximum reservation is - * set to the query-level config MAX_PINNED_RESULT_SPOOLING_MEMORY. - * - * If SPOOL_QUERY_RESULTS is true but spill is disabled either due to SCRATCH_LIMIT = 0 - * or SCRATCH_DIRS is empty, SPOOL_QUERY_RESULTS will be set to false. A ResourceProfile - * is returned with no reservation or buffer sizes, and the estimated memory consumption - * is 0. + * set to the query-level config MAX_SPILLED_RESULT_SPOOLING_MEM. */ @Override public void computeResourceProfile(TQueryOptions queryOptions) { - if (queryOptions.isSpool_query_results()) { - // Check if we need to disable result spooling because we can not spill. - long scratchLimit = queryOptions.getScratch_limit(); - String scratchDirs = BackendConfig.INSTANCE.getScratchDirs(); - if (scratchLimit == 0 || scratchDirs.isEmpty()) { - if (LOG.isTraceEnabled()) { - LOG.trace("Result spooling is disabled due to unavailability of scratch " - + "space."); - } - queryOptions.setSpool_query_results(false); - resourceProfile_ = ResourceProfile.noReservation(0); - return; - } - - long maxSpoolingMem = queryOptions.getMax_result_spooling_mem(); - if (maxSpoolingMem <= 0) { - // max_result_spooling_mem = 0 means unbounded. But instead of setting unlimited - // memory reservation, we fallback to the default max_result_spooling_mem. - TQueryOptions defaults = new TQueryOptions(); - maxSpoolingMem = defaults.getMax_result_spooling_mem(); - queryOptions.setMax_result_spooling_mem(maxSpoolingMem); - } - - long bufferSize = queryOptions.getDefault_spillable_buffer_size(); - long maxRowBufferSize = PlanNode.computeMaxSpillableBufferSize( - bufferSize, queryOptions.getMax_row_size()); - long minMemReservationBytes = 2 * maxRowBufferSize; - long maxMemReservationBytes = Math.max(maxSpoolingMem, minMemReservationBytes); - - // User might set query option scratch_limit that is lower than either of - // minMemReservationBytes, maxMemReservationBytes, or - // max_spilled_result_spooling_mem. We define: - // - // maxAllowedScratchLimit = scratchLimit - maxRowBufferSize - // - // If maxAllowedScratchLimit < minMemReservationBytes, we fall back to use - // BlockingPlanRootSink in the backend by silently disabling result spooling. - // If maxAllowedScratchLimit < maxMemReservationBytes, we silently lower - // maxMemReservationBytes, max_result_spooling_mem, and - // max_spilled_result_spooling_mem accordingly to fit maxAllowedScratchLimit. - // Otherwise, do nothing. - // - // BufferedPlanRootSink may slightly exceed its maxMemReservationBytes when it - // decides to spill. maxRowBufferSize bytes is subtracted in maxAllowedScratchLimit - // to give extra space, ensuring spill does not exceed scratch_limit. - if (scratchLimit > -1) { - long maxAllowedScratchLimit = scratchLimit - maxRowBufferSize; - if (maxAllowedScratchLimit < minMemReservationBytes) { - if (LOG.isTraceEnabled()) { - LOG.trace("Result spooling is disabled due to low scratch_limit (" - + scratchLimit + "). Try increasing scratch_limit to >= " - + (minMemReservationBytes + maxRowBufferSize) - + " to enable result spooling."); - } - queryOptions.setSpool_query_results(false); - resourceProfile_ = ResourceProfile.noReservation(0); - return; - } else if (maxAllowedScratchLimit < maxMemReservationBytes) { - maxMemReservationBytes = maxAllowedScratchLimit; - queryOptions.setMax_result_spooling_mem(maxAllowedScratchLimit); - if (LOG.isTraceEnabled()) { - LOG.trace("max_result_spooling_mem is lowered to " + maxMemReservationBytes - + " to fit scratch_limit (" + scratchLimit + ")."); - } - } - - // If we got here, it means we can use BufferedPlanRootSink with at least - // minMemReservationBytes in memory. But the amount of memory we can spill to disk - // may still be limited by scratch_limit. Thus, we need to lower - // max_spilled_result_spooling_mem as necessary. - if (maxAllowedScratchLimit < queryOptions.getMax_spilled_result_spooling_mem()) { - queryOptions.setMax_spilled_result_spooling_mem(maxAllowedScratchLimit); - if (LOG.isTraceEnabled()) { - LOG.trace("max_spilled_result_spooling_mem is lowered to " - + maxAllowedScratchLimit + " to fit scratch_limit (" - + scratchLimit + ")."); - } - } - } - - PlanNode inputNode = fragment_.getPlanRoot(); - - long memEstimateBytes; - if (inputNode.getCardinality() == -1 || inputNode.getAvgRowSize() == -1) { - memEstimateBytes = DEFAULT_RESULT_SPOOLING_ESTIMATED_MEMORY; - } else { - long inputCardinality = Math.max(1L, inputNode.getCardinality()); - memEstimateBytes = (long) Math.ceil(inputCardinality * inputNode.getAvgRowSize()); - } - memEstimateBytes = Math.min(memEstimateBytes, maxMemReservationBytes); + // TODO: this sanitization should have already happened in PlanRootSink.create(). + // This can be removed once PlanRootSink constructor is made private. + sanitizeSpoolingOptions(queryOptions, true); + if (!queryOptions.isSpool_query_results()) { + resourceProfile_ = ResourceProfile.noReservation(0); + } else { + SpoolingMemoryBound memBound = new SpoolingMemoryBound(queryOptions); + long memEstimateBytes = getMemEstimateBytes(memBound.maxMemReservationBytes_); resourceProfile_ = new ResourceProfileBuilder() .setMemEstimateBytes(memEstimateBytes) - .setMinMemReservationBytes(minMemReservationBytes) - .setMaxMemReservationBytes(maxMemReservationBytes) - .setMaxRowBufferBytes(maxRowBufferSize) - .setSpillableBufferBytes(bufferSize) + .setMinMemReservationBytes(memBound.minMemReservationBytes_) + .setMaxMemReservationBytes(memBound.maxMemReservationBytes_) + .setMaxRowBufferBytes(memBound.maxRowBufferSize_) + .setSpillableBufferBytes(memBound.bufferSize_) .build(); + } + } + + private long getMemEstimateBytes(long maxMemReservationBytes) { + PlanNode inputNode = fragment_.getPlanRoot(); + + long memEstimateBytes; + if (inputNode.getCardinality() == -1 || inputNode.getAvgRowSize() == -1) { + memEstimateBytes = DEFAULT_RESULT_SPOOLING_ESTIMATED_MEMORY; } else { - resourceProfile_ = ResourceProfile.noReservation(0); + long inputCardinality = Math.max(1L, inputNode.getCardinality()); + memEstimateBytes = (long) Math.ceil(inputCardinality * inputNode.getAvgRowSize()); + } + memEstimateBytes = Math.min(memEstimateBytes, maxMemReservationBytes); + return memEstimateBytes; + } + + /** + * Helper method to disable SPOOL_QUERY_RESULTS options if it is not possible to do + * so. This method will check various limits around scratch space and disable spooling + * option if such limit will be exceeded. Once disabled, SPOOL_QUERY_RESULTS option + * should not be enabled anywhere else. + * + * If SPOOL_QUERY_RESULTS is true but spill is disabled either due to SCRATCH_LIMIT = 0 + * or SCRATCH_DIRS is empty, SPOOL_QUERY_RESULTS will be set to false. + * + * If this method does not disable spooling option, sanitize MAX_RESULT_SPOOLING_MEM + * and MAX_SPILLED_RESULT_SPOOLING_MEM option into a valid value. + * + * @param queryOptions query options to potentially modify. + * @param requireSpooling hard override to disable result spooling that can be supplied + * by caller. + */ + private static void sanitizeSpoolingOptions( + TQueryOptions queryOptions, boolean requireSpooling) { + if (!requireSpooling || !queryOptions.isSpool_query_results()) { + queryOptions.setSpool_query_results(false); + return; + } + + // Check if we need to disable result spooling because we can not spill. + long scratchLimit = queryOptions.getScratch_limit(); + String scratchDirs = BackendConfig.INSTANCE.getScratchDirs(); + if (scratchLimit == 0 || scratchDirs.isEmpty()) { + LOG.info("Result spooling is disabled due to unavailability of scratch space."); + queryOptions.setSpool_query_results(false); + return; + } + + long maxSpoolingMem = queryOptions.getMax_result_spooling_mem(); + if (maxSpoolingMem <= 0) { + // max_result_spooling_mem = 0 means unbounded. But instead of setting unlimited + // memory reservation, we fallback to the default max_result_spooling_mem. + TQueryOptions defaults = new TQueryOptions(); + maxSpoolingMem = defaults.getMax_result_spooling_mem(); + queryOptions.setMax_result_spooling_mem(maxSpoolingMem); + } + + SpoolingMemoryBound memBound = new SpoolingMemoryBound(queryOptions); + if (!memBound.hasSufficientScratchLimit()) { + LOG.info("Result spooling is disabled due to low scratch_limit (" + + memBound.scratchLimit_ + "). Try increasing scratch_limit to >= " + + (memBound.minRequiredScratchLimit()) + " to enable result spooling."); + queryOptions.setSpool_query_results(false); + return; + } + + // User might set query option scratch_limit that is lower than either of + // minMemReservationBytes, maxMemReservationBytes, or + // max_spilled_result_spooling_mem. We define: + // + // maxAllowedScratchLimit = scratchLimit - maxRowBufferSize + // + // If maxAllowedScratchLimit < minMemReservationBytes, we fall back to use + // BlockingPlanRootSink in the backend by silently disabling result spooling. + // If maxAllowedScratchLimit < maxMemReservationBytes, we silently lower + // max_result_spooling_mem and max_spilled_result_spooling_mem accordingly to fit + // maxAllowedScratchLimit. Otherwise, do nothing. + // + // BufferedPlanRootSink may slightly exceed its maxMemReservationBytes when it + // decides to spill. maxRowBufferSize bytes is subtracted in maxAllowedScratchLimit + // to give extra space, ensuring spill does not exceed scratch_limit. + if (memBound.hasBoundedScratchLimit()) { + Preconditions.checkState(memBound.scratchLimit_ > 0); + if (memBound.maxAllowedScratchLimit_ < memBound.minMemReservationBytes_) { + LOG.info("Result spooling is disabled due to low scratch_limit (" + scratchLimit + + "). Try increasing scratch_limit to >= " + + memBound.minRequiredScratchLimit() + " to enable result spooling."); + queryOptions.setSpool_query_results(false); + return; + } + + if (memBound.maxMemReservationBytes_ < queryOptions.getMax_result_spooling_mem()) { + queryOptions.setMax_result_spooling_mem(memBound.maxMemReservationBytes_); + LOG.info("max_result_spooling_mem is lowered to " + + memBound.maxMemReservationBytes_ + " to fit scratch_limit (" + + memBound.scratchLimit_ + ")."); + } + + // If we got here, it means we can use BufferedPlanRootSink with at least + // minMemReservationBytes in memory. But the amount of memory we can spill to disk + // may still be limited by scratch_limit. Thus, we need to lower + // max_spilled_result_spooling_mem as necessary. + if (memBound.maxAllowedScratchLimit_ + < queryOptions.getMax_spilled_result_spooling_mem()) { + queryOptions.setMax_spilled_result_spooling_mem(memBound.maxAllowedScratchLimit_); + LOG.info("max_spilled_result_spooling_mem is lowered to " + + memBound.maxAllowedScratchLimit_ + " to fit scratch_limit (" + + memBound.scratchLimit_ + ")."); + } } } @@ -232,4 +278,70 @@ public class PlanRootSink extends DataSink { super.computeRowConsumptionAndProductionToCost(); fragment_.setFixedInstanceCount(fragment_.getNumInstances()); } + + private static class SpoolingMemoryBound { + // Values taken as is from query options. + public final long scratchLimit_; + public final long bufferSize_; + public final long maxRowSize_; + public final long maxSpoolingMem_; + + // Derived values from query options. + public final long maxRowBufferSize_; + public final long minMemReservationBytes_; + public final long maxMemReservationBytes_; + + // Only valid if scratch limit is bounded. + public final long maxAllowedScratchLimit_; + + public SpoolingMemoryBound(TQueryOptions queryOptions) { + Preconditions.checkArgument(queryOptions.isSpool_query_results()); + Preconditions.checkArgument(queryOptions.getMax_result_spooling_mem() > 0); + + scratchLimit_ = queryOptions.getScratch_limit(); + bufferSize_ = queryOptions.getDefault_spillable_buffer_size(); + maxRowSize_ = queryOptions.getMax_row_size(); + maxSpoolingMem_ = queryOptions.getMax_result_spooling_mem(); + + maxRowBufferSize_ = + PlanNode.computeMaxSpillableBufferSize(bufferSize_, maxRowSize_); + minMemReservationBytes_ = 2 * maxRowBufferSize_; + + long maxAllowedScratchLimit = queryOptions.getMax_spilled_result_spooling_mem(); + long maxMemReservationBytes = Math.max(maxSpoolingMem_, minMemReservationBytes_); + if (hasBoundedScratchLimit()) { + maxAllowedScratchLimit = scratchLimit_ - maxRowBufferSize_; + if (maxAllowedScratchLimit < maxMemReservationBytes) { + maxMemReservationBytes = maxAllowedScratchLimit; + } + } + maxAllowedScratchLimit_ = maxAllowedScratchLimit; + maxMemReservationBytes_ = maxMemReservationBytes; + } + + public boolean hasBoundedScratchLimit() { return scratchLimit_ > -1; } + + public long minRequiredScratchLimit() { + return minMemReservationBytes_ + maxRowBufferSize_; + } + + public boolean hasSufficientScratchLimit() { + return !hasBoundedScratchLimit() + || maxAllowedScratchLimit_ >= minMemReservationBytes_; + } + + @Override + public String toString() { + MoreObjects.ToStringHelper toStrHelper = MoreObjects.toStringHelper(this); + toStrHelper.add("scratchLimit", scratchLimit_); + toStrHelper.add("bufferSize", bufferSize_); + toStrHelper.add("maxRowSize", maxRowSize_); + toStrHelper.add("maxSpoolingMem", maxSpoolingMem_); + toStrHelper.add("maxRowBufferSize", maxRowBufferSize_); + toStrHelper.add("minMemReservationBytes", minMemReservationBytes_); + toStrHelper.add("maxMemReservationBytes", maxMemReservationBytes_); + toStrHelper.add("maxAllowedScratchLimit", maxAllowedScratchLimit_); + return toStrHelper.toString(); + } + } } diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java index 2f8eff98d..a6e83b62c 100644 --- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java @@ -2380,7 +2380,7 @@ public class SingleNodePlanner implements SingleNodePlannerIntf { QueryStmt queryStmt = ctx_.getQueryStmt(); queryStmt.substituteResultExprs(rootNodeSmap, ctx_.getRootAnalyzer()); List resultExprs = queryStmt.getResultExprs(); - return ctx_.getAnalysisResult().getQueryStmt().createDataSink(resultExprs); + return PlanRootSink.create(ctx_, resultExprs, queryStmt.canSpoolResult()); } @Override diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/operators/ImpalaOperatorTable.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/operators/ImpalaOperatorTable.java index cb198582f..dddd62486 100644 --- a/java/calcite-planner/src/main/java/org/apache/impala/calcite/operators/ImpalaOperatorTable.java +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/operators/ImpalaOperatorTable.java @@ -84,6 +84,7 @@ public class ImpalaOperatorTable extends ReflectiveSqlOperatorTable { .add("regr_count") .add("localtime") .add("translate") + .add("sleep") .build(); private static ImpalaOperatorTable INSTANCE; diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaPlanRel.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaPlanRel.java index 3ba9e8087..0d49af089 100644 --- a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaPlanRel.java +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaPlanRel.java @@ -17,6 +17,8 @@ package org.apache.impala.calcite.rel.node; +import com.google.common.base.Preconditions; + import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.core.Filter; @@ -31,8 +33,7 @@ import org.apache.impala.common.ImpalaException; /** * ImpalaPlanRel. Interface used for all Impala intermediary RelNodes */ -public interface ImpalaPlanRel { - +public interface ImpalaPlanRel extends RelNode { /** * Enum representing the type of class used in the RelNode * Using an enum here so that Impala Plan RelNodes can be used in @@ -99,8 +100,8 @@ public interface ImpalaPlanRel { * mentioned below can be in between the Aggregate RelNode and the Table Scan * RelNode. */ - public static boolean canPassThroughParentAggregate(ImpalaPlanRel planRel) { - switch (getRelNodeType((RelNode) planRel)) { + public static boolean canPassThroughParentAggregate(RelNode relNode) { + switch (getRelNodeType(relNode)) { case FILTER: case PROJECT: return true; diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteSingleNodePlanner.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteSingleNodePlanner.java index 6dbc6aa7d..de74ffd54 100644 --- a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteSingleNodePlanner.java +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteSingleNodePlanner.java @@ -17,28 +17,26 @@ package org.apache.impala.calcite.service; - -import java.util.ArrayList; -import java.util.List; - import org.apache.calcite.rel.RelNode; -import org.apache.impala.analysis.AnalysisDriver; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.Values; import org.apache.impala.analysis.ExprSubstitutionMap; import org.apache.impala.analysis.ParsedStatement; import org.apache.impala.calcite.rel.node.ImpalaPlanRel; import org.apache.impala.calcite.rel.node.NodeWithExprs; import org.apache.impala.common.ImpalaException; import org.apache.impala.planner.DataSink; -import org.apache.impala.planner.PlannerContext; import org.apache.impala.planner.PlanNode; import org.apache.impala.planner.PlanRootSink; +import org.apache.impala.planner.PlannerContext; import org.apache.impala.planner.SingleNodePlannerIntf; import org.apache.impala.thrift.TColumn; import org.apache.impala.thrift.TResultSetMetadata; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; /** * Implementation of the SingleNodePlannerIntf which returns a PlanNode @@ -53,6 +51,7 @@ public class CalciteSingleNodePlanner implements SingleNodePlannerIntf { private final CalciteAnalysisResult analysisResult_; private NodeWithExprs rootNode_; private List fieldNames_; + private boolean returnsMoreThanOneRow_; public CalciteSingleNodePlanner(PlannerContext ctx) { ctx_ = ctx; @@ -71,6 +70,8 @@ public class CalciteSingleNodePlanner implements SingleNodePlannerIntf { new CalciteOptimizer(analysisResult_, ctx_.getTimeline()); ImpalaPlanRel optimizedPlan = optimizer.optimize(logicalPlan); + returnsMoreThanOneRow_ = returnsMoreThanOneRow(optimizedPlan); + // Create Physical Impala PlanNodes CalcitePhysPlanCreator physPlanCreator = new CalcitePhysPlanCreator(analysisResult_.getAnalyzer(), ctx_); @@ -86,7 +87,47 @@ public class CalciteSingleNodePlanner implements SingleNodePlannerIntf { */ @Override public DataSink createDataSink(ExprSubstitutionMap rootNodeSmap) { - return new PlanRootSink(rootNode_.outputExprs_); + return PlanRootSink.create(ctx_, rootNode_.outputExprs_, returnsMoreThanOneRow_); + } + + private boolean returnsMoreThanOneRow(RelNode logicalPlan) { + return !isSingleRowValues(logicalPlan) && !hasOneRowAgg(logicalPlan); + } + + private boolean isSingleRowValues(RelNode relNode) { + // A project will keep the row count the same. Theoretically, a filter + // can reduce the row count, but there should not be any filter + // over a values clause because the optimization rules should take + // care of this situation. + while (relNode instanceof Project) { + relNode = relNode.getInput(0); + } + + if (!(relNode instanceof Values)) { + return false; + } + + Values values = (Values) relNode; + return Values.isEmpty(values) || Values.isSingleValue(values); + } + + /** + * Checks if there is an aggregation at the root that guarantees there + * will be at most one row. Avoid aggs that have groups via a group keyword + * or a distinct keyword. + */ + private boolean hasOneRowAgg(RelNode relNode) { + while (ImpalaPlanRel.canPassThroughParentAggregate(relNode)) { + relNode = relNode.getInput(0); + } + if (!(relNode instanceof Aggregate)) { + return false; + } + Aggregate agg = (Aggregate) relNode; + if (agg.getGroupCount() > 0 || agg.containsDistinctCall()) { + return false; + } + return true; } @Override diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/ExecRequestCreator.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/ExecRequestCreator.java index 3e5b90913..a60d0eb7a 100644 --- a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/ExecRequestCreator.java +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/ExecRequestCreator.java @@ -250,7 +250,7 @@ public class ExecRequestCreator implements CompilerStep { rootFragment.verifyTree(); List resultExprs = outputExprs; - rootFragment.setSink(new PlanRootSink(resultExprs)); + rootFragment.setSink(PlanRootSink.create(ctx, resultExprs, true)); Planner.checkForDisableCodegen(rootFragment.getPlanRoot(), ctx); // finalize exchanges: this ensures that for hash partitioned joins, the partitioning diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/calcite_tpcds/tpcds-q13.test b/testdata/workloads/functional-planner/queries/PlannerTest/calcite_tpcds/tpcds-q13.test index 0b912d56c..e5c36984e 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/calcite_tpcds/tpcds-q13.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/calcite_tpcds/tpcds-q13.test @@ -55,10 +55,10 @@ Max Per-Host Resource Reservation: Memory=72.52MB Threads=1 Per-Host Resource Estimates: Memory=269MB F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 | Per-Instance Resources: mem-estimate=268.97MB mem-reservation=72.52MB thread-reservation=1 runtime-filters-memory=6.00MB -| max-parallelism=1 segment-costs=[1440342244, 4] +| max-parallelism=1 segment-costs=[1440342244, 0] PLAN-ROOT SINK | output exprs: avg(tpcds_partitioned_parquet_snap.store_sales.ss_quantity), avg(tpcds_partitioned_parquet_snap.store_sales.ss_ext_sales_price), avg(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost), sum(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost) -| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 cost=4 +| mem-estimate=0B mem-reservation=0B thread-reservation=0 cost=0 | 11:AGGREGATE [FINALIZE] | output: avg(CAST(tpcds_partitioned_parquet_snap.store_sales.ss_quantity AS BIGINT)), avg(tpcds_partitioned_parquet_snap.store_sales.ss_ext_sales_price), avg(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost), sum(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost) @@ -184,14 +184,14 @@ PLAN-ROOT SINK tuple-ids=0 row-size=40B cardinality=176.68M(filtered from 863.99M) cost=1216284982 in pipelines: 00(GETNEXT) ---- DISTRIBUTEDPLAN -Max Per-Host Resource Reservation: Memory=404.45MB Threads=24 -Per-Host Resource Estimates: Memory=677MB +Max Per-Host Resource Reservation: Memory=400.45MB Threads=24 +Per-Host Resource Estimates: Memory=673MB F07:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -| Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1 -| max-parallelism=1 segment-costs=[68, 4] cpu-comparison-result=130 [max(1 (self) vs 130 (sum children))] +| Per-Instance Resources: mem-estimate=68.03KB mem-reservation=0B thread-reservation=1 +| max-parallelism=1 segment-costs=[68, 0] cpu-comparison-result=130 [max(1 (self) vs 130 (sum children))] PLAN-ROOT SINK | output exprs: avg(tpcds_partitioned_parquet_snap.store_sales.ss_quantity), avg(tpcds_partitioned_parquet_snap.store_sales.ss_ext_sales_price), avg(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost), sum(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost) -| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 cost=4 +| mem-estimate=0B mem-reservation=0B thread-reservation=0 cost=0 | 19:AGGREGATE [FINALIZE] | output: avg:merge(tpcds_partitioned_parquet_snap.store_sales.ss_quantity), avg:merge(tpcds_partitioned_parquet_snap.store_sales.ss_ext_sales_price), avg:merge(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost), sum:merge(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost) @@ -425,14 +425,14 @@ max-parallelism=150 segment-costs=[1456086510] tuple-ids=0 row-size=40B cardinality=176.68M(filtered from 863.99M) cost=1216284982 in pipelines: 00(GETNEXT) ---- PARALLELPLANS -Max Per-Host Resource Reservation: Memory=404.45MB Threads=24 -Per-Host Resource Estimates: Memory=677MB +Max Per-Host Resource Reservation: Memory=400.45MB Threads=24 +Per-Host Resource Estimates: Memory=673MB F07:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -| Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1 -| max-parallelism=1 segment-costs=[68, 4] cpu-comparison-result=130 [max(1 (self) vs 130 (sum children))] +| Per-Instance Resources: mem-estimate=68.03KB mem-reservation=0B thread-reservation=1 +| max-parallelism=1 segment-costs=[68, 0] cpu-comparison-result=130 [max(1 (self) vs 130 (sum children))] PLAN-ROOT SINK | output exprs: avg(tpcds_partitioned_parquet_snap.store_sales.ss_quantity), avg(tpcds_partitioned_parquet_snap.store_sales.ss_ext_sales_price), avg(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost), sum(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost) -| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 cost=4 +| mem-estimate=0B mem-reservation=0B thread-reservation=0 cost=0 | 19:AGGREGATE [FINALIZE] | output: avg:merge(tpcds_partitioned_parquet_snap.store_sales.ss_quantity), avg:merge(tpcds_partitioned_parquet_snap.store_sales.ss_ext_sales_price), avg:merge(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost), sum:merge(tpcds_partitioned_parquet_snap.store_sales.ss_ext_wholesale_cost) diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/calcite_tpcds/tpcds-q48.test b/testdata/workloads/functional-planner/queries/PlannerTest/calcite_tpcds/tpcds-q48.test index b4a998a19..0a9dfbaaa 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/calcite_tpcds/tpcds-q48.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/calcite_tpcds/tpcds-q48.test @@ -71,10 +71,10 @@ Max Per-Host Resource Reservation: Memory=69.52MB Threads=1 Per-Host Resource Estimates: Memory=250MB F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 | Per-Instance Resources: mem-estimate=250.04MB mem-reservation=69.52MB thread-reservation=1 runtime-filters-memory=5.00MB -| max-parallelism=1 segment-costs=[1126444413, 1] +| max-parallelism=1 segment-costs=[1126444413, 0] PLAN-ROOT SINK | output exprs: sum(tpcds_partitioned_parquet_snap.store_sales.ss_quantity) -| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 cost=1 +| mem-estimate=0B mem-reservation=0B thread-reservation=0 cost=0 | 09:AGGREGATE [FINALIZE] | output: sum(CAST(tpcds_partitioned_parquet_snap.store_sales.ss_quantity AS BIGINT)) @@ -179,14 +179,14 @@ PLAN-ROOT SINK tuple-ids=0 row-size=28B cardinality=176.68M(filtered from 863.99M) cost=910976956 in pipelines: 00(GETNEXT) ---- DISTRIBUTEDPLAN -Max Per-Host Resource Reservation: Memory=379.14MB Threads=22 -Per-Host Resource Estimates: Memory=622MB +Max Per-Host Resource Reservation: Memory=375.14MB Threads=22 +Per-Host Resource Estimates: Memory=618MB F06:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -| Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1 -| max-parallelism=1 segment-costs=[25, 1] cpu-comparison-result=130 [max(1 (self) vs 130 (sum children))] +| Per-Instance Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1 +| max-parallelism=1 segment-costs=[25, 0] cpu-comparison-result=130 [max(1 (self) vs 130 (sum children))] PLAN-ROOT SINK | output exprs: sum(tpcds_partitioned_parquet_snap.store_sales.ss_quantity) -| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 cost=1 +| mem-estimate=0B mem-reservation=0B thread-reservation=0 cost=0 | 16:AGGREGATE [FINALIZE] | output: sum:merge(tpcds_partitioned_parquet_snap.store_sales.ss_quantity) @@ -382,14 +382,14 @@ max-parallelism=130 segment-costs=[1221102531] tuple-ids=0 row-size=28B cardinality=176.68M(filtered from 863.99M) cost=910976956 in pipelines: 00(GETNEXT) ---- PARALLELPLANS -Max Per-Host Resource Reservation: Memory=379.14MB Threads=22 -Per-Host Resource Estimates: Memory=622MB +Max Per-Host Resource Reservation: Memory=375.14MB Threads=22 +Per-Host Resource Estimates: Memory=618MB F06:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -| Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1 -| max-parallelism=1 segment-costs=[25, 1] cpu-comparison-result=130 [max(1 (self) vs 130 (sum children))] +| Per-Instance Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1 +| max-parallelism=1 segment-costs=[25, 0] cpu-comparison-result=130 [max(1 (self) vs 130 (sum children))] PLAN-ROOT SINK | output exprs: sum(tpcds_partitioned_parquet_snap.store_sales.ss_quantity) -| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 cost=1 +| mem-estimate=0B mem-reservation=0B thread-reservation=0 cost=0 | 16:AGGREGATE [FINALIZE] | output: sum:merge(tpcds_partitioned_parquet_snap.store_sales.ss_quantity) diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/calcite_tpcds/tpcds-q87.test b/testdata/workloads/functional-planner/queries/PlannerTest/calcite_tpcds/tpcds-q87.test index 997ac1342..b0286a601 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/calcite_tpcds/tpcds-q87.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/calcite_tpcds/tpcds-q87.test @@ -27,10 +27,10 @@ Max Per-Host Resource Reservation: Memory=192.94MB Threads=1 Per-Host Resource Estimates: Memory=52.18GB F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 | Per-Instance Resources: mem-estimate=52.18GB mem-reservation=192.94MB thread-reservation=1 runtime-filters-memory=51.00MB -| max-parallelism=1 segment-costs=[54918503750, 27478414018, 72455093642, 13789590829, 19351054919, 22241751, 1] +| max-parallelism=1 segment-costs=[54918503750, 27478414018, 72455093642, 13789590829, 19351054919, 22241751, 0] PLAN-ROOT SINK | output exprs: count() -| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 cost=1 +| mem-estimate=0B mem-reservation=0B thread-reservation=0 cost=0 | 22:AGGREGATE [FINALIZE] | output: count() @@ -238,11 +238,11 @@ PLAN-ROOT SINK Max Per-Host Resource Reservation: Memory=4.62GB Threads=85 Per-Host Resource Estimates: Memory=59.85GB F16:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -| Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1 -| max-parallelism=1 segment-costs=[25, 1] cpu-comparison-result=360 [max(1 (self) vs 360 (sum children))] +| Per-Instance Resources: mem-estimate=184.84KB mem-reservation=0B thread-reservation=1 +| max-parallelism=1 segment-costs=[25, 0] cpu-comparison-result=360 [max(1 (self) vs 360 (sum children))] PLAN-ROOT SINK | output exprs: count() -| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 cost=1 +| mem-estimate=0B mem-reservation=0B thread-reservation=0 cost=0 | 40:AGGREGATE [FINALIZE] | output: count:merge() @@ -641,11 +641,11 @@ max-parallelism=1824 segment-costs=[54881581784, 24500370827] cpu-comparison-res Max Per-Host Resource Reservation: Memory=4.62GB Threads=85 Per-Host Resource Estimates: Memory=59.85GB F16:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 -| Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1 -| max-parallelism=1 segment-costs=[25, 1] cpu-comparison-result=360 [max(1 (self) vs 360 (sum children))] +| Per-Instance Resources: mem-estimate=184.84KB mem-reservation=0B thread-reservation=1 +| max-parallelism=1 segment-costs=[25, 0] cpu-comparison-result=360 [max(1 (self) vs 360 (sum children))] PLAN-ROOT SINK | output exprs: count() -| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 cost=1 +| mem-estimate=0B mem-reservation=0B thread-reservation=0 cost=0 | 40:AGGREGATE [FINALIZE] | output: count:merge() diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/result-spooling.test b/testdata/workloads/functional-planner/queries/PlannerTest/result-spooling.test index df28b3fc0..9fb016d01 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/result-spooling.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/result-spooling.test @@ -71,7 +71,7 @@ Per-Instance Resources: mem-estimate=16.00MB mem-reservation=32.00KB thread-rese in pipelines: 00(GETNEXT) ==== # Validate that the maximum memory reservation for PLAN-ROOT SINK is bounded by -# MAX_PINNED_RESULT_SPOOLING_MEMORY. +# MAX_SPILLED_RESULT_SPOOLING_MEM. select * from tpch.lineitem order by l_orderkey ---- DISTRIBUTEDPLAN Max Per-Host Resource Reservation: Memory=24.00MB Threads=3 diff --git a/testdata/workloads/functional-query/queries/QueryTest/calcite.test b/testdata/workloads/functional-query/queries/QueryTest/calcite.test index db57ca70c..5cccf98b8 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/calcite.test +++ b/testdata/workloads/functional-query/queries/QueryTest/calcite.test @@ -1128,3 +1128,13 @@ NaN,Inf ---- TYPES DOUBLE, FLOAT ==== +---- QUERY +select count(*) from functional.alltypestiny; +---- RUNTIME_PROFILE +row_regex: .*SPOOL_QUERY_RESULTS=0.* +==== +---- QUERY +select * from (values(0)); +---- RUNTIME_PROFILE +row_regex: .*SPOOL_QUERY_RESULTS=0.* +====