diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc index fb7ddb441..a78b43c00 100644 --- a/be/src/exec/analytic-eval-node.cc +++ b/be/src/exec/analytic-eval-node.cc @@ -36,8 +36,8 @@ AnalyticEvalNode::AnalyticEvalNode(ObjectPool* pool, const TPlanNode& tnode, result_tuple_desc_( descs.GetTupleDescriptor(tnode.analytic_node.output_tuple_id)), buffered_tuple_desc_(NULL), - partition_by_expr_ctx_(NULL), - order_by_expr_ctx_(NULL), + partition_by_eq_expr_ctx_(NULL), + order_by_eq_expr_ctx_(NULL), rows_start_offset_(0), rows_end_offset_(0), last_result_idx_(-1), @@ -108,12 +108,12 @@ Status AnalyticEvalNode::Init(const TPlanNode& tnode) { if (analytic_node.__isset.partition_by_eq) { DCHECK(analytic_node.__isset.buffered_tuple_id); RETURN_IF_ERROR(Expr::CreateExprTree(pool_, analytic_node.partition_by_eq, - &partition_by_expr_ctx_)); + &partition_by_eq_expr_ctx_)); } - if (analytic_node.__isset.order_by_lt) { + if (analytic_node.__isset.order_by_eq) { DCHECK(analytic_node.__isset.buffered_tuple_id); - RETURN_IF_ERROR(Expr::CreateExprTree(pool_, analytic_node.order_by_lt, - &order_by_expr_ctx_)); + RETURN_IF_ERROR(Expr::CreateExprTree(pool_, analytic_node.order_by_eq, + &order_by_eq_expr_ctx_)); } return Status::OK; } @@ -137,16 +137,16 @@ Status AnalyticEvalNode::Prepare(RuntimeState* state) { state->obj_pool()->Add(fn_ctxs_[i]); } if (buffered_tuple_desc_ != NULL) { - DCHECK(partition_by_expr_ctx_ != NULL || order_by_expr_ctx_ != NULL); + DCHECK(partition_by_eq_expr_ctx_ != NULL || order_by_eq_expr_ctx_ != NULL); vector tuple_ids; tuple_ids.push_back(child(0)->row_desc().tuple_descriptors()[0]->id()); tuple_ids.push_back(buffered_tuple_desc_->id()); RowDescriptor cmp_row_desc(state->desc_tbl(), tuple_ids, vector(2, false)); - if (partition_by_expr_ctx_ != NULL) { - RETURN_IF_ERROR(partition_by_expr_ctx_->Prepare(state, cmp_row_desc)); + if (partition_by_eq_expr_ctx_ != NULL) { + RETURN_IF_ERROR(partition_by_eq_expr_ctx_->Prepare(state, cmp_row_desc)); } - if (order_by_expr_ctx_ != NULL) { - RETURN_IF_ERROR(order_by_expr_ctx_->Prepare(state, cmp_row_desc)); + if (order_by_eq_expr_ctx_ != NULL) { + RETURN_IF_ERROR(order_by_eq_expr_ctx_->Prepare(state, cmp_row_desc)); } } child_tuple_cmp_row_ = reinterpret_cast( @@ -171,11 +171,11 @@ Status AnalyticEvalNode::Open(RuntimeState* state) { DCHECK(!evaluators_[i]->is_merge()); } - if (partition_by_expr_ctx_ != NULL) { - RETURN_IF_ERROR(partition_by_expr_ctx_->Open(state)); + if (partition_by_eq_expr_ctx_ != NULL) { + RETURN_IF_ERROR(partition_by_eq_expr_ctx_->Open(state)); } - if (order_by_expr_ctx_ != NULL) { - RETURN_IF_ERROR(order_by_expr_ctx_->Open(state)); + if (order_by_eq_expr_ctx_ != NULL) { + RETURN_IF_ERROR(order_by_eq_expr_ctx_->Open(state)); } // An intermediate tuple is only allocated once and is reused. @@ -305,7 +305,7 @@ inline void AnalyticEvalNode::TryAddResultTupleForPrevRow(bool next_partition, << " idx=" << stream_idx; if (fn_scope_ == ROWS) return; if (next_partition || (fn_scope_ == RANGE && window_.__isset.window_end && - PrevRowCompare(order_by_expr_ctx_))) { + !PrevRowCompare(order_by_eq_expr_ctx_))) { AddResultTuple(stream_idx - 1); } } @@ -489,9 +489,9 @@ Status AnalyticEvalNode::ProcessChildBatch(RuntimeState* state) { // copied from curr_tuple_ because the original is used for one or more previous // row(s) but the incremental state still applies to the current row. bool next_partition = false; - if (partition_by_expr_ctx_ != NULL) { - // partition_by_expr_ctx_ checks equality over the predicate exprs - next_partition = !PrevRowCompare(partition_by_expr_ctx_); + if (partition_by_eq_expr_ctx_ != NULL) { + // partition_by_eq_expr_ctx_ checks equality over the predicate exprs + next_partition = !PrevRowCompare(partition_by_eq_expr_ctx_); } TryAddResultTupleForPrevRow(next_partition, stream_idx, row); if (next_partition) InitNextPartition(stream_idx); @@ -654,8 +654,8 @@ void AnalyticEvalNode::Close(RuntimeState* state) { evaluators_[i]->Close(state); fn_ctxs_[i]->impl()->Close(); } - if (partition_by_expr_ctx_ != NULL) partition_by_expr_ctx_->Close(state); - if (order_by_expr_ctx_ != NULL) order_by_expr_ctx_->Close(state); + if (partition_by_eq_expr_ctx_ != NULL) partition_by_eq_expr_ctx_->Close(state); + if (order_by_eq_expr_ctx_ != NULL) order_by_eq_expr_ctx_->Close(state); if (prev_child_batch_.get() != NULL) prev_child_batch_.reset(); if (curr_child_batch_.get() != NULL) curr_child_batch_.reset(); if (curr_tuple_pool_.get() != NULL) curr_tuple_pool_->FreeAll(); @@ -668,11 +668,11 @@ void AnalyticEvalNode::DebugString(int indentation_level, stringstream* out) con *out << string(indentation_level * 2, ' '); *out << "AnalyticEvalNode(" << " window=" << DebugWindowString(); - if (partition_by_expr_ctx_ != NULL) { - *out << " partition_exprs=" << partition_by_expr_ctx_->root()->DebugString(); + if (partition_by_eq_expr_ctx_ != NULL) { + *out << " partition_exprs=" << partition_by_eq_expr_ctx_->root()->DebugString(); } - if (order_by_expr_ctx_ != NULL) { - *out << " order_by_exprs=" << order_by_expr_ctx_->root()->DebugString(); + if (order_by_eq_expr_ctx_ != NULL) { + *out << " order_by_exprs=" << order_by_eq_expr_ctx_->root()->DebugString(); } *out << AggFnEvaluator::DebugString(evaluators_); ExecNode::DebugString(indentation_level, out); diff --git a/be/src/exec/analytic-eval-node.h b/be/src/exec/analytic-eval-node.h index adcede984..49b6c0e6f 100644 --- a/be/src/exec/analytic-eval-node.h +++ b/be/src/exec/analytic-eval-node.h @@ -187,17 +187,17 @@ class AnalyticEvalNode : public ExecNode { TupleDescriptor* buffered_tuple_desc_; // TupleRow* composed of the first child tuple and the buffered tuple, used by - // partition_by_expr_ctx_ and order_by_expr_ctx_. Set in Prepare() if + // partition_by_eq_expr_ctx_ and order_by_eq_expr_ctx_. Set in Prepare() if // buffered_tuple_desc_ is not NULL, allocated from mem_pool_. TupleRow* child_tuple_cmp_row_; // Expr context for a predicate that checks if child tuple '<' buffered tuple for // partitioning exprs. - ExprContext* partition_by_expr_ctx_; + ExprContext* partition_by_eq_expr_ctx_; // Expr context for a predicate that checks if child tuple '<' buffered tuple for // order by exprs. - ExprContext* order_by_expr_ctx_; + ExprContext* order_by_eq_expr_ctx_; // The scope over which analytic functions are evaluated. // TODO: Consider adding additional state to capture whether different kinds of window diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift index 8e8a1232f..9570485ba 100644 --- a/common/thrift/PlanNodes.thrift +++ b/common/thrift/PlanNodes.thrift @@ -303,10 +303,11 @@ struct TAnalyticNode { // child tuple and the buffered tuple 8: optional Exprs.TExpr partition_by_eq - // predicate that checks: child tuple '<' buffered tuple for order_by_exprs; - // only set if buffered_tuple_id is set; should be evaluated over a row that - // is composed of the child tuple and the buffered tuple - 9: optional Exprs.TExpr order_by_lt + // predicate that checks: the order_by_exprs are equal or both NULL when evaluated + // over the child tuple and the buffered tuple. only set if buffered_tuple_id is set; + // should be evaluated over a row that is composed of the child tuple and the buffered + // tuple + 9: optional Exprs.TExpr order_by_eq } struct TUnionNode { diff --git a/fe/src/main/java/com/cloudera/impala/planner/AnalyticEvalNode.java b/fe/src/main/java/com/cloudera/impala/planner/AnalyticEvalNode.java index dae005d2a..4d372041c 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/AnalyticEvalNode.java +++ b/fe/src/main/java/com/cloudera/impala/planner/AnalyticEvalNode.java @@ -14,7 +14,6 @@ package com.cloudera.impala.planner; -import java.util.ArrayList; import java.util.List; import org.slf4j.Logger; @@ -73,7 +72,7 @@ public class AnalyticEvalNode extends PlanNode { // predicates constructed from partitionExprs_/orderingExprs_ to // compare input to buffered tuples private final Expr partitionByEq_; - private final Expr orderByLessThan_; + private final Expr orderByEq_; private final TupleDescriptor bufferedTupleDesc_; public AnalyticEvalNode( @@ -82,8 +81,7 @@ public class AnalyticEvalNode extends PlanNode { List orderByElements, AnalyticWindow analyticWindow, TupleDescriptor logicalTupleDesc, TupleDescriptor intermediateTupleDesc, TupleDescriptor outputTupleDesc, ExprSubstitutionMap logicalToPhysicalSmap, - Expr partitionByEq, Expr orderByLessThan, - TupleDescriptor bufferedTupleDesc) { + Expr partitionByEq, Expr orderByEq, TupleDescriptor bufferedTupleDesc) { super(id, input.getTupleIds(), "ANALYTIC"); Preconditions.checkState(!tupleIds_.contains(outputTupleDesc.getId())); // we're materializing the input row augmented with the analytic output tuple @@ -98,7 +96,7 @@ public class AnalyticEvalNode extends PlanNode { outputTupleDesc_ = outputTupleDesc; logicalToPhysicalSmap_ = logicalToPhysicalSmap; partitionByEq_ = partitionByEq; - orderByLessThan_ = orderByLessThan; + orderByEq_ = orderByEq; bufferedTupleDesc_ = bufferedTupleDesc; children_.add(input); nullableTupleIds_.addAll(input.getNullableTupleIds()); @@ -158,8 +156,8 @@ public class AnalyticEvalNode extends PlanNode { .add("outputTid", outputTupleDesc_.getId()) .add("partitionByEq", partitionByEq_ != null ? partitionByEq_.debugString() : "null") - .add("orderByLt", - orderByLessThan_ != null ? orderByLessThan_.debugString() : "null") + .add("orderByEq", + orderByEq_ != null ? orderByEq_.debugString() : "null") .addValue(super.debugString()) .toString(); } @@ -185,8 +183,8 @@ public class AnalyticEvalNode extends PlanNode { if (partitionByEq_ != null) { msg.analytic_node.setPartition_by_eq(partitionByEq_.treeToThrift()); } - if (orderByLessThan_ != null) { - msg.analytic_node.setOrder_by_lt(orderByLessThan_.treeToThrift()); + if (orderByEq_ != null) { + msg.analytic_node.setOrder_by_eq(orderByEq_.treeToThrift()); } if (bufferedTupleDesc_ != null) { msg.analytic_node.setBuffered_tuple_id(bufferedTupleDesc_.getId().asInt()); diff --git a/fe/src/main/java/com/cloudera/impala/planner/AnalyticPlanner.java b/fe/src/main/java/com/cloudera/impala/planner/AnalyticPlanner.java index 00b6d8bcc..20ee03d05 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/AnalyticPlanner.java +++ b/fe/src/main/java/com/cloudera/impala/planner/AnalyticPlanner.java @@ -384,12 +384,13 @@ public class AnalyticPlanner { sortTupleId, bufferedSmap); LOG.trace("partitionByEq: " + partitionByEq.debugString()); } - Expr orderByLessThan = null; + Expr orderByEq = null; if (!windowGroup.orderByElements.isEmpty()) { - orderByLessThan = createLessThan( - OrderByElement.substitute(windowGroup.orderByElements, sortSmap, analyzer_), + orderByEq = createNullMatchingEquals( + OrderByElement.getOrderByExprs(OrderByElement.substitute( + windowGroup.orderByElements, sortSmap, analyzer_)), sortTupleId, bufferedSmap); - LOG.trace("orderByLt: " + orderByLessThan.debugString()); + LOG.trace("orderByEq: " + orderByEq.debugString()); } root = new AnalyticEvalNode(idGenerator_.getNextId(), root, stmtTupleIds_, @@ -398,66 +399,12 @@ public class AnalyticPlanner { windowGroup.window, analyticInfo_.getOutputTupleDesc(), windowGroup.physicalIntermediateTuple, windowGroup.physicalOutputTuple, windowGroup.logicalToPhysicalSmap, - partitionByEq, orderByLessThan, bufferedTupleDesc); + partitionByEq, orderByEq, bufferedTupleDesc); root.init(analyzer_); } return root; } - /** - * Create ' < ' predicate. - */ - private Expr createLessThan(List elements, TupleId inputTid, - ExprSubstitutionMap bufferedSmap) { - Preconditions.checkState(!elements.isEmpty()); - Expr result = createLessThanAux(elements, 0, inputTid, bufferedSmap); - try { - result.analyze(analyzer_); - } catch (AnalysisException e) { - throw new IllegalStateException(e); - } - return result; - } - - /** - * Create an unanalyzed '<' predicate for elements >= i. - * - * With asc/nulls first, the predicate has the form - * rhs[i] is not null && ( - * lhs[i] is null - * || lhs[i] < rhs[i] - * || (lhs[i] = rhs[i] && ) - * ) - * - * TODO: this is extremely contorted; we need to introduce a builtin - * compare(lhs, rhs, is_asc, nulls_first) - */ - private Expr createLessThanAux(List elements, int i, TupleId inputTid, - ExprSubstitutionMap bufferedSmap) { - if (i > elements.size() - 1) return new BoolLiteral(false); - - // compare elements[i] - Expr lhs = elements.get(i).getExpr(); - Preconditions.checkState(lhs.isBound(inputTid)); - Expr rhs = lhs.substitute(bufferedSmap, analyzer_); - Expr rhsIsNotFirst = new IsNullPredicate( - elements.get(i).nullsFirst() ? rhs : lhs, true); - Expr lhsIsFirst = new IsNullPredicate( - elements.get(i).nullsFirst() ? lhs : rhs, false); - BinaryPredicate.Operator comparison = elements.get(i).isAsc() - ? BinaryPredicate.Operator.LT - : BinaryPredicate.Operator.GT; - Expr lhsPrecedesRhs = new BinaryPredicate(comparison, lhs, rhs); - Expr lhsEqRhs = new BinaryPredicate(BinaryPredicate.Operator.EQ, lhs, rhs); - Expr remainder = createLessThanAux(elements, i + 1, inputTid, bufferedSmap); - Expr result = new CompoundPredicate(CompoundPredicate.Operator.AND, - rhsIsNotFirst, - new CompoundPredicate(CompoundPredicate.Operator.OR, - new CompoundPredicate(CompoundPredicate.Operator.OR, lhsIsFirst, lhsPrecedesRhs), - new CompoundPredicate(CompoundPredicate.Operator.AND, lhsEqRhs, remainder))); - return result; - } - /** * Create a predicate that checks if all exprs are equal or both sides are null. */ diff --git a/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test b/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test index f89c29d3b..db4ebf075 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test +++ b/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test @@ -824,4 +824,44 @@ select count(distinct t1.c1) from 11 ---- TYPES BIGINT -==== \ No newline at end of file +==== +---- QUERY +# IMPALA-1292: Incorrect result in analytic SUM when ORDER BY column is null +select tinyint_col, id, +SUM(id) OVER (ORDER BY tinyint_col ASC, id ASC) +FROM alltypesagg +where (tinyint_col is NULL or tinyint_col < 2) and id < 100 order by 1, 2 +---- RESULTS +1,1,1 +1,11,12 +1,21,33 +1,31,64 +1,41,105 +1,51,156 +1,61,217 +1,71,288 +1,81,369 +1,91,460 +NULL,0,460 +NULL,0,460 +NULL,10,480 +NULL,10,480 +NULL,20,520 +NULL,20,520 +NULL,30,580 +NULL,30,580 +NULL,40,660 +NULL,40,660 +NULL,50,760 +NULL,50,760 +NULL,60,880 +NULL,60,880 +NULL,70,1020 +NULL,70,1020 +NULL,80,1180 +NULL,80,1180 +NULL,90,1360 +NULL,90,1360 +---- TYPES +TINYINT, INT, BIGINT +====