diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc index 0bf51a904..b7dca6116 100644 --- a/be/src/exec/partitioned-aggregation-node.cc +++ b/be/src/exec/partitioned-aggregation-node.cc @@ -360,6 +360,61 @@ Status PartitionedAggregationNode::Open(RuntimeState* state) { Status PartitionedAggregationNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) { + int first_row_idx = row_batch->num_rows(); + RETURN_IF_ERROR(GetNextInternal(state, row_batch, eos)); + RETURN_IF_ERROR(HandleOutputStrings(row_batch, first_row_idx)); + return Status::OK(); +} + +Status PartitionedAggregationNode::HandleOutputStrings(RowBatch* row_batch, + int first_row_idx) { + if (!needs_finalize_ && !needs_serialize_) return Status::OK(); + // String data returned by Serialize() or Finalize() is from local expr allocations in + // the agg function contexts, and will be freed on the next GetNext() call by + // FreeLocalAllocations(). The data either needs to be copied out or sent up the plan + // tree via MarkNeedToReturn(). (See IMPALA-3311) + for (int i = 0; i < aggregate_evaluators_.size(); ++i) { + const SlotDescriptor* slot_desc = aggregate_evaluators_[i]->output_slot_desc(); + DCHECK(!slot_desc->type().IsCollectionType()) << "producing collections NYI"; + if (!slot_desc->type().IsVarLenStringType()) continue; + if (IsInSubplan()) { + // Copy string data to the row batch's pool. This is more efficient than + // MarkNeedToReturn() in a subplan since we are likely producing many small batches. + RETURN_IF_ERROR(CopyStringData(slot_desc, row_batch, first_row_idx, + row_batch->tuple_data_pool())); + } else { + row_batch->MarkNeedToReturn(); + break; + } + } + return Status::OK(); +} + +Status PartitionedAggregationNode::CopyStringData(const SlotDescriptor* slot_desc, + RowBatch* row_batch, int first_row_idx, MemPool* pool) { + DCHECK(slot_desc->type().IsVarLenStringType()); + DCHECK_EQ(row_batch->row_desc().tuple_descriptors().size(), 1); + FOREACH_ROW(row_batch, first_row_idx, batch_iter) { + Tuple* tuple = batch_iter.Get()->GetTuple(0); + StringValue* sv = reinterpret_cast( + tuple->GetSlot(slot_desc->tuple_offset())); + if (sv == NULL || sv->len == 0) continue; + char* new_ptr = reinterpret_cast(pool->TryAllocate(sv->len)); + if (new_ptr == NULL) { + Status s = Status::MemLimitExceeded(); + s.AddDetail(Substitute("Cannot perform aggregation at node with id $0." + " Failed to allocate $1 output bytes.", id_, sv->len)); + state_->SetMemLimitExceeded(); + return s; + } + memcpy(new_ptr, sv->ptr, sv->len); + sv->ptr = new_ptr; + } + return Status::OK(); +} + +Status PartitionedAggregationNode::GetNextInternal(RuntimeState* state, + RowBatch* row_batch, bool* eos) { SCOPED_TIMER(runtime_profile_->total_time_counter()); RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state)); RETURN_IF_CANCELLED(state); diff --git a/be/src/exec/partitioned-aggregation-node.h b/be/src/exec/partitioned-aggregation-node.h index 0b9451193..ab560c5ea 100644 --- a/be/src/exec/partitioned-aggregation-node.h +++ b/be/src/exec/partitioned-aggregation-node.h @@ -390,6 +390,20 @@ class PartitionedAggregationNode : public ExecNode { /// a temporary buffer. boost::scoped_ptr serialize_stream_; + /// Materializes 'row_batch' in either grouping or non-grouping case. + Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos); + + /// Helper function called by GetNextInternal() to ensure that string data referenced in + /// 'row_batch' will live as long as 'row_batch's tuples. 'first_row_idx' indexes the + /// first row that should be processed in 'row_batch'. + Status HandleOutputStrings(RowBatch* row_batch, int first_row_idx); + + /// Copies string data from the specified slot into 'pool', and sets the StringValues' + /// ptrs to the copied data. Copies data from all tuples in 'row_batch' from + /// 'first_row_idx' onwards. 'slot_desc' must have a var-len string type. + Status CopyStringData(const SlotDescriptor* slot_desc, RowBatch* row_batch, + int first_row_idx, MemPool* pool); + /// Constructs singleton output tuple, allocating memory from pool. Tuple* ConstructSingletonOutputTuple( const std::vector& agg_fn_ctxs, MemPool* pool); diff --git a/be/src/exprs/agg-fn-evaluator.h b/be/src/exprs/agg-fn-evaluator.h index 98fb4a16a..e9598ea23 100644 --- a/be/src/exprs/agg-fn-evaluator.h +++ b/be/src/exprs/agg-fn-evaluator.h @@ -118,6 +118,7 @@ class AggFnEvaluator { const std::string& fn_name() const { return fn_.name.function_name; } const std::string& update_symbol() const { return fn_.aggregate_fn.update_fn_symbol; } const std::string& merge_symbol() const { return fn_.aggregate_fn.merge_fn_symbol; } + const SlotDescriptor* output_slot_desc() const { return output_slot_desc_; } static std::string DebugString(const std::vector& exprs); std::string DebugString() const; diff --git a/testdata/workloads/functional-query/queries/QueryTest/nested-types-runtime.test b/testdata/workloads/functional-query/queries/QueryTest/nested-types-runtime.test index 2e38d1deb..35e27c58f 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/nested-types-runtime.test +++ b/testdata/workloads/functional-query/queries/QueryTest/nested-types-runtime.test @@ -450,3 +450,19 @@ inner join t2.int_array a ---- TYPES bigint ==== +---- QUERY +# IMPALA-3311: test string data coming out of an agg in a subplan +select id, m from complextypestbl t, +(select min(cast(item as string)) m from t.int_array) v +---- RESULTS +1,'1' +2,'1' +3,'NULL' +4,'NULL' +5,'NULL' +6,'NULL' +7,'NULL' +8,'-1' +---- TYPES +BIGINT,STRING +==== diff --git a/testdata/workloads/perf-regression/queries/subplan_aggregation.test b/testdata/workloads/perf-regression/queries/subplan_aggregation.test new file mode 100644 index 000000000..b9ea89434 --- /dev/null +++ b/testdata/workloads/perf-regression/queries/subplan_aggregation.test @@ -0,0 +1,11 @@ +==== +---- QUERY: subplan_aggregation +-- Description: Agg in subplan produces string output that's fed to non-trivial parent +-- plan +-- Target test case: Regression test for IMPALA-3311 +select c_custkey, max(m) from customer c, +(select max(o_orderstatus) m from c.c_orders) v +group by c_custkey order by 1 limit 1 +---- RESULTS +---- TYPES +====