mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
IMPALA-13478: Sync tuple cache files to disk asynchronously
When a tuple cache entry is first being written, we want to sync the contents to disk. Currently, that happens on the fast path and delays the query results, sometimes significantly. This moves the Sync() call off of the fast path by passing the work to a thread pool. The threads in the pool open the file, sync it to disk, then close the file. If anything goes wrong, the cache entry is evicted. The tuple cache can generate writes very quickly, so this needs a backpressure mechanism to avoid overwhelming the disk. In particular, it needs to avoid accumulating dirty buffers to the point that the OS throttles new writes, delaying the query fast path. This implements a limit on outstanding writes (i.e. writes that have not been flushed to disk). To enforce it, writers now call UpdateWriteSize() to reserve space before writing. UpdateWriteSize() can fail if it hits the limit on outstanding writes or if this particular cache entry has hit the maximum size. When it fails, the writer should abort writing the cache entry. Since UpdateWriteSize() is updating the charge in the cache, the outstanding writes are being counted against the capacity, triggering evictions. This improves the tuple cache's adherence to the capacity limit. The outstanding writes limits is configured via the tuple_cache_outstanding_write_limit startup flag, which is either a specific size string (e.g. 1GB) or a percentage of the process memory limit. To avoid updating the cache charge very frequently, this has an update chunk size specified by tuple_cache_outstanding_write_chunk_bytes. This adds counters at the daemon level: - outstanding write bytes - number of writes halted due to backpressure - number of sync calls that fail (due to IO errors) - number of sync calls dropped due to queue backpressure The runtime profile adds a NumTupleCacheBackpressureHalted counter that is set when a write hits the outstanding write limit. This has a startup option to add randomness to the tuple cache keys to make it easy to test a scenario with no cache hits. Testing: - Added unit tests to tuple-cache-mgr-test - Testing with TPC-DS on a cluster with fast NVME SSDs showed a significant improvement in the first-run times due to the asynchronous syncs. - Testing with TPC-H on a system with a slow disk and zero cache hits showed improved behavior with the backpressure Change-Id: I646bb56300656d8b8ac613cb8fe2f85180b386d3 Reviewed-on: http://gerrit.cloudera.org:8080/22215 Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com> Reviewed-by: Michael Smith <michael.smith@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
@@ -60,6 +60,8 @@ Status TupleCacheNode::Prepare(RuntimeState* state) {
|
||||
num_hits_counter_ = ADD_COUNTER(runtime_profile(), "NumTupleCacheHits", TUnit::UNIT);
|
||||
num_halted_counter_ =
|
||||
ADD_COUNTER(runtime_profile(), "NumTupleCacheHalted", TUnit::UNIT);
|
||||
num_backpressure_halted_counter_ =
|
||||
ADD_COUNTER(runtime_profile(), "NumTupleCacheBackpressureHalted", TUnit::UNIT);
|
||||
num_skipped_counter_ =
|
||||
ADD_COUNTER(runtime_profile(), "NumTupleCacheSkipped", TUnit::UNIT);
|
||||
|
||||
@@ -128,7 +130,10 @@ Status TupleCacheNode::Open(RuntimeState* state) {
|
||||
}
|
||||
} else if (tuple_cache_mgr->IsAvailableForWrite(handle_)) {
|
||||
writer_ = make_unique<TupleFileWriter>(tuple_cache_mgr->GetPath(handle_),
|
||||
mem_tracker(), runtime_profile(), tuple_cache_mgr->MaxSize());
|
||||
mem_tracker(), runtime_profile(),
|
||||
[this, tuple_cache_mgr] (size_t new_size) {
|
||||
return tuple_cache_mgr->RequestWriteSize(&this->handle_, new_size);
|
||||
});
|
||||
Status status = writer_->Open(state);
|
||||
if (!status.ok()) {
|
||||
LOG(WARNING) << "Could not write cache entry for "
|
||||
@@ -282,18 +287,23 @@ Status TupleCacheNode::GetNext(
|
||||
// If there was an error or we exceeded the file size limit, stop caching but
|
||||
// continue reading from the child node.
|
||||
if (!status.ok()) {
|
||||
if (writer_->ExceededMaxSize()) {
|
||||
bool set_tombstone = false;
|
||||
if (status.code() == TErrorCode::TUPLE_CACHE_ENTRY_SIZE_LIMIT_EXCEEDED) {
|
||||
VLOG_FILE << "Tuple Cache entry for " << combined_key_
|
||||
<< " hit the maximum file size: " << status.GetDetail();
|
||||
COUNTER_ADD(num_halted_counter_, 1);
|
||||
tuple_cache_mgr->IncrementMetric(TupleCacheMgr::MetricType::HALTED);
|
||||
writer_->Abort();
|
||||
tuple_cache_mgr->AbortWrite(move(handle_), true);
|
||||
set_tombstone = true;
|
||||
} else if (status.code() ==
|
||||
TErrorCode::TUPLE_CACHE_OUTSTANDING_WRITE_LIMIT_EXCEEDED) {
|
||||
VLOG_FILE << "Tuple Cache entry for " << combined_key_
|
||||
<< " hit the outstanding writes limit: " << status.GetDetail();
|
||||
COUNTER_ADD(num_backpressure_halted_counter_, 1);
|
||||
} else {
|
||||
// This is an unknown error (e.g. an IO error), so write a warning.
|
||||
LOG(WARNING) << "Unable to write cache file: " << status.GetDetail();
|
||||
writer_->Abort();
|
||||
tuple_cache_mgr->AbortWrite(move(handle_), false);
|
||||
}
|
||||
writer_->Abort();
|
||||
tuple_cache_mgr->AbortWrite(move(handle_), set_tombstone);
|
||||
writer_.reset();
|
||||
} else if (*eos) {
|
||||
// If we hit end of stream, then we can complete the cache entry
|
||||
|
||||
@@ -66,6 +66,9 @@ private:
|
||||
RuntimeProfile::Counter* num_hits_counter_ = nullptr;
|
||||
/// Number of results that were too large for the cache
|
||||
RuntimeProfile::Counter* num_halted_counter_ = nullptr;
|
||||
/// Number of results that were halted due to backpressure (i.e. hitting the
|
||||
/// outstanding writes limit)
|
||||
RuntimeProfile::Counter* num_backpressure_halted_counter_ = nullptr;
|
||||
/// Number of results that skip the cache due to a tombstone
|
||||
RuntimeProfile::Counter* num_skipped_counter_ = nullptr;
|
||||
|
||||
|
||||
@@ -246,7 +246,13 @@ TEST_F(TupleFileReadWriteTest, TestExceedMaxFileSize) {
|
||||
// Limit the file to 20 bytes
|
||||
size_t max_size = 20;
|
||||
|
||||
TupleFileWriter writer(path, tracker(), profile(), max_size);
|
||||
TupleFileWriter writer(path, tracker(), profile(),
|
||||
[max_size] (size_t requested_size) {
|
||||
if (requested_size > max_size) {
|
||||
return Status("exceed the maximum file size");
|
||||
}
|
||||
return Status::OK();
|
||||
});
|
||||
|
||||
Status status = writer.Open(runtime_state());
|
||||
ASSERT_OK(status);
|
||||
@@ -279,7 +285,13 @@ TEST_F(TupleFileReadWriteTest, TestExactMaxFileSize) {
|
||||
// Now, run the same thing with the max size set to the number of bytes written.
|
||||
string path2 = Path("exact-max-size-file");
|
||||
filesystem::remove(path2);
|
||||
TupleFileWriter limited_writer(path2, tracker(), profile(), max_size);
|
||||
TupleFileWriter limited_writer(path2, tracker(), profile(),
|
||||
[max_size] (size_t requested_size) {
|
||||
if (requested_size > max_size) {
|
||||
return Status("exceed the maximum file size");
|
||||
}
|
||||
return Status::OK();
|
||||
});
|
||||
|
||||
status = limited_writer.Open(runtime_state());
|
||||
ASSERT_OK(status);
|
||||
|
||||
@@ -39,7 +39,8 @@ namespace impala {
|
||||
static const char* UNIQUE_PATH_SUFFIX = ".%%%%";
|
||||
|
||||
TupleFileWriter::TupleFileWriter(
|
||||
std::string path, MemTracker* parent, RuntimeProfile* profile, size_t max_file_size)
|
||||
std::string path, MemTracker* parent, RuntimeProfile* profile,
|
||||
RequestWriteSizeCb request_write_size_cb)
|
||||
: path_(move(path)),
|
||||
temp_suffix_(filesystem::unique_path(UNIQUE_PATH_SUFFIX).string()),
|
||||
tracker_(new MemTracker(-1, "TupleFileWriter", parent)),
|
||||
@@ -49,7 +50,7 @@ TupleFileWriter::TupleFileWriter(
|
||||
serialize_timer_(profile ? ADD_TIMER(profile, "TupleCacheSerializeTime") : nullptr),
|
||||
bytes_written_(profile ?
|
||||
ADD_COUNTER(profile, "TupleCacheBytesWritten", TUnit::BYTES) : nullptr),
|
||||
max_file_size_(max_file_size) {}
|
||||
request_write_size_cb_(move(request_write_size_cb)) {}
|
||||
|
||||
TupleFileWriter::~TupleFileWriter() {
|
||||
if (state_ != State::Uninitialized) {
|
||||
@@ -135,11 +136,11 @@ Status TupleFileWriter::Write(RuntimeState* state, RowBatch* row_batch) {
|
||||
for (auto slice : slices) {
|
||||
num_bytes_to_write += slice.size();
|
||||
}
|
||||
if (BytesWritten() + num_bytes_to_write > max_file_size_) {
|
||||
exceeded_max_size_ = true;
|
||||
return Status(
|
||||
Substitute("Write of size $0 would cause $1 to exceed the maximum file size $2",
|
||||
num_bytes_to_write, TempPath(), max_file_size_));
|
||||
|
||||
if (request_write_size_cb_ != nullptr) {
|
||||
// If the request_write_size_cb is set, call it before writing to get permission.
|
||||
// It will return a not-OK status if this writer should stop.
|
||||
RETURN_IF_ERROR(request_write_size_cb_(BytesWritten() + num_bytes_to_write));
|
||||
}
|
||||
|
||||
RETURN_IF_ERROR(DebugAction(state->query_options(), "TUPLE_FILE_WRITER_WRITE"));
|
||||
@@ -187,7 +188,7 @@ Status TupleFileWriter::Commit(RuntimeState* state) {
|
||||
|
||||
RETURN_IF_ERROR(DebugAction(state->query_options(), "TUPLE_FILE_WRITER_COMMIT"));
|
||||
|
||||
KUDU_RETURN_IF_ERROR(tmp_file_->Sync(), "Failed to sync cache file");
|
||||
// Sync() is called asychnronously by TupleCacheMgr's sync thread pool.
|
||||
KUDU_RETURN_IF_ERROR(tmp_file_->Close(), "Failed to close cache file");
|
||||
|
||||
std::string src = TempPath();
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <fstream>
|
||||
#include <functional>
|
||||
#include <limits>
|
||||
#include <memory>
|
||||
|
||||
@@ -37,6 +38,8 @@ class RowBatch;
|
||||
class RuntimeState;
|
||||
class TupleReadWriteTest;
|
||||
|
||||
using RequestWriteSizeCb = std::function<Status (size_t)>;
|
||||
|
||||
/// The TupleFileWriter is used to serialize a stream of RowBatches to a local file
|
||||
/// for the tuple cache. It uses the standard RowBatch serialization used for KRPC
|
||||
/// data streams (i.e. RowBatch::Serialize()). The files can be read back using the
|
||||
@@ -48,9 +51,10 @@ class TupleReadWriteTest;
|
||||
/// Commit(), it runs Abort() and any associated file is deleted. The user can
|
||||
/// proactively call Abort() to delete any associated files, but it is not required.
|
||||
///
|
||||
/// The TupleFileWriter enforces a maximum file size and will fail Write() calls that
|
||||
/// would exceed this limit. It provides a way for the caller to get how many bytes
|
||||
/// have been written for accounting purposes.
|
||||
/// The TupleFileWriter calls the request_write_size_cb requesting a new write size
|
||||
/// before each write and will fail Write() if this callback returns an error.
|
||||
/// It provides a way for the caller to get how many bytes have been written for
|
||||
/// accounting purposes.
|
||||
///
|
||||
/// Currently, the TupleFileWriter does not embed the actual tuple layout into the
|
||||
/// file. It relies on the corresponding TupleFileReader reading with the same
|
||||
@@ -59,7 +63,7 @@ class TupleReadWriteTest;
|
||||
class TupleFileWriter {
|
||||
public:
|
||||
TupleFileWriter(std::string path, MemTracker* parent, RuntimeProfile* profile,
|
||||
size_t max_file_size = std::numeric_limits<size_t>::max());
|
||||
RequestWriteSizeCb request_write_size_cb = nullptr);
|
||||
~TupleFileWriter();
|
||||
|
||||
Status Open(RuntimeState* state);
|
||||
@@ -69,8 +73,6 @@ public:
|
||||
// call Write() or Commit().
|
||||
Status Write(RuntimeState* state, RowBatch* row_batch);
|
||||
|
||||
bool ExceededMaxSize() const { return exceeded_max_size_; }
|
||||
|
||||
// Number of bytes written to file. Must be called before Commit/Abort.
|
||||
size_t BytesWritten() const;
|
||||
|
||||
@@ -102,10 +104,8 @@ private:
|
||||
RuntimeProfile::Counter* serialize_timer_;
|
||||
// Total bytes written
|
||||
RuntimeProfile::Counter* bytes_written_;
|
||||
// Maximum size for the resulting file
|
||||
size_t max_file_size_;
|
||||
// True if the file reached the maximum size
|
||||
bool exceeded_max_size_ = false;
|
||||
// Callback to request an increase to the write size
|
||||
RequestWriteSizeCb request_write_size_cb_;
|
||||
|
||||
// This writes to a temporary file, only moving it into the final location with
|
||||
// Commit(). tmp_file_ is the file abstraction used for writing the temporary file.
|
||||
|
||||
@@ -404,7 +404,7 @@ Status ExecEnv::Init() {
|
||||
}
|
||||
|
||||
// Initialize the tuple cache
|
||||
RETURN_IF_ERROR(tuple_cache_mgr_->Init());
|
||||
RETURN_IF_ERROR(tuple_cache_mgr_->Init(bytes_limit));
|
||||
|
||||
LOG(INFO) << "Admit memory limit: "
|
||||
<< PrettyPrinter::Print(admit_mem_limit_, TUnit::BYTES);
|
||||
|
||||
@@ -19,9 +19,11 @@
|
||||
#include <boost/filesystem.hpp>
|
||||
|
||||
#include "gutil/strings/substitute.h"
|
||||
#include "kudu/util/env.h"
|
||||
#include "runtime/tuple-cache-mgr.h"
|
||||
#include "testutil/gtest-util.h"
|
||||
#include "util/filesystem-util.h"
|
||||
#include "util/time.h"
|
||||
|
||||
#include "common/names.h"
|
||||
|
||||
@@ -47,12 +49,17 @@ public:
|
||||
}
|
||||
|
||||
TupleCacheMgr GetCache(const string& cache_dir, const string& capacity = "1MB",
|
||||
string eviction_policy = "LRU", uint8_t debug_pos = 0) {
|
||||
string eviction_policy = "LRU", uint8_t debug_pos = TupleCacheMgr::NO_FILES,
|
||||
uint32_t sync_pool_size = 0, uint32_t sync_pool_queue_depth = 1000,
|
||||
string outstanding_write_limit_str = "1GB",
|
||||
uint32_t outstanding_write_chunk_bytes = 0) {
|
||||
string cache_config;
|
||||
if (!cache_dir.empty()) {
|
||||
cache_config = Substitute("$0:$1", cache_dir, capacity);
|
||||
}
|
||||
return TupleCacheMgr{move(cache_config), move(eviction_policy), &metrics_, debug_pos};
|
||||
return TupleCacheMgr{move(cache_config), move(eviction_policy), &metrics_, debug_pos,
|
||||
sync_pool_size, sync_pool_queue_depth, move(outstanding_write_limit_str),
|
||||
outstanding_write_chunk_bytes};
|
||||
}
|
||||
|
||||
TupleCacheMgr GetCache() {
|
||||
@@ -60,11 +67,13 @@ public:
|
||||
}
|
||||
|
||||
TupleCacheMgr GetFailAllocateCache() {
|
||||
return GetCache(GetCacheDir(), "1MB", "LRU", TupleCacheMgr::FAIL_ALLOCATE);
|
||||
return GetCache(GetCacheDir(), "1MB", "LRU",
|
||||
TupleCacheMgr::FAIL_ALLOCATE | TupleCacheMgr::NO_FILES);
|
||||
}
|
||||
|
||||
TupleCacheMgr GetFailInsertCache() {
|
||||
return GetCache(GetCacheDir(), "1MB", "LRU", TupleCacheMgr::FAIL_INSERT);
|
||||
return GetCache(GetCacheDir(), "1MB", "LRU",
|
||||
TupleCacheMgr::FAIL_INSERT | TupleCacheMgr::NO_FILES);
|
||||
}
|
||||
|
||||
std::string GetCacheDir() const { return cache_dir_; }
|
||||
@@ -303,4 +312,286 @@ TEST_F(TupleCacheMgrTest, TestMaxSize) {
|
||||
EXPECT_EQ(1024, cache.MaxSize());
|
||||
}
|
||||
|
||||
TEST_F(TupleCacheMgrTest, TestRequestWriteSize) {
|
||||
FLAGS_cache_force_single_shard = true;
|
||||
TupleCacheMgr cache = GetCache(GetCacheDir(), "1KB");
|
||||
ASSERT_OK(cache.Init());
|
||||
|
||||
// Write 5 entries of 200 bytes each
|
||||
for (int i = 0; i < 5; ++i) {
|
||||
TupleCacheMgr::UniqueHandle handle = cache.Lookup(Substitute("a_key_$0", i), true);
|
||||
EXPECT_TRUE(cache.IsAvailableForWrite(handle));
|
||||
cache.CompleteWrite(move(handle), 200);
|
||||
}
|
||||
|
||||
TupleCacheMgr::UniqueHandle handle = cache.Lookup("update_entry_then_abort", true);
|
||||
EXPECT_TRUE(cache.IsAvailableForWrite(handle));
|
||||
// Update to 200 bytes. This should evict one entry.
|
||||
Status status = cache.RequestWriteSize(&handle, 200);
|
||||
EXPECT_OK(status);
|
||||
EXPECT_EQ(cache.tuple_cache_entries_evicted_->GetValue(), 1);
|
||||
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 200);
|
||||
|
||||
// Update to 900. This will evict all the others
|
||||
status = cache.RequestWriteSize(&handle, 900);
|
||||
EXPECT_OK(status);
|
||||
EXPECT_EQ(cache.tuple_cache_entries_evicted_->GetValue(), 5);
|
||||
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 900);
|
||||
|
||||
// Update to MaxSize(). This will succeed.
|
||||
status = cache.RequestWriteSize(&handle, cache.MaxSize());
|
||||
EXPECT_OK(status);
|
||||
EXPECT_EQ(cache.tuple_cache_entries_evicted_->GetValue(), 5);
|
||||
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), cache.MaxSize());
|
||||
|
||||
// Try to update to MaxSize() + 1. This will fail.
|
||||
status = cache.RequestWriteSize(&handle, cache.MaxSize() + 1);
|
||||
EXPECT_FALSE(status.ok());
|
||||
EXPECT_EQ(status.code(), TErrorCode::TUPLE_CACHE_ENTRY_SIZE_LIMIT_EXCEEDED);
|
||||
EXPECT_EQ(cache.tuple_cache_entries_evicted_->GetValue(), 5);
|
||||
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), cache.MaxSize());
|
||||
|
||||
// Need to test the three state transitions out of IN_PROGRESS
|
||||
// Path #1: AbortWrite without tombstone
|
||||
cache.AbortWrite(move(handle), /* tombstone */ false);
|
||||
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 0);
|
||||
|
||||
// Path #2: AbortWrite with tombstone
|
||||
handle = cache.Lookup("update_entry_then_tombstone", true);
|
||||
EXPECT_TRUE(cache.IsAvailableForWrite(handle));
|
||||
status = cache.RequestWriteSize(&handle, 900);
|
||||
EXPECT_OK(status);
|
||||
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 900);
|
||||
cache.AbortWrite(move(handle), /* tombstone */ true);
|
||||
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 0);
|
||||
|
||||
// Path #3: CompleteWrite
|
||||
handle = cache.Lookup("update_entry_then_complete", true);
|
||||
EXPECT_TRUE(cache.IsAvailableForWrite(handle));
|
||||
status = cache.RequestWriteSize(&handle, 900);
|
||||
EXPECT_OK(status);
|
||||
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 900);
|
||||
cache.CompleteWrite(move(handle), 900);
|
||||
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 0);
|
||||
}
|
||||
|
||||
TEST_F(TupleCacheMgrTest, TestOutstandingWriteLimit) {
|
||||
FLAGS_cache_force_single_shard = true;
|
||||
// Set up a cache with an outstanding write limit of 1KB
|
||||
TupleCacheMgr cache = GetCache(GetCacheDir(), "1KB", "LRU", 0, 0, 0, "1KB");
|
||||
ASSERT_OK(cache.Init());
|
||||
|
||||
// Open two handles
|
||||
TupleCacheMgr::UniqueHandle handle1 = cache.Lookup("outstanding_write_limit_1", true);
|
||||
EXPECT_TRUE(cache.IsAvailableForWrite(handle1));
|
||||
TupleCacheMgr::UniqueHandle handle2 = cache.Lookup("outstanding_write_limit_2", true);
|
||||
EXPECT_TRUE(cache.IsAvailableForWrite(handle2));
|
||||
|
||||
// UpdateWrite size to 512 bytes for each, so it is equal to the limit and succeeds.
|
||||
Status status = cache.RequestWriteSize(&handle1, 512);
|
||||
EXPECT_OK(status);
|
||||
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 512);
|
||||
|
||||
status = cache.RequestWriteSize(&handle2, 512);
|
||||
EXPECT_OK(status);
|
||||
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 1024);
|
||||
|
||||
// Going one byte past the limit should fail
|
||||
// This does not set exceeded_max_size
|
||||
status = cache.RequestWriteSize(&handle1, 513);
|
||||
EXPECT_FALSE(status.ok());
|
||||
EXPECT_EQ(status.code(), TErrorCode::TUPLE_CACHE_OUTSTANDING_WRITE_LIMIT_EXCEEDED);
|
||||
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 1024);
|
||||
|
||||
// Clean up
|
||||
cache.AbortWrite(move(handle1), /* tombstone */ false);
|
||||
cache.AbortWrite(move(handle2), /* tombstone */ false);
|
||||
}
|
||||
|
||||
TEST_F(TupleCacheMgrTest, TestOutstandingWriteLimitConcurrent) {
|
||||
FLAGS_cache_force_single_shard = true;
|
||||
// Set up a cache with a low outstanding write limit of 1KB to make it easy to hit
|
||||
// the limit.
|
||||
TupleCacheMgr cache = GetCache(GetCacheDir(), "100KB", "LRU", 0, 0, 0, "1KB");
|
||||
ASSERT_OK(cache.Init());
|
||||
|
||||
// This attempts to do 100 512-byte writes to the cache with 64 byte request chunks.
|
||||
// The cache is big enough to fit all of the writes, so the only reason they should
|
||||
// fail is when they hit the outstanding write limit.
|
||||
vector<future<bool>> results;
|
||||
results.reserve(100);
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
results.emplace_back(async(launch::async, [&cache, i]() {
|
||||
TupleCacheMgr::UniqueHandle handle = cache.Lookup(Substitute("write$0", i), true);
|
||||
EXPECT_TRUE(cache.IsAvailableForWrite(handle));
|
||||
// Write in 64 byte chunks, 8 chunks = 512 bytes
|
||||
for (int num_chunks = 1; num_chunks <= 8; ++num_chunks) {
|
||||
Status status = cache.RequestWriteSize(&handle, num_chunks * 64);
|
||||
if (!status.ok()) {
|
||||
cache.AbortWrite(move(handle), /* tombstone */ false);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
cache.CompleteWrite(move(handle), 512);
|
||||
return true;
|
||||
}));
|
||||
}
|
||||
|
||||
// Wait for all threads to complete and count the number of failures
|
||||
uint32_t num_failures = 0;
|
||||
for (auto& result : results) {
|
||||
result.wait();
|
||||
if (!result.get()) num_failures++;
|
||||
}
|
||||
|
||||
// This test case has race conditions. We expect the failures to line up with the
|
||||
// number of backpressure halted. We expect at least one thread to succeed.
|
||||
// There are scenarios where all the threads can succeed, so this doesn't require
|
||||
// num_failures > 0.
|
||||
EXPECT_EQ(cache.tuple_cache_backpressure_halted_->GetValue(), num_failures);
|
||||
EXPECT_LT(num_failures, 100);
|
||||
}
|
||||
|
||||
TEST_F(TupleCacheMgrTest, TestOutstandingWriteChunkSize) {
|
||||
FLAGS_cache_force_single_shard = true;
|
||||
uint32_t chunk_size = 250;
|
||||
// Set up a cache with an outstanding write limit of 1KB and a chunk size of 250
|
||||
// The chunk size is specifically not a clean divisor of 1KB.
|
||||
TupleCacheMgr cache =
|
||||
GetCache(GetCacheDir(), "1KB", "LRU", 0, 0, 0, "2KB", chunk_size);
|
||||
ASSERT_OK(cache.Init());
|
||||
|
||||
TupleCacheMgr::UniqueHandle handle = cache.Lookup("outstanding_chunk_then_abort", true);
|
||||
EXPECT_TRUE(cache.IsAvailableForWrite(handle));
|
||||
|
||||
// Update write size to 1, but this is counted as the chunk size
|
||||
Status status = cache.RequestWriteSize(&handle, 1);
|
||||
EXPECT_OK(status);
|
||||
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), chunk_size);
|
||||
|
||||
// Request write size to be equal to the chunk size. This doesn't change the outstanding
|
||||
// write bytes.
|
||||
status = cache.RequestWriteSize(&handle, chunk_size);
|
||||
EXPECT_OK(status);
|
||||
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), chunk_size);
|
||||
|
||||
// Request write size to be one above the chunk size. This grabs a second chunk.
|
||||
status = cache.RequestWriteSize(&handle, chunk_size + 1);
|
||||
EXPECT_OK(status);
|
||||
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 2 * chunk_size);
|
||||
|
||||
// The chunk size avoids conflicts with the MaxSize(). This requests a size that
|
||||
// would round to larger than MaxSize (the chunk size is not a clean divisor of the
|
||||
// cache size), but it does not result in an error. Instead, it reserves MaxSize().
|
||||
status = cache.RequestWriteSize(&handle,
|
||||
((cache.MaxSize() / chunk_size) * chunk_size) + 1);
|
||||
EXPECT_OK(status);
|
||||
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), cache.MaxSize());
|
||||
|
||||
// Request size can go all the way to MaxSize() even with chunk size.
|
||||
status = cache.RequestWriteSize(&handle, cache.MaxSize());
|
||||
EXPECT_OK(status);
|
||||
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), cache.MaxSize());
|
||||
|
||||
// Need to test the three state transitions out of IN_PROGRESS
|
||||
// Path #1: AbortWrite without tombstone
|
||||
cache.AbortWrite(move(handle), /* tombstone */ false);
|
||||
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 0);
|
||||
|
||||
// Path #2: AbortWrite with tombstone
|
||||
handle = cache.Lookup("outstanding_chunk_then_tombstone", true);
|
||||
EXPECT_TRUE(cache.IsAvailableForWrite(handle));
|
||||
status = cache.RequestWriteSize(&handle, chunk_size + 1);
|
||||
EXPECT_OK(status);
|
||||
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 2 * chunk_size);
|
||||
cache.AbortWrite(move(handle), /* tombstone */ true);
|
||||
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 0);
|
||||
|
||||
// Path #3: CompleteWrite
|
||||
handle = cache.Lookup("outstanding_chunk_then_complete", true);
|
||||
EXPECT_TRUE(cache.IsAvailableForWrite(handle));
|
||||
status = cache.RequestWriteSize(&handle, chunk_size + 1);
|
||||
EXPECT_OK(status);
|
||||
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 2 * chunk_size);
|
||||
cache.CompleteWrite(move(handle), chunk_size + 1);
|
||||
EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 0);
|
||||
}
|
||||
|
||||
TEST_F(TupleCacheMgrTest, TestSyncToDisk) {
|
||||
// Need the debug_pos to be zero so that DebugPos::NO_FILES is not set.
|
||||
TupleCacheMgr cache =
|
||||
GetCache(GetCacheDir(), "1KB", "LRU", /* debug_pos */ 0, /* sync_pool_size */ 10);
|
||||
ASSERT_OK(cache.Init());
|
||||
|
||||
// Error case: If there is no file, then the thread doing sync will get an error
|
||||
// when trying to open the file. This causes the entry to be evicted.
|
||||
TupleCacheMgr::UniqueHandle handle = cache.Lookup("key_without_file", true);
|
||||
EXPECT_TRUE(cache.IsAvailableForWrite(handle));
|
||||
cache.CompleteWrite(move(handle), 100);
|
||||
// Sleep a bit to let the thread pool process the entry
|
||||
SleepForMs(100);
|
||||
handle = cache.Lookup("key_without_file", false);
|
||||
EXPECT_FALSE(cache.IsAvailableForRead(handle));
|
||||
|
||||
// Success case: If there is a file that can be synced to disk, everything behaves
|
||||
// normally.
|
||||
handle = cache.Lookup("key_with_file", true);
|
||||
std::string file_path = cache.GetPath(handle);
|
||||
std::unique_ptr<kudu::WritableFile> cache_file;
|
||||
kudu::Status s = kudu::Env::Default()->NewWritableFile(file_path, &cache_file);
|
||||
EXPECT_TRUE(s.ok());
|
||||
std::string data("data");
|
||||
cache_file->Append(Slice(data));
|
||||
cache.CompleteWrite(move(handle), 100);
|
||||
// Sleep a bit to let the thread pool process the entry
|
||||
SleepForMs(100);
|
||||
handle = cache.Lookup("key_with_file", false);
|
||||
EXPECT_TRUE(cache.IsAvailableForRead(handle));
|
||||
}
|
||||
|
||||
TEST_F(TupleCacheMgrTest, TestDroppedSyncs) {
|
||||
// Need the debug_pos to be zero so that DebugPos::NO_FILES is not set.
|
||||
// We set a small sync_pool_size (1) and the bare minimum sync_pool_queue_depth (1)
|
||||
// to force some syncs to be dropped.
|
||||
FLAGS_cache_force_single_shard = true;
|
||||
TupleCacheMgr cache = GetCache(GetCacheDir(), "10KB", "LRU", /* debug_pos */ 0,
|
||||
/* sync_pool_size */ 1, /* sync_pool_queue_depth */ 1);
|
||||
ASSERT_OK(cache.Init());
|
||||
|
||||
// Attempt to write entries to the cache concurrently to stress the sync pool.
|
||||
// This uses many writers, but the writes are small and can all fit into the
|
||||
// cache. The only reason something would fail to write to the cache is if the
|
||||
// sync pool gets overwhelmed.
|
||||
vector<future<bool>> results;
|
||||
results.reserve(100);
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
results.emplace_back(async(launch::async, [&cache, i]() {
|
||||
TupleCacheMgr::UniqueHandle handle = cache.Lookup(Substitute("write$0", i), true);
|
||||
EXPECT_TRUE(cache.IsAvailableForWrite(handle));
|
||||
std::string file_path = cache.GetPath(handle);
|
||||
std::unique_ptr<kudu::WritableFile> cache_file;
|
||||
kudu::Status s = kudu::Env::Default()->NewWritableFile(file_path, &cache_file);
|
||||
EXPECT_TRUE(s.ok());
|
||||
std::string data("data");
|
||||
cache_file->Append(Slice(data));
|
||||
cache.CompleteWrite(move(handle), 100);
|
||||
// CompleteWrite doesn't return status, so we can only tell if the sync failed
|
||||
// by looking up the entry.
|
||||
handle = cache.Lookup(Substitute("write$0", i), false);
|
||||
return cache.IsAvailableForRead(handle);
|
||||
}));
|
||||
}
|
||||
|
||||
// Wait for all threads to complete and count the number of failures
|
||||
uint32_t num_failures = 0;
|
||||
for (auto& result : results) {
|
||||
result.wait();
|
||||
if (!result.get()) num_failures++;
|
||||
}
|
||||
// The sync pool should get overwhelmed and the number of dropped syncs should match
|
||||
// the number of failures.
|
||||
EXPECT_GT(cache.tuple_cache_dropped_sync_->GetValue(), 0);
|
||||
EXPECT_EQ(cache.tuple_cache_dropped_sync_->GetValue(), num_failures);
|
||||
}
|
||||
|
||||
} // namespace impala
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
|
||||
#include <boost/filesystem.hpp>
|
||||
|
||||
#include "common/constant-strings.h"
|
||||
#include "common/logging.h"
|
||||
#include "exec/tuple-file-reader.h"
|
||||
#include "exec/tuple-text-file-reader.h"
|
||||
@@ -54,6 +55,20 @@ DEFINE_string(tuple_cache_eviction_policy, "LRU",
|
||||
DEFINE_string(tuple_cache_debug_dump_dir, "",
|
||||
"Directory for dumping the intermediate query result tuples for debugging purpose.");
|
||||
|
||||
DEFINE_uint32(tuple_cache_sync_pool_size, 10,
|
||||
"(Advanced) Size of the thread pool syncing cache files to disk asynchronously. "
|
||||
"If set to 0, cache files are flushed sychronously.");
|
||||
DEFINE_uint32(tuple_cache_sync_pool_queue_depth, 1000,
|
||||
"(Advanced) Maximum queue depth for the thread pool syncing cache files to disk");
|
||||
|
||||
static const string OUTSTANDING_WRITE_LIMIT_MSG =
|
||||
"(Advanced) Limit on the size of outstanding tuple cache writes. " +
|
||||
Substitute(MEM_UNITS_HELP_MSG, "the process memory limit");
|
||||
DEFINE_string(tuple_cache_outstanding_write_limit, "1GB",
|
||||
OUTSTANDING_WRITE_LIMIT_MSG.c_str());
|
||||
DEFINE_uint32(tuple_cache_outstanding_write_chunk_bytes, 128 * 1024,
|
||||
"(Advanced) Chunk size for incrementing the outstanding tuple cache write size");
|
||||
|
||||
// Global feature flag for tuple caching. If false, enable_tuple_cache cannot be true
|
||||
// and the coordinator cannot produce plans with TupleCacheNodes. The tuple_cache
|
||||
// parameter also cannot be specified.
|
||||
@@ -91,31 +106,47 @@ static string ConstructTupleCacheDebugDumpPath() {
|
||||
}
|
||||
|
||||
TupleCacheMgr::TupleCacheMgr(MetricGroup* metrics)
|
||||
: TupleCacheMgr(FLAGS_tuple_cache, FLAGS_tuple_cache_eviction_policy, metrics, 0) {}
|
||||
: TupleCacheMgr(FLAGS_tuple_cache, FLAGS_tuple_cache_eviction_policy, metrics,
|
||||
/* debug_pos */ 0, FLAGS_tuple_cache_sync_pool_size,
|
||||
FLAGS_tuple_cache_sync_pool_queue_depth,
|
||||
FLAGS_tuple_cache_outstanding_write_limit,
|
||||
FLAGS_tuple_cache_outstanding_write_chunk_bytes) {}
|
||||
|
||||
TupleCacheMgr::TupleCacheMgr(string cache_config, string eviction_policy_str,
|
||||
MetricGroup* metrics, uint8_t debug_pos)
|
||||
TupleCacheMgr::TupleCacheMgr(
|
||||
string cache_config, string eviction_policy_str, MetricGroup* metrics,
|
||||
uint8_t debug_pos, uint32_t sync_pool_size, uint32_t sync_pool_queue_depth,
|
||||
string outstanding_write_limit_str, uint32_t outstanding_write_chunk_bytes)
|
||||
: cache_config_(move(cache_config)),
|
||||
eviction_policy_str_(move(eviction_policy_str)),
|
||||
outstanding_write_limit_str_(move(outstanding_write_limit_str)),
|
||||
cache_debug_dump_dir_(ConstructTupleCacheDebugDumpPath()),
|
||||
debug_pos_(debug_pos),
|
||||
sync_pool_size_(sync_pool_size),
|
||||
sync_pool_queue_depth_(sync_pool_queue_depth),
|
||||
outstanding_write_chunk_bytes_(outstanding_write_chunk_bytes),
|
||||
tuple_cache_hits_(metrics->AddCounter("impala.tuple-cache.hits", 0)),
|
||||
tuple_cache_misses_(metrics->AddCounter("impala.tuple-cache.misses", 0)),
|
||||
tuple_cache_skipped_(metrics->AddCounter("impala.tuple-cache.skipped", 0)),
|
||||
tuple_cache_halted_(metrics->AddCounter("impala.tuple-cache.halted", 0)),
|
||||
tuple_cache_backpressure_halted_(
|
||||
metrics->AddCounter("impala.tuple-cache.backpressure-halted", 0)),
|
||||
tuple_cache_entries_evicted_(
|
||||
metrics->AddCounter("impala.tuple-cache.entries-evicted", 0)),
|
||||
tuple_cache_failed_sync_(metrics->AddCounter("impala.tuple-cache.failed-syncs", 0)),
|
||||
tuple_cache_dropped_sync_(metrics->AddCounter("impala.tuple-cache.dropped-syncs", 0)),
|
||||
tuple_cache_entries_in_use_(
|
||||
metrics->AddGauge("impala.tuple-cache.entries-in-use", 0)),
|
||||
tuple_cache_entries_in_use_bytes_(
|
||||
metrics->AddGauge("impala.tuple-cache.entries-in-use-bytes", 0)),
|
||||
tuple_cache_tombstones_in_use_(
|
||||
metrics->AddGauge("impala.tuple-cache.tombstones-in-use", 0)),
|
||||
tuple_cache_outstanding_writes_bytes_(
|
||||
metrics->AddGauge("impala.tuple-cache.outstanding-writes-bytes", 0)),
|
||||
tuple_cache_entry_size_stats_(metrics->RegisterMetric(
|
||||
new HistogramMetric(MetricDefs::Get("impala.tuple-cache.entry-sizes"),
|
||||
STATS_MAX_TUPLE_CACHE_ENTRY_SIZE, 3))) {}
|
||||
|
||||
Status TupleCacheMgr::Init() {
|
||||
Status TupleCacheMgr::Init(int64_t process_bytes_limit) {
|
||||
if (cache_config_.empty()) {
|
||||
LOG(INFO) << "Tuple Cache is disabled.";
|
||||
return Status::OK();
|
||||
@@ -183,12 +214,36 @@ Status TupleCacheMgr::Init() {
|
||||
eviction_policy_str_));
|
||||
}
|
||||
|
||||
// The outstanding write limit can either be a specific value, or it can be a
|
||||
// percentage of the process bytes limit. If the process bytes limit is zero,
|
||||
// a percentage is not allowed.
|
||||
outstanding_write_limit_ = ParseUtil::ParseMemSpec(outstanding_write_limit_str_,
|
||||
&is_percent, process_bytes_limit);
|
||||
if (outstanding_write_limit_ <= 0) {
|
||||
CLEAN_EXIT_WITH_ERROR(
|
||||
Substitute("Invalid tuple cache outstanding write limit configuration: $0.",
|
||||
FLAGS_tuple_cache_outstanding_write_limit));
|
||||
}
|
||||
|
||||
// Setting sync_pool_size == 0 results in synchronous flushing to disk. This is
|
||||
// mainly used for backend tests
|
||||
if (sync_pool_size_ > 0) {
|
||||
sync_thread_pool_.reset(new ThreadPool<string>("tuple-cache-mgr", "sync-worker",
|
||||
sync_pool_size_, sync_pool_queue_depth_,
|
||||
[this] (int thread_id, const string& filename) {
|
||||
this->SyncFileToDisk(filename);
|
||||
}));
|
||||
RETURN_IF_ERROR(sync_thread_pool_->Init());
|
||||
}
|
||||
|
||||
cache_.reset(NewCache(policy, capacity, "Tuple_Cache"));
|
||||
|
||||
RETURN_IF_ERROR(cache_->Init());
|
||||
|
||||
LOG(INFO) << "Tuple Cache initialized at " << cache_dir_
|
||||
<< " with capacity " << PrettyPrinter::Print(capacity, TUnit::BYTES);
|
||||
<< " with capacity " << PrettyPrinter::Print(capacity, TUnit::BYTES)
|
||||
<< " and outstanding write limit: "
|
||||
<< PrettyPrinter::Print(outstanding_write_limit_, TUnit::BYTES);
|
||||
enabled_ = true;
|
||||
return Status::OK();
|
||||
}
|
||||
@@ -207,36 +262,41 @@ Status TupleCacheMgr::Init() {
|
||||
// again, if found return it (IsAvailableForWrite()=false),
|
||||
// else create a new entry (IsAvailableForWrite()=true).
|
||||
//
|
||||
// entry found
|
||||
// Lookup(acquire_state=true) ---> [ ... ] returns any of the states below
|
||||
// entry found
|
||||
// Lookup(acquire_state=true) ---> [ ... ] returns any of the states below
|
||||
// |
|
||||
// | entry absent: create new entry
|
||||
// v CompleteWrite
|
||||
// [ IN_PROGRESS, false ] ---> [ COMPLETE, true ]
|
||||
// v CompleteWrite SyncFileToDisk
|
||||
// [ IN_PROGRESS, false ] ---> [ COMPLETE_UNSYNCED, true ] ---> [ COMPLETE, true ]
|
||||
// |
|
||||
// | AbortWrite
|
||||
// v tombstone=true
|
||||
// [ IN_PROGRESS, false ] ---> [ TOMBSTONE, false ]
|
||||
// v tombstone=true
|
||||
// [ IN_PROGRESS, false ] ---> [ TOMBSTONE, false ]
|
||||
// |
|
||||
// | tombstone=false
|
||||
// v
|
||||
// [ IN_PROGRESS, false ] Scheduled for eviction, will be deleted once ref count=0.
|
||||
//
|
||||
|
||||
enum class TupleCacheState { IN_PROGRESS, TOMBSTONE, COMPLETE };
|
||||
enum class TupleCacheState { IN_PROGRESS, TOMBSTONE, COMPLETE_UNSYNCED, COMPLETE };
|
||||
|
||||
// An entry consists of a TupleCacheEntry followed by a C-string for the path.
|
||||
struct TupleCacheEntry {
|
||||
std::atomic<TupleCacheState> state{TupleCacheState::IN_PROGRESS};
|
||||
size_t size = 0;
|
||||
// Charge in the cache when there is a file associated with this entry. This is zero for
|
||||
// TOMBSTONE and IN_PROGRESS before the first UpdateWriteSize call, but those states
|
||||
// still have a base charge in the cache. During IN_PROGRESS, this is a reservation that
|
||||
// exceeds the current size of the file.
|
||||
size_t charge = 0;
|
||||
};
|
||||
|
||||
struct TupleCacheMgr::Handle {
|
||||
Cache::UniqueHandle cache_handle;
|
||||
bool is_writer = false;
|
||||
// Minimum charge to use if this entry becomes a TOMBSTONE
|
||||
size_t base_charge = 0;
|
||||
};
|
||||
|
||||
|
||||
void TupleCacheMgr::HandleDeleter::operator()(Handle* ptr) const { delete ptr; }
|
||||
|
||||
static uint8_t* getHandleData(const Cache* cache, TupleCacheMgr::Handle* handle) {
|
||||
@@ -244,22 +304,37 @@ static uint8_t* getHandleData(const Cache* cache, TupleCacheMgr::Handle* handle)
|
||||
return cache->Value(handle->cache_handle).mutable_data();
|
||||
}
|
||||
|
||||
static TupleCacheState GetState(const Cache* cache, TupleCacheMgr::Handle* handle) {
|
||||
uint8_t* data = getHandleData(cache, handle);
|
||||
TupleCacheState TupleCacheMgr::GetState(TupleCacheMgr::Handle* handle) const {
|
||||
uint8_t* data = getHandleData(cache_.get(), handle);
|
||||
return reinterpret_cast<TupleCacheEntry*>(data)->state;
|
||||
}
|
||||
|
||||
// Returns true if state was updated.
|
||||
static bool UpdateState(const Cache* cache, TupleCacheMgr::Handle* handle,
|
||||
bool TupleCacheMgr::UpdateState(TupleCacheMgr::Handle* handle,
|
||||
TupleCacheState requiredState, TupleCacheState newState) {
|
||||
uint8_t* data = getHandleData(cache, handle);
|
||||
uint8_t* data = getHandleData(cache_.get(), handle);
|
||||
return reinterpret_cast<TupleCacheEntry*>(data)->
|
||||
state.compare_exchange_strong(requiredState, newState);
|
||||
}
|
||||
|
||||
static void UpdateSize(Cache* cache, TupleCacheMgr::Handle* handle, size_t size) {
|
||||
uint8_t* data = getHandleData(cache, handle);
|
||||
reinterpret_cast<TupleCacheEntry*>(data)->size = size;
|
||||
size_t TupleCacheMgr::GetCharge(TupleCacheMgr::Handle* handle) const {
|
||||
uint8_t* data = getHandleData(cache_.get(), handle);
|
||||
return reinterpret_cast<TupleCacheEntry*>(data)->charge;
|
||||
}
|
||||
|
||||
void TupleCacheMgr::UpdateWriteSize(TupleCacheMgr::Handle* handle,
|
||||
size_t charge) {
|
||||
uint8_t* data = getHandleData(cache_.get(), handle);
|
||||
// We can only adjust the cache charge while an entry is IN_PROGRESS
|
||||
DCHECK(TupleCacheState::IN_PROGRESS == GetState(handle));
|
||||
TupleCacheEntry* entry = reinterpret_cast<TupleCacheEntry*>(data);
|
||||
int64_t diff = charge - entry->charge;
|
||||
entry->charge = charge;
|
||||
cache_->UpdateCharge(handle->cache_handle, charge);
|
||||
if (diff < 0) {
|
||||
DCHECK_LE(-diff, tuple_cache_outstanding_writes_bytes_->GetValue());
|
||||
}
|
||||
tuple_cache_outstanding_writes_bytes_->Increment(diff);
|
||||
}
|
||||
|
||||
static Cache::UniquePendingHandle CreateEntry(
|
||||
@@ -282,8 +357,8 @@ static Cache::UniquePendingHandle CreateEntry(
|
||||
}
|
||||
|
||||
// If the entry exists, the Handle pins it so it doesn't go away, but the entry may be in
|
||||
// any state (IN PROGRESS, TOMBSTONE, COMPLETE). If the entry doesn't exist and
|
||||
// acquire_write is true, it's created with the state IN_PROGRESS.
|
||||
// any state (IN PROGRESS, TOMBSTONE, COMPLETE_UNSYNCED, COMPLETE). If the entry doesn't
|
||||
// exist and acquire_write is true, it's created with the state IN_PROGRESS.
|
||||
TupleCacheMgr::UniqueHandle TupleCacheMgr::Lookup(
|
||||
const Slice& key, bool acquire_write) {
|
||||
if (!enabled_) return nullptr;
|
||||
@@ -319,6 +394,7 @@ TupleCacheMgr::UniqueHandle TupleCacheMgr::Lookup(
|
||||
VLOG_FILE << "Tuple Cache Entry created for " << path;
|
||||
handle->cache_handle = move(chandle);
|
||||
handle->is_writer = true;
|
||||
handle->base_charge = sizeof(TupleCacheEntry) + path.size() + 1;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -327,45 +403,123 @@ TupleCacheMgr::UniqueHandle TupleCacheMgr::Lookup(
|
||||
|
||||
bool TupleCacheMgr::IsAvailableForRead(UniqueHandle& handle) const {
|
||||
if (!handle || !handle->cache_handle) return false;
|
||||
return TupleCacheState::COMPLETE == GetState(cache_.get(), handle.get());
|
||||
TupleCacheState state = GetState(handle.get());
|
||||
return TupleCacheState::COMPLETE_UNSYNCED == state ||
|
||||
TupleCacheState::COMPLETE == state;
|
||||
}
|
||||
|
||||
bool TupleCacheMgr::IsAvailableForWrite(UniqueHandle& handle) const {
|
||||
if (!handle || !handle->cache_handle) return false;
|
||||
return handle->is_writer &&
|
||||
TupleCacheState::IN_PROGRESS == GetState(cache_.get(), handle.get());
|
||||
return handle->is_writer && TupleCacheState::IN_PROGRESS == GetState(handle.get());
|
||||
}
|
||||
|
||||
void TupleCacheMgr::CompleteWrite(UniqueHandle handle, size_t size) {
|
||||
DCHECK(enabled_);
|
||||
DCHECK(handle != nullptr && handle->cache_handle != nullptr);
|
||||
DCHECK(handle->is_writer);
|
||||
DCHECK_LE(size, MaxSize());
|
||||
DCHECK_GE(size, 0);
|
||||
if (sync_pool_size_ > 0 &&
|
||||
sync_thread_pool_->GetQueueSize() >= sync_pool_queue_depth_) {
|
||||
// The sync_thread_pool_ has reached its max queue size. This should almost never
|
||||
// happen, as the outstanding writes limit should kick in before this is overwhelmed.
|
||||
// If it does happen, bail out.
|
||||
AbortWrite(move(handle), false);
|
||||
tuple_cache_dropped_sync_->Increment(1);
|
||||
return;
|
||||
}
|
||||
VLOG_FILE << "Tuple Cache: Complete " << GetPath(handle) << " (" << size << ")";
|
||||
CHECK(UpdateState(cache_.get(), handle.get(),
|
||||
TupleCacheState::IN_PROGRESS, TupleCacheState::COMPLETE));
|
||||
UpdateSize(cache_.get(), handle.get(), size);
|
||||
cache_->UpdateCharge(handle->cache_handle, size);
|
||||
UpdateWriteSize(handle.get(), size);
|
||||
CHECK(UpdateState(handle.get(),
|
||||
TupleCacheState::IN_PROGRESS, TupleCacheState::COMPLETE_UNSYNCED));
|
||||
tuple_cache_entries_in_use_bytes_->Increment(size);
|
||||
tuple_cache_entry_size_stats_->Update(size);
|
||||
// When the sync_pool_size_ is 0, there is no thread pool and this does the sync
|
||||
// directly. This is used for backend tests to avoid race conditions.
|
||||
if (sync_pool_size_ > 0) {
|
||||
// Offer the cache key to the thread pool.
|
||||
bool success = sync_thread_pool_->Offer(cache_->Key(handle->cache_handle).ToString());
|
||||
if (!success) {
|
||||
// The queue is full, so evict this entry
|
||||
VLOG_FILE << "Tuple Cache: Sync thread pool queue full. Evicting "
|
||||
<< GetPath(handle);
|
||||
cache_->Erase(cache_->Key(handle->cache_handle));
|
||||
tuple_cache_dropped_sync_->Increment(1);
|
||||
}
|
||||
} else {
|
||||
SyncFileToDisk(cache_->Key(handle->cache_handle).ToString());
|
||||
}
|
||||
}
|
||||
|
||||
void TupleCacheMgr::AbortWrite(UniqueHandle handle, bool tombstone) {
|
||||
DCHECK(enabled_);
|
||||
DCHECK(handle != nullptr && handle->cache_handle != nullptr);
|
||||
DCHECK(handle->is_writer);
|
||||
if (tombstone) {
|
||||
VLOG_FILE << "Tuple Cache: Tombstone " << GetPath(handle);
|
||||
tuple_cache_tombstones_in_use_->Increment(1);
|
||||
CHECK(UpdateState(cache_.get(), handle.get(),
|
||||
// We update the write size to 0 to remove the existing cache charge
|
||||
// (and decrement the outstanding writes counter)
|
||||
UpdateWriteSize(handle.get(), 0);
|
||||
CHECK(UpdateState(handle.get(),
|
||||
TupleCacheState::IN_PROGRESS, TupleCacheState::TOMBSTONE));
|
||||
// We want the tombstone cache entry to have a base charge, so set that now
|
||||
// (without counting towards the outstanding writes).
|
||||
cache_->UpdateCharge(handle->cache_handle, handle->base_charge);
|
||||
} else {
|
||||
// Remove the cache entry. Leaves state IN_PROGRESS so entry won't be reused until
|
||||
// successfully evicted.
|
||||
DCHECK(TupleCacheState::IN_PROGRESS == GetState(cache_.get(), handle.get()));
|
||||
DCHECK(TupleCacheState::IN_PROGRESS == GetState(handle.get()));
|
||||
cache_->Erase(cache_->Key(handle->cache_handle));
|
||||
}
|
||||
}
|
||||
|
||||
Status TupleCacheMgr::RequestWriteSize(UniqueHandle* handle, size_t new_size) {
|
||||
// The handle better be from a writer
|
||||
DCHECK((*handle)->is_writer);
|
||||
|
||||
uint8_t* data = getHandleData(cache_.get(), handle->get());
|
||||
size_t cur_charge = reinterpret_cast<TupleCacheEntry*>(data)->charge;
|
||||
if (new_size > cur_charge) {
|
||||
// Need to increase the charge, which can fail
|
||||
// 1. There is a maximum size for any given entry
|
||||
// 2. There is a maximum amount of outstanding writes (i.e. dirty buffers)
|
||||
// The chunk size limits the frequency of incrementing the counter in the cache
|
||||
// itself. The chunk size is disabled for unit tests to have exact counter values.
|
||||
// The chunk size does not impact enforcement of the maximum entry size.
|
||||
|
||||
// An individual entry cannot exceed the MaxSize()
|
||||
if (new_size > MaxSize()) {
|
||||
tuple_cache_halted_->Increment(1);
|
||||
return Status(TErrorCode::TUPLE_CACHE_ENTRY_SIZE_LIMIT_EXCEEDED, MaxSize());
|
||||
}
|
||||
|
||||
size_t new_charge = new_size;
|
||||
if (outstanding_write_chunk_bytes_ != 0) {
|
||||
new_charge = ((new_size / outstanding_write_chunk_bytes_) + 1) *
|
||||
outstanding_write_chunk_bytes_;
|
||||
// The chunk size should not change the behavior of the MaxSize(), so limit the
|
||||
// new_charge to MaxSize() if it would otherwise exceed it.
|
||||
if (new_charge > MaxSize()) {
|
||||
new_charge = MaxSize();
|
||||
}
|
||||
}
|
||||
int64_t diff = new_charge - cur_charge;
|
||||
DCHECK_GT(new_charge, cur_charge);
|
||||
DCHECK_GE(new_charge, new_size);
|
||||
|
||||
// Limit the total outstanding writes to avoid excessive dirty buffers for the OS
|
||||
if (tuple_cache_outstanding_writes_bytes_->GetValue() + diff >
|
||||
outstanding_write_limit_) {
|
||||
tuple_cache_backpressure_halted_->Increment(1);
|
||||
return Status(TErrorCode::TUPLE_CACHE_OUTSTANDING_WRITE_LIMIT_EXCEEDED,
|
||||
outstanding_write_limit_);
|
||||
}
|
||||
UpdateWriteSize(handle->get(), new_charge);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
const char* TupleCacheMgr::GetPath(UniqueHandle& handle) const {
|
||||
DCHECK(enabled_);
|
||||
DCHECK(handle != nullptr && handle->cache_handle != nullptr);
|
||||
@@ -426,9 +580,22 @@ void TupleCacheMgr::EvictedEntry(Slice key, Slice value) {
|
||||
DCHECK(tuple_cache_entries_in_use_bytes_ != nullptr);
|
||||
tuple_cache_entries_evicted_->Increment(1);
|
||||
tuple_cache_entries_in_use_->Increment(-1);
|
||||
tuple_cache_entries_in_use_bytes_->Increment(-entry->size);
|
||||
DCHECK_GE(tuple_cache_entries_in_use_->GetValue(), 0);
|
||||
DCHECK_GE(tuple_cache_entries_in_use_bytes_->GetValue(), 0);
|
||||
// entries_in_use_bytes is incremented only when the entry reaches the
|
||||
// COMPLETE_UNSYNCED state
|
||||
if (TupleCacheState::COMPLETE_UNSYNCED == entry->state ||
|
||||
TupleCacheState::COMPLETE == entry->state) {
|
||||
tuple_cache_entries_in_use_bytes_->Increment(-entry->charge);
|
||||
DCHECK_GE(tuple_cache_entries_in_use_bytes_->GetValue(), 0);
|
||||
}
|
||||
// Outstanding write bytes are accumulated during IN_PROGRESS, and remain set until
|
||||
// the transition from COMPLETE_UNSYNCED to COMPLETE.
|
||||
if (TupleCacheState::COMPLETE_UNSYNCED == entry->state ||
|
||||
TupleCacheState::IN_PROGRESS == entry->state) {
|
||||
DCHECK(tuple_cache_outstanding_writes_bytes_ != nullptr);
|
||||
DCHECK_GE(tuple_cache_outstanding_writes_bytes_->GetValue(), entry->charge);
|
||||
tuple_cache_outstanding_writes_bytes_->Increment(-entry->charge);
|
||||
}
|
||||
} else {
|
||||
DCHECK(tuple_cache_tombstones_in_use_ != nullptr);
|
||||
tuple_cache_tombstones_in_use_->Increment(-1);
|
||||
@@ -465,4 +632,66 @@ Status TupleCacheMgr::DeleteExistingFiles() const {
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void TupleCacheMgr::SyncFileToDisk(const string& cache_key) {
|
||||
Cache::UniqueHandle pos = cache_->Lookup(cache_key);
|
||||
// The entry can be evicted while waiting to be synced to disk. If the entry no longer
|
||||
// exists, there is nothing to do.
|
||||
if (pos == nullptr) return;
|
||||
UniqueHandle handle{new Handle()};
|
||||
handle->cache_handle = move(pos);
|
||||
// If the entry has a state other than COMPLETE_UNSYNCED, it could have been
|
||||
// evicted and recreated. There is nothing to do.
|
||||
if (TupleCacheState::COMPLETE_UNSYNCED != GetState(handle.get())) {
|
||||
return;
|
||||
}
|
||||
bool success = true;
|
||||
// Some unit tests don't create a real file when testing the TupleCacheMgr, so
|
||||
// only do the sync if there is a backing file
|
||||
bool has_backing_file = !(debug_pos_ & DebugPos::NO_FILES);
|
||||
if (has_backing_file) {
|
||||
// Open the cache file associated with this key, then call Sync() on it, and
|
||||
// close it.
|
||||
std::string file_path = GetPath(handle);
|
||||
std::unique_ptr<kudu::RWFile> file_to_sync;
|
||||
kudu::RWFileOptions opts;
|
||||
opts.mode = kudu::Env::OpenMode::MUST_EXIST;
|
||||
kudu::Status s = kudu::Env::Default()->NewRWFile(opts, file_path, &file_to_sync);
|
||||
if (!s.ok()) {
|
||||
LOG(WARNING) << Substitute("SyncFileToDisk: Failed to open file $0: $1", file_path,
|
||||
s.ToString());
|
||||
success = false;
|
||||
} else {
|
||||
s = file_to_sync->Sync();
|
||||
if (!s.ok()) {
|
||||
LOG(WARNING) << Substitute("SyncFileToDisk: Failed to sync file $0: $1",
|
||||
file_path, s.ToString());
|
||||
success = false;
|
||||
}
|
||||
// Close the file even if Sync() fails
|
||||
s = file_to_sync->Close();
|
||||
if (!s.ok()) {
|
||||
LOG(WARNING) << Substitute("SyncFileToDisk: Failed to close file $0: $1",
|
||||
file_path, s.ToString());
|
||||
success = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (success) {
|
||||
bool update_succeeded = UpdateState(handle.get(),
|
||||
TupleCacheState::COMPLETE_UNSYNCED, TupleCacheState::COMPLETE);
|
||||
if (update_succeeded) {
|
||||
tuple_cache_outstanding_writes_bytes_->Increment(-GetCharge(handle.get()));
|
||||
}
|
||||
// Only crash for a failed state change on debug builds. The sync completed
|
||||
// and the state change doesn't really impact external behavior. It isn't
|
||||
// worth crashing on a release build.
|
||||
DCHECK(update_succeeded);
|
||||
} else {
|
||||
// In case of any error, erase this cache entry
|
||||
VLOG_FILE << "Tuple Cache: SyncFileToDisk failed. Evicting " << GetPath(handle);
|
||||
cache_->Erase(cache_->Key(handle->cache_handle));
|
||||
tuple_cache_failed_sync_->Increment(1);
|
||||
}
|
||||
}
|
||||
} // namespace impala
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
#include "runtime/bufferpool/buffer-pool.h"
|
||||
#include "util/cache/cache.h"
|
||||
#include "util/metrics.h"
|
||||
#include "util/thread-pool.h"
|
||||
|
||||
namespace impala {
|
||||
|
||||
@@ -33,6 +34,8 @@ class TupleReader;
|
||||
// Declaration of the debug tuple cache bad postfix constant.
|
||||
extern const char* DEBUG_TUPLE_CACHE_BAD_POSTFIX;
|
||||
|
||||
enum class TupleCacheState;
|
||||
|
||||
/// The TupleCacheMgr maintains per-daemon settings and metadata for the tuple cache.
|
||||
/// This it used by the various TupleCacheNodes from queries to lookup the cache
|
||||
/// entries or write cache entries. The TupleCacheMgr maintains the capacity constraint
|
||||
@@ -58,14 +61,15 @@ public:
|
||||
~TupleCacheMgr() = default;
|
||||
|
||||
// Initialize the TupleCacheMgr. Must be called before any of the other APIs.
|
||||
Status Init() WARN_UNUSED_RESULT;
|
||||
// The process_bytes_limit is used to scale a percentage value for the outstanding
|
||||
// writes limit. If it is set to 0, a percentage value is not allowed.
|
||||
Status Init(int64_t process_bytes_limit = 0) WARN_UNUSED_RESULT;
|
||||
|
||||
/// Enum for metric type.
|
||||
enum class MetricType {
|
||||
HIT,
|
||||
MISS,
|
||||
SKIPPED,
|
||||
HALTED,
|
||||
};
|
||||
|
||||
struct DebugDumpCacheMetaData {
|
||||
@@ -108,6 +112,12 @@ public:
|
||||
// queries.
|
||||
void AbortWrite(UniqueHandle handle, bool tombstone);
|
||||
|
||||
// Request an increase to the outstanding write size. This should be called before
|
||||
// writing more data to a tuple cache file. If the new_size exceeds the maximum size
|
||||
// for a cache entry, this returns TUPLE_CACHE_ENTRY_SIZE_LIMIT_EXCEEDED. This returns
|
||||
// TUPLE_CACHE_OUTSTANDING_WRITE_LIMIT_EXCEEDED if it hits the outstanding writes limit.
|
||||
Status RequestWriteSize(UniqueHandle* handle, size_t new_size);
|
||||
|
||||
/// Get path to read/write.
|
||||
const char* GetPath(UniqueHandle&) const;
|
||||
|
||||
@@ -126,9 +136,6 @@ public:
|
||||
case MetricType::SKIPPED:
|
||||
tuple_cache_skipped_->Increment(1);
|
||||
break;
|
||||
case MetricType::HALTED:
|
||||
tuple_cache_halted_->Increment(1);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -161,35 +168,69 @@ public:
|
||||
TupleCacheMgr& operator=(const TupleCacheMgr&) = delete;
|
||||
|
||||
friend class TupleCacheMgrTest;
|
||||
FRIEND_TEST(TupleCacheMgrTest, TestRequestWriteSize);
|
||||
FRIEND_TEST(TupleCacheMgrTest, TestOutstandingWriteLimit);
|
||||
FRIEND_TEST(TupleCacheMgrTest, TestOutstandingWriteLimitConcurrent);
|
||||
FRIEND_TEST(TupleCacheMgrTest, TestOutstandingWriteChunkSize);
|
||||
FRIEND_TEST(TupleCacheMgrTest, TestDroppedSyncs);
|
||||
|
||||
// Constructor for tests
|
||||
enum DebugPos {
|
||||
FAIL_ALLOCATE = 1 << 0,
|
||||
FAIL_INSERT = 1 << 1,
|
||||
NO_FILES = 1 << 2,
|
||||
};
|
||||
TupleCacheMgr(string cache_config, string eviction_policy_str,
|
||||
MetricGroup* metrics, uint8_t debug_pos);
|
||||
TupleCacheMgr(string cache_config, string eviction_policy_str, MetricGroup* metrics,
|
||||
uint8_t debug_pos, uint32_t sync_pool_size, uint32_t sync_pool_queue_depth,
|
||||
string outstanding_write_limit_str, uint32_t outstanding_write_chunk_bytes);
|
||||
|
||||
// Delete any existing files in the cache directory to start fresh
|
||||
Status DeleteExistingFiles() const;
|
||||
|
||||
// Sync file for cache key to disk
|
||||
void SyncFileToDisk(const std::string& cache_key);
|
||||
|
||||
// Get the current state for a cache handle
|
||||
TupleCacheState GetState(Handle* handle) const;
|
||||
|
||||
// Update a handle's state to newState, verifying that it matches the requredState.
|
||||
// If the update fails, return false.
|
||||
bool UpdateState(Handle* handle, TupleCacheState requiredState,
|
||||
TupleCacheState newState);
|
||||
|
||||
// Get the current charge for this handle.
|
||||
size_t GetCharge(Handle* handle) const;
|
||||
|
||||
// Update the current charge for this handle and adjust the outstanding writes
|
||||
// counter accordingly.
|
||||
void UpdateWriteSize(Handle* handle, size_t charge);
|
||||
|
||||
const std::string cache_config_;
|
||||
const std::string eviction_policy_str_;
|
||||
const std::string outstanding_write_limit_str_;
|
||||
|
||||
std::string cache_dir_;
|
||||
std::string cache_debug_dump_dir_;
|
||||
bool enabled_ = false;
|
||||
uint8_t debug_pos_;
|
||||
uint32_t sync_pool_size_;
|
||||
uint32_t sync_pool_queue_depth_;
|
||||
uint32_t outstanding_write_chunk_bytes_;
|
||||
int64_t outstanding_write_limit_ = 0;
|
||||
|
||||
/// Metrics for the tuple cache in the daemon level.
|
||||
IntCounter* tuple_cache_hits_;
|
||||
IntCounter* tuple_cache_misses_;
|
||||
IntCounter* tuple_cache_skipped_;
|
||||
IntCounter* tuple_cache_halted_;
|
||||
IntCounter* tuple_cache_backpressure_halted_;
|
||||
IntCounter* tuple_cache_entries_evicted_;
|
||||
IntCounter* tuple_cache_failed_sync_;
|
||||
IntCounter* tuple_cache_dropped_sync_;
|
||||
IntGauge* tuple_cache_entries_in_use_;
|
||||
IntGauge* tuple_cache_entries_in_use_bytes_;
|
||||
IntGauge* tuple_cache_tombstones_in_use_;
|
||||
IntGauge* tuple_cache_outstanding_writes_bytes_;
|
||||
|
||||
/// Statistics for the tuple cache sizes allocated.
|
||||
HistogramMetric* tuple_cache_entry_size_stats_;
|
||||
@@ -206,6 +247,9 @@ public:
|
||||
/// An in-memory presentation for metadata of tuple caches for debug verification.
|
||||
/// The key is the key of the tuple cache.
|
||||
std::unordered_map<std::string, DebugDumpCacheMetaData> debug_dump_caches_metadata_;
|
||||
|
||||
/// Thread pool for syncing files to disk
|
||||
std::unique_ptr<ThreadPool<std::string>> sync_thread_pool_;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -38,6 +38,7 @@
|
||||
#include "thirdparty/datasketches/MurmurHash3.h"
|
||||
#include "util/debug-util.h"
|
||||
#include "util/parse-util.h"
|
||||
#include "util/time.h"
|
||||
|
||||
DECLARE_int64(min_buffer_size);
|
||||
|
||||
@@ -55,6 +56,10 @@ DEFINE_bool_hidden(tuple_cache_ignore_query_options, false,
|
||||
"If true, don't compute TQueryOptionsHash for tuple caching to allow testing tuple "
|
||||
"caching failure modes.");
|
||||
|
||||
DEFINE_bool_hidden(tuple_cache_query_options_random_seed, false,
|
||||
"Inject randomness into the TQueryOptionsHash to force zero hits. This is for "
|
||||
"testing only.");
|
||||
|
||||
DEFINE_string_hidden(tuple_cache_exempt_query_options, "",
|
||||
"A comma-separated list of additional query options to exclude from the tuple cache "
|
||||
"key. Option names must be lower-case.");
|
||||
@@ -1511,7 +1516,13 @@ TQueryOptionsHash impala::QueryOptionsResultHash(const TQueryOptions& query_opti
|
||||
exempt = Split(FLAGS_tuple_cache_exempt_query_options, ",", SkipEmpty());
|
||||
}
|
||||
|
||||
HashState hash{QUERY_OPTION_HASH_SEED, QUERY_OPTION_HASH_SEED};
|
||||
uint64_t seed = QUERY_OPTION_HASH_SEED;
|
||||
// To allow testing scenarios with zero hits, the random seed flags incorporates the
|
||||
// current time into the query option hash.
|
||||
if (FLAGS_tuple_cache_query_options_random_seed) {
|
||||
seed = static_cast<uint64_t>(MonotonicNanos());
|
||||
}
|
||||
HashState hash{seed, seed};
|
||||
#define QUERY_OPT_FN(NAME, ENUM, LEVEL) \
|
||||
if (query_options.__isset.NAME && exempt.count(#NAME) == 0) \
|
||||
HashQueryOptionValue(query_options.NAME, hash);
|
||||
|
||||
@@ -494,7 +494,13 @@ error_codes = (
|
||||
|
||||
("TUPLE_CACHE_INCONSISTENCY", 160, "Inconsistent tuple cache found: $0."),
|
||||
|
||||
("OAUTH_VERIFY_FAILED", 161, "Error verifying OAuth Token: $0.")
|
||||
("OAUTH_VERIFY_FAILED", 161, "Error verifying OAuth Token: $0."),
|
||||
|
||||
("TUPLE_CACHE_ENTRY_SIZE_LIMIT_EXCEEDED", 162, "Exceeded the maximum size for a tuple "
|
||||
"cache entry ($0 bytes)"),
|
||||
|
||||
("TUPLE_CACHE_OUTSTANDING_WRITE_LIMIT_EXCEEDED", 163, "Outstanding tuple cache writes "
|
||||
"exceeded the limit ($0 bytes)")
|
||||
)
|
||||
|
||||
import sys
|
||||
|
||||
@@ -119,6 +119,46 @@
|
||||
"kind": "COUNTER",
|
||||
"key": "impala.tuple-cache.halted"
|
||||
},
|
||||
{
|
||||
"description": "The total number of Tuple Cache that halted writing due to backpressure",
|
||||
"contexts": [
|
||||
"IMPALAD"
|
||||
],
|
||||
"label": "Tuple Cache Halted Writing Due to Backpressure",
|
||||
"units": "UNIT",
|
||||
"kind": "COUNTER",
|
||||
"key": "impala.tuple-cache.backpressure-halted"
|
||||
},
|
||||
{
|
||||
"description": "The total number of Tuple Cache syncs to disk that failed",
|
||||
"contexts": [
|
||||
"IMPALAD"
|
||||
],
|
||||
"label": "Tuple Cache Failed Syncs",
|
||||
"units": "UNIT",
|
||||
"kind": "COUNTER",
|
||||
"key": "impala.tuple-cache.failed-syncs"
|
||||
},
|
||||
{
|
||||
"description": "The total number of Tuple Cache syncs to disk dropped due to backpressure",
|
||||
"contexts": [
|
||||
"IMPALAD"
|
||||
],
|
||||
"label": "Tuple Cache Dropped Syncs",
|
||||
"units": "UNIT",
|
||||
"kind": "COUNTER",
|
||||
"key": "impala.tuple-cache.dropped-syncs"
|
||||
},
|
||||
{
|
||||
"description": "The total outstanding bytes of Tuple Cache Writes",
|
||||
"contexts": [
|
||||
"IMPALAD"
|
||||
],
|
||||
"label": "Outstanding Tuple Cache Writes total bytes",
|
||||
"units": "BYTES",
|
||||
"kind": "GAUGE",
|
||||
"key": "impala.tuple-cache.outstanding-writes-bytes"
|
||||
},
|
||||
{
|
||||
"description": "The number of in-use Tuple Cache Entries",
|
||||
"contexts": [
|
||||
|
||||
Reference in New Issue
Block a user