IMPALA-3647: track runtime filter memory in separate tracker

This change breaks out runtime filter memory consumption from the
query-wide tracker to improve debuggability of memory limit exceeded
errors.

Testing: ran exhaustive tests, ran local and cluster stress tests.

Change-Id: I9f28f3b55b5c62e6f0f9838c5947c9446d444d20
Reviewed-on: http://gerrit.cloudera.org:8080/3247
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Reviewed-by: Michael Ho <kwho@cloudera.com>
Tested-by: Internal Jenkins
This commit is contained in:
Tim Armstrong
2016-05-24 23:37:28 -07:00
parent 7ac341d427
commit 585ee48dc7
5 changed files with 23 additions and 9 deletions

View File

@@ -180,6 +180,7 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) {
runtime_state_->InitMemTrackers(query_id_, &fragment_instance_ctx.request_pool,
bytes_limit, rm_reservation_size_bytes);
RETURN_IF_ERROR(runtime_state_->CreateBlockMgr());
runtime_state_->InitFilterBank();
// Reserve one main thread from the pool
runtime_state_->resource_pool()->AcquireThreadToken();

View File

@@ -62,6 +62,9 @@ RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* s
default_filter_size_ = max<int64_t>(default_filter_size_, min_filter_size_);
default_filter_size_ =
BitUtil::RoundUpToPowerOfTwo(min<int64_t>(default_filter_size_, max_filter_size_));
filter_mem_tracker_.reset(new MemTracker(-1, -1, "Runtime Filter Bank",
state->instance_mem_tracker(), false));
}
RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& filter_desc,
@@ -166,7 +169,7 @@ void RuntimeFilterBank::PublishGlobalFilter(int32_t filter_id,
BloomFilter::GetExpectedHeapSpaceUsed(thrift_filter.log_heap_space);
// Silently fail to publish the filter (replacing it with a 0-byte complete one) if
// there's not enough memory for it.
if (!state_->query_mem_tracker()->TryConsume(required_space)) {
if (!filter_mem_tracker_->TryConsume(required_space)) {
VLOG_QUERY << "No memory for global filter: " << filter_id
<< " (fragment instance: " << state_->fragment_instance_id() << ")";
it->second->SetBloomFilter(BloomFilter::ALWAYS_TRUE_FILTER);
@@ -191,7 +194,7 @@ BloomFilter* RuntimeFilterBank::AllocateScratchBloomFilter(int32_t filter_id) {
// Track required space
int64_t log_filter_size = Bits::Log2Ceiling64(it->second->filter_size());
int64_t required_space = BloomFilter::GetExpectedHeapSpaceUsed(log_filter_size);
if (!state_->query_mem_tracker()->TryConsume(required_space)) return NULL;
if (!filter_mem_tracker_->TryConsume(required_space)) return NULL;
BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(log_filter_size));
DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed());
memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed());
@@ -217,6 +220,7 @@ void RuntimeFilterBank::Close() {
lock_guard<mutex> l(runtime_filter_lock_);
closed_ = true;
obj_pool_.Clear();
state_->query_mem_tracker()->Release(memory_allocated_->value());
filter_mem_tracker_->Release(memory_allocated_->value());
filter_mem_tracker_->UnregisterFromParent();
}

View File

@@ -19,12 +19,14 @@
#include "runtime/types.h"
#include "util/runtime-profile.h"
#include <boost/scoped_ptr.hpp>
#include <boost/thread/lock_guard.hpp>
#include <boost/unordered_map.hpp>
namespace impala {
class BloomFilter;
class MemTracker;
class RuntimeFilter;
class RuntimeState;
class TBloomFilter;
@@ -127,6 +129,9 @@ class RuntimeFilterBank {
/// Object pool to track allocated Bloom filters.
ObjectPool obj_pool_;
/// MemTracker to track Bloom filter memory.
boost::scoped_ptr<MemTracker> filter_mem_tracker_;
/// True iff Close() has been called. Used to prevent races between
/// AllocateScratchBloomFilter() and Close().
bool closed_;

View File

@@ -76,8 +76,7 @@ RuntimeState::RuntimeState(const TExecPlanFragmentParams& fragment_params,
"Fragment " + PrintId(fragment_ctx().fragment_instance_id)),
is_cancelled_(false),
query_resource_mgr_(NULL),
root_node_id_(-1),
filter_bank_(new RuntimeFilterBank(query_ctx(), this)) {
root_node_id_(-1) {
Status status = Init(exec_env);
DCHECK(status.ok()) << status.GetDetail();
}
@@ -91,11 +90,9 @@ RuntimeState::RuntimeState(const TQueryCtx& query_ctx)
profile_(obj_pool_.get(), "<unnamed>"),
is_cancelled_(false),
query_resource_mgr_(NULL),
root_node_id_(-1),
filter_bank_(new RuntimeFilterBank(query_ctx, this)) {
root_node_id_(-1) {
fragment_params_.__set_query_ctx(query_ctx);
fragment_params_.query_ctx.request.query_options
.__set_batch_size(DEFAULT_BATCH_SIZE);
fragment_params_.query_ctx.request.query_options.__set_batch_size(DEFAULT_BATCH_SIZE);
}
RuntimeState::~RuntimeState() {
@@ -161,6 +158,10 @@ void RuntimeState::InitMemTrackers(const TUniqueId& query_id, const string* pool
runtime_profile()->name(), query_mem_tracker_.get()));
}
void RuntimeState::InitFilterBank() {
filter_bank_.reset(new RuntimeFilterBank(query_ctx(), this));
}
Status RuntimeState::CreateBlockMgr() {
DCHECK(block_mgr_.get() == NULL);

View File

@@ -81,6 +81,9 @@ class RuntimeState {
void InitMemTrackers(const TUniqueId& query_id, const std::string* request_pool,
int64_t query_bytes_limit, int64_t query_rm_reservation_limit_bytes = -1);
/// Initializes the runtime filter bank. Must be called after InitMemTrackers().
void InitFilterBank();
/// Gets/Creates the query wide block mgr.
Status CreateBlockMgr();