Nested Types: Plan generation for correlated and child table refs with Subplans.

The plan generation is heuristic. A SubplanNode is placed as low as possible in the
plan tree - as soon as its required parent tuple ids are materialized.
This approach is simple to understand and implement, but not always optimal. For
example, it may be better to place a Subplan after a selective join, but today we
will place it below the join if it is correct to do so.

For such scenarios, the straight_join hint can be used to manually tune the join
and Subplan order. If straight_join is used, correlated and child table refs are placed
into the same SubplanNode if they are adjacent in the FROM clause.

Change-Id: I53e4623eb58f8b7ad3d02be15ad8726769f6f8c9
Reviewed-on: http://gerrit.cloudera.org:8080/401
Reviewed-by: Alex Behm <alex.behm@cloudera.com>
Tested-by: Internal Jenkins
This commit is contained in:
Alex Behm
2015-05-08 22:19:50 -07:00
committed by Internal Jenkins
parent 7906ed44ac
commit 2db16efda8
23 changed files with 1964 additions and 201 deletions

View File

@@ -35,9 +35,6 @@ public class CollectionTableRef extends TableRef {
// parent scan's tuple. Result of analysis. Fully resolved against base tables.
private Expr collectionExpr_;
// True if this TableRef directly references a TableRef from an outer query block.
private boolean isCorrelated_;
// END: Members that need to be reset()
/////////////////////////////////////////
@@ -61,20 +58,19 @@ public class CollectionTableRef extends TableRef {
super(other);
collectionExpr_ =
(other.collectionExpr_ != null) ? other.collectionExpr_.clone() : null;
isCorrelated_ = other.isCorrelated_;
}
/**
* Registers this table ref with the given analyzer and add a slot descriptor for
* the materialized collection to be populated by parent scan. Also determines
* whether this table ref is correlated or not.
* Registers this collection table ref with the given analyzer and adds a slot
* descriptor for the materialized collection to be populated by parent scan.
* Also determines whether this collection table ref is correlated or not.
*/
@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
if (isAnalyzed_) return;
Preconditions.checkNotNull(getPrivilegeRequirement());
desc_ = analyzer.registerTableRef(this);
if (isRelativeRef()) {
if (isRelative()) {
SlotDescriptor parentSlotDesc = analyzer.registerSlotRef(resolvedPath_);
parentSlotDesc.setItemTupleDesc(desc_);
collectionExpr_ = new SlotRef(parentSlotDesc);
@@ -83,20 +79,25 @@ public class CollectionTableRef extends TableRef {
Analyzer parentAnalyzer =
analyzer.findAnalyzer(resolvedPath_.getRootDesc().getId());
Preconditions.checkNotNull(parentAnalyzer);
isCorrelated_ = parentAnalyzer != analyzer;
if (parentAnalyzer != analyzer) {
TableRef parentRef =
parentAnalyzer.getTableRef(resolvedPath_.getRootDesc().getId());
Preconditions.checkNotNull(parentRef);
// InlineViews are currently not supported as a parent ref.
Preconditions.checkState(!(parentRef instanceof InlineViewRef));
correlatedTupleIds_.add(parentRef.getId());
}
}
isAnalyzed_ = true;
analyzeJoin(analyzer);
}
@Override
public boolean isRelativeRef() {
public boolean isRelative() {
Preconditions.checkNotNull(resolvedPath_);
return resolvedPath_.getRootDesc() != null;
}
@Override
public boolean isCorrelated() { return isCorrelated_; }
public Expr getCollectionExpr() { return collectionExpr_; }
@Override
@@ -106,6 +107,5 @@ public class CollectionTableRef extends TableRef {
public void reset() {
super.reset();
collectionExpr_ = null;
isCorrelated_ = false;
}
}

View File

@@ -154,7 +154,7 @@ public class InlineViewRef extends TableRef {
inlineViewAnalyzer_.setUseHiveColLabels(
isCatalogView ? true : analyzer.useHiveColLabels());
queryStmt_.analyze(inlineViewAnalyzer_);
queryStmt_.checkCorrelatedTableRefs(inlineViewAnalyzer_);
correlatedTupleIds_.addAll(queryStmt_.getCorrelatedTupleIds(inlineViewAnalyzer_));
if (explicitColLabels_ != null) {
Preconditions.checkState(
explicitColLabels_.size() == queryStmt_.getColLabels().size());

View File

@@ -21,7 +21,6 @@ import java.util.Set;
import com.cloudera.impala.catalog.Type;
import com.cloudera.impala.common.AnalysisException;
import com.cloudera.impala.common.InternalException;
import com.cloudera.impala.common.TreeNode;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
@@ -100,6 +99,10 @@ public abstract class QueryStmt extends StatementBase {
}
/**
* Returns a list containing all the materialized tuple ids that this stmt is
* correlated with (i.e., those tuple ids from outer query blocks that TableRefs
* inside this stmt are rooted at).
*
* Throws if this stmt contains an illegal mix of un/correlated table refs.
* A statement is illegal if it contains a TableRef correlated with a parent query
* block as well as a table ref with an absolute path (e.g. a BaseTabeRef). Such a
@@ -112,19 +115,22 @@ public abstract class QueryStmt extends StatementBase {
* (3) a mix of correlated table refs and table refs rooted at those refs
* (the statement is 'self-contained' with respect to correlation)
*/
public void checkCorrelatedTableRefs(Analyzer analyzer) throws AnalysisException {
public List<TupleId> getCorrelatedTupleIds(Analyzer analyzer)
throws AnalysisException {
// Correlated tuple ids of this stmt.
List<TupleId> correlatedTupleIds = Lists.newArrayList();
// First correlated and absolute table refs. Used for error detection/reporting.
// We pick the first ones for simplicity. Choosing arbitrary ones is equally valid.
TableRef correlatedRef = null;
TableRef absoluteRef = null;
// Materialized tuple ids of the table refs checked so far.
Set<TupleId> tblRefIds = Sets.newHashSet();
List<TableRef> tblRefs = Lists.newArrayList();
collectTableRefs(tblRefs);
// First table ref that is correlated with a parent query block.
TableRef correlatedRef = null;
// First absolute table ref.
TableRef absoluteRef = null;
// Tuple ids of the tblRefs checked so far.
Set<TupleId> tblRefIds = Sets.newHashSet();
for (TableRef tblRef: tblRefs) {
if (absoluteRef == null && !tblRef.isRelativeRef()) absoluteRef = tblRef;
if (correlatedRef == null && tblRef.isCorrelated()) {
if (absoluteRef == null && !tblRef.isRelative()) absoluteRef = tblRef;
if (tblRef.isCorrelated()) {
// Check if the correlated table ref is rooted at a tuple descriptor from within
// this query stmt. If so, the correlation is contained within this stmt
// and the table ref does not conflict with absolute refs.
@@ -132,7 +138,8 @@ public abstract class QueryStmt extends StatementBase {
Preconditions.checkNotNull(t.getResolvedPath().getRootDesc());
// This check relies on tblRefs being in depth-first order.
if (!tblRefIds.contains(t.getResolvedPath().getRootDesc().getId())) {
correlatedRef = tblRef;
if (correlatedRef == null) correlatedRef = tblRef;
correlatedTupleIds.add(t.getResolvedPath().getRootDesc().getId());
}
}
if (correlatedRef != null && absoluteRef != null) {
@@ -143,6 +150,7 @@ public abstract class QueryStmt extends StatementBase {
}
tblRefIds.add(tblRef.getId());
}
return correlatedTupleIds;
}
private void analyzeLimit(Analyzer analyzer) throws AnalysisException {
@@ -365,8 +373,7 @@ public abstract class QueryStmt extends StatementBase {
* This is called prior to plan tree generation and allows tuple-materializing
* PlanNodes to compute their tuple's mem layout.
*/
public abstract void materializeRequiredSlots(Analyzer analyzer)
throws InternalException;
public abstract void materializeRequiredSlots(Analyzer analyzer);
/**
* Mark slots referenced in exprs as materialized.

View File

@@ -0,0 +1,61 @@
// Copyright (c) 2015 Cloudera, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.cloudera.impala.analysis;
import java.util.List;
import com.cloudera.impala.planner.PlanNode;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/**
* Dummy table ref that is used in plan generation for adding a SingularRowSrcNode
* inside a SubplanNode's plan tree (second child).
*/
public class SingularRowSrcTableRef extends TableRef {
private final List<TupleId> tblRefIds_;
private final List<TupleId> tupleIds_;
public SingularRowSrcTableRef(PlanNode subplanInput) {
super(null, "singular-row-src-tblref");
Preconditions.checkNotNull(subplanInput);
desc_ = null;
isAnalyzed_ = true;
tblRefIds_ = Lists.newArrayList(subplanInput.getTblRefIds());
tupleIds_ = Lists.newArrayList(subplanInput.getTupleIds());
}
/**
* This override is needed to support join inversion where the original lhs
* is a SingularRowSrcTableRef.
*/
@Override
public void setLeftTblRef(TableRef leftTblRef) {
super.setLeftTblRef(leftTblRef);
tblRefIds_.clear();
tupleIds_.clear();
tblRefIds_.addAll(leftTblRef_.getAllTupleIds());
tupleIds_.addAll(leftTblRef_.getAllMaterializedTupleIds());
}
@Override
public TupleId getId() { return tblRefIds_.get(tblRefIds_.size() - 1); }
@Override
public List<TupleId> getAllTupleIds() { return tblRefIds_; }
@Override
public List<TupleId> getAllMaterializedTupleIds() { return tupleIds_; }
}

View File

@@ -79,7 +79,8 @@ public class Subquery extends Expr {
analyzer_ = new Analyzer(analyzer);
analyzer_.setIsSubquery();
stmt_.analyze(analyzer_);
stmt_.checkCorrelatedTableRefs(analyzer_);
// Check whether the stmt_ contains an illegal mix of un/correlated table refs.
stmt_.getCorrelatedTupleIds(analyzer_);
// Set the subquery type based on the types of the exprs in the
// result list of the associated SelectStmt.

View File

@@ -104,6 +104,12 @@ public class TableRef implements ParseNode {
// all (logical) TupleIds referenced in the On clause
protected List<TupleId> onClauseTupleIds_ = Lists.newArrayList();
// All physical tuple ids that this table ref is correlated with:
// Tuple ids of root descriptors from outer query blocks that this table ref
// (if a CollectionTableRef) or contained CollectionTableRefs (if an InlineViewRef)
// are rooted at. Populated during analysis.
protected List<TupleId> correlatedTupleIds_ = Lists.newArrayList();
// analysis output
protected TupleDescriptor desc_;
@@ -142,6 +148,7 @@ public class TableRef implements ParseNode {
leftTblRef_ = null;
isAnalyzed_ = other.isAnalyzed_;
onClauseTupleIds_ = Lists.newArrayList(other.onClauseTupleIds_);
correlatedTupleIds_ = Lists.newArrayList(other.correlatedTupleIds_);
desc_ = other.desc_;
}
@@ -230,7 +237,13 @@ public class TableRef implements ParseNode {
* Returns true if this table ref has a resolved path that is rooted at a registered
* tuple descriptor, false otherwise.
*/
public boolean isRelativeRef() { return false; }
public boolean isRelative() { return false; }
/**
* Indicates if this TableRef directly or indirectly references another TableRef from
* an outer query block.
*/
public boolean isCorrelated() { return !correlatedTupleIds_.isEmpty(); }
public List<String> getPath() { return rawPath_; }
public Path getResolvedPath() { return resolvedPath_; }
@@ -281,13 +294,10 @@ public class TableRef implements ParseNode {
}
public DistributionMode getDistributionMode() { return distrMode_; }
public List<TupleId> getOnClauseTupleIds() { return onClauseTupleIds_; }
public List<TupleId> getCorrelatedTupleIds() { return correlatedTupleIds_; }
public boolean isAnalyzed() { return isAnalyzed_; }
public boolean isResolved() { return !getClass().equals(TableRef.class); }
/**
* Indicates if this TableRef references another TableRef from an outer query block.
*/
public boolean isCorrelated() { return false; }
/**
* This method should only be called after the TableRef has been analyzed.
*/
@@ -304,7 +314,7 @@ public class TableRef implements ParseNode {
public TupleId getId() {
Preconditions.checkState(isAnalyzed_);
// after analyze(), desc should be set.
Preconditions.checkState(desc_ != null);
Preconditions.checkNotNull(desc_);
return desc_.getId();
}
@@ -476,7 +486,7 @@ public class TableRef implements ParseNode {
onClauseTupleIds.addAll(tupleIds);
}
onClauseTupleIds_.addAll(onClauseTupleIds);
} else if (!isRelativeRef()
} else if (!isRelative()
&& (getJoinOp().isOuterJoin() || getJoinOp().isSemiJoin())) {
throw new AnalysisException(
joinOp_.toString() + " requires an ON or USING clause.");
@@ -587,5 +597,6 @@ public class TableRef implements ParseNode {
leftTblRef_ = null;
onClauseTupleIds_.clear();
desc_ = null;
correlatedTupleIds_.clear();
}
}

View File

@@ -22,7 +22,6 @@ import org.slf4j.LoggerFactory;
import com.cloudera.impala.catalog.ColumnStats;
import com.cloudera.impala.common.AnalysisException;
import com.cloudera.impala.common.InternalException;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -274,7 +273,7 @@ public class UnionStmt extends QueryStmt {
* Calls materializeRequiredSlots() on the operands themselves.
*/
@Override
public void materializeRequiredSlots(Analyzer analyzer) throws InternalException {
public void materializeRequiredSlots(Analyzer analyzer) {
TupleDescriptor tupleDesc = analyzer.getDescTbl().getTupleDesc(tupleId_);
if (!distinctOperands_.isEmpty()) {
// to keep things simple we materialize all grouping exprs = output slots,

View File

@@ -26,7 +26,6 @@ import com.cloudera.impala.analysis.ExprSubstitutionMap;
import com.cloudera.impala.analysis.OrderByElement;
import com.cloudera.impala.analysis.TupleDescriptor;
import com.cloudera.impala.analysis.TupleId;
import com.cloudera.impala.common.InternalException;
import com.cloudera.impala.thrift.TAnalyticNode;
import com.cloudera.impala.thrift.TExplainLevel;
import com.cloudera.impala.thrift.TPlanNode;
@@ -108,7 +107,7 @@ public class AnalyticEvalNode extends PlanNode {
public List<OrderByElement> getOrderByElements() { return orderByElements_; }
@Override
public void init(Analyzer analyzer) throws InternalException {
public void init(Analyzer analyzer) {
computeMemLayout(analyzer);
intermediateTupleDesc_.computeMemLayout();

View File

@@ -101,6 +101,8 @@ public class DistributedPlanner {
// (the result set with the limit constraint needs to be computed centrally);
// merge later if needed
boolean childIsPartitioned = !child.hasLimit();
// Do not fragment the subplan of a SubplanNode since it is executed locally.
if (root instanceof SubplanNode && child == root.getChild(1)) continue;
childFragments.add(
createPlanFragments(
child, childIsPartitioned, perNodeMemLimit, fragments));
@@ -120,6 +122,9 @@ public class DistributedPlanner {
result = createNestedLoopJoinFragment(
(NestedLoopJoinNode) root, childFragments.get(1), childFragments.get(0),
perNodeMemLimit, fragments);
} else if (root instanceof SubplanNode) {
Preconditions.checkState(childFragments.size() == 1);
result = createSubplanNodeFragment((SubplanNode) root, childFragments.get(0));
} else if (root instanceof SelectNode) {
result = createSelectNodeFragment((SelectNode) root, childFragments);
} else if (root instanceof UnionNode) {
@@ -266,6 +271,17 @@ public class DistributedPlanner {
return new PlanFragment(ctx_.getNextFragmentId(), node, DataPartition.RANDOM);
}
/**
* Adds the SubplanNode as the new plan root to the child fragment and returns
* the child fragment.
*/
private PlanFragment createSubplanNodeFragment(SubplanNode node,
PlanFragment childFragment) {
node.setChild(0, childFragment.getPlanRoot());
childFragment.setPlanRoot(node);
return childFragment;
}
/**
* Modifies the leftChildFragment to execute a cross join. The right child input is
* provided by an ExchangeNode, which is the destination of the rightChildFragment's

View File

@@ -18,7 +18,6 @@ import java.util.ArrayList;
import com.cloudera.impala.analysis.Analyzer;
import com.cloudera.impala.analysis.TupleId;
import com.cloudera.impala.common.InternalException;
import com.cloudera.impala.thrift.TExplainLevel;
import com.cloudera.impala.thrift.TPlanNode;
import com.cloudera.impala.thrift.TPlanNodeType;
@@ -41,7 +40,7 @@ public class EmptySetNode extends PlanNode {
}
@Override
public void init(Analyzer analyzer) throws InternalException {
public void init(Analyzer analyzer) {
// If the physical output tuple produced by an AnalyticEvalNode wasn't created
// the logical output tuple is returned by getMaterializedTupleIds(). It needs
// to be set as materialized (even though it isn't) to avoid failing precondition

View File

@@ -43,6 +43,7 @@ import com.cloudera.impala.catalog.HdfsFileFormat;
import com.cloudera.impala.catalog.HdfsPartition;
import com.cloudera.impala.catalog.HdfsPartition.FileBlock;
import com.cloudera.impala.catalog.HdfsTable;
import com.cloudera.impala.catalog.Type;
import com.cloudera.impala.common.AnalysisException;
import com.cloudera.impala.common.InternalException;
import com.cloudera.impala.common.PrintUtils;
@@ -602,6 +603,12 @@ public class HdfsScanNode extends ScanNode {
cardinality_ = tbl_.getNumRows();
}
}
// Adjust cardinality for all collections referenced along the tuple's path.
if (cardinality_ != -1) {
for (Type t: desc_.getPath().getMatchedTypes()) {
if (t.isCollectionType()) cardinality_ *= PlannerContext.AVG_COLLECTION_SIZE;
}
}
inputCardinality_ = cardinality_;
Preconditions.checkState(cardinality_ >= 0 || cardinality_ == -1,
"Internal error: invalid scan node cardinality: " + cardinality_);

View File

@@ -96,6 +96,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
// to avoid having to pass assigned conjuncts back and forth
// (the planner uses this to save and reset the global state in between join tree
// alternatives)
// TODO for 2.3: Save this state in the PlannerContext instead.
protected Set<ExprId> assignedConjuncts_;
// estimate of the output cardinality of this node; set in computeStats();
@@ -386,8 +387,8 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
* and computes the mem layout of all materialized tuples (with the assumption that
* slots that are needed by ancestor PlanNodes have already been marked).
* Also performs final expr substitution with childrens' smaps and computes internal
* state required for toThrift().
* This is called directly after construction.
* state required for toThrift(). This is called directly after construction.
* Throws if an expr substitution or evaluation fails.
*/
public void init(Analyzer analyzer) throws InternalException {
assignConjuncts(analyzer);

View File

@@ -42,6 +42,15 @@ public class Planner {
/**
* Returns a list of plan fragments for executing an analyzed parse tree.
* May return a single-node or distributed executable plan.
*
* Plan generation may fail and throw for the following reasons:
* 1. Expr evaluation failed, e.g., during partition pruning.
* 2. A certain feature is not yet implemented, e.g., physical join implementation for
* outer/semi joins without equi conjuncts.
* 3. Expr substitution failed, e.g., because an expr was substituted with a type that
* render the containing expr semantically invalid. Analysis should have ensured
* that such an expr substitution during plan generation never fails. If it does,
* that typically means there is a bug in analysis, or a broken/missing smap.
*/
public ArrayList<PlanFragment> createPlan() throws ImpalaException {
SingleNodePlanner singleNodePlanner = new SingleNodePlanner(ctx_);

View File

@@ -14,12 +14,15 @@
package com.cloudera.impala.planner;
import java.util.LinkedList;
import com.cloudera.impala.analysis.AnalysisContext;
import com.cloudera.impala.analysis.Analyzer;
import com.cloudera.impala.analysis.QueryStmt;
import com.cloudera.impala.common.IdGenerator;
import com.cloudera.impala.thrift.TQueryCtx;
import com.cloudera.impala.thrift.TQueryOptions;
import com.google.common.collect.Lists;
/**
* Contains the analysis result of a query as well as planning-specific
@@ -33,10 +36,25 @@ public class PlannerContext {
// The maximum fraction of remaining memory that a sort node can use during execution.
public final static double SORT_MEM_MAX_FRACTION = 0.80;
// Assumed average number of items in a nested collection, since we currently have no
// statistics on nested fields. The motivation for this constant is to avoid
// pathological plan choices that could result from a SubplanNode having an unknown
// cardinality (due to UnnestNodes not knowing their cardinality), or from a ScanNode
// significantly underestimating its output cardinality because intermediate collections
// are not accounted for at all. For example, we will place a table ref plan with a
// SubplanNode on the build side of a join due to an unknown cardinality if the other
// input is a base table scan with stats.
// The constant value was chosen arbitrarily to not be "too high" or "too low".
// TODO: Compute stats for nested types and pick them up here.
public static final long AVG_COLLECTION_SIZE = 10;
private final IdGenerator<PlanNodeId> nodeIdGenerator_ = PlanNodeId.createGenerator();
private final IdGenerator<PlanFragmentId> fragmentIdGenerator_ =
PlanFragmentId.createGenerator();
// Keeps track of subplan nesting. Maintained with push/popSubplan().
private final LinkedList<SubplanNode> subplans_ = Lists.newLinkedList();
private final TQueryCtx queryCtx_;
private final AnalysisContext.AnalysisResult analysisResult_;
private final QueryStmt queryStmt_;
@@ -65,4 +83,9 @@ public class PlannerContext {
public boolean isInsertOrCtas() {
return analysisResult_.isInsertStmt() || analysisResult_.isCreateTableAsSelectStmt();
}
public boolean hasSubplan() { return !subplans_.isEmpty(); }
public SubplanNode getSubplan() { return subplans_.getFirst(); }
public boolean pushSubplan(SubplanNode n) { return subplans_.offerFirst(n); }
public void popSubplan() { subplans_.removeFirst(); }
}

View File

@@ -21,7 +21,6 @@ import org.slf4j.LoggerFactory;
import com.cloudera.impala.analysis.Analyzer;
import com.cloudera.impala.analysis.Expr;
import com.cloudera.impala.common.InternalException;
import com.cloudera.impala.thrift.TExplainLevel;
import com.cloudera.impala.thrift.TPlanNode;
import com.cloudera.impala.thrift.TPlanNodeType;
@@ -47,7 +46,7 @@ public class SelectNode extends PlanNode {
}
@Override
public void init(Analyzer analyzer) throws InternalException {
public void init(Analyzer analyzer) {
analyzer.markConjunctsAssigned(conjuncts_);
computeStats(analyzer);
createDefaultSmap(analyzer);

View File

@@ -20,6 +20,7 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
@@ -38,6 +39,7 @@ import com.cloudera.impala.analysis.InlineViewRef;
import com.cloudera.impala.analysis.JoinOperator;
import com.cloudera.impala.analysis.QueryStmt;
import com.cloudera.impala.analysis.SelectStmt;
import com.cloudera.impala.analysis.SingularRowSrcTableRef;
import com.cloudera.impala.analysis.SlotDescriptor;
import com.cloudera.impala.analysis.SlotId;
import com.cloudera.impala.analysis.SlotRef;
@@ -60,6 +62,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
@@ -82,16 +85,30 @@ public class SingleNodePlanner {
* in the planner context. The planning process recursively walks the parse tree and
* performs the following actions.
* In the top-down phase over query statements:
* - materialize the slots required for evaluating expressions of that statement
* - migrate conjuncts from parent blocks into inline views and union operands
* - Materialize the slots required for evaluating expressions of that statement.
* - Migrate conjuncts from parent blocks into inline views and union operands.
* In the bottom-up phase generate the plan tree for every query statement:
* - perform join-order optimization when generating the plan of the FROM
* clause of a select statement; requires that all materialized slots are known
* for an accurate estimate of row sizes needed for cost-based join ordering
* - assign conjuncts that can be evaluated at that node and compute the stats
* of that node (cardinality, etc.)
* - apply combined expression substitution map of child plan nodes; if a plan node
* re-maps its input, set a substitution map to be applied by parents
* - Generate the plan for the FROM-clause of a select statement: The plan trees of
* absolute and uncorrelated table refs are connected via JoinNodes. The relative
* and correlated table refs are associated with one or more SubplanNodes.
* - A new SubplanNode is placed on top of an existing plan node whenever the tuples
* materialized by that plan node enable evaluation of one or more relative or
* correlated table refs, i.e., SubplanNodes are placed at the lowest possible point
* in the plan, often right after a ScanNode materializing the (single) parent tuple.
* - The right-hand side of each SubplanNode is a plan tree generated by joining a
* SingularRowSrcTableRef with those applicable relative and correlated refs.
* A SingularRowSrcTableRef represents the current row being processed by the
* SubplanNode from its input (first child).
* - Connecting table ref plans via JoinNodes is done in a cost-based fashion
* (join-order optimization). All materialized slots, including those of tuples
* materialized inside a SubplanNode, must be known for an accurate estimate of row
* sizes needed for cost-based join ordering.
* - The remaining aggregate/analytic/orderby portions of a select statement are added
* on top of the FROM-clause plan.
* - Whenever a new node is added to the plan tree, assign conjuncts that can be
* evaluated at that node and compute the stats of that node (cardinality, etc.).
* - Apply combined expression substitution map of child plan nodes; if a plan node
* re-maps its input, set a substitution map to be applied by parents.
*/
public PlanNode createSingleNodePlan() throws ImpalaException {
QueryStmt queryStmt = ctx_.getQueryStmt();
@@ -121,14 +138,12 @@ public class SingleNodePlanner {
return singleNodePlan;
}
/*
/**
* Validates a single-node plan by checking that it does not contain right or
* full outer joins with no equi-join conjuncts. Throws an InternalException
* if plan validation fails.
* full outer joins with no equi-join conjuncts that are not inside the right child
* of a SubplanNode. Throws an InternalException if plan validation fails.
*
* TODO for 2.3: Temporary solution; the planner should avoid generating invalid plans.
* We will revisit this check and address this TODO in the context of subplan
* generation.
*/
public void validatePlan(PlanNode planNode) throws NotImplementedException {
if (planNode instanceof NestedLoopJoinNode) {
@@ -144,14 +159,21 @@ public class SingleNodePlanner {
}
}
for (PlanNode child: planNode.getChildren()) validatePlan(child);
if (planNode instanceof SubplanNode) {
// Right and full outer joins with no equi-join conjuncts are ok in the right
// child of a SubplanNode.
validatePlan(planNode.getChild(0));
} else {
for (PlanNode child: planNode.getChildren()) {
validatePlan(child);
}
}
}
/**
* Creates an EmptyNode that 'materializes' the tuples of the given stmt.
*/
private PlanNode createEmptyNode(QueryStmt stmt, Analyzer analyzer)
throws InternalException {
private PlanNode createEmptyNode(QueryStmt stmt, Analyzer analyzer) {
ArrayList<TupleId> tupleIds = Lists.newArrayList();
stmt.getMaterializedTupleIds(tupleIds);
EmptySetNode node = new EmptySetNode(ctx_.getNextNodeId(), tupleIds);
@@ -236,8 +258,7 @@ public class SingleNodePlanner {
* semantically correct
*/
private PlanNode addUnassignedConjuncts(
Analyzer analyzer, List<TupleId> tupleIds, PlanNode root)
throws InternalException {
Analyzer analyzer, List<TupleId> tupleIds, PlanNode root) {
// No point in adding SelectNode on top of an EmptyNode.
if (root instanceof EmptySetNode) return root;
Preconditions.checkNotNull(root);
@@ -257,28 +278,32 @@ public class SingleNodePlanner {
}
/**
* Return the cheapest plan that materializes the joins of all TblRefs in refPlans.
* Assumes that refPlans are in the order as they originally appeared in the query.
* Return the cheapest plan that materializes the joins of all TableRefs in
* parentRefPlans and the subplans of all applicable TableRefs in subplanRefs.
* Assumes that parentRefPlans are in the order as they originally appeared in
* the query.
* For this plan:
* - the plan is executable, ie, all non-cross joins have equi-join predicates
* - the leftmost scan is over the largest of the inputs for which we can still
* construct an executable plan
* - all rhs's are in decreasing order of selectiveness (percentage of rows they
* eliminate)
* - from bottom to top, all rhs's are in increasing order of selectivity (percentage
* of surviving rows)
* - outer/cross/semi joins: rhs serialized size is < lhs serialized size;
* enforced via join inversion, if necessary
* - SubplanNodes are placed as low as possible in the plan tree - as soon as the
* required tuple ids of one or more TableRefs in subplanRefs are materialized
* Returns null if we can't create an executable plan.
*/
private PlanNode createCheapestJoinPlan(
Analyzer analyzer, List<Pair<TableRef, PlanNode>> refPlans)
private PlanNode createCheapestJoinPlan(Analyzer analyzer,
List<Pair<TableRef, PlanNode>> parentRefPlans, List<SubplanRef> subplanRefs)
throws ImpalaException {
LOG.trace("createCheapestJoinPlan");
if (refPlans.size() == 1) return refPlans.get(0).second;
if (parentRefPlans.size() == 1) return parentRefPlans.get(0).second;
// collect eligible candidates for the leftmost input; list contains
// (plan, materialized size)
ArrayList<Pair<TableRef, Long>> candidates = Lists.newArrayList();
for (Pair<TableRef, PlanNode> entry: refPlans) {
for (Pair<TableRef, PlanNode> entry: parentRefPlans) {
TableRef ref = entry.first;
JoinOperator joinOp = ref.getJoinOp();
@@ -291,8 +316,7 @@ public class SingleNodePlanner {
// TODO: Allow the rhs of any cross join as the leftmost table. This needs careful
// consideration of the joinOps that result from such a re-ordering (IMPALA-1281).
if (((joinOp.isOuterJoin() || joinOp.isSemiJoin() || joinOp.isCrossJoin()) &&
ref != refPlans.get(1).first) || joinOp.isNullAwareLeftAntiJoin()) {
// ref cannot appear as the leftmost input
ref != parentRefPlans.get(1).first) || joinOp.isNullAwareLeftAntiJoin()) {
continue;
}
@@ -304,11 +328,12 @@ public class SingleNodePlanner {
LOG.trace("candidate " + ref.getUniqueAlias() + ": 0");
continue;
}
Preconditions.checkNotNull(ref.getDesc());
Preconditions.checkState(ref.isAnalyzed());
long materializedSize =
(long) Math.ceil(plan.getAvgRowSize() * (double) plan.getCardinality());
candidates.add(new Pair(ref, new Long(materializedSize)));
LOG.trace("candidate " + ref.getUniqueAlias() + ": " + Long.toString(materializedSize));
LOG.trace(
"candidate " + ref.getUniqueAlias() + ": " + Long.toString(materializedSize));
}
if (candidates.isEmpty()) return null;
@@ -323,7 +348,7 @@ public class SingleNodePlanner {
});
for (Pair<TableRef, Long> candidate: candidates) {
PlanNode result = createJoinPlan(analyzer, candidate.first, refPlans);
PlanNode result = createJoinPlan(analyzer, candidate.first, parentRefPlans, subplanRefs);
if (result != null) return result;
}
return null;
@@ -332,10 +357,12 @@ public class SingleNodePlanner {
/**
* Returns a plan with leftmostRef's plan as its leftmost input; the joins
* are in decreasing order of selectiveness (percentage of rows they eliminate).
* Creates and adds subplan nodes as soon as the tuple ids required by at least one
* subplan ref are materialized by a join node added during plan generation.
* The leftmostRef's join will be inverted if it is an outer/semi/cross join.
*/
private PlanNode createJoinPlan(
Analyzer analyzer, TableRef leftmostRef, List<Pair<TableRef, PlanNode>> refPlans)
private PlanNode createJoinPlan(Analyzer analyzer, TableRef leftmostRef,
List<Pair<TableRef, PlanNode>> refPlans, List<SubplanRef> subplanRefs)
throws ImpalaException {
LOG.trace("createJoinPlan: " + leftmostRef.getUniqueAlias());
@@ -350,10 +377,6 @@ public class SingleNodePlanner {
}
}
Preconditions.checkNotNull(root);
// refs that have been joined. The union of joinedRefs and the refs in remainingRefs
// are the set of all table refs.
Set<TableRef> joinedRefs = Sets.newHashSet();
joinedRefs.add(leftmostRef);
// If the leftmostTblRef is an outer/semi/cross join, we must invert it.
boolean planHasInvertedJoin = false;
@@ -366,48 +389,52 @@ public class SingleNodePlanner {
// not change.
leftmostRef.invertJoin(refPlans, analyzer);
planHasInvertedJoin = true;
// Avoid swapping the refPlan elements in-place.
refPlans = Lists.newArrayList(refPlans);
Collections.swap(refPlans, 0, 1);
}
// Maps from a TableRef in refPlans with an outer/semi join op to the set of
// TableRefs that precede it refPlans (i.e., in FROM-clause order).
// The map is used to place outer/semi joins at a fixed position in the plan tree
// (IMPALA-860), s.t. all the tables appearing to the left/right of an outer/semi
// join in the original query still remain to the left/right after join ordering.
// This prevents join re-ordering across outer/semi joins which is generally wrong.
Map<TableRef, Set<TableRef>> precedingRefs = Maps.newHashMap();
List<TableRef> tmpTblRefs = Lists.newArrayList();
for (Pair<TableRef, PlanNode> entry: refPlans) {
TableRef tblRef = entry.first;
if (tblRef.getJoinOp().isOuterJoin() || tblRef.getJoinOp().isSemiJoin()) {
precedingRefs.put(tblRef, Sets.newHashSet(tmpTblRefs));
}
tmpTblRefs.add(tblRef);
}
// Refs that have been joined. The union of joinedRefs and the refs in remainingRefs
// are the set of all table refs.
Set<TableRef> joinedRefs = Sets.newHashSet(leftmostRef);
long numOps = 0;
int i = 0;
while (!remainingRefs.isEmpty()) {
// we minimize the resulting cardinality at each step in the join chain,
// which minimizes the total number of hash table lookups
// We minimize the resulting cardinality at each step in the join chain,
// which minimizes the total number of hash table lookups.
PlanNode newRoot = null;
Pair<TableRef, PlanNode> minEntry = null;
for (Pair<TableRef, PlanNode> entry: remainingRefs) {
TableRef ref = entry.first;
LOG.trace(Integer.toString(i) + " considering ref " + ref.getUniqueAlias());
// Determine whether we can or must consider this join at this point in the plan.
// Place outer/semi joins at a fixed position in the plan tree (IMPALA-860),
// s.t. all the tables appearing to the left/right of an outer/semi join in
// the original query still remain to the left/right after join ordering. This
// prevents join re-ordering across outer/semi joins which is generally wrong.
// The checks below relies on remainingRefs being in the order as they originally
// appeared in the query.
JoinOperator joinOp = ref.getJoinOp();
if (joinOp.isOuterJoin() || joinOp.isSemiJoin()) {
List<TupleId> currentTids = Lists.newArrayList(root.getTblRefIds());
currentTids.add(ref.getId());
// Place outer/semi joins at a fixed position in the plan tree. We know that
// the join resulting from 'ref' must become the new root if the current
// root materializes exactly those tuple ids corresponding to TableRefs
// appearing to the left of 'ref' in the original query.
List<TupleId> tableRefTupleIds = ref.getAllTupleIds();
if (!currentTids.containsAll(tableRefTupleIds) ||
!tableRefTupleIds.containsAll(currentTids)) {
// Do not consider the remaining table refs to prevent incorrect re-ordering
// of tables across outer/semi/anti joins.
break;
}
} else if (ref.getJoinOp().isCrossJoin()) {
if (!joinedRefs.contains(ref.getLeftTblRef())) continue;
// Place outer/semi joins at a fixed position in the plan tree.
Set<TableRef> requiredRefs = precedingRefs.get(ref);
if (requiredRefs != null) {
Preconditions.checkState(joinOp.isOuterJoin() || joinOp.isSemiJoin());
// If the required table refs have not been placed yet, do not even consider
// the remaining table refs to prevent incorrect re-ordering of tables across
// outer/semi joins.
if (!requiredRefs.equals(joinedRefs)) break;
}
PlanNode rhsPlan = entry.second;
analyzer.setAssignedConjuncts(root.getAssignedConjuncts());
boolean invertJoin = false;
if (joinOp.isOuterJoin() || joinOp.isSemiJoin() || joinOp.isCrossJoin()) {
// Invert the join if doing so reduces the size of build-side hash table
@@ -423,6 +450,8 @@ public class SingleNodePlanner {
invertJoin = true;
}
}
analyzer.setAssignedConjuncts(root.getAssignedConjuncts());
PlanNode candidate = null;
if (invertJoin) {
ref.setJoinOp(ref.getJoinOp().invert());
@@ -477,8 +506,15 @@ public class SingleNodePlanner {
remainingRefs.remove(minEntry);
joinedRefs.add(minEntry.first);
root = newRoot;
// assign id_ after running through the possible choices in order to end up
// Create a Subplan on top of the new root for all the subplan refs that can be
// evaluated at this point.
// TODO: Once we have stats on nested collections, we should consider the join
// order in conjunction with the placement of SubplanNodes, i.e., move the creation
// of SubplanNodes into the join-ordering loop above.
root = createSubplan(root, subplanRefs, false, false, analyzer);
// assign node ids after running through the possible choices in order to end up
// with a dense sequence of node ids
if (root instanceof SubplanNode) root.getChild(0).setId(ctx_.getNextNodeId());
root.setId(ctx_.getNextNodeId());
analyzer.setAssignedConjuncts(root.getAssignedConjuncts());
++i;
@@ -488,18 +524,21 @@ public class SingleNodePlanner {
}
/**
* Return a plan with joins in the order of refPlans (= FROM clause order).
* Return a plan with joins in the order of parentRefPlans (= FROM clause order).
* Adds coalesced SubplanNodes based on the FROM-clause order of subplanRefs.
*/
private PlanNode createFromClauseJoinPlan(
Analyzer analyzer, List<Pair<TableRef, PlanNode>> refPlans)
private PlanNode createFromClauseJoinPlan(Analyzer analyzer,
List<Pair<TableRef, PlanNode>> parentRefPlans, List<SubplanRef> subplanRefs)
throws ImpalaException {
// create left-deep sequence of binary hash joins; assign node ids as we go along
Preconditions.checkState(!refPlans.isEmpty());
PlanNode root = refPlans.get(0).second;
for (int i = 1; i < refPlans.size(); ++i) {
TableRef innerRef = refPlans.get(i).first;
PlanNode innerPlan = refPlans.get(i).second;
Preconditions.checkState(!parentRefPlans.isEmpty());
PlanNode root = parentRefPlans.get(0).second;
for (int i = 1; i < parentRefPlans.size(); ++i) {
TableRef innerRef = parentRefPlans.get(i).first;
PlanNode innerPlan = parentRefPlans.get(i).second;
root = createJoinNode(analyzer, root, innerPlan, null, innerRef);
if (root != null) root = createSubplan(root, subplanRefs, true, false, analyzer);
if (root instanceof SubplanNode) root.getChild(0).setId(ctx_.getNextNodeId());
root.setId(ctx_.getNextNodeId());
}
return root;
@@ -547,32 +586,15 @@ public class SingleNodePlanner {
return createAggregationPlan(selectStmt, analyzer, emptySetNode);
}
// create plans for our table refs; use a list here instead of a map to
// maintain a deterministic order of traversing the TableRefs during join
// plan generation (helps with tests)
List<Pair<TableRef, PlanNode>> refPlans = Lists.newArrayList();
for (TableRef ref: selectStmt.getTableRefs()) {
PlanNode plan = createTableRefNode(analyzer, ref);
Preconditions.checkState(plan != null);
refPlans.add(new Pair(ref, plan));
}
// save state of conjunct assignment; needed for join plan generation
for (Pair<TableRef, PlanNode> entry: refPlans) {
entry.second.setAssignedConjuncts(analyzer.getAssignedConjuncts());
}
PlanNode root = null;
if (!selectStmt.getSelectList().isStraightJoin()) {
Set<ExprId> assignedConjuncts = analyzer.getAssignedConjuncts();
root = createCheapestJoinPlan(analyzer, refPlans);
if (root == null) analyzer.setAssignedConjuncts(assignedConjuncts);
}
if (selectStmt.getSelectList().isStraightJoin() || root == null) {
// we didn't have enough stats to do a cost-based join plan, or the STRAIGHT_JOIN
// keyword was in the select list: use the FROM clause order instead
root = createFromClauseJoinPlan(analyzer, refPlans);
Preconditions.checkNotNull(root);
}
// Separate table refs into parent refs (uncorrelated or absolute) and
// subplan refs (correlated or relative), and generate their plan.
boolean isStraightJoin = selectStmt.getSelectList().isStraightJoin();
List<TableRef> parentRefs = Lists.newArrayList();
List<SubplanRef> subplanRefs = Lists.newArrayList();
computeParentAndSubplanRefs(
selectStmt.getTableRefs(), isStraightJoin, parentRefs, subplanRefs);
PlanNode root = createTableRefsPlan(
parentRefs, subplanRefs, isStraightJoin, analyzer);
// add aggregation, if any
if (selectStmt.getAggInfo() != null) {
@@ -585,6 +607,249 @@ public class SingleNodePlanner {
return root;
}
/**
* Holds a table ref that must be evaluated inside a subplan (i.e., a relative or
* correlated ref), along with the materialized tuple ids and table ref ids that
* are required for this table ref to be correctly evaluated inside a SubplanNode.
*
* Required materialized tuple ids:
* These ensure that the SubplanNode evaluating this table ref is placed only once all
* root tuples needed by this table ref or relative refs contained in this table ref
* are materialized.
*
* Required table ref ids:
* These ensure that the SubplanNode evaluating this table ref is placed correctly
* with respect to join ordering, in particular, that the SubplanNode is not ordered
* across semi/outer joins.
*/
private static class SubplanRef {
// Relative or correlated table ref.
public final TableRef tblRef;
// List of tuple ids that must be materialized before 'tblRef' can be
// correctly evaluated inside a SubplanNode.
public final List<TupleId> requiredTids;
// List of table ref ids that a plan tree must contain before 'tblRef'
// can be correctly evaluated inside a SubplanNode.
public final List<TupleId> requiredTblRefIds;
public SubplanRef(TableRef tblRef, List<TupleId> requiredTids,
List<TupleId> requiredTblRefIds) {
Preconditions.checkState(tblRef.isRelative() || tblRef.isCorrelated());
this.tblRef = tblRef;
this.requiredTids = requiredTids;
this.requiredTblRefIds = requiredTblRefIds;
}
}
/**
* Separates tblRefs into the following two lists.
*
* parentRefs:
* Uncorrelated and non-relative table refs. These are the 'regular' table refs whose
* plans are connected by join nodes, and are not placed inside a Subplan. The returned
* parentRefs are self-contained with respect to TableRef linking, i.e., each returned
* TableRef has its left TableRef link set to the TableRef preceding it in parentRefs.
*
* subplanRefs:
* Correlated and relative table refs. The plan of such refs must be put inside a
* Subplan. See SubplanRef for details. The left TableRef link of the TableRefs in
* returned SubplanRefs are set to null.
* If isStraightJoin is true, then the required tuple ids and table ref ids of a
* correlated or relative ref are simply those of all table refs preceding it in
* the FROM-clause order.
*
* If this function is called when generating the right-hand side of a SubplanNode,
* then correlated and relative table refs that require only tuples produced by the
* SubplanNode's input are placed inside parentRefs.
*/
private void computeParentAndSubplanRefs(List<TableRef> tblRefs,
boolean isStraightJoin, List<TableRef> parentRefs, List<SubplanRef> subplanRefs) {
// Check if we are in a Subplan context.
List<TupleId> subplanTids = Collections.emptyList();
List<TupleId> subplanTblRefIds = Collections.emptyList();
if (ctx_.hasSubplan()) {
subplanTids = ctx_.getSubplan().getChild(0).getTupleIds();
subplanTblRefIds = ctx_.getSubplan().getChild(0).getTblRefIds();
}
// List of table ref ids of all outer/semi joined table refs seen so far.
List<TupleId> outerOrSemiJoinedTblRefIds = Lists.newArrayList();
for (TableRef ref: tblRefs) {
boolean isParentRef = true;
if (ref.isRelative() || ref.isCorrelated()) {
List<TupleId> requiredTids = Lists.newArrayList();
List<TupleId> requiredTblRefIds = Lists.newArrayList();
if (isStraightJoin) {
// Place the SubplanNode at the table refs position in FROM-clause order
// with respect to the parent table refs.
TableRef lastParentRef = parentRefs.get(parentRefs.size() - 1);
requiredTids.addAll(lastParentRef.getAllMaterializedTupleIds());
requiredTblRefIds.add(lastParentRef.getId());
} else {
// TODO: Think about when we can allow re-ordering across semi/outer joins
// in subplans.
if (ref.isCorrelated()) {
requiredTids.addAll(ref.getCorrelatedTupleIds());
} else {
CollectionTableRef collectionTableRef = (CollectionTableRef) ref;
requiredTids.add(collectionTableRef.getResolvedPath().getRootDesc().getId());
}
requiredTblRefIds.addAll(outerOrSemiJoinedTblRefIds);
}
if (!subplanTids.containsAll(requiredTids) ||
!subplanTblRefIds.containsAll(requiredTblRefIds)) {
isParentRef = false;
ref.setLeftTblRef(null);
// For outer and semi joins, we also need to ensure that the On-clause
// conjuncts can be evaluated, so add those required table ref ids,
// excluding the id of ref itself.
if (ref.getJoinOp().isOuterJoin() || ref.getJoinOp().isSemiJoin()) {
List<TupleId> onClauseTblRefIds =
Lists.newArrayList(ref.getOnClauseTupleIds());
onClauseTblRefIds.remove(ref.getId());
requiredTblRefIds.addAll(onClauseTblRefIds);
}
subplanRefs.add(new SubplanRef(ref, requiredTids, requiredTblRefIds));
}
}
if (isParentRef) {
// Fix the chain of parent table refs.
if (!parentRefs.isEmpty()) {
ref.setLeftTblRef(parentRefs.get(parentRefs.size() - 1));
}
parentRefs.add(ref);
}
if (ref.getJoinOp().isOuterJoin() || ref.getJoinOp().isSemiJoin()) {
outerOrSemiJoinedTblRefIds.add(ref.getId());
}
}
Preconditions.checkState(tblRefs.size() == parentRefs.size() + subplanRefs.size());
}
/**
* Returns a plan tree for evaluating the given parentRefs and subplanRefs.
*/
private PlanNode createTableRefsPlan(List<TableRef> parentRefs,
List<SubplanRef> subplanRefs, boolean isStraightJoin, Analyzer analyzer)
throws ImpalaException {
// create plans for our table refs; use a list here instead of a map to
// maintain a deterministic order of traversing the TableRefs during join
// plan generation (helps with tests)
List<Pair<TableRef, PlanNode>> parentRefPlans = Lists.newArrayList();
for (TableRef ref: parentRefs) {
PlanNode root = createTableRefNode(ref, isStraightJoin, analyzer);
Preconditions.checkNotNull(root);
root = createSubplan(root, subplanRefs, isStraightJoin, true, analyzer);
parentRefPlans.add(new Pair<TableRef, PlanNode>(ref, root));
}
// save state of conjunct assignment; needed for join plan generation
for (Pair<TableRef, PlanNode> entry: parentRefPlans) {
entry.second.setAssignedConjuncts(analyzer.getAssignedConjuncts());
}
PlanNode root = null;
if (!isStraightJoin) {
Set<ExprId> assignedConjuncts = analyzer.getAssignedConjuncts();
root = createCheapestJoinPlan(analyzer, parentRefPlans, subplanRefs);
// If createCheapestJoinPlan() failed to produce an executable plan, then we need
// to restore the original state of conjunct assignment for the straight-join plan
// to not incorrectly miss conjuncts.
if (root == null) analyzer.setAssignedConjuncts(assignedConjuncts);
}
if (isStraightJoin || root == null) {
// we didn't have enough stats to do a cost-based join plan, or the STRAIGHT_JOIN
// keyword was in the select list: use the FROM clause order instead
root = createFromClauseJoinPlan(analyzer, parentRefPlans, subplanRefs);
Preconditions.checkNotNull(root);
}
return root;
}
/**
* Places a SubplanNode on top of 'root' that evaluates all the subplan refs that can
* be correctly evaluated from 'root's materialized tuple ids. Returns 'root' if there
* are no applicable subplan refs.
* Assigns the returned SubplanNode a new node id unless assignId is false.
*
* If applicable, the SubplanNode is created as follows:
* - 'root' is the input of the SubplanNode (first child)
* - the second child is the plan tree generated from these table refs:
* 1. a SingularRowSrcTableRef that represents the current row being processed
* by the SubplanNode to be joined with
* 2. all applicable subplan refs
* - the second child plan tree is generated as usual with createTableRefsPlan()
* - the plans of the applicable subplan refs are generated as usual, without a
* SingularRowSrcTableRef
* - nested SubplanNodes are generated recursively inside createTableRefsPlan() by
* passing in the remaining subplanRefs that are not applicable after 'root'; some
* of those subplanRefs may become applicable inside the second child plan tree of
* the SubplanNode generated here
*/
private PlanNode createSubplan(PlanNode root, List<SubplanRef> subplanRefs,
boolean isStraightJoin, boolean assignId, Analyzer analyzer)
throws ImpalaException {
Preconditions.checkNotNull(root);
List<TableRef> applicableRefs = extractApplicableRefs(root, subplanRefs);
if (applicableRefs.isEmpty()) return root;
// Prepend a SingularRowSrcTableRef representing the current row being processed
// by the SubplanNode from its input (first child).
Preconditions.checkState(applicableRefs.get(0).getLeftTblRef() == null);
applicableRefs.add(0, new SingularRowSrcTableRef(root));
applicableRefs.get(1).setLeftTblRef(applicableRefs.get(0));
// Construct an incomplete SubplanNode that only knows its input so we can push it
// into the planner context. The subplan is set after the subplan tree has been
// constructed.
SubplanNode subplanNode = new SubplanNode(root);
if (assignId) subplanNode.setId(ctx_.getNextNodeId());
// Push the SubplanNode such that UnnestNodes and SingularRowSrcNodes can pick up
// their containing SubplanNode. Also, further plan generation relies on knowing
// whether we are in a subplan context or not (see computeParentAndSubplanRefs()).
ctx_.pushSubplan(subplanNode);
PlanNode subplan =
createTableRefsPlan(applicableRefs, subplanRefs, isStraightJoin, analyzer);
ctx_.popSubplan();
subplanNode.setSubplan(subplan);
subplanNode.init(analyzer);
return subplanNode;
}
/**
* Returns a new list with all table refs from subplanRefs that can be correctly
* evaluated inside a SubplanNode placed after the given plan root.
* The returned table refs have their left-table links properly set, and the
* corresponding SubplanRefs are removed from subplanRefs.
*/
private List<TableRef> extractApplicableRefs(PlanNode root,
List<SubplanRef> subplanRefs) {
// List of table ref ids in 'root' as well as the table ref ids of all table refs
// placed in 'subplanRefs' so far.
List<TupleId> tblRefIds = Lists.newArrayList(root.getTblRefIds());
List<TableRef> result = Lists.newArrayList();
Iterator<SubplanRef> subplanRefIt = subplanRefs.iterator();
TableRef leftTblRef = null;
while (subplanRefIt.hasNext()) {
SubplanRef subplanRef = subplanRefIt.next();
// Ensure that 'root' materializes all required tuples (first condition), and that
// correct join ordering is obeyed (second condition).
if (root.getTupleIds().containsAll(subplanRef.requiredTids) &&
tblRefIds.containsAll(subplanRef.requiredTblRefIds)) {
subplanRef.tblRef.setLeftTblRef(leftTblRef);
result.add(subplanRef.tblRef);
leftTblRef = subplanRef.tblRef;
subplanRefIt.remove();
// Add the table ref id such that other subplan refs that can be evaluated inside
// the same SubplanNode but only after this ref are returned as well.
tblRefIds.add(subplanRef.tblRef.getId());
}
}
return result;
}
/**
* Returns a new AggregationNode that materializes the aggregation of the given stmt.
* Assigns conjuncts from the Having clause to the returned node.
@@ -880,9 +1145,11 @@ public class SingleNodePlanner {
/**
* Create node for scanning all data files of a particular table.
* Throws if a PlanNode.init() failed or if planning of the given
* table ref is not implemented.
*/
private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef)
throws InternalException {
throws ImpalaException {
ScanNode scanNode = null;
if (tblRef.getTable() instanceof HdfsTable) {
scanNode = new HdfsScanNode(ctx_.getNextNodeId(), tblRef.getDesc(),
@@ -897,7 +1164,8 @@ public class SingleNodePlanner {
// HBase table
scanNode = new HBaseScanNode(ctx_.getNextNodeId(), tblRef.getDesc());
} else {
throw new InternalException("Invalid table ref class: " + tblRef.getClass());
throw new NotImplementedException(
"Planning not implemented for table ref class: " + tblRef.getClass());
}
// TODO: move this to HBaseScanNode.init();
Preconditions.checkState(scanNode instanceof HBaseScanNode);
@@ -1004,6 +1272,8 @@ public class SingleNodePlanner {
* Create a node to join outer with inner. Either the outer or the inner may be a plan
* created from a table ref (but not both), and the corresponding outer/innerRef
* should be non-null.
* Throws if the JoinNode.init() failed, or the required physical join implementation
* is missing, e.g., an outer/semi join without equi conjuncts.
*/
private PlanNode createJoinNode(
Analyzer analyzer, PlanNode outer, PlanNode inner, TableRef outerRef,
@@ -1011,6 +1281,16 @@ public class SingleNodePlanner {
Preconditions.checkState(innerRef != null ^ outerRef != null);
TableRef tblRef = (innerRef != null) ? innerRef : outerRef;
// A singular row source always has a cardinality of 1. In that case, a cross join
// is certainly cheaper than a hash join.
if (innerRef instanceof SingularRowSrcTableRef ||
outerRef instanceof SingularRowSrcTableRef) {
NestedLoopJoinNode result = new NestedLoopJoinNode(outer, inner,
tblRef.getDistributionMode(), tblRef.getJoinOp(), Collections.<Expr>emptyList());
result.init(analyzer);
return result;
}
// get eq join predicates for the TableRefs' ids (not the PlanNodes' ids, which
// are materialized)
List<BinaryPredicate> eqJoinConjuncts = Collections.emptyList();
@@ -1053,8 +1333,9 @@ public class SingleNodePlanner {
// conjuncts to produce correct results.
// TODO This doesn't handle predicates specified in the On clause which are not
// bound by any tuple id (e.g. ON (true))
otherJoinConjuncts =
analyzer.getUnassignedConjuncts(tblRef.getAllTupleIds(), false);
List<TupleId> tblRefIds = Lists.newArrayList(outer.getTblRefIds());
tblRefIds.addAll(inner.getTblRefIds());
otherJoinConjuncts = analyzer.getUnassignedConjuncts(tblRefIds, false);
if (tblRef.getJoinOp().isNullAwareLeftAntiJoin()) {
boolean hasNullMatchingEqOperator = false;
// Keep only the null-matching eq conjunct in the eqJoinConjuncts and move
@@ -1092,16 +1373,34 @@ public class SingleNodePlanner {
/**
* Create a tree of PlanNodes for the given tblRef, which can be a BaseTableRef,
* CollectionTableRef or an InlineViewRef.
* Throws if a PlanNode.init() failed or if planning of the given
* table ref is not implemented.
*/
private PlanNode createTableRefNode(Analyzer analyzer, TableRef tblRef)
throws ImpalaException {
if (tblRef instanceof BaseTableRef || tblRef instanceof CollectionTableRef) {
return createScanNode(analyzer, tblRef);
private PlanNode createTableRefNode(TableRef tblRef, boolean isStraightJoin,
Analyzer analyzer) throws ImpalaException {
PlanNode result = null;
if (tblRef instanceof BaseTableRef) {
result = createScanNode(analyzer, tblRef);
} else if (tblRef instanceof CollectionTableRef) {
if (tblRef.isRelative()) {
Preconditions.checkState(ctx_.hasSubplan());
result = new UnnestNode(ctx_.getNextNodeId(), ctx_.getSubplan(),
(CollectionTableRef) tblRef);
result.init(analyzer);
} else {
result = createScanNode(analyzer, tblRef);
}
} else if (tblRef instanceof InlineViewRef) {
return createInlineViewPlan(analyzer, (InlineViewRef) tblRef);
result = createInlineViewPlan(analyzer, (InlineViewRef) tblRef);
} else if (tblRef instanceof SingularRowSrcTableRef) {
Preconditions.checkState(ctx_.hasSubplan());
result = new SingularRowSrcNode(ctx_.getNextNodeId(), ctx_.getSubplan());
result.init(analyzer);
} else {
throw new NotImplementedException(
"Planning not implemented for table ref class: " + tblRef.getClass());
}
throw new InternalException(
"Unknown TableRef node: " + tblRef.getClass().getSimpleName());
return result;
}
/**

View File

@@ -0,0 +1,71 @@
// Copyright (c) 2015 Cloudera, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.cloudera.impala.planner;
import com.cloudera.impala.analysis.Analyzer;
import com.cloudera.impala.common.InternalException;
import com.cloudera.impala.thrift.TExplainLevel;
import com.cloudera.impala.thrift.TPlanNode;
import com.cloudera.impala.thrift.TPlanNodeType;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/**
* A SingularRowSrcNode returns the current row that is being processed by its
* containing SubplanNode. A SingularRowSrcNode can only appear in the plan tree
* of a SubplanNode. A SingularRowSrcNode returns its parent's smap such that
* substitutions are appropriately applied within the SubplanNode's second child.
*/
public class SingularRowSrcNode extends PlanNode {
private final SubplanNode containingSubplanNode_;
protected SingularRowSrcNode(PlanNodeId id, SubplanNode containingSubplanNode) {
super(id, "SINGULAR ROW SRC");
tupleIds_ = Lists.newArrayList(containingSubplanNode.getChild(0).tupleIds_);
tblRefIds_ = Lists.newArrayList(containingSubplanNode.getChild(0).tblRefIds_);
containingSubplanNode_ = containingSubplanNode;
}
@Override
public void init(Analyzer analyzer) throws InternalException {
super.init(analyzer);
outputSmap_ = containingSubplanNode_.getChild(0).getOutputSmap();
Preconditions.checkState(conjuncts_.isEmpty());
}
@Override
public void computeStats(Analyzer analyzer) {
super.computeStats(analyzer);
cardinality_ = 1;
numNodes_ = containingSubplanNode_.getNumNodes();
}
@Override
protected String getNodeExplainString(String prefix, String detailPrefix,
TExplainLevel detailLevel) {
StringBuilder output = new StringBuilder();
output.append(String.format("%s%s\n", prefix, getDisplayLabel()));
if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
output.append(String.format(
"%sparent-subplan=%s\n", detailPrefix, containingSubplanNode_.getId()));
}
return output.toString();
}
@Override
protected void toThrift(TPlanNode msg) {
msg.node_type = TPlanNodeType.SINGULAR_ROW_SRC_NODE;
}
}

View File

@@ -0,0 +1,95 @@
// Copyright (c) 2015 Cloudera, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.cloudera.impala.planner;
import com.cloudera.impala.analysis.Analyzer;
import com.cloudera.impala.common.InternalException;
import com.cloudera.impala.thrift.TExplainLevel;
import com.cloudera.impala.thrift.TPlanNode;
import com.cloudera.impala.thrift.TPlanNodeType;
import com.google.common.base.Preconditions;
/**
* A SubplanNode evaluates its right child plan tree for every row from its left child,
* and returns those rows produced by the right child. The right child is called the
* 'subplan tree' and the left child the 'input'. A SubplanNode is similar to a join,
* but different in the following respects. First, a SubplanNode does not do any real
* work itself. It only returns rows produced by the right child plan tree, which
* typically has a dependency on the current input row (see SingularRowSrcNode and
* UnnestNode). Second, no join predicates are required. A SubplanNode does not
* evaluate any conjuncts.
*/
public class SubplanNode extends PlanNode {
public SubplanNode(PlanNode input) {
super("SUBPLAN");
children_.add(input);
}
/**
* Sets the subplan of this SubplanNode. Dependent plan nodes such as UnnestNodes
* and SingularRowSrcNodes need to know their SubplanNode parent, therefore, setting
* the subplan in this SubplanNode is deferred until the subplan tree has been
* constructed (which requires the parent SubplanNode to have been constructed).
*/
public void setSubplan(PlanNode subplan) {
Preconditions.checkState(children_.size() == 1);
children_.add(subplan);
tblRefIds_.addAll(subplan.getTblRefIds());
tupleIds_.addAll(subplan.getTupleIds());
nullableTupleIds_.addAll(subplan.getNullableTupleIds());
}
@Override
public void init(Analyzer analyzer) throws InternalException {
// Subplan root must have been set.
Preconditions.checkState(children_.size() == 2);
// Check that there are no unassigned conjuncts that can be evaluated by this node.
// All such conjuncts should have already been assigned in the right child.
assignConjuncts(analyzer);
Preconditions.checkState(conjuncts_.isEmpty());
computeStats(analyzer);
outputSmap_ = getChild(1).getOutputSmap();
// Save state of assigned conjuncts for join-ordering attempts (see member comment).
assignedConjuncts_ = analyzer.getAssignedConjuncts();
}
@Override
protected void computeStats(Analyzer analyzer) {
super.computeStats(analyzer);
if (getChild(0).cardinality_ != -1 && getChild(1).cardinality_ != -1) {
cardinality_ =
multiplyCardinalities(getChild(0).cardinality_, getChild(1).cardinality_);
} else {
cardinality_ = -1;
}
}
@Override
protected String getNodeExplainString(String prefix, String detailPrefix,
TExplainLevel detailLevel) {
StringBuilder output = new StringBuilder();
output.append(String.format("%s%s\n", prefix, getDisplayLabel()));
if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
if (!conjuncts_.isEmpty()) {
output.append(detailPrefix + "predicates: " +
getExplainString(conjuncts_) + "\n");
}
}
return output.toString();
}
@Override
protected void toThrift(TPlanNode msg) { msg.node_type = TPlanNodeType.SUBPLAN_NODE; }
}

View File

@@ -26,7 +26,6 @@ import com.cloudera.impala.analysis.Analyzer;
import com.cloudera.impala.analysis.Expr;
import com.cloudera.impala.analysis.SlotDescriptor;
import com.cloudera.impala.analysis.TupleId;
import com.cloudera.impala.common.InternalException;
import com.cloudera.impala.common.Pair;
import com.cloudera.impala.thrift.TExplainLevel;
import com.cloudera.impala.thrift.TExpr;
@@ -157,7 +156,7 @@ public class UnionNode extends PlanNode {
* been evaluated during registration to set analyzer.hasEmptyResultSet_.
*/
@Override
public void init(Analyzer analyzer) throws InternalException {
public void init(Analyzer analyzer) {
computeMemLayout(analyzer);
computeStats(analyzer);

View File

@@ -0,0 +1,101 @@
// Copyright (c) 2015 Cloudera, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.cloudera.impala.planner;
import java.util.ArrayList;
import com.cloudera.impala.analysis.Analyzer;
import com.cloudera.impala.analysis.CollectionTableRef;
import com.cloudera.impala.analysis.Expr;
import com.cloudera.impala.common.InternalException;
import com.cloudera.impala.thrift.TExplainLevel;
import com.cloudera.impala.thrift.TPlanNode;
import com.cloudera.impala.thrift.TPlanNodeType;
import com.cloudera.impala.thrift.TUnnestNode;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/**
* An UnnestNode scans over a collection materialized in memory, and returns
* one row per item in the collection.
* An UnnestNode can only appear in the plan tree of a SubplanNode.
*/
public class UnnestNode extends PlanNode {
private final SubplanNode parent_;
private final CollectionTableRef tblRef_;
private final Expr collectionExpr_;
public UnnestNode(PlanNodeId id, SubplanNode parent, CollectionTableRef tblRef) {
super(id, Lists.newArrayList(tblRef.getDesc().getId()), "UNNEST");
parent_ = parent;
tblRef_ = tblRef;
collectionExpr_ = tblRef_.getCollectionExpr();
// Assume the collection expr has been fully resolved in analysis.
Preconditions.checkState(
collectionExpr_.isBoundByTupleIds(parent.getChild(0).tupleIds_));
}
@Override
public void init(Analyzer analyzer) throws InternalException {
ArrayList<Expr> bindingPredicates = analyzer.getBoundPredicates(tupleIds_.get(0));
conjuncts_.addAll(bindingPredicates);
super.init(analyzer);
// Unnest is like a scan and must materialize the slots of its conjuncts.
markSlotsMaterialized(analyzer, conjuncts_);
computeMemLayout(analyzer);
}
@Override
public void computeStats(Analyzer analyzer) {
super.computeStats(analyzer);
cardinality_ = PlannerContext.AVG_COLLECTION_SIZE;
numNodes_ = parent_.getNumNodes();
}
@Override
protected String getNodeExplainString(String prefix, String detailPrefix,
TExplainLevel detailLevel) {
StringBuilder output = new StringBuilder();
output.append(String.format("%s%s [%s]\n", prefix, getDisplayLabel(),
getDisplayLabelDetail()));
if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
output.append(
String.format("%sparent-subplan=%s\n", detailPrefix, parent_.getId()));
}
if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
if (!conjuncts_.isEmpty()) {
output.append(
detailPrefix + "predicates: " + getExplainString(conjuncts_) + "\n");
}
}
return output.toString();
}
@Override
protected String getDisplayLabelDetail() {
StringBuilder strBuilder = new StringBuilder();
strBuilder.append(Joiner.on(".").join(tblRef_.getPath()));
if (tblRef_.hasExplicitAlias()) strBuilder.append(" " + tblRef_.getExplicitAlias());
return strBuilder.toString();
}
@Override
protected void toThrift(TPlanNode msg) {
msg.node_type = TPlanNodeType.UNNEST_NODE;
msg.setUnnest_node(new TUnnestNode(collectionExpr_.treeToThrift()));
}
}

View File

@@ -3263,7 +3263,7 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
testNumberOfMembers(ValuesStmt.class, 0);
// Also check TableRefs.
testNumberOfMembers(TableRef.class, 13);
testNumberOfMembers(TableRef.class, 14);
testNumberOfMembers(BaseTableRef.class, 0);
testNumberOfMembers(InlineViewRef.class, 8);
}

View File

@@ -1142,6 +1142,11 @@ WHERE `$a$2`.`$c$1` > t4.id
08:NESTED LOOP JOIN [INNER JOIN]
| predicates: sum(t1.int_col) > t4.id
|
|--00:SCAN HDFS [functional.alltypestiny t4]
| partitions=4/4 files=4 size=460B
|
07:NESTED LOOP JOIN [CROSS JOIN]
|
|--05:AGGREGATE [FINALIZE]
| | output: sum(t1.int_col)
| | limit: 1
@@ -1149,11 +1154,6 @@ WHERE `$a$2`.`$c$1` > t4.id
| 04:SCAN HDFS [functional.alltypesagg t1]
| partitions=11/11 files=11 size=814.73KB
|
07:NESTED LOOP JOIN [CROSS JOIN]
|
|--00:SCAN HDFS [functional.alltypestiny t4]
| partitions=4/4 files=4 size=460B
|
03:HASH JOIN [INNER JOIN]
| hash predicates: t1.bigint_col = t2.smallint_col
| limit: 1