From a615ebc549e7d4824272c96bfa0cba2d20e8413a Mon Sep 17 00:00:00 2001 From: Alex Behm Date: Sun, 23 Feb 2014 23:39:33 -0800 Subject: [PATCH] IMPALA-822,IMP-1271: Binding predicates on an aggregation now properly trigger slot materialization. The bug was that the number of materialized agg-tuple slots did not correspond to the number of materialized agg functions, due to binding predicates against an AggNode causing slot materialization after SelectStmt.materializeRequiredSlots(). This patch fixes the issue by taking binding predicates (bound to a slot in an agg tuple) into consideration in SelectStmt.materializeRequiredSlots(). I added a new sanity check in AggregationNode.toThrift() surfaced another issue with slot materialization that is also fixed in this patch. The ordering exprs must be marked before the agg exprs in SelectStmt.materializeRequiredSlots() because the odering exprs may contain agg exprs that are only referenced inside the ORDER BY clause. Change-Id: I1bdc0466f583907bed625ce6608938e59faee83f Reviewed-on: http://gerrit.ent.cloudera.com:8080/1639 Reviewed-by: Marcel Kornacker Tested-by: jenkins Reviewed-on: http://gerrit.ent.cloudera.com:8080/1818 Reviewed-by: Alex Behm --- .../impala/analysis/AggregateInfo.java | 13 +++++++ .../cloudera/impala/analysis/Analyzer.java | 7 ++-- .../cloudera/impala/analysis/SelectStmt.java | 36 +++++++++++++------ .../impala/planner/AggregationNode.java | 6 ++-- .../cloudera/impala/planner/HdfsScanNode.java | 2 +- .../queries/PlannerTest/aggregation.test | 17 +++++++++ .../queries/QueryTest/aggregation.test | 13 +++++++ 7 files changed, 76 insertions(+), 18 deletions(-) diff --git a/fe/src/main/java/com/cloudera/impala/analysis/AggregateInfo.java b/fe/src/main/java/com/cloudera/impala/analysis/AggregateInfo.java index ae591b56c..a56a171ef 100644 --- a/fe/src/main/java/com/cloudera/impala/analysis/AggregateInfo.java +++ b/fe/src/main/java/com/cloudera/impala/analysis/AggregateInfo.java @@ -595,6 +595,19 @@ public class AggregateInfo { } } + /** + * Sanity check that the number of materialized slots of the agg tuple corresponds to + * the number of materialized aggregate functions plus the number of grouping exprs. + */ + public void checkConsistency() { + int numMaterializedSlots = 0; + for (SlotDescriptor slotDesc: aggTupleDesc_.getSlots()) { + if (slotDesc.isMaterialized()) ++numMaterializedSlots; + } + Preconditions.checkState(numMaterializedSlots == + materializedAggregateSlots_.size() + groupingExprs_.size()); + } + /** * Returns DataPartition derived from grouping exprs. * Returns unpartitioned spec if no grouping. diff --git a/fe/src/main/java/com/cloudera/impala/analysis/Analyzer.java b/fe/src/main/java/com/cloudera/impala/analysis/Analyzer.java index 6bc6e4f28..449ce6068 100644 --- a/fe/src/main/java/com/cloudera/impala/analysis/Analyzer.java +++ b/fe/src/main/java/com/cloudera/impala/analysis/Analyzer.java @@ -749,9 +749,8 @@ public class Analyzer { } /** - * Returns pairs : the predicate is fully bound by slotId and can - * be evaluated by 'node'; the bool indicates whether the application of the - * predicate to slotId implies an + * Returns pairs : the predicate is fully bound by slotId; + * the bool indicates whether the application of the predicate to slotId implies an * assignment of that predicate (ie, it doesn't need to be applied to the slot * from which it originated). * Predicates are derived from binding predicates of slots in the same equivalence @@ -761,7 +760,7 @@ public class Analyzer { * TODO: exclude UDFs from predicate propagation? their overloaded variants could * have very different semantics */ - public ArrayList> getBoundPredicates(SlotId slotId, PlanNode node) { + public ArrayList> getBoundPredicates(SlotId slotId) { LOG.trace("getBoundPredicates(" + slotId.toString() + ")"); TupleId tid = getTupleId(slotId); ArrayList> result = Lists.newArrayList(); diff --git a/fe/src/main/java/com/cloudera/impala/analysis/SelectStmt.java b/fe/src/main/java/com/cloudera/impala/analysis/SelectStmt.java index 5cac62d7a..930f57d39 100644 --- a/fe/src/main/java/com/cloudera/impala/analysis/SelectStmt.java +++ b/fe/src/main/java/com/cloudera/impala/analysis/SelectStmt.java @@ -27,6 +27,7 @@ import com.cloudera.impala.catalog.ColumnType; import com.cloudera.impala.catalog.PrimitiveType; import com.cloudera.impala.common.AnalysisException; import com.cloudera.impala.common.InternalException; +import com.cloudera.impala.common.Pair; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -209,22 +210,35 @@ public class SelectStmt extends QueryStmt { Expr.cloneList(unassignedJoinConjuncts, baseTblSmap_); materializeSlots(analyzer, baseTblJoinConjuncts); - if (aggInfo_ != null) { - // mark all agg exprs needed for HAVING pred as materialized before calling - // AggregateInfo.materializeRequiredSlots(), otherwise they won't show up in - // AggregateInfo.getMaterializedAggregateExprs() - if (havingPred_ != null) { - materializeSlots(analyzer, Lists.newArrayList(havingPred_)); - } - aggInfo_.materializeRequiredSlots(analyzer, baseTblSmap_); - } - if (sortInfo_ != null) { - // mark ordering exprs + // mark ordering exprs before marking agg exprs because the ordering exprs + // may contain agg exprs that are not referenced anywhere but the ORDER BY clause List resolvedExprs = Expr.cloneList(sortInfo_.getOrderingExprs(), baseTblSmap_); materializeSlots(analyzer, resolvedExprs); } + + if (aggInfo_ != null) { + // mark all agg exprs needed for HAVING pred and binding predicates as materialized + // before calling AggregateInfo.materializeRequiredSlots(), otherwise they won't + // show up in AggregateInfo.getMaterializedAggregateExprs() + ArrayList havingConjuncts = Lists.newArrayList(); + if (havingPred_ != null) havingConjuncts.add(havingPred_); + TupleDescriptor aggTupleDesc = analyzer.getTupleDesc(aggInfo_.getAggTupleId()); + for (SlotDescriptor slotDesc: aggTupleDesc.getSlots()) { + ArrayList> bindingPredicates = + analyzer.getBoundPredicates(slotDesc.getId()); + for (Pair p: bindingPredicates) { + if (!analyzer.isConjunctAssigned(p.first)) { + havingConjuncts.add(p.first); + } + } + } + havingConjuncts.addAll( + analyzer.getUnassignedConjuncts(aggInfo_.getAggTupleId().asList(), false)); + materializeSlots(analyzer, havingConjuncts); + aggInfo_.materializeRequiredSlots(analyzer, baseTblSmap_); + } } /** diff --git a/fe/src/main/java/com/cloudera/impala/planner/AggregationNode.java b/fe/src/main/java/com/cloudera/impala/planner/AggregationNode.java index cf0320daf..18828a36f 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/AggregationNode.java +++ b/fe/src/main/java/com/cloudera/impala/planner/AggregationNode.java @@ -89,7 +89,7 @@ public class AggregationNode extends PlanNode { // logic to predicates over multiple slots for (SlotDescriptor slotDesc: analyzer.getTupleDesc(tupleIds_.get(0)).getSlots()) { ArrayList> bindingPredicates = - analyzer.getBoundPredicates(slotDesc.getId(), this); + analyzer.getBoundPredicates(slotDesc.getId()); for (Pair p: bindingPredicates) { if (!analyzer.isConjunctAssigned(p.first)) { conjuncts_.add(p.first); @@ -100,7 +100,6 @@ public class AggregationNode extends PlanNode { // also add remaining unassigned conjuncts_ assignConjuncts(analyzer); - markSlotsMaterialized(analyzer, conjuncts_); computeMemLayout(analyzer); // do this at the end so it can take all conjuncts into account computeStats(analyzer); @@ -111,6 +110,8 @@ public class AggregationNode extends PlanNode { Expr.SubstitutionMap combinedChildSmap = getCombinedChildSmap(); aggInfo_.substitute(combinedChildSmap); baseTblSmap_ = aggInfo_.getSMap(); + // assert consistent aggregate expr and slot materialization + aggInfo_.checkConsistency(); } @Override @@ -165,6 +166,7 @@ public class AggregationNode extends PlanNode { for (FunctionCallExpr e: aggInfo_.getMaterializedAggregateExprs()) { aggregateFunctions.add(e.treeToThrift()); } + aggInfo_.checkConsistency(); msg.agg_node = new TAggregationNode( aggregateFunctions, aggInfo_.getAggTupleId().asInt(), needsFinalize_); diff --git a/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java b/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java index 53a77a6e0..e13d7f822 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java @@ -112,7 +112,7 @@ public class HdfsScanNode extends ScanNode { // loop over all materialized slots and add predicates to conjuncts_ for (SlotDescriptor slotDesc: analyzer.getTupleDesc(tupleIds_.get(0)).getSlots()) { ArrayList> bindingPredicates = - analyzer.getBoundPredicates(slotDesc.getId(), this); + analyzer.getBoundPredicates(slotDesc.getId()); for (Pair p: bindingPredicates) { if (p.second) analyzer.markConjunctAssigned(p.first); conjuncts_.add(p.first); diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test index b80ae298d..7a63773f5 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test @@ -142,3 +142,20 @@ having count(t2.int_col) = count(t1.bigint_col) 00:SCAN HDFS [functional.alltypes t1] partitions=24/24 size=478.45KB ==== +# Tests proper slot materialization of agg-tuple slots for avg (IMP-1271). +# 't.x > 10' is picked up as an unassigned conjunct, and not as a binding +# predicate because avg gets rewritten into an expr against two slots +# (and getBoundPredicates() cannot handle multi-slot predicates). +select 1 from + (select int_col, avg(bigint_col) x from functional.alltypes + group by int_col) t +where t.x > 10 +---- PLAN +01:AGGREGATE [FINALIZE] +| output: sum(bigint_col), count(bigint_col) +| group by: int_col +| having: sum(bigint_col) / count(bigint_col) > 10.0 +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 size=478.45KB +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/aggregation.test b/testdata/workloads/functional-query/queries/QueryTest/aggregation.test index 1eb0b9fc2..27dc90e2a 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/aggregation.test +++ b/testdata/workloads/functional-query/queries/QueryTest/aggregation.test @@ -879,3 +879,16 @@ bigint,bigint 1,2 4,4 ==== +---- QUERY +# Test that binding predicates on an aggregation properly trigger materialization of +# slots in the agg tuple and the slots needed for evaluating the corresponding agg funcs +# (IMPALA-822). +select 1 from + (select count(bigint_col) c from functional.alltypesagg + having min(int_col) is not null) as t +where c is not null +---- TYPES +bigint +---- +1 +====