From 7767d300a3f018c8c8b32fa72abe5c126900a2be Mon Sep 17 00:00:00 2001 From: Skye Wanderman-Milne Date: Thu, 12 May 2016 17:03:12 -0700 Subject: [PATCH] IMPALA-3311: fix string data coming out of aggs in subplans The problem: varlen data (e.g. strings) produced by aggregations is freed by FreeLocalAllocations() after passing up the output batch. This works for streaming operators or blocking operators that copy their input, but results in memory corruption when the output reaches non-copying blocking operators, e.g. SubplanNode and NestedLoopJoinNode. The fix: this patch makes the PartitionedAggregationNode copy out produced string data if the node is in a subplan. Otherwise it calls MarkNeedsToReturn() on the output batch. Marking the batch would work in the subplan case as well, but would likely be less efficient since it would result in many small batches coming out of the subplan. The patch includes a test case. However, this test only exposes the problem with an ASAN build and the --disable_mem_pools flag, which we don't currently have automated testing for. Change-Id: Iada891504c261ba54f4eb8c9d7e4e5223668d7b9 Reviewed-on: http://gerrit.cloudera.org:8080/2929 Reviewed-by: Dan Hecht Tested-by: Internal Jenkins --- be/src/exec/partitioned-aggregation-node.cc | 55 +++++++++++++++++++ be/src/exec/partitioned-aggregation-node.h | 14 +++++ be/src/exprs/agg-fn-evaluator.h | 1 + .../QueryTest/nested-types-runtime.test | 16 ++++++ .../queries/subplan_aggregation.test | 11 ++++ 5 files changed, 97 insertions(+) create mode 100644 testdata/workloads/perf-regression/queries/subplan_aggregation.test diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc index 0bf51a904..b7dca6116 100644 --- a/be/src/exec/partitioned-aggregation-node.cc +++ b/be/src/exec/partitioned-aggregation-node.cc @@ -360,6 +360,61 @@ Status PartitionedAggregationNode::Open(RuntimeState* state) { Status PartitionedAggregationNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) { + int first_row_idx = row_batch->num_rows(); + RETURN_IF_ERROR(GetNextInternal(state, row_batch, eos)); + RETURN_IF_ERROR(HandleOutputStrings(row_batch, first_row_idx)); + return Status::OK(); +} + +Status PartitionedAggregationNode::HandleOutputStrings(RowBatch* row_batch, + int first_row_idx) { + if (!needs_finalize_ && !needs_serialize_) return Status::OK(); + // String data returned by Serialize() or Finalize() is from local expr allocations in + // the agg function contexts, and will be freed on the next GetNext() call by + // FreeLocalAllocations(). The data either needs to be copied out or sent up the plan + // tree via MarkNeedToReturn(). (See IMPALA-3311) + for (int i = 0; i < aggregate_evaluators_.size(); ++i) { + const SlotDescriptor* slot_desc = aggregate_evaluators_[i]->output_slot_desc(); + DCHECK(!slot_desc->type().IsCollectionType()) << "producing collections NYI"; + if (!slot_desc->type().IsVarLenStringType()) continue; + if (IsInSubplan()) { + // Copy string data to the row batch's pool. This is more efficient than + // MarkNeedToReturn() in a subplan since we are likely producing many small batches. + RETURN_IF_ERROR(CopyStringData(slot_desc, row_batch, first_row_idx, + row_batch->tuple_data_pool())); + } else { + row_batch->MarkNeedToReturn(); + break; + } + } + return Status::OK(); +} + +Status PartitionedAggregationNode::CopyStringData(const SlotDescriptor* slot_desc, + RowBatch* row_batch, int first_row_idx, MemPool* pool) { + DCHECK(slot_desc->type().IsVarLenStringType()); + DCHECK_EQ(row_batch->row_desc().tuple_descriptors().size(), 1); + FOREACH_ROW(row_batch, first_row_idx, batch_iter) { + Tuple* tuple = batch_iter.Get()->GetTuple(0); + StringValue* sv = reinterpret_cast( + tuple->GetSlot(slot_desc->tuple_offset())); + if (sv == NULL || sv->len == 0) continue; + char* new_ptr = reinterpret_cast(pool->TryAllocate(sv->len)); + if (new_ptr == NULL) { + Status s = Status::MemLimitExceeded(); + s.AddDetail(Substitute("Cannot perform aggregation at node with id $0." + " Failed to allocate $1 output bytes.", id_, sv->len)); + state_->SetMemLimitExceeded(); + return s; + } + memcpy(new_ptr, sv->ptr, sv->len); + sv->ptr = new_ptr; + } + return Status::OK(); +} + +Status PartitionedAggregationNode::GetNextInternal(RuntimeState* state, + RowBatch* row_batch, bool* eos) { SCOPED_TIMER(runtime_profile_->total_time_counter()); RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state)); RETURN_IF_CANCELLED(state); diff --git a/be/src/exec/partitioned-aggregation-node.h b/be/src/exec/partitioned-aggregation-node.h index 0b9451193..ab560c5ea 100644 --- a/be/src/exec/partitioned-aggregation-node.h +++ b/be/src/exec/partitioned-aggregation-node.h @@ -390,6 +390,20 @@ class PartitionedAggregationNode : public ExecNode { /// a temporary buffer. boost::scoped_ptr serialize_stream_; + /// Materializes 'row_batch' in either grouping or non-grouping case. + Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos); + + /// Helper function called by GetNextInternal() to ensure that string data referenced in + /// 'row_batch' will live as long as 'row_batch's tuples. 'first_row_idx' indexes the + /// first row that should be processed in 'row_batch'. + Status HandleOutputStrings(RowBatch* row_batch, int first_row_idx); + + /// Copies string data from the specified slot into 'pool', and sets the StringValues' + /// ptrs to the copied data. Copies data from all tuples in 'row_batch' from + /// 'first_row_idx' onwards. 'slot_desc' must have a var-len string type. + Status CopyStringData(const SlotDescriptor* slot_desc, RowBatch* row_batch, + int first_row_idx, MemPool* pool); + /// Constructs singleton output tuple, allocating memory from pool. Tuple* ConstructSingletonOutputTuple( const std::vector& agg_fn_ctxs, MemPool* pool); diff --git a/be/src/exprs/agg-fn-evaluator.h b/be/src/exprs/agg-fn-evaluator.h index 98fb4a16a..e9598ea23 100644 --- a/be/src/exprs/agg-fn-evaluator.h +++ b/be/src/exprs/agg-fn-evaluator.h @@ -118,6 +118,7 @@ class AggFnEvaluator { const std::string& fn_name() const { return fn_.name.function_name; } const std::string& update_symbol() const { return fn_.aggregate_fn.update_fn_symbol; } const std::string& merge_symbol() const { return fn_.aggregate_fn.merge_fn_symbol; } + const SlotDescriptor* output_slot_desc() const { return output_slot_desc_; } static std::string DebugString(const std::vector& exprs); std::string DebugString() const; diff --git a/testdata/workloads/functional-query/queries/QueryTest/nested-types-runtime.test b/testdata/workloads/functional-query/queries/QueryTest/nested-types-runtime.test index 2e38d1deb..35e27c58f 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/nested-types-runtime.test +++ b/testdata/workloads/functional-query/queries/QueryTest/nested-types-runtime.test @@ -450,3 +450,19 @@ inner join t2.int_array a ---- TYPES bigint ==== +---- QUERY +# IMPALA-3311: test string data coming out of an agg in a subplan +select id, m from complextypestbl t, +(select min(cast(item as string)) m from t.int_array) v +---- RESULTS +1,'1' +2,'1' +3,'NULL' +4,'NULL' +5,'NULL' +6,'NULL' +7,'NULL' +8,'-1' +---- TYPES +BIGINT,STRING +==== diff --git a/testdata/workloads/perf-regression/queries/subplan_aggregation.test b/testdata/workloads/perf-regression/queries/subplan_aggregation.test new file mode 100644 index 000000000..b9ea89434 --- /dev/null +++ b/testdata/workloads/perf-regression/queries/subplan_aggregation.test @@ -0,0 +1,11 @@ +==== +---- QUERY: subplan_aggregation +-- Description: Agg in subplan produces string output that's fed to non-trivial parent +-- plan +-- Target test case: Regression test for IMPALA-3311 +select c_custkey, max(m) from customer c, +(select max(o_orderstatus) m from c.c_orders) v +group by c_custkey order by 1 limit 1 +---- RESULTS +---- TYPES +====