diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc index ef3920669..1d828ef73 100644 --- a/be/src/runtime/plan-fragment-executor.cc +++ b/be/src/runtime/plan-fragment-executor.cc @@ -180,6 +180,7 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) { runtime_state_->InitMemTrackers(query_id_, &fragment_instance_ctx.request_pool, bytes_limit, rm_reservation_size_bytes); RETURN_IF_ERROR(runtime_state_->CreateBlockMgr()); + runtime_state_->InitFilterBank(); // Reserve one main thread from the pool runtime_state_->resource_pool()->AcquireThreadToken(); diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc index 91f12c994..36f7b5fd4 100644 --- a/be/src/runtime/runtime-filter-bank.cc +++ b/be/src/runtime/runtime-filter-bank.cc @@ -62,6 +62,9 @@ RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* s default_filter_size_ = max(default_filter_size_, min_filter_size_); default_filter_size_ = BitUtil::RoundUpToPowerOfTwo(min(default_filter_size_, max_filter_size_)); + + filter_mem_tracker_.reset(new MemTracker(-1, -1, "Runtime Filter Bank", + state->instance_mem_tracker(), false)); } RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& filter_desc, @@ -166,7 +169,7 @@ void RuntimeFilterBank::PublishGlobalFilter(int32_t filter_id, BloomFilter::GetExpectedHeapSpaceUsed(thrift_filter.log_heap_space); // Silently fail to publish the filter (replacing it with a 0-byte complete one) if // there's not enough memory for it. - if (!state_->query_mem_tracker()->TryConsume(required_space)) { + if (!filter_mem_tracker_->TryConsume(required_space)) { VLOG_QUERY << "No memory for global filter: " << filter_id << " (fragment instance: " << state_->fragment_instance_id() << ")"; it->second->SetBloomFilter(BloomFilter::ALWAYS_TRUE_FILTER); @@ -191,7 +194,7 @@ BloomFilter* RuntimeFilterBank::AllocateScratchBloomFilter(int32_t filter_id) { // Track required space int64_t log_filter_size = Bits::Log2Ceiling64(it->second->filter_size()); int64_t required_space = BloomFilter::GetExpectedHeapSpaceUsed(log_filter_size); - if (!state_->query_mem_tracker()->TryConsume(required_space)) return NULL; + if (!filter_mem_tracker_->TryConsume(required_space)) return NULL; BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(log_filter_size)); DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed()); memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed()); @@ -217,6 +220,7 @@ void RuntimeFilterBank::Close() { lock_guard l(runtime_filter_lock_); closed_ = true; obj_pool_.Clear(); - state_->query_mem_tracker()->Release(memory_allocated_->value()); + filter_mem_tracker_->Release(memory_allocated_->value()); + filter_mem_tracker_->UnregisterFromParent(); } diff --git a/be/src/runtime/runtime-filter-bank.h b/be/src/runtime/runtime-filter-bank.h index 4703a0f5e..1bf38703f 100644 --- a/be/src/runtime/runtime-filter-bank.h +++ b/be/src/runtime/runtime-filter-bank.h @@ -19,12 +19,14 @@ #include "runtime/types.h" #include "util/runtime-profile.h" +#include #include #include namespace impala { class BloomFilter; +class MemTracker; class RuntimeFilter; class RuntimeState; class TBloomFilter; @@ -127,6 +129,9 @@ class RuntimeFilterBank { /// Object pool to track allocated Bloom filters. ObjectPool obj_pool_; + /// MemTracker to track Bloom filter memory. + boost::scoped_ptr filter_mem_tracker_; + /// True iff Close() has been called. Used to prevent races between /// AllocateScratchBloomFilter() and Close(). bool closed_; diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc index 3701907e7..afbe3b6eb 100644 --- a/be/src/runtime/runtime-state.cc +++ b/be/src/runtime/runtime-state.cc @@ -76,8 +76,7 @@ RuntimeState::RuntimeState(const TExecPlanFragmentParams& fragment_params, "Fragment " + PrintId(fragment_ctx().fragment_instance_id)), is_cancelled_(false), query_resource_mgr_(NULL), - root_node_id_(-1), - filter_bank_(new RuntimeFilterBank(query_ctx(), this)) { + root_node_id_(-1) { Status status = Init(exec_env); DCHECK(status.ok()) << status.GetDetail(); } @@ -91,11 +90,9 @@ RuntimeState::RuntimeState(const TQueryCtx& query_ctx) profile_(obj_pool_.get(), ""), is_cancelled_(false), query_resource_mgr_(NULL), - root_node_id_(-1), - filter_bank_(new RuntimeFilterBank(query_ctx, this)) { + root_node_id_(-1) { fragment_params_.__set_query_ctx(query_ctx); - fragment_params_.query_ctx.request.query_options - .__set_batch_size(DEFAULT_BATCH_SIZE); + fragment_params_.query_ctx.request.query_options.__set_batch_size(DEFAULT_BATCH_SIZE); } RuntimeState::~RuntimeState() { @@ -161,6 +158,10 @@ void RuntimeState::InitMemTrackers(const TUniqueId& query_id, const string* pool runtime_profile()->name(), query_mem_tracker_.get())); } +void RuntimeState::InitFilterBank() { + filter_bank_.reset(new RuntimeFilterBank(query_ctx(), this)); +} + Status RuntimeState::CreateBlockMgr() { DCHECK(block_mgr_.get() == NULL); diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h index 22cecafe8..f1047249f 100644 --- a/be/src/runtime/runtime-state.h +++ b/be/src/runtime/runtime-state.h @@ -81,6 +81,9 @@ class RuntimeState { void InitMemTrackers(const TUniqueId& query_id, const std::string* request_pool, int64_t query_bytes_limit, int64_t query_rm_reservation_limit_bytes = -1); + /// Initializes the runtime filter bank. Must be called after InitMemTrackers(). + void InitFilterBank(); + /// Gets/Creates the query wide block mgr. Status CreateBlockMgr();