From 28fc8ddf6053e5d5b3b2bc0dd461597724ecbaf0 Mon Sep 17 00:00:00 2001 From: Matthew Jacobs Date: Mon, 22 Sep 2014 16:45:40 -0700 Subject: [PATCH] IMPALA-1292: Incorrect result in analytic SUM when ORDER BY column is null The 'less than' predicate created by AnalyticPlanner used to check if the previous row was less than the current row is not exactly what we want to determine when rows in RANGE windows (the default window in this case) share the same result values. Rows get the same results when the order by exprs evaluate equally or both null, so it's easiest (and more efficient) to use a predicate that simply checks equality or both null. We already create such predicates for checking for partition boundaries, so this is a trivial change. When we support arbitrary RANGE window offsets we will likely want to add similar predicates that compare two tuples plus/minus the offset, but those will be simpler because there can be only one order by expr when specifying RANGE offsets with PRECEDING/FOLLOWING. Change-Id: I52ff6203686832852430e498eca6ad2cc2daee98 Reviewed-on: http://gerrit.sjc.cloudera.com:8080/4474 Tested-by: jenkins Reviewed-by: Matthew Jacobs Tested-by: Matthew Jacobs --- be/src/exec/analytic-eval-node.cc | 50 +++++++------- be/src/exec/analytic-eval-node.h | 6 +- common/thrift/PlanNodes.thrift | 9 +-- .../impala/planner/AnalyticEvalNode.java | 16 ++--- .../impala/planner/AnalyticPlanner.java | 65 ++----------------- .../queries/QueryTest/analytic-fns.test | 42 +++++++++++- 6 files changed, 87 insertions(+), 101 deletions(-) diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc index fb7ddb441..a78b43c00 100644 --- a/be/src/exec/analytic-eval-node.cc +++ b/be/src/exec/analytic-eval-node.cc @@ -36,8 +36,8 @@ AnalyticEvalNode::AnalyticEvalNode(ObjectPool* pool, const TPlanNode& tnode, result_tuple_desc_( descs.GetTupleDescriptor(tnode.analytic_node.output_tuple_id)), buffered_tuple_desc_(NULL), - partition_by_expr_ctx_(NULL), - order_by_expr_ctx_(NULL), + partition_by_eq_expr_ctx_(NULL), + order_by_eq_expr_ctx_(NULL), rows_start_offset_(0), rows_end_offset_(0), last_result_idx_(-1), @@ -108,12 +108,12 @@ Status AnalyticEvalNode::Init(const TPlanNode& tnode) { if (analytic_node.__isset.partition_by_eq) { DCHECK(analytic_node.__isset.buffered_tuple_id); RETURN_IF_ERROR(Expr::CreateExprTree(pool_, analytic_node.partition_by_eq, - &partition_by_expr_ctx_)); + &partition_by_eq_expr_ctx_)); } - if (analytic_node.__isset.order_by_lt) { + if (analytic_node.__isset.order_by_eq) { DCHECK(analytic_node.__isset.buffered_tuple_id); - RETURN_IF_ERROR(Expr::CreateExprTree(pool_, analytic_node.order_by_lt, - &order_by_expr_ctx_)); + RETURN_IF_ERROR(Expr::CreateExprTree(pool_, analytic_node.order_by_eq, + &order_by_eq_expr_ctx_)); } return Status::OK; } @@ -137,16 +137,16 @@ Status AnalyticEvalNode::Prepare(RuntimeState* state) { state->obj_pool()->Add(fn_ctxs_[i]); } if (buffered_tuple_desc_ != NULL) { - DCHECK(partition_by_expr_ctx_ != NULL || order_by_expr_ctx_ != NULL); + DCHECK(partition_by_eq_expr_ctx_ != NULL || order_by_eq_expr_ctx_ != NULL); vector tuple_ids; tuple_ids.push_back(child(0)->row_desc().tuple_descriptors()[0]->id()); tuple_ids.push_back(buffered_tuple_desc_->id()); RowDescriptor cmp_row_desc(state->desc_tbl(), tuple_ids, vector(2, false)); - if (partition_by_expr_ctx_ != NULL) { - RETURN_IF_ERROR(partition_by_expr_ctx_->Prepare(state, cmp_row_desc)); + if (partition_by_eq_expr_ctx_ != NULL) { + RETURN_IF_ERROR(partition_by_eq_expr_ctx_->Prepare(state, cmp_row_desc)); } - if (order_by_expr_ctx_ != NULL) { - RETURN_IF_ERROR(order_by_expr_ctx_->Prepare(state, cmp_row_desc)); + if (order_by_eq_expr_ctx_ != NULL) { + RETURN_IF_ERROR(order_by_eq_expr_ctx_->Prepare(state, cmp_row_desc)); } } child_tuple_cmp_row_ = reinterpret_cast( @@ -171,11 +171,11 @@ Status AnalyticEvalNode::Open(RuntimeState* state) { DCHECK(!evaluators_[i]->is_merge()); } - if (partition_by_expr_ctx_ != NULL) { - RETURN_IF_ERROR(partition_by_expr_ctx_->Open(state)); + if (partition_by_eq_expr_ctx_ != NULL) { + RETURN_IF_ERROR(partition_by_eq_expr_ctx_->Open(state)); } - if (order_by_expr_ctx_ != NULL) { - RETURN_IF_ERROR(order_by_expr_ctx_->Open(state)); + if (order_by_eq_expr_ctx_ != NULL) { + RETURN_IF_ERROR(order_by_eq_expr_ctx_->Open(state)); } // An intermediate tuple is only allocated once and is reused. @@ -305,7 +305,7 @@ inline void AnalyticEvalNode::TryAddResultTupleForPrevRow(bool next_partition, << " idx=" << stream_idx; if (fn_scope_ == ROWS) return; if (next_partition || (fn_scope_ == RANGE && window_.__isset.window_end && - PrevRowCompare(order_by_expr_ctx_))) { + !PrevRowCompare(order_by_eq_expr_ctx_))) { AddResultTuple(stream_idx - 1); } } @@ -489,9 +489,9 @@ Status AnalyticEvalNode::ProcessChildBatch(RuntimeState* state) { // copied from curr_tuple_ because the original is used for one or more previous // row(s) but the incremental state still applies to the current row. bool next_partition = false; - if (partition_by_expr_ctx_ != NULL) { - // partition_by_expr_ctx_ checks equality over the predicate exprs - next_partition = !PrevRowCompare(partition_by_expr_ctx_); + if (partition_by_eq_expr_ctx_ != NULL) { + // partition_by_eq_expr_ctx_ checks equality over the predicate exprs + next_partition = !PrevRowCompare(partition_by_eq_expr_ctx_); } TryAddResultTupleForPrevRow(next_partition, stream_idx, row); if (next_partition) InitNextPartition(stream_idx); @@ -654,8 +654,8 @@ void AnalyticEvalNode::Close(RuntimeState* state) { evaluators_[i]->Close(state); fn_ctxs_[i]->impl()->Close(); } - if (partition_by_expr_ctx_ != NULL) partition_by_expr_ctx_->Close(state); - if (order_by_expr_ctx_ != NULL) order_by_expr_ctx_->Close(state); + if (partition_by_eq_expr_ctx_ != NULL) partition_by_eq_expr_ctx_->Close(state); + if (order_by_eq_expr_ctx_ != NULL) order_by_eq_expr_ctx_->Close(state); if (prev_child_batch_.get() != NULL) prev_child_batch_.reset(); if (curr_child_batch_.get() != NULL) curr_child_batch_.reset(); if (curr_tuple_pool_.get() != NULL) curr_tuple_pool_->FreeAll(); @@ -668,11 +668,11 @@ void AnalyticEvalNode::DebugString(int indentation_level, stringstream* out) con *out << string(indentation_level * 2, ' '); *out << "AnalyticEvalNode(" << " window=" << DebugWindowString(); - if (partition_by_expr_ctx_ != NULL) { - *out << " partition_exprs=" << partition_by_expr_ctx_->root()->DebugString(); + if (partition_by_eq_expr_ctx_ != NULL) { + *out << " partition_exprs=" << partition_by_eq_expr_ctx_->root()->DebugString(); } - if (order_by_expr_ctx_ != NULL) { - *out << " order_by_exprs=" << order_by_expr_ctx_->root()->DebugString(); + if (order_by_eq_expr_ctx_ != NULL) { + *out << " order_by_exprs=" << order_by_eq_expr_ctx_->root()->DebugString(); } *out << AggFnEvaluator::DebugString(evaluators_); ExecNode::DebugString(indentation_level, out); diff --git a/be/src/exec/analytic-eval-node.h b/be/src/exec/analytic-eval-node.h index adcede984..49b6c0e6f 100644 --- a/be/src/exec/analytic-eval-node.h +++ b/be/src/exec/analytic-eval-node.h @@ -187,17 +187,17 @@ class AnalyticEvalNode : public ExecNode { TupleDescriptor* buffered_tuple_desc_; // TupleRow* composed of the first child tuple and the buffered tuple, used by - // partition_by_expr_ctx_ and order_by_expr_ctx_. Set in Prepare() if + // partition_by_eq_expr_ctx_ and order_by_eq_expr_ctx_. Set in Prepare() if // buffered_tuple_desc_ is not NULL, allocated from mem_pool_. TupleRow* child_tuple_cmp_row_; // Expr context for a predicate that checks if child tuple '<' buffered tuple for // partitioning exprs. - ExprContext* partition_by_expr_ctx_; + ExprContext* partition_by_eq_expr_ctx_; // Expr context for a predicate that checks if child tuple '<' buffered tuple for // order by exprs. - ExprContext* order_by_expr_ctx_; + ExprContext* order_by_eq_expr_ctx_; // The scope over which analytic functions are evaluated. // TODO: Consider adding additional state to capture whether different kinds of window diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift index 8e8a1232f..9570485ba 100644 --- a/common/thrift/PlanNodes.thrift +++ b/common/thrift/PlanNodes.thrift @@ -303,10 +303,11 @@ struct TAnalyticNode { // child tuple and the buffered tuple 8: optional Exprs.TExpr partition_by_eq - // predicate that checks: child tuple '<' buffered tuple for order_by_exprs; - // only set if buffered_tuple_id is set; should be evaluated over a row that - // is composed of the child tuple and the buffered tuple - 9: optional Exprs.TExpr order_by_lt + // predicate that checks: the order_by_exprs are equal or both NULL when evaluated + // over the child tuple and the buffered tuple. only set if buffered_tuple_id is set; + // should be evaluated over a row that is composed of the child tuple and the buffered + // tuple + 9: optional Exprs.TExpr order_by_eq } struct TUnionNode { diff --git a/fe/src/main/java/com/cloudera/impala/planner/AnalyticEvalNode.java b/fe/src/main/java/com/cloudera/impala/planner/AnalyticEvalNode.java index dae005d2a..4d372041c 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/AnalyticEvalNode.java +++ b/fe/src/main/java/com/cloudera/impala/planner/AnalyticEvalNode.java @@ -14,7 +14,6 @@ package com.cloudera.impala.planner; -import java.util.ArrayList; import java.util.List; import org.slf4j.Logger; @@ -73,7 +72,7 @@ public class AnalyticEvalNode extends PlanNode { // predicates constructed from partitionExprs_/orderingExprs_ to // compare input to buffered tuples private final Expr partitionByEq_; - private final Expr orderByLessThan_; + private final Expr orderByEq_; private final TupleDescriptor bufferedTupleDesc_; public AnalyticEvalNode( @@ -82,8 +81,7 @@ public class AnalyticEvalNode extends PlanNode { List orderByElements, AnalyticWindow analyticWindow, TupleDescriptor logicalTupleDesc, TupleDescriptor intermediateTupleDesc, TupleDescriptor outputTupleDesc, ExprSubstitutionMap logicalToPhysicalSmap, - Expr partitionByEq, Expr orderByLessThan, - TupleDescriptor bufferedTupleDesc) { + Expr partitionByEq, Expr orderByEq, TupleDescriptor bufferedTupleDesc) { super(id, input.getTupleIds(), "ANALYTIC"); Preconditions.checkState(!tupleIds_.contains(outputTupleDesc.getId())); // we're materializing the input row augmented with the analytic output tuple @@ -98,7 +96,7 @@ public class AnalyticEvalNode extends PlanNode { outputTupleDesc_ = outputTupleDesc; logicalToPhysicalSmap_ = logicalToPhysicalSmap; partitionByEq_ = partitionByEq; - orderByLessThan_ = orderByLessThan; + orderByEq_ = orderByEq; bufferedTupleDesc_ = bufferedTupleDesc; children_.add(input); nullableTupleIds_.addAll(input.getNullableTupleIds()); @@ -158,8 +156,8 @@ public class AnalyticEvalNode extends PlanNode { .add("outputTid", outputTupleDesc_.getId()) .add("partitionByEq", partitionByEq_ != null ? partitionByEq_.debugString() : "null") - .add("orderByLt", - orderByLessThan_ != null ? orderByLessThan_.debugString() : "null") + .add("orderByEq", + orderByEq_ != null ? orderByEq_.debugString() : "null") .addValue(super.debugString()) .toString(); } @@ -185,8 +183,8 @@ public class AnalyticEvalNode extends PlanNode { if (partitionByEq_ != null) { msg.analytic_node.setPartition_by_eq(partitionByEq_.treeToThrift()); } - if (orderByLessThan_ != null) { - msg.analytic_node.setOrder_by_lt(orderByLessThan_.treeToThrift()); + if (orderByEq_ != null) { + msg.analytic_node.setOrder_by_eq(orderByEq_.treeToThrift()); } if (bufferedTupleDesc_ != null) { msg.analytic_node.setBuffered_tuple_id(bufferedTupleDesc_.getId().asInt()); diff --git a/fe/src/main/java/com/cloudera/impala/planner/AnalyticPlanner.java b/fe/src/main/java/com/cloudera/impala/planner/AnalyticPlanner.java index 00b6d8bcc..20ee03d05 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/AnalyticPlanner.java +++ b/fe/src/main/java/com/cloudera/impala/planner/AnalyticPlanner.java @@ -384,12 +384,13 @@ public class AnalyticPlanner { sortTupleId, bufferedSmap); LOG.trace("partitionByEq: " + partitionByEq.debugString()); } - Expr orderByLessThan = null; + Expr orderByEq = null; if (!windowGroup.orderByElements.isEmpty()) { - orderByLessThan = createLessThan( - OrderByElement.substitute(windowGroup.orderByElements, sortSmap, analyzer_), + orderByEq = createNullMatchingEquals( + OrderByElement.getOrderByExprs(OrderByElement.substitute( + windowGroup.orderByElements, sortSmap, analyzer_)), sortTupleId, bufferedSmap); - LOG.trace("orderByLt: " + orderByLessThan.debugString()); + LOG.trace("orderByEq: " + orderByEq.debugString()); } root = new AnalyticEvalNode(idGenerator_.getNextId(), root, stmtTupleIds_, @@ -398,66 +399,12 @@ public class AnalyticPlanner { windowGroup.window, analyticInfo_.getOutputTupleDesc(), windowGroup.physicalIntermediateTuple, windowGroup.physicalOutputTuple, windowGroup.logicalToPhysicalSmap, - partitionByEq, orderByLessThan, bufferedTupleDesc); + partitionByEq, orderByEq, bufferedTupleDesc); root.init(analyzer_); } return root; } - /** - * Create ' < ' predicate. - */ - private Expr createLessThan(List elements, TupleId inputTid, - ExprSubstitutionMap bufferedSmap) { - Preconditions.checkState(!elements.isEmpty()); - Expr result = createLessThanAux(elements, 0, inputTid, bufferedSmap); - try { - result.analyze(analyzer_); - } catch (AnalysisException e) { - throw new IllegalStateException(e); - } - return result; - } - - /** - * Create an unanalyzed '<' predicate for elements >= i. - * - * With asc/nulls first, the predicate has the form - * rhs[i] is not null && ( - * lhs[i] is null - * || lhs[i] < rhs[i] - * || (lhs[i] = rhs[i] && ) - * ) - * - * TODO: this is extremely contorted; we need to introduce a builtin - * compare(lhs, rhs, is_asc, nulls_first) - */ - private Expr createLessThanAux(List elements, int i, TupleId inputTid, - ExprSubstitutionMap bufferedSmap) { - if (i > elements.size() - 1) return new BoolLiteral(false); - - // compare elements[i] - Expr lhs = elements.get(i).getExpr(); - Preconditions.checkState(lhs.isBound(inputTid)); - Expr rhs = lhs.substitute(bufferedSmap, analyzer_); - Expr rhsIsNotFirst = new IsNullPredicate( - elements.get(i).nullsFirst() ? rhs : lhs, true); - Expr lhsIsFirst = new IsNullPredicate( - elements.get(i).nullsFirst() ? lhs : rhs, false); - BinaryPredicate.Operator comparison = elements.get(i).isAsc() - ? BinaryPredicate.Operator.LT - : BinaryPredicate.Operator.GT; - Expr lhsPrecedesRhs = new BinaryPredicate(comparison, lhs, rhs); - Expr lhsEqRhs = new BinaryPredicate(BinaryPredicate.Operator.EQ, lhs, rhs); - Expr remainder = createLessThanAux(elements, i + 1, inputTid, bufferedSmap); - Expr result = new CompoundPredicate(CompoundPredicate.Operator.AND, - rhsIsNotFirst, - new CompoundPredicate(CompoundPredicate.Operator.OR, - new CompoundPredicate(CompoundPredicate.Operator.OR, lhsIsFirst, lhsPrecedesRhs), - new CompoundPredicate(CompoundPredicate.Operator.AND, lhsEqRhs, remainder))); - return result; - } - /** * Create a predicate that checks if all exprs are equal or both sides are null. */ diff --git a/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test b/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test index f89c29d3b..db4ebf075 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test +++ b/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test @@ -824,4 +824,44 @@ select count(distinct t1.c1) from 11 ---- TYPES BIGINT -==== \ No newline at end of file +==== +---- QUERY +# IMPALA-1292: Incorrect result in analytic SUM when ORDER BY column is null +select tinyint_col, id, +SUM(id) OVER (ORDER BY tinyint_col ASC, id ASC) +FROM alltypesagg +where (tinyint_col is NULL or tinyint_col < 2) and id < 100 order by 1, 2 +---- RESULTS +1,1,1 +1,11,12 +1,21,33 +1,31,64 +1,41,105 +1,51,156 +1,61,217 +1,71,288 +1,81,369 +1,91,460 +NULL,0,460 +NULL,0,460 +NULL,10,480 +NULL,10,480 +NULL,20,520 +NULL,20,520 +NULL,30,580 +NULL,30,580 +NULL,40,660 +NULL,40,660 +NULL,50,760 +NULL,50,760 +NULL,60,880 +NULL,60,880 +NULL,70,1020 +NULL,70,1020 +NULL,80,1180 +NULL,80,1180 +NULL,90,1360 +NULL,90,1360 +---- TYPES +TINYINT, INT, BIGINT +====