From 44125729dc7d1e12e7ee2fad83254eebcb435ac5 Mon Sep 17 00:00:00 2001 From: Skye Wanderman-Milne Date: Mon, 17 Mar 2014 15:04:20 -0700 Subject: [PATCH] UDF/UDA memory management improvements * AggFnEvaluator now uses the UDF mem pool (I'm planning to change this to per-exec node pools in the expr refactoring) * FunctionContext::TrackAllocation()/Free() actually use the UDF's mem tracker * Added FunctionContextImpl::Close() which sets warnings for leaked allocations Change-Id: I792ffd49102a92b57e34df18d8ff5f5d0fd27370 Reviewed-on: http://gerrit.ent.cloudera.com:8080/1792 Reviewed-by: Skye Wanderman-Milne Tested-by: Skye Wanderman-Milne (cherry picked from commit 41a5f7cfa718789fa3b2de3a31f085411fb5000c) Reviewed-on: http://gerrit.ent.cloudera.com:8080/1954 Tested-by: jenkins --- be/src/exec/aggregation-node.cc | 35 ++-- be/src/exec/aggregation-node.h | 4 +- be/src/exprs/agg-fn-evaluator.cc | 17 +- be/src/exprs/agg-fn-evaluator.h | 7 +- be/src/exprs/aggregate-functions.cc | 15 +- be/src/exprs/aggregate-functions.h | 4 + be/src/exprs/native-udf-expr.cc | 14 ++ be/src/exprs/native-udf-expr.h | 13 +- be/src/runtime/free-pool.h | 2 + be/src/runtime/mem-tracker.h | 14 +- be/src/testutil/CMakeLists.txt | 4 +- be/src/testutil/test-udas.cc | 37 ++++ be/src/testutil/test-udas.h | 28 +++ be/src/testutil/test-udfs.cc | 47 +++-- be/src/testutil/test-udfs.h | 28 +++ be/src/udf/CMakeLists.txt | 1 + be/src/udf/uda-test-harness-impl.h | 176 +++++++++--------- be/src/udf/uda-test-harness.h | 23 ++- be/src/udf/uda-test.cc | 25 +++ be/src/udf/udf-internal.h | 14 +- be/src/udf/udf-test-harness.cc | 4 + be/src/udf/udf-test-harness.h | 25 ++- be/src/udf/udf-test.cc | 22 ++- be/src/udf/udf.cc | 92 +++++++-- be/src/udf/udf.h | 21 ++- be/src/udf_samples/hyperloglog-uda.cc | 9 + .../com/cloudera/impala/catalog/Catalog.java | 18 +- .../queries/QueryTest/uda-mem-limit.test | 12 ++ .../queries/QueryTest/udf-mem-limit.test | 14 ++ tests/query_test/test_udfs.py | 23 +++ 30 files changed, 560 insertions(+), 188 deletions(-) create mode 100644 be/src/testutil/test-udas.h create mode 100644 be/src/testutil/test-udfs.h create mode 100644 testdata/workloads/functional-query/queries/QueryTest/uda-mem-limit.test create mode 100644 testdata/workloads/functional-query/queries/QueryTest/udf-mem-limit.test diff --git a/be/src/exec/aggregation-node.cc b/be/src/exec/aggregation-node.cc index 577a22170..4e3be0ef6 100644 --- a/be/src/exec/aggregation-node.cc +++ b/be/src/exec/aggregation-node.cc @@ -112,8 +112,7 @@ Status AggregationNode::Prepare(RuntimeState* state) { ++j; } SlotDescriptor* desc = agg_tuple_desc_->slots()[j]; - RETURN_IF_ERROR(aggregate_evaluators_[i]->Prepare(state, child(0)->row_desc(), - tuple_pool_.get(), desc)); + RETURN_IF_ERROR(aggregate_evaluators_[i]->Prepare(state, child(0)->row_desc(), desc)); } // TODO: how many buckets? @@ -124,6 +123,8 @@ Status AggregationNode::Prepare(RuntimeState* state) { // create single output tuple now; we need to output something // even if our input is empty singleton_output_tuple_ = ConstructAggTuple(); + hash_tbl_->Insert(reinterpret_cast(&singleton_output_tuple_)); + output_iterator_ = hash_tbl_->Begin(); } if (state->codegen_enabled()) { @@ -159,7 +160,6 @@ Status AggregationNode::Open(RuntimeState* state) { RowBatch batch(children_[0]->row_desc(), state->batch_size(), mem_tracker()); int64_t num_input_rows = 0; - int64_t num_agg_rows = 0; while (true) { bool eos; RETURN_IF_CANCELLED(state); @@ -173,18 +173,19 @@ Status AggregationNode::Open(RuntimeState* state) { VLOG_ROW << "input row: " << PrintRow(row, children_[0]->row_desc()); } } - int64_t agg_rows_before = hash_tbl_->size(); if (process_row_batch_fn_ != NULL) { process_row_batch_fn_(this, &batch); - } else if (singleton_output_tuple_ != NULL) { + } else if (probe_exprs_.empty()) { ProcessRowBatchNoGrouping(&batch); } else { ProcessRowBatchWithGrouping(&batch); } COUNTER_SET(hash_table_buckets_counter_, hash_tbl_->num_buckets()); COUNTER_SET(hash_table_load_factor_counter_, hash_tbl_->load_factor()); - num_agg_rows += (hash_tbl_->size() - agg_rows_before); num_input_rows += batch.num_rows(); + // We must set output_iterator_ here, rather than outside the loop, because + // output_iterator_ must be set if the function returns within the loop + output_iterator_ = hash_tbl_->Begin(); batch.Reset(); RETURN_IF_ERROR(state->CheckQueryState()); @@ -194,13 +195,8 @@ Status AggregationNode::Open(RuntimeState* state) { // We have consumed all of the input from the child and transfered ownership of the // resources we need, so the child can be closed safely to release its resources. child(0)->Close(state); - if (singleton_output_tuple_ != NULL) { - hash_tbl_->Insert(reinterpret_cast(&singleton_output_tuple_)); - ++num_agg_rows; - } VLOG_FILE << "aggregated " << num_input_rows << " input rows into " - << num_agg_rows << " output rows"; - output_iterator_ = hash_tbl_->Begin(); + << hash_tbl_->size() << " output rows"; return Status::OK; } @@ -223,6 +219,7 @@ Status AggregationNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* TupleRow* row = row_batch->GetRow(row_idx); Tuple* agg_tuple = output_iterator_.GetRow()->GetTuple(0); FinalizeAggTuple(agg_tuple); + output_iterator_.Next(); row->SetTuple(0, agg_tuple); if (ExecNode::EvalConjuncts(conjuncts, num_conjuncts, row)) { VLOG_ROW << "output row: " << PrintRow(row, row_desc()); @@ -230,7 +227,6 @@ Status AggregationNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* ++num_rows_returned_; if (ReachedLimit()) break; } - output_iterator_.Next(); } *eos = output_iterator_.AtEnd() || ReachedLimit(); COUNTER_SET(rows_returned_counter_, num_rows_returned_); @@ -239,6 +235,15 @@ Status AggregationNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* void AggregationNode::Close(RuntimeState* state) { if (is_closed()) return; + + // Iterate through the remaining rows in the hash table and call Serialize/Finalize on + // them in order to free any memory allocated by UDAs + while (!output_iterator_.AtEnd()) { + Tuple* agg_tuple = output_iterator_.GetRow()->GetTuple(0); + FinalizeAggTuple(agg_tuple); + output_iterator_.Next(); + } + if (tuple_pool_.get() != NULL) tuple_pool_->FreeAll(); if (hash_tbl_.get() != NULL) hash_tbl_->Close(); for (int i = 0; i < aggregate_evaluators_.size(); ++i) { @@ -583,7 +588,7 @@ Function* AggregationNode::CodegenProcessRowBatch( DCHECK(update_tuple_fn != NULL); // Get the cross compiled update row batch function - IRFunction::Type ir_fn = (singleton_output_tuple_ == NULL ? + IRFunction::Type ir_fn = (!probe_exprs_.empty() ? IRFunction::AGG_NODE_PROCESS_ROW_BATCH_WITH_GROUPING : IRFunction::AGG_NODE_PROCESS_ROW_BATCH_NO_GROUPING); Function* process_batch_fn = codegen->GetFunction(ir_fn); @@ -594,7 +599,7 @@ Function* AggregationNode::CodegenProcessRowBatch( } int replaced = 0; - if (singleton_output_tuple_ == NULL) { + if (!probe_exprs_.empty()) { // Aggregation w/o grouping does not use a hash table. // Codegen for hash diff --git a/be/src/exec/aggregation-node.h b/be/src/exec/aggregation-node.h index ed000ee8d..71658fbf5 100644 --- a/be/src/exec/aggregation-node.h +++ b/be/src/exec/aggregation-node.h @@ -75,7 +75,9 @@ class AggregationNode : public ExecNode { std::vector build_exprs_; TupleId agg_tuple_id_; TupleDescriptor* agg_tuple_desc_; - Tuple* singleton_output_tuple_; // result of aggregation w/o GROUP BY + // Result of aggregation w/o GROUP BY. + // Note: can be NULL even if there is no grouping if the result tuple is 0 width + Tuple* singleton_output_tuple_; boost::scoped_ptr tuple_pool_; diff --git a/be/src/exprs/agg-fn-evaluator.cc b/be/src/exprs/agg-fn-evaluator.cc index 1f2900a8f..b45dedf5e 100644 --- a/be/src/exprs/agg-fn-evaluator.cc +++ b/be/src/exprs/agg-fn-evaluator.cc @@ -99,8 +99,7 @@ AggFnEvaluator::~AggFnEvaluator() { } Status AggFnEvaluator::Prepare(RuntimeState* state, const RowDescriptor& desc, - MemPool* pool, const SlotDescriptor* output_slot_desc) { - DCHECK(pool != NULL); + const SlotDescriptor* output_slot_desc) { DCHECK(output_slot_desc != NULL); DCHECK(output_slot_desc_ == NULL); output_slot_desc_ = output_slot_desc; @@ -154,7 +153,7 @@ Status AggFnEvaluator::Prepare(RuntimeState* state, const RowDescriptor& desc, for (int i = 0; i < input_exprs_.size(); ++i) { AnyValUtil::ColumnTypeToTypeDesc(input_exprs_[i]->type(), &arg_types[i]); } - ctx_.reset(FunctionContextImpl::CreateContext(state, pool, arg_types)); + ctx_.reset(FunctionContextImpl::CreateContext(state, state->udf_pool(), arg_types)); return Status::OK; } @@ -174,6 +173,18 @@ Status AggFnEvaluator::Open(RuntimeState* state) { void AggFnEvaluator::Close(RuntimeState* state) { Expr::Close(input_exprs_, state); + + if (ctx_.get() != NULL) { + bool previous_error = ctx_->has_error(); + ctx_->impl()->Close(); + if (!previous_error && ctx_->has_error()) { + // TODO: revisit this (see comment in NativeUdfExpr) + stringstream ss; + ss << "UDA ERROR: " << ctx_->error_msg(); + state->LogError(ss.str()); + } + } + if (cache_entry_ != NULL) { LibCache::instance()->DecrementUseCount(cache_entry_); cache_entry_ = NULL; diff --git a/be/src/exprs/agg-fn-evaluator.h b/be/src/exprs/agg-fn-evaluator.h index 4db55d82e..65f264c00 100644 --- a/be/src/exprs/agg-fn-evaluator.h +++ b/be/src/exprs/agg-fn-evaluator.h @@ -33,6 +33,7 @@ namespace impala { class AggregationNode; class Expr; class MemPool; +class MemTracker; class ObjectPool; class RowDescriptor; class RuntimeState; @@ -70,12 +71,8 @@ class AggFnEvaluator { // Initializes the agg expr. 'desc' must be the row descriptor for the input TupleRow. // It is used to get the input values in the Update() and Merge() functions. // 'output_slot_desc' is the slot that this evaluator should write to. - // The underlying aggregate function allocates memory from the 'pool'. This is - // either string data for intermediate results or whatever memory the UDA might - // need. - // TODO: should we give them their own pool? Status Prepare(RuntimeState* state, const RowDescriptor& desc, - MemPool* pool, const SlotDescriptor* output_slot_desc); + const SlotDescriptor* output_slot_desc); ~AggFnEvaluator(); diff --git a/be/src/exprs/aggregate-functions.cc b/be/src/exprs/aggregate-functions.cc index 6a69f5847..bc6ffd218 100644 --- a/be/src/exprs/aggregate-functions.cc +++ b/be/src/exprs/aggregate-functions.cc @@ -46,6 +46,15 @@ void AggregateFunctions::InitZero(FunctionContext*, T* dst) { dst->val = 0; } +StringVal AggregateFunctions::StringValSerializeOrFinalize( + FunctionContext* ctx, const StringVal& src) { + if (src.is_null) return src; + StringVal result(ctx, src.len); + memcpy(result.ptr, src.ptr, src.len); + ctx->Free(src.ptr); + return result; +} + void AggregateFunctions::CountUpdate( FunctionContext*, const AnyVal& src, BigIntVal* dst) { DCHECK(!dst->is_null); @@ -330,9 +339,9 @@ StringVal AggregateFunctions::PcFinalize(FunctionContext* c, const StringVal& sr stringstream ss; ss << result; string str = ss.str(); - StringVal dst = src; + StringVal dst(c, str.length()); memcpy(dst.ptr, str.c_str(), str.length()); - dst.len = str.length(); + c->Free(src.ptr); return dst; } @@ -347,6 +356,7 @@ StringVal AggregateFunctions::PcsaFinalize(FunctionContext* c, const StringVal& StringVal dst = src; memcpy(dst.ptr, str.c_str(), str.length()); dst.len = str.length(); + c->Free(src.ptr); return dst; } @@ -422,6 +432,7 @@ StringVal AggregateFunctions::HllFinalize(FunctionContext* ctx, const StringVal& string out_str = out.str(); StringVal result_str(ctx, out_str.size()); memcpy(result_str.ptr, out_str.c_str(), result_str.len); + ctx->Free(src.ptr); return result_str; } diff --git a/be/src/exprs/aggregate-functions.h b/be/src/exprs/aggregate-functions.h index 8b9e814d4..bcfaa6ace 100644 --- a/be/src/exprs/aggregate-functions.h +++ b/be/src/exprs/aggregate-functions.h @@ -38,6 +38,10 @@ class AggregateFunctions { template static void InitZero(FunctionContext*, T* dst); + // StringVal Serialize/Finalize function that copies and frees src + static StringVal StringValSerializeOrFinalize( + FunctionContext* ctx, const StringVal& src); + // Implementation of Count and Count(*) static void CountUpdate(FunctionContext*, const AnyVal& src, BigIntVal* dst); static void CountStarUpdate(FunctionContext*, BigIntVal* dst); diff --git a/be/src/exprs/native-udf-expr.cc b/be/src/exprs/native-udf-expr.cc index 8d0887911..6ea5f196e 100644 --- a/be/src/exprs/native-udf-expr.cc +++ b/be/src/exprs/native-udf-expr.cc @@ -285,6 +285,20 @@ void NativeUdfExpr::Close(RuntimeState* state) { if (close_fn_ != NULL && opened_) { close_fn_(udf_context_.get(), FunctionContext::THREAD_LOCAL); } + + if (udf_context_.get() != NULL) { + bool previous_error = udf_context_->has_error(); + udf_context_->impl()->Close(); + if (!previous_error && udf_context_->has_error()) { + // TODO: revisit this. Errors logged in close will likely not be displayed to the + // shell, and we may want to automatically log bad query statuses set in close + // rather than manually doing it here and in AggFnEvaluator. + stringstream ss; + ss << "UDF ERROR: " << udf_context_->error_msg(); + state->LogError(ss.str()); + } + } + Expr::Close(state); } diff --git a/be/src/exprs/native-udf-expr.h b/be/src/exprs/native-udf-expr.h index 86e6b7daf..b4fbebc83 100644 --- a/be/src/exprs/native-udf-expr.h +++ b/be/src/exprs/native-udf-expr.h @@ -45,9 +45,16 @@ class TExprNode; // path. // // TODO: -// - convert other Exprs to UDFs or override GetIrComputeFn() -// - ExprContext -// - remove current Codegen/ComputeFn API +// - Refactoring +// - convert other Exprs to UDFs or override GetIrComputeFn() +// - ExprContext +// - remove current Codegen/ComputeFn API +// - Fix error reporting, e.g. reporting leaks +// - Testing +// - Test cancellation +// - Type descs in UDA test harness +// - Allow more functions to be NULL in UDA test harness +// class NativeUdfExpr: public Expr { public: ~NativeUdfExpr(); diff --git a/be/src/runtime/free-pool.h b/be/src/runtime/free-pool.h index b4cba2ec8..d8ad9c768 100644 --- a/be/src/runtime/free-pool.h +++ b/be/src/runtime/free-pool.h @@ -109,6 +109,8 @@ class FreePool { return new_ptr; } + MemTracker* mem_tracker() { return mem_pool_->mem_tracker(); } + private: static const int NUM_LISTS = 64; diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h index 9379d09b0..3242d11fe 100644 --- a/be/src/runtime/mem-tracker.h +++ b/be/src/runtime/mem-tracker.h @@ -113,7 +113,9 @@ class MemTracker { for (std::vector::iterator tracker = all_trackers_.begin(); tracker != all_trackers_.end(); ++tracker) { (*tracker)->consumption_->Update(bytes); - DCHECK_GE((*tracker)->consumption_->current_value(), 0); + if ((*tracker)->consumption_metric_ == NULL) { + DCHECK_GE((*tracker)->consumption_->current_value(), 0); + } } } @@ -180,7 +182,15 @@ class MemTracker { for (std::vector::iterator tracker = all_trackers_.begin(); tracker != all_trackers_.end(); ++tracker) { (*tracker)->consumption_->Update(-bytes); - DCHECK_GE((*tracker)->consumption_->current_value(), 0); + // If a UDF calls FunctionContext::TrackAllocation() but allocates less than the + // reported amount, the subsequent call to FunctionContext::Free() may cause the + // process mem tracker to go negative until it is synced back to the tcmalloc + // metric. Don't blow up in this case. (Note that this doesn't affect non-process + // trackers since we can enforce that the reported memory usage is internally + // consistent.) + if ((*tracker)->consumption_metric_ == NULL) { + DCHECK_GE((*tracker)->consumption_->current_value(), 0); + } } // TODO: Release brokered memory? diff --git a/be/src/testutil/CMakeLists.txt b/be/src/testutil/CMakeLists.txt index 1d3863a73..e9a7ab868 100644 --- a/be/src/testutil/CMakeLists.txt +++ b/be/src/testutil/CMakeLists.txt @@ -22,9 +22,11 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/testutil") set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/testutil") add_library(TestUtil STATIC - impalad-query-executor.cc + impalad-query-executor.cc in-process-servers.cc desc-tbl-builder.cc + test-udas.cc + test-udfs.cc ) add_library(TestUdfs SHARED test-udfs.cc) diff --git a/be/src/testutil/test-udas.cc b/be/src/testutil/test-udas.cc index 9ccb8391e..518d0c278 100644 --- a/be/src/testutil/test-udas.cc +++ b/be/src/testutil/test-udas.cc @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "testutil/test-udas.h" + #include using namespace impala_udf; @@ -54,3 +56,38 @@ void AggMerge(FunctionContext*, const BufferVal&, BufferVal*) {} BigIntVal AggFinalize(FunctionContext*, const BufferVal&) { return BigIntVal::null(); } + +// Defines MemTest(bigint) return bigint +// "Allocates" the specified number of bytes in the update function and frees them in the +// serialize function. Useful for testing mem limits. +void MemTestInit(FunctionContext*, BigIntVal* total) { + *total = BigIntVal(0); +} + +void MemTestUpdate(FunctionContext* context, const BigIntVal& bytes, BigIntVal* total) { + if (bytes.is_null) return; + context->TrackAllocation(bytes.val); // freed by serialize() + total->val += bytes.val; +} + +void MemTestMerge(FunctionContext* context, const BigIntVal& src, BigIntVal* dst) { + if (src.is_null) return; + context->TrackAllocation(src.val); // freed by finalize() + if (dst->is_null) { + *dst = src; + return; + } + dst->val += src.val; +} + +const BigIntVal MemTestSerialize(FunctionContext* context, const BigIntVal& total) { + if (total.is_null) return BigIntVal(0); + context->Free(total.val); + return total; +} + +BigIntVal MemTestFinalize(FunctionContext* context, const BigIntVal& total) { + if (total.is_null) return BigIntVal(0); + context->Free(total.val); + return total; +} diff --git a/be/src/testutil/test-udas.h b/be/src/testutil/test-udas.h new file mode 100644 index 000000000..aeed56cd8 --- /dev/null +++ b/be/src/testutil/test-udas.h @@ -0,0 +1,28 @@ +// Copyright 2012 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef IMPALA_UDF_TEST_UDAS_H +#define IMPALA_UDF_TEST_UDAS_H + +#include "udf/udf.h" + +using namespace impala_udf; + +void MemTestInit(FunctionContext*, BigIntVal* total); +void MemTestUpdate(FunctionContext* context, const BigIntVal& bytes, BigIntVal* total); +void MemTestMerge(FunctionContext* context, const BigIntVal& src, BigIntVal* dst); +const BigIntVal MemTestSerialize(FunctionContext* context, const BigIntVal& total); +BigIntVal MemTestFinalize(FunctionContext* context, const BigIntVal& total); + +#endif diff --git a/be/src/testutil/test-udfs.cc b/be/src/testutil/test-udfs.cc index 4440e1aae..919c7b091 100644 --- a/be/src/testutil/test-udfs.cc +++ b/be/src/testutil/test-udfs.cc @@ -178,13 +178,9 @@ void ConstantArgPrepare( FunctionContext* context, FunctionContext::FunctionStateScope scope) { if (scope == FunctionContext::THREAD_LOCAL) { IntVal* arg = reinterpret_cast(context->GetConstantArg(0)); - if (arg == NULL) { - IntVal* state = reinterpret_cast(context->Allocate(sizeof(IntVal))); - *state = IntVal::null(); - context->SetFunctionState(scope, state); - } else { - context->SetFunctionState(scope, arg); - } + IntVal* state = reinterpret_cast(context->Allocate(sizeof(IntVal))); + *state = (arg != NULL) ? *arg : IntVal::null(); + context->SetFunctionState(scope, state); } } @@ -227,8 +223,37 @@ void ValidateOpenClose( } } -namespace foo { - void bar( - FunctionContext* context, FunctionContext::FunctionStateScope scope) { } - +// MemTest UDF: "Allocates" the specified number of bytes per call. +void MemTestPrepare(FunctionContext* context, FunctionContext::FunctionStateScope scope) { + if (scope == FunctionContext::THREAD_LOCAL) { + int64_t* total = + reinterpret_cast(context->Allocate(sizeof(int64_t))); + *total = 0; + context->SetFunctionState(scope, total); + } +} + +BigIntVal MemTest(FunctionContext* context, const BigIntVal& bytes) { + int64_t* total = reinterpret_cast( + context->GetFunctionState(FunctionContext::THREAD_LOCAL)); + context->TrackAllocation(bytes.val); + *total += bytes.val; + return bytes; +} + +void MemTestClose(FunctionContext* context, FunctionContext::FunctionStateScope scope) { + if (scope == FunctionContext::THREAD_LOCAL) { + int64_t* total = reinterpret_cast( + context->GetFunctionState(FunctionContext::THREAD_LOCAL)); + context->Free(*total); + context->Free(reinterpret_cast(total)); + context->SetFunctionState(scope, NULL); + } +} + +BigIntVal DoubleFreeTest(FunctionContext* context, BigIntVal bytes) { + context->TrackAllocation(bytes.val); + context->Free(bytes.val); + context->Free(bytes.val); + return bytes; } diff --git a/be/src/testutil/test-udfs.h b/be/src/testutil/test-udfs.h new file mode 100644 index 000000000..ee68f7a62 --- /dev/null +++ b/be/src/testutil/test-udfs.h @@ -0,0 +1,28 @@ +// Copyright 2012 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef IMPALA_UDF_TEST_UDFS_H +#define IMPALA_UDF_TEST_UDFS_H + +#include "udf/udf.h" + +using namespace impala_udf; + +void MemTestPrepare(FunctionContext* context, FunctionContext::FunctionStateScope scope); +BigIntVal MemTest(FunctionContext* context, const BigIntVal& bytes); +void MemTestClose(FunctionContext* context, FunctionContext::FunctionStateScope scope); + +BigIntVal DoubleFreeTest(FunctionContext* context, BigIntVal bytes); + +#endif diff --git a/be/src/udf/CMakeLists.txt b/be/src/udf/CMakeLists.txt index d73f96b90..38ee68e50 100644 --- a/be/src/udf/CMakeLists.txt +++ b/be/src/udf/CMakeLists.txt @@ -34,6 +34,7 @@ set (UDF_TEST_LINK_LIBS GlobalFlags ImpalaUdf Runtime + TestUtil Util -Wl,--end-group # Below are all external dependencies. They should some after the impala libs. diff --git a/be/src/udf/uda-test-harness-impl.h b/be/src/udf/uda-test-harness-impl.h index 03e9ddacc..73045deb9 100644 --- a/be/src/udf/uda-test-harness-impl.h +++ b/be/src/udf/uda-test-harness-impl.h @@ -20,7 +20,6 @@ #include #include -#include #include namespace impala_udf { @@ -97,14 +96,18 @@ bool UdaTestHarnessBase::Execute( error_msg_ = ""; RESULT result; + std::vector types; // TODO if (mode == ALL || mode == SINGLE_NODE) { - result = ExecuteSingleNode(); - if (error_msg_.empty() && !CheckResult(result, expected)) { - std::stringstream ss; - ss << "UDA failed running in single node execution." << std::endl - << "Expected: " << DebugString(expected) - << " Actual: " << DebugString(result); - error_msg_ = ss.str(); + { + ScopedFunctionContext context(UdfTestHarness::CreateTestContext(types), this); + result = ExecuteSingleNode(&context); + if (error_msg_.empty() && !CheckResult(result, expected)) { + std::stringstream ss; + ss << "UDA failed running in single node execution." << std::endl + << "Expected: " << DebugString(expected) + << " Actual: " << DebugString(result); + error_msg_ = ss.str(); + } } if (!error_msg_.empty()) return false; } @@ -112,7 +115,8 @@ bool UdaTestHarnessBase::Execute( const int num_nodes[] = { 1, 2, 10, 20, 100 }; if (mode == ALL || mode == ONE_LEVEL) { for (int i = 0; i < sizeof(num_nodes) / sizeof(int); ++i) { - result = ExecuteOneLevel(num_nodes[i]); + ScopedFunctionContext context(UdfTestHarness::CreateTestContext(types), this); + result = ExecuteOneLevel(num_nodes[i], &context); if (error_msg_.empty() && !CheckResult(result, expected)) { std::stringstream ss; ss << "UDA failed running in one level distributed mode with " @@ -129,7 +133,8 @@ bool UdaTestHarnessBase::Execute( if (mode == ALL || mode == TWO_LEVEL) { for (int i = 0; i < sizeof(num_nodes) / sizeof(int); ++i) { for (int j = 0; j <= i; ++j) { - result = ExecuteTwoLevel(num_nodes[i], num_nodes[j]); + ScopedFunctionContext context(UdfTestHarness::CreateTestContext(types), this); + result = ExecuteTwoLevel(num_nodes[i], num_nodes[j], &context); if (error_msg_.empty() && !CheckResult(result, expected)) { std::stringstream ss; ss << "UDA failed running in two level distributed mode with " @@ -148,31 +153,31 @@ bool UdaTestHarnessBase::Execute( } template -RESULT UdaTestHarnessBase::ExecuteSingleNode() { - std::vector types; // TODO - boost::scoped_ptr context(UdfTestHarness::CreateTestContext(types)); +RESULT UdaTestHarnessBase::ExecuteSingleNode( + ScopedFunctionContext* context) { INTERMEDIATE intermediate = UdaTestHarnessUtil::CreateIntermediate( - context.get(), fixed_buffer_byte_size_); + context->get(), fixed_buffer_byte_size_); - init_fn_(context.get(), &intermediate); - if (!CheckContext(context.get())) return RESULT::null(); + init_fn_(context->get(), &intermediate); + if (!CheckContext(context->get())) return RESULT::null(); for (int i = 0; i < num_input_values_; ++i) { - Update(i, context.get(), &intermediate); + Update(i, context->get(), &intermediate); } - if (!CheckContext(context.get())) return RESULT::null(); + if (!CheckContext(context->get())) return RESULT::null(); // Single node doesn't need merge or serialize - RESULT result = finalize_fn_(context.get(), intermediate); - UdaTestHarnessUtil::FreeIntermediate(context.get(), intermediate); - if (!CheckContext(context.get())) return RESULT::null(); + RESULT result = finalize_fn_(context->get(), intermediate); + UdaTestHarnessUtil::FreeIntermediate(context->get(), intermediate); + if (!CheckContext(context->get())) return RESULT::null(); return result; } template -RESULT UdaTestHarnessBase::ExecuteOneLevel(int num_nodes) { - std::vector > contexts; +RESULT UdaTestHarnessBase::ExecuteOneLevel(int num_nodes, + ScopedFunctionContext* result_context) { + std::vector > contexts; std::vector intermediates; contexts.resize(num_nodes); @@ -180,58 +185,54 @@ RESULT UdaTestHarnessBase::ExecuteOneLevel(int num_nodes) for (int i = 0; i < num_nodes; ++i) { FunctionContext* cxt = UdfTestHarness::CreateTestContext(types); - contexts[i].reset(cxt); + contexts[i].reset(new ScopedFunctionContext(cxt, this)); intermediates.push_back(UdaTestHarnessUtil::CreateIntermediate( cxt, fixed_buffer_byte_size_)); init_fn_(cxt, &intermediates[i]); if (!CheckContext(cxt)) return RESULT::null(); } - boost::scoped_ptr merge_context( - UdfTestHarness::CreateTestContext(types)); INTERMEDIATE merged = UdaTestHarnessUtil::CreateIntermediate( - merge_context.get(), fixed_buffer_byte_size_); - init_fn_(merge_context.get(), &merged); - if (!CheckContext(merge_context.get())) return RESULT::null(); + result_context->get(), fixed_buffer_byte_size_); + init_fn_(result_context->get(), &merged); + if (!CheckContext(result_context->get())) return RESULT::null(); // Process all the values in the single level num_nodes contexts for (int i = 0; i < num_input_values_; ++i) { int target = i % num_nodes; - Update(i, contexts[target].get(), &intermediates[target]); + Update(i, contexts[target].get()->get(), &intermediates[target]); } // Merge them all into the final for (int i = 0; i < num_nodes; ++i) { - if (!CheckContext(contexts[i].get())) return RESULT::null(); + if (!CheckContext(contexts[i].get()->get())) return RESULT::null(); + INTERMEDIATE serialized = intermediates[i]; if (serialize_fn_ != NULL) { - INTERMEDIATE serialized = serialize_fn_(contexts[i].get(), intermediates[i]); - INTERMEDIATE copy = - UdaTestHarnessUtil::CopyIntermediate( - merge_context.get(), fixed_buffer_byte_size_, serialized); - UdaTestHarnessUtil::FreeIntermediate( - contexts[i].get(), intermediates[i]); - merge_fn_(merge_context.get(), copy, &merged); - if (!CheckContext(contexts[i].get())) return RESULT::null(); - } else { - merge_fn_(merge_context.get(), intermediates[i], &merged); - UdaTestHarnessUtil::FreeIntermediate( - contexts[i].get(), intermediates[i]); + serialized = serialize_fn_(contexts[i].get()->get(), intermediates[i]); } + INTERMEDIATE copy = + UdaTestHarnessUtil::CopyIntermediate( + result_context->get(), fixed_buffer_byte_size_, serialized); + UdaTestHarnessUtil::FreeIntermediate( + contexts[i].get()->get(), intermediates[i]); + merge_fn_(result_context->get(), copy, &merged); + UdaTestHarnessUtil::FreeIntermediate(result_context->get(), copy); + if (!CheckContext(contexts[i].get()->get())) return RESULT::null(); contexts[i].reset(); } - if (!CheckContext(merge_context.get())) return RESULT::null(); + if (!CheckContext(result_context->get())) return RESULT::null(); - RESULT result = finalize_fn_(merge_context.get(), merged); - UdaTestHarnessUtil::FreeIntermediate(merge_context.get(), merged); - if (!CheckContext(merge_context.get())) return RESULT::null(); + RESULT result = finalize_fn_(result_context->get(), merged); + UdaTestHarnessUtil::FreeIntermediate(result_context->get(), merged); + if (!CheckContext(result_context->get())) return RESULT::null(); return result; } template RESULT UdaTestHarnessBase::ExecuteTwoLevel( - int num1, int num2) { - std::vector > level1_contexts, level2_contexts; + int num1, int num2, ScopedFunctionContext* result_context) { + std::vector > level1_contexts, level2_contexts; std::vector level1_intermediates, level2_intermediates; level1_contexts.resize(num1); level2_contexts.resize(num2); @@ -241,7 +242,7 @@ RESULT UdaTestHarnessBase::ExecuteTwoLevel( // Initialize the intermediate contexts and intermediate result buffers for (int i = 0; i < num1; ++i) { FunctionContext* cxt = UdfTestHarness::CreateTestContext(types); - level1_contexts[i].reset(cxt); + level1_contexts[i].reset(new ScopedFunctionContext(cxt, this)); level1_intermediates.push_back( UdaTestHarnessUtil::CreateIntermediate( cxt, fixed_buffer_byte_size_)); @@ -250,7 +251,7 @@ RESULT UdaTestHarnessBase::ExecuteTwoLevel( } for (int i = 0; i < num2; ++i) { FunctionContext* cxt = UdfTestHarness::CreateTestContext(types); - level2_contexts[i].reset(cxt); + level2_contexts[i].reset(new ScopedFunctionContext(cxt, this)); level2_intermediates.push_back( UdaTestHarnessUtil::CreateIntermediate( cxt, fixed_buffer_byte_size_)); @@ -259,70 +260,63 @@ RESULT UdaTestHarnessBase::ExecuteTwoLevel( } // Initialize the final context and final intermediate buffer - boost::scoped_ptr final_context( - UdfTestHarness::CreateTestContext(types)); INTERMEDIATE final_intermediate = UdaTestHarnessUtil::CreateIntermediate( - final_context.get(), fixed_buffer_byte_size_); - init_fn_(final_context.get(), &final_intermediate); - if (!CheckContext(final_context.get())) return RESULT::null(); + result_context->get(), fixed_buffer_byte_size_); + init_fn_(result_context->get(), &final_intermediate); + if (!CheckContext(result_context->get())) return RESULT::null(); // Assign all the input values to level 1 updates for (int i = 0; i < num_input_values_; ++i) { int target = i % num1; - Update(i, level1_contexts[target].get(), &level1_intermediates[target]); + Update(i, level1_contexts[target].get()->get(), &level1_intermediates[target]); } // Serialize the level 1 intermediates and merge them with a level 2 intermediate for (int i = 0; i < num1; ++i) { - if (!CheckContext(level1_contexts[i].get())) return RESULT::null(); + if (!CheckContext(level1_contexts[i].get()->get())) return RESULT::null(); int target = i % num2; + INTERMEDIATE serialized = level1_intermediates[i]; if (serialize_fn_ != NULL) { - INTERMEDIATE serialized = - serialize_fn_(level1_contexts[i].get(), level1_intermediates[i]); - INTERMEDIATE copy = - UdaTestHarnessUtil::CopyIntermediate( - level1_contexts[i].get(), fixed_buffer_byte_size_, serialized); - UdaTestHarnessUtil::FreeIntermediate( - level1_contexts[i].get(), level1_intermediates[i]); - merge_fn_(level2_contexts[target].get(), - copy, &level2_intermediates[target]); - if (!CheckContext(level1_contexts[i].get())) return RESULT::null(); - } else { - merge_fn_(level2_contexts[target].get(), - level1_intermediates[i], &level2_intermediates[target]); - UdaTestHarnessUtil::FreeIntermediate( - level1_contexts[i].get(), level1_intermediates[i]); + serialized = serialize_fn_(level1_contexts[i].get()->get(), level1_intermediates[i]); } + INTERMEDIATE copy = + UdaTestHarnessUtil::CopyIntermediate( + level2_contexts[target].get()->get(), fixed_buffer_byte_size_, serialized); + UdaTestHarnessUtil::FreeIntermediate( + level1_contexts[i].get()->get(), level1_intermediates[i]); + merge_fn_(level2_contexts[target].get()->get(), + copy, &level2_intermediates[target]); + UdaTestHarnessUtil::FreeIntermediate( + level2_contexts[target].get()->get(), copy); + if (!CheckContext(level1_contexts[i].get()->get())) return RESULT::null(); level1_contexts[i].reset(); } // Merge all the level twos into the final for (int i = 0; i < num2; ++i) { - if (!CheckContext(level2_contexts[i].get())) return RESULT::null(); + if (!CheckContext(level2_contexts[i].get()->get())) return RESULT::null(); + INTERMEDIATE serialized = level2_intermediates[i]; if (serialize_fn_ != NULL) { - INTERMEDIATE serialized = - serialize_fn_(level2_contexts[i].get(), level2_intermediates[i]); - INTERMEDIATE copy = - UdaTestHarnessUtil::CopyIntermediate( - level2_contexts[i].get(), fixed_buffer_byte_size_, serialized); - UdaTestHarnessUtil::FreeIntermediate( - level2_contexts[i].get(), level2_intermediates[i]); - merge_fn_(final_context.get(), copy, &final_intermediate); - if (!CheckContext(level2_contexts[i].get())) return RESULT::null(); - } else { - merge_fn_(final_context.get(), level2_intermediates[i], &final_intermediate); - UdaTestHarnessUtil::FreeIntermediate( - level2_contexts[i].get(), level2_intermediates[i]); + serialized = serialize_fn_(level2_contexts[i].get()->get(), level2_intermediates[i]); } + INTERMEDIATE copy = + UdaTestHarnessUtil::CopyIntermediate( + result_context->get(), fixed_buffer_byte_size_, serialized); + UdaTestHarnessUtil::FreeIntermediate( + level2_contexts[i].get()->get(), level2_intermediates[i]); + merge_fn_(result_context->get(), copy, &final_intermediate); + UdaTestHarnessUtil::FreeIntermediate( + result_context->get(), copy); + if (!CheckContext(level2_contexts[i].get()->get())) return RESULT::null(); level2_contexts[i].reset(); } - if (!CheckContext(final_context.get())) return RESULT::null(); + if (!CheckContext(result_context->get())) return RESULT::null(); - RESULT result = finalize_fn_(final_context.get(), final_intermediate); + RESULT result = finalize_fn_(result_context->get(), final_intermediate); UdaTestHarnessUtil::FreeIntermediate( - final_context.get(), final_intermediate); - if (!CheckContext(final_context.get())) return RESULT::null(); + result_context->get(), final_intermediate); + if (!CheckContext(result_context->get())) return RESULT::null(); return result; } diff --git a/be/src/udf/uda-test-harness.h b/be/src/udf/uda-test-harness.h index 99385e4b6..363b0f5ce 100644 --- a/be/src/udf/uda-test-harness.h +++ b/be/src/udf/uda-test-harness.h @@ -77,6 +77,23 @@ class UdaTestHarnessBase { num_input_values_(0) { } + struct ScopedFunctionContext { + ScopedFunctionContext(FunctionContext* context, UdaTestHarnessBase* harness) + : context_(context), harness_(harness) { } + + ~ScopedFunctionContext() { + UdfTestHarness::CloseContext(context_); + harness_->CheckContext(context_); + delete context_; + } + + FunctionContext* get() { return context_; } + + private: + FunctionContext* context_; + UdaTestHarnessBase* harness_; + }; + // Runs the UDA in all the modes, validating the result is 'expected' each time. bool Execute(const RESULT& expected, UdaExecutionMode mode); @@ -88,16 +105,16 @@ class UdaTestHarnessBase { // Runs the UDA on a single node. The entire execution happens in 1 context. // The UDA does a update on all the input values and then a finalize. - RESULT ExecuteSingleNode(); + RESULT ExecuteSingleNode(ScopedFunctionContext* result_context); // Runs the UDA, simulating a single level aggregation. The values are processed // on num_nodes + 1 contexts. There are num_nodes that do update and serialize. // There is a final context that does merge and finalize. - RESULT ExecuteOneLevel(int num_nodes); + RESULT ExecuteOneLevel(int num_nodes, ScopedFunctionContext* result_context); // Runs the UDA, simulating a two level aggregation with num1 in the first level and // num2 in the second. The values are processed in num1 + num2 contexts. - RESULT ExecuteTwoLevel(int num1, int num2); + RESULT ExecuteTwoLevel(int num1, int num2, ScopedFunctionContext* result_context); virtual void Update(int idx, FunctionContext* context, INTERMEDIATE* dst) = 0; diff --git a/be/src/udf/uda-test.cc b/be/src/udf/uda-test.cc index 5fc720825..108c2914b 100644 --- a/be/src/udf/uda-test.cc +++ b/be/src/udf/uda-test.cc @@ -17,6 +17,7 @@ #include "common/logging.h" #include "udf/uda-test-harness.h" +#include "testutil/test-udas.h" using namespace impala; using namespace impala_udf; @@ -110,6 +111,14 @@ void MinUpdate(FunctionContext* context, const StringVal& input, BufferVal* val) // Serialize the state into the min string const BufferVal MinSerialize(FunctionContext* context, const BufferVal& intermediate) { + MinState* state = reinterpret_cast(intermediate); + if (state->value == NULL) return intermediate; + // Hack to persist the intermediate state's value without leaking. + // TODO: revisit BufferVal and design a better way to do this + StringVal copy_buffer(context, state->len); + memcpy(copy_buffer.ptr, state->value, state->len); + context->Free(state->value); + state->value = copy_buffer.ptr; return intermediate; } @@ -126,6 +135,7 @@ StringVal MinFinalize(FunctionContext* context, const BufferVal& val) { if (state->value == NULL) return StringVal::null(); StringVal result = StringVal(context, state->len); memcpy(result.ptr, state->value, state->len); + context->Free(state->value); return result; } @@ -282,6 +292,21 @@ TEST(MinTest, Basic) { EXPECT_TRUE(test.Execute(values, StringVal("ZZZ"))) << test.GetErrorMsg(); } +TEST(MemTest, Basic) { + UdaTestHarness test( + ::MemTestInit, ::MemTestUpdate, ::MemTestMerge, ::MemTestSerialize, + ::MemTestFinalize); + vector input; + for (int i = 0; i < 10; ++i) { + input.push_back(10); + } + EXPECT_TRUE(test.Execute(input, BigIntVal(100))) << test.GetErrorMsg(); + + UdaTestHarness test_leak( + ::MemTestInit, ::MemTestUpdate, ::MemTestMerge, NULL, ::MemTestFinalize); + EXPECT_FALSE(test_leak.Execute(input, BigIntVal(100))) << test.GetErrorMsg(); +} + int main(int argc, char** argv) { impala::InitGoogleLoggingSafe(argv[0]); ::testing::InitGoogleTest(&argc, argv); diff --git a/be/src/udf/udf-internal.h b/be/src/udf/udf-internal.h index d68673de8..980bab796 100644 --- a/be/src/udf/udf-internal.h +++ b/be/src/udf/udf-internal.h @@ -41,6 +41,11 @@ class FunctionContextImpl { FunctionContextImpl(impala_udf::FunctionContext* parent); + // Checks for any outstanding memory allocations. If there is unfreed memory, adds a + // warning and frees the allocations. Note that local allocations are freed with the + // MemPool backing pool_. + void Close(); + // Allocates a buffer of 'byte_size' with "local" memory management. These // allocations are not freed one by one but freed as a pool by FreeLocalAllocations() // This is used where the lifetime of the allocation is clear. @@ -51,12 +56,6 @@ class FunctionContextImpl { // Frees all allocations returned by AllocateLocal(). void FreeLocalAllocations(); - // Returns true if there are no outstanding allocations. - bool CheckAllocationsEmpty(); - - // Returns true if there are no outstanding local allocations. - bool CheckLocalAlloctionsEmpty(); - // Sets constant_args_. The AnyVal* values are owned by the caller. void SetConstantArgs(const std::vector& constant_args); @@ -107,6 +106,9 @@ class FunctionContextImpl { // indicates that the corresponding argument is non-constant. Otherwise contains the // value of the argument. std::vector constant_args_; + + // Indicates whether this context has been closed. Used for verification/debugging. + bool closed_; }; } diff --git a/be/src/udf/udf-test-harness.cc b/be/src/udf/udf-test-harness.cc index 93b878b69..c51ee891b 100644 --- a/be/src/udf/udf-test-harness.cc +++ b/be/src/udf/udf-test-harness.cc @@ -34,3 +34,7 @@ void UdfTestHarness::SetConstantArgs( } context->impl()->SetConstantArgs(constant_args); } + +void UdfTestHarness::CloseContext(FunctionContext* context) { + context->impl()->Close(); +} diff --git a/be/src/udf/udf-test-harness.h b/be/src/udf/udf-test-harness.h index 096570e21..3af701d34 100644 --- a/be/src/udf/udf-test-harness.h +++ b/be/src/udf/udf-test-harness.h @@ -47,6 +47,10 @@ class UdfTestHarness { static void SetConstantArgs( FunctionContext* context, const std::vector& constant_args); + // Test contexts should be closed in order to check for UDF memory leaks. Leaks cause + // the error to be set on context. + static void CloseContext(FunctionContext* context); + // Template function to execute a UDF and validate the result. They should be // used like: // ValidateUdf(udf_fn, arg1, arg2, ..., expected_result); @@ -65,6 +69,7 @@ class UdfTestHarness { if (!RunPrepareFn(init_fn, context.get())) return false; RET ret = fn(context.get()); RunCloseFn(close_fn, context.get()); + CloseContext(context.get()); return Validate(context.get(), expected, ret); } @@ -270,13 +275,16 @@ class UdfTestHarness { template static bool Validate(FunctionContext* context, const RET& expected, const RET& actual) { - if (!ValidateError(context)) return false; - if (actual == expected) return true; - - std::cerr << "UDF did not return the correct result:" << std::endl - << " Expected: " << DebugString(expected) << std::endl - << " Actual: " << DebugString(actual) << std::endl; - return false; + bool valid = true; + if (!context->has_error() && actual != expected) { + std::cerr << "UDF did not return the correct result:" << std::endl + << " Expected: " << DebugString(expected) << std::endl + << " Actual: " << DebugString(actual) << std::endl; + valid = false; + } + CloseContext(context); + if (!ValidateError(context)) valid = false; + return valid; } static bool RunPrepareFn(UdfPrepare prepare_fn, FunctionContext* context) { @@ -290,7 +298,8 @@ class UdfTestHarness { static void RunCloseFn(UdfClose close_fn, FunctionContext* context) { if (close_fn != NULL) { - close_fn(context, FunctionContext::FRAGMENT_LOCAL); + // TODO: FRAGMENT_LOCAL + close_fn(context, FunctionContext::THREAD_LOCAL); } } }; diff --git a/be/src/udf/udf-test.cc b/be/src/udf/udf-test.cc index 76c12abcc..642b1f482 100644 --- a/be/src/udf/udf-test.cc +++ b/be/src/udf/udf-test.cc @@ -17,6 +17,7 @@ #include #include "common/logging.h" +#include "testutil/test-udfs.h" #include "udf/udf-test-harness.h" using namespace boost; @@ -174,9 +175,9 @@ TEST(UdfTest, TestFunctionContext) { scoped_ptr arg(new SmallIntVal(100)); vector constant_args; constant_args.push_back(arg.get()); - UdfTestHarness::ValidateUdf( + EXPECT_TRUE((UdfTestHarness::ValidateUdf( ValidateSharedState, *arg, SmallIntVal::null(), - ValidateSharedStatePrepare, ValidateSharedStateClose, constant_args); + ValidateSharedStatePrepare, ValidateSharedStateClose, constant_args))); } TEST(UdfTest, TestValidate) { @@ -226,6 +227,23 @@ TEST(UdfTest, TestVarArgs) { NumVarArgs, BigIntVal(0), args, IntVal(args.size())))); } +TEST(UdfTest, MemTest) { + scoped_ptr bytes_arg(new BigIntVal(1000)); + vector constant_args; + constant_args.push_back(bytes_arg.get()); + + EXPECT_TRUE((UdfTestHarness::ValidateUdf( + ::MemTest, *bytes_arg, *bytes_arg, ::MemTestPrepare, ::MemTestClose, + constant_args))); + + EXPECT_FALSE((UdfTestHarness::ValidateUdf( + ::MemTest, *bytes_arg, *bytes_arg, ::MemTestPrepare, NULL, constant_args))); + + EXPECT_FALSE((UdfTestHarness::ValidateUdf( + ::DoubleFreeTest, *bytes_arg, *bytes_arg))); + +} + int main(int argc, char** argv) { impala::InitGoogleLoggingSafe(argv[0]); ::testing::InitGoogleTest(&argc, argv); diff --git a/be/src/udf/udf.cc b/be/src/udf/udf.cc index 6615e5af5..500e1986e 100644 --- a/be/src/udf/udf.cc +++ b/be/src/udf/udf.cc @@ -30,6 +30,12 @@ // When they build their library to a .so, they'd use the version of FunctionContext // in the main binary, which does include FreePool. namespace impala { +class MemTracker { + public: + void Consume(int64_t bytes) { } + void Release(int64_t bytes) { } +}; + class FreePool { public: FreePool(MemPool*) { } @@ -45,6 +51,11 @@ class FreePool { void Free(uint8_t* ptr) { free(ptr); } + + MemTracker* mem_tracker() { return &mem_tracker_; } + + private: + MemTracker mem_tracker_; }; class RuntimeState { @@ -63,6 +74,7 @@ class RuntimeState { } #else #include "runtime/free-pool.h" +#include "runtime/mem-tracker.h" #include "runtime/runtime-state.h" #endif @@ -87,10 +99,7 @@ FunctionContext::FunctionContext() : impl_(new FunctionContextImpl(this)) { } FunctionContext::~FunctionContext() { - // TODO: this needs to free local allocations but there's a mem issue - // in the uda harness now. - impl_->CheckLocalAlloctionsEmpty(); - impl_->CheckAllocationsEmpty(); + assert(impl_->closed_ && "FunctionContext wasn't closed!"); delete impl_->pool_; delete impl_; } @@ -100,7 +109,47 @@ FunctionContextImpl::FunctionContextImpl(FunctionContext* parent) num_warnings_(0), thread_local_fn_state_(NULL), fragment_local_fn_state_(NULL), - external_bytes_tracked_(0) { + external_bytes_tracked_(0), + closed_(false) { +} + +void FunctionContextImpl::Close() { + if (!allocations_.empty()) { + int bytes = 0; + for (map::iterator i = allocations_.begin(); + i != allocations_.end(); ++i) { + bytes += i->second; + } + stringstream ss; + ss << bytes << " bytes leaked via FunctionContext::Allocate()"; + context_->SetError(ss.str().c_str()); +#ifndef IMPALA_UDF_SDK_BUILD + // TODO: this is a stopgap because setting the error in Close() causes it to not be + // displayed in the shell. + LOG(WARNING) << ss.str(); +#endif + } + allocations_.clear(); + + FreeLocalAllocations(); + + if (external_bytes_tracked_ > 0) { + stringstream ss; + ss << external_bytes_tracked_ + << " bytes leaked via FunctionContext::TrackAllocation()"; + context_->SetError(ss.str().c_str()); +#ifndef IMPALA_UDF_SDK_BUILD + // TODO: this is a stopgap because setting the error in Close() causes it to not be + // displayed in the shell. + LOG(WARNING) << ss.str(); +#endif + } + // This isn't ideal because the memory is still leaked, but don't track it so our + // accounting stays sane. + // TODO: we need to modify the memtrackers to allow leaked user-allocated memory. + context_->Free(external_bytes_tracked_); + + closed_ = true; } FunctionContext::ImpalaVersion FunctionContext::version() const { @@ -162,15 +211,26 @@ void FunctionContext::Free(uint8_t* buffer) { } } else { impl_->allocations_.erase(buffer); + impl_->pool_->Free(buffer); } } void FunctionContext::TrackAllocation(int64_t bytes) { impl_->external_bytes_tracked_ += bytes; + impl_->pool_->mem_tracker()->Consume(bytes); } void FunctionContext::Free(int64_t bytes) { + if (bytes > impl_->external_bytes_tracked_) { + stringstream ss; + ss << "FunctionContext::Free() called with " << bytes << " bytes, but only " + << impl_->external_bytes_tracked_ << " bytes are tracked via " + << "FunctionContext::TrackAllocation()"; + SetError(ss.str().c_str()); + return; + } impl_->external_bytes_tracked_ -= bytes; + impl_->pool_->mem_tracker()->Release(bytes); } void FunctionContext::SetError(const char* error_msg) { @@ -187,6 +247,14 @@ bool FunctionContext::AddWarning(const char* warning_msg) { stringstream ss; ss << "UDF WARNING: " << warning_msg; if (impl_->state_ != NULL) { +#ifndef IMPALA_UDF_SDK_BUILD + // If this is called while the query is being closed, the runtime state log will have + // already been displayed to the user. Also log the warning so there's some chance + // the user will actually see it. + // TODO: somehow print the full error log in the shell? This is a problem for any + // function using LogError() during close. + LOG(WARNING) << ss.str(); +#endif return impl_->state_->LogError(ss.str()); } else { cerr << ss.str() << endl; @@ -237,20 +305,6 @@ void FunctionContextImpl::FreeLocalAllocations() { local_allocations_.clear(); } -bool FunctionContextImpl::CheckAllocationsEmpty() { - if (allocations_.empty() && external_bytes_tracked_ == 0) return true; - // TODO: fix this - //if (debug_) context_->SetError("Leaked allocations."); - return false; -} - -bool FunctionContextImpl::CheckLocalAlloctionsEmpty() { - if (local_allocations_.empty()) return true; - // TODO: fix this - //if (debug_) context_->SetError("Leaked local allocations."); - return false; -} - void FunctionContextImpl::SetConstantArgs(const vector& constant_args) { constant_args_ = constant_args; } diff --git a/be/src/udf/udf.h b/be/src/udf/udf.h index c03359414..94ccf0b38 100644 --- a/be/src/udf/udf.h +++ b/be/src/udf/udf.h @@ -275,9 +275,10 @@ typedef void (*UdfClose)(FunctionContext* context, //---------------------------------------------------------------------------- // The UDA execution is broken up into a few steps. The general calling pattern // is one of these: -// 1) Init(), Evaluate() (repeatedly), Serialize() -// 2) Init(), Merge() (repeatedly), Serialize() -// 3) Init(), Finalize() +// 1) Init(), Update() (repeatedly), Serialize() +// 2) Init(), Update() (repeatedly), Finalize() +// 3) Init(), Merge() (repeatedly), Serialize() +// 4) Init(), Merge() (repeatedly), Finalize() // The UDA is registered with three types: the result type, the input type and // the intermediate type. // @@ -289,11 +290,13 @@ typedef void (*UdfClose)(FunctionContext* context, // intermediate type should be string and the UDA can cast the ptr to the structure // it is using. // -// Memory Management: For allocations that are not returned to Impala, the UDA -// should use the FunctionContext::Allocate()/Free() methods. For StringVal allocations -// returned to Impala (e.g. UdaSerialize()), the UDA should allocate the result -// via StringVal(FunctionContext*, int) ctor and Impala will automatically handle -// freeing it. +// Memory Management: For allocations that are not returned to Impala, the UDA should use +// the FunctionContext::Allocate()/Free() methods. In general, Allocate() is called in +// Init(), and then Free() must be called in both Serialize() and Finalize(), since +// either of these functions may be called to clean up the state. For StringVal +// allocations returned to Impala (e.g. returned by UdaSerialize()), the UDA should +// allocate the result via StringVal(FunctionContext*, int) ctor and Impala will +// automatically handle freeing it. // // For clarity in documenting the UDA interface, the various types will be typedefed // here. The actual execution resolves all the types at runtime and none of these types @@ -320,7 +323,7 @@ typedef void (*UdaMerge)(FunctionContext* context, const IntermediateType& src, IntermediateType* dst); // Serialize the intermediate type. The serialized data is then sent across the -// wire. This is not called unless the intermediate type is String. +// wire. // No additional functions will be called with this FunctionContext object and the // UDA should do final clean (e.g. Free()) here. typedef const IntermediateType (*UdaSerialize)(FunctionContext* context, diff --git a/be/src/udf_samples/hyperloglog-uda.cc b/be/src/udf_samples/hyperloglog-uda.cc index 392bcb45b..7d0cec98b 100644 --- a/be/src/udf_samples/hyperloglog-uda.cc +++ b/be/src/udf_samples/hyperloglog-uda.cc @@ -81,6 +81,14 @@ void HllMerge(FunctionContext* ctx, const StringVal& src, StringVal* dst) { } } +StringVal HllSerialize(FunctionContext* ctx, const StringVal& src) { + if (src.is_null) return src; + StringVal result(ctx, src.len); + memcpy(result.ptr, src.ptr, src.len); + ctx->Free(src.ptr); + return result; +} + StringVal HllFinalize(FunctionContext* ctx, const StringVal& src) { assert(!src.is_null); assert(src.len == pow(2, HLL_PRECISION)); @@ -119,6 +127,7 @@ StringVal HllFinalize(FunctionContext* ctx, const StringVal& src) { string out_str = out.str(); StringVal result_str(ctx, out_str.size()); memcpy(result_str.ptr, out_str.c_str(), result_str.len); + ctx->Free(src.ptr); return result_str; } diff --git a/fe/src/main/java/com/cloudera/impala/catalog/Catalog.java b/fe/src/main/java/com/cloudera/impala/catalog/Catalog.java index e1347d68c..b7916a0cc 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/Catalog.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/Catalog.java @@ -542,6 +542,8 @@ public abstract class Catalog { "14InitNullStringEPN10impala_udf15FunctionContextEPNS1_9StringValE"; final String initNull = prefix + "8InitNullEPN10impala_udf15FunctionContextEPNS1_6AnyValE"; + final String stringValSerializeOrFinalize = prefix + + "28StringValSerializeOrFinalizeEPN10impala_udf15FunctionContextERKNS1_9StringValE"; Db db = builtinsDb_; // Count (*) @@ -566,17 +568,19 @@ public abstract class Catalog { null, null, false)); // Min String minMaxInit = t.isStringType() ? initNullString : initNull; + String minMaxSerializeOrFinalize = t.isStringType() ? + stringValSerializeOrFinalize : null; db.addBuiltin(AggregateFunction.createBuiltin(db, "min", Lists.newArrayList(t), t, t, minMaxInit, prefix + MIN_UPDATE_SYMBOL.get(t), prefix + MIN_UPDATE_SYMBOL.get(t), - null, null, true)); + minMaxSerializeOrFinalize, minMaxSerializeOrFinalize, true)); // Max db.addBuiltin(AggregateFunction.createBuiltin(db, "max", Lists.newArrayList(t), t, t, minMaxInit, prefix + MAX_UPDATE_SYMBOL.get(t), prefix + MAX_UPDATE_SYMBOL.get(t), - null, null, true)); + minMaxSerializeOrFinalize, minMaxSerializeOrFinalize, true)); // NDV // TODO: this needs to switch to CHAR(64) as the intermediate type db.addBuiltin(AggregateFunction.createBuiltin(db, "ndv", @@ -584,7 +588,7 @@ public abstract class Catalog { prefix + "7HllInitEPN10impala_udf15FunctionContextEPNS1_9StringValE", prefix + HLL_UPDATE_SYMBOL.get(t), prefix + "8HllMergeEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_", - null, + stringValSerializeOrFinalize, prefix + "11HllFinalizeEPN10impala_udf15FunctionContextERKNS1_9StringValE", true)); @@ -595,7 +599,7 @@ public abstract class Catalog { prefix + "6PcInitEPN10impala_udf15FunctionContextEPNS1_9StringValE", prefix + PC_UPDATE_SYMBOL.get(t), prefix + "7PcMergeEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_", - null, + stringValSerializeOrFinalize, prefix + "10PcFinalizeEPN10impala_udf15FunctionContextERKNS1_9StringValE", true)); @@ -606,7 +610,7 @@ public abstract class Catalog { prefix + "6PcInitEPN10impala_udf15FunctionContextEPNS1_9StringValE", prefix + PCSA_UPDATE_SYMBOL.get(t), prefix + "7PcMergeEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_", - null, + stringValSerializeOrFinalize, prefix + "12PcsaFinalizeEPN10impala_udf15FunctionContextERKNS1_9StringValE", true)); } @@ -652,7 +656,7 @@ public abstract class Catalog { initNullString, prefix + "12StringConcatEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_", prefix + "12StringConcatEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_", - null, null, false)); + stringValSerializeOrFinalize, stringValSerializeOrFinalize, false)); // Group_concat(string, string) db.addBuiltin(AggregateFunction.createBuiltin(db, "group_concat", Lists.newArrayList(ColumnType.STRING, ColumnType.STRING), @@ -661,6 +665,6 @@ public abstract class Catalog { prefix + "12StringConcatEPN10impala_udf15FunctionContextERKNS1_9StringValES6_PS4_", prefix + "12StringConcatEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_", - null, null, false)); + stringValSerializeOrFinalize, stringValSerializeOrFinalize, false)); } } diff --git a/testdata/workloads/functional-query/queries/QueryTest/uda-mem-limit.test b/testdata/workloads/functional-query/queries/QueryTest/uda-mem-limit.test new file mode 100644 index 000000000..16ac4707a --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/uda-mem-limit.test @@ -0,0 +1,12 @@ +==== +---- QUERY +create database if not exists native_function_test; +use native_function_test; + +drop function if exists agg_memtest(bigint); + +create aggregate function agg_memtest(bigint) returns bigint +location '/test-warehouse/libTestUdas.so' update_fn='MemTestUpdate'; + +select agg_memtest(bigint_col * 10 * 1024 * 1024) from functional.alltypes; +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/udf-mem-limit.test b/testdata/workloads/functional-query/queries/QueryTest/udf-mem-limit.test new file mode 100644 index 000000000..f8df3cc55 --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/udf-mem-limit.test @@ -0,0 +1,14 @@ +==== +---- QUERY +create database if not exists native_function_test; +use native_function_test; + +drop function if exists memtest(bigint); + +create function memtest(bigint) returns bigint +location '/test-warehouse/libTestUdfs.so' symbol='MemTest' +prepare_fn='_Z14MemTestPreparePN10impala_udf15FunctionContextENS0_18FunctionStateScopeE' +close_fn='_Z12MemTestClosePN10impala_udf15FunctionContextENS0_18FunctionStateScopeE'; + +select * from functional.alltypes where bigint_col > memtest(10 * 1024 * 1024) +==== diff --git a/tests/query_test/test_udfs.py b/tests/query_test/test_udfs.py index 8455e7fde..fd85867e3 100644 --- a/tests/query_test/test_udfs.py +++ b/tests/query_test/test_udfs.py @@ -1,6 +1,7 @@ #!/usr/bin/env python # Copyright (c) 2012 Cloudera, Inc. All rights reserved. +from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.test_vector import * from tests.common.impala_test_suite import * from tests.common.impala_cluster import ImpalaCluster @@ -106,6 +107,28 @@ class TestUdfs(ImpalaTestSuite): assert results.success assert len(results.data) == 9999 + def test_mem_limits(self, vector): + # Set the mem limit high enough that a simple scan can run + mem_limit = 1024 * 1024 + vector.get_value('exec_option')['mem_limit'] = mem_limit + + try: + self.run_test_case('QueryTest/udf-mem-limit', vector) + assert False, "Query was expected to fail" + except ImpalaBeeswaxException, e: + self.__check_exception(e) + + try: + self.run_test_case('QueryTest/uda-mem-limit', vector) + assert False, "Query was expected to fail" + except ImpalaBeeswaxException, e: + self.__check_exception(e) + + def __check_exception(self, e): + if ('Memory limit exceeded' not in e.inner_exception.message and + 'Cancelled' not in e.inner_exception.message): + raise e + def __run_query_all_impalads(self, exec_options, query, expected): impala_cluster = ImpalaCluster() for impalad in impala_cluster.impalads: