IMPALA-12908: Add correctness check for tuple cache

The patch adds a feature to the automated correctness check for
tuple cache. The purpose of this feature is to enable the
verification of the correctness of the tuple cache by comparing
caches with the same key across different queries.

The feature consists of two main components: cache dumping and
runtime correctness validation.

During the cache dumping phase, if a tuple cache is detected,
we retrieve the cache from the global cache and dump it to a
subdirectory as a reference file within the specified debug
dumping directory. The subdirectory is using the cache key as
its name. Additionally, data from the child is also read and
dumped to a separate file in the same directory. We expect
these two files to be identical, assuming the results are
deterministic. For non-deterministic cases like TOP-N or others,
we may detect them and exclude them from dumping later.
Furthermore, the cache data will be transformed into a
human-readable text format on a row-by-row basis before dumping.
This approach allows for easier investigation and later analysis.

The verification process starts by comparing the entire file
content sharing with the same key. If the content matches, the
verification is considered successful. However, if the content
doesn't match, we enter a slower mode where we compare all the
rows individually. In the slow mode, we will create a hash map
from the reference cache file, then iterate the current cache
file row by row and search if every row exists in the hash map.
Additionally, a counter is integrated into the hash map to
handle scenarios involving duplicated rows. Once verification is
complete, if no discrepancies are found, both files will be removed.
If discrepancies are detected, the files will be kept and appended
with a '.bad' postfix.

New start flags:
Added a starting flag tuple_cache_debug_dump_dir for specifying
the directory for dumping the result caches. if
tuple_cache_debug_dump_dir is empty, the feature is disabled.

Added a query option enable_tuple_cache_verification to enable
or disable the tuple cache verification. Default is true. Only
valid when tuple_cache_debug_dump_dir is specified.

Tests:
Ran the testcase test_tuple_cache_tpc_queries and caught known
inconsistencies.

Change-Id: Ied074e274ebf99fb57e3ee41a13148725775b77c
Reviewed-on: http://gerrit.cloudera.org:8080/21754
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com>
This commit is contained in:
Yida Wu
2023-05-11 13:34:46 -07:00
committed by Joe McDonnell
parent 4c582fc55b
commit f11172a4a2
21 changed files with 1033 additions and 12 deletions

View File

@@ -111,6 +111,9 @@ add_library(Exec
tuple-cache-node.cc tuple-cache-node.cc
tuple-file-reader.cc tuple-file-reader.cc
tuple-file-writer.cc tuple-file-writer.cc
tuple-text-file-reader.cc
tuple-text-file-writer.cc
tuple-text-file-util.cc
union-node.cc union-node.cc
unnest-node.cc unnest-node.cc
) )
@@ -128,6 +131,7 @@ add_library(ExecTests STATIC
incr-stats-util-test.cc incr-stats-util-test.cc
read-write-util-test.cc read-write-util-test.cc
tuple-file-read-write-test.cc tuple-file-read-write-test.cc
tuple-text-file-util-test.cc
zigzag-test.cc zigzag-test.cc
) )
add_dependencies(ExecTests gen-deps) add_dependencies(ExecTests gen-deps)
@@ -138,6 +142,7 @@ ADD_UNIFIED_BE_LSAN_TEST(hash-table-test HashTableTest.*)
ADD_UNIFIED_BE_LSAN_TEST(delimited-text-parser-test DelimitedTextParser.*) ADD_UNIFIED_BE_LSAN_TEST(delimited-text-parser-test DelimitedTextParser.*)
ADD_UNIFIED_BE_LSAN_TEST(read-write-util-test ReadWriteUtil.*) ADD_UNIFIED_BE_LSAN_TEST(read-write-util-test ReadWriteUtil.*)
ADD_UNIFIED_BE_LSAN_TEST(tuple-file-read-write-test TupleFileReadWriteTest.*) ADD_UNIFIED_BE_LSAN_TEST(tuple-file-read-write-test TupleFileReadWriteTest.*)
ADD_UNIFIED_BE_LSAN_TEST(tuple-text-file-util-test TupleTextFileUtilTest.*)
# Exception to unified be tests: Custom main with global Frontend object # Exception to unified be tests: Custom main with global Frontend object
ADD_BE_LSAN_TEST(row-batch-list-test) ADD_BE_LSAN_TEST(row-batch-list-test)
ADD_BE_LSAN_TEST(scratch-tuple-batch-test) ADD_BE_LSAN_TEST(scratch-tuple-batch-test)

View File

@@ -17,10 +17,13 @@
#include <gflags/gflags.h> #include <gflags/gflags.h>
#include "exec/tuple-cache-node.h"
#include "exec/exec-node-util.h" #include "exec/exec-node-util.h"
#include "exec/tuple-cache-node.h"
#include "exec/tuple-file-reader.h" #include "exec/tuple-file-reader.h"
#include "exec/tuple-file-writer.h" #include "exec/tuple-file-writer.h"
#include "exec/tuple-text-file-reader.h"
#include "exec/tuple-text-file-util.h"
#include "exec/tuple-text-file-writer.h"
#include "runtime/exec-env.h" #include "runtime/exec-env.h"
#include "runtime/row-batch.h" #include "runtime/row-batch.h"
#include "runtime/runtime-state.h" #include "runtime/runtime-state.h"
@@ -47,6 +50,11 @@ TupleCacheNode::TupleCacheNode(
TupleCacheNode::~TupleCacheNode() = default; TupleCacheNode::~TupleCacheNode() = default;
static bool TupleCacheVerificationEnabled(RuntimeState* state) {
DCHECK(state != nullptr);
return state->query_options().enable_tuple_cache_verification;
}
Status TupleCacheNode::Prepare(RuntimeState* state) { Status TupleCacheNode::Prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::Prepare(state)); RETURN_IF_ERROR(ExecNode::Prepare(state));
num_hits_counter_ = ADD_COUNTER(runtime_profile(), "NumTupleCacheHits", TUnit::UNIT); num_hits_counter_ = ADD_COUNTER(runtime_profile(), "NumTupleCacheHits", TUnit::UNIT);
@@ -79,14 +87,44 @@ Status TupleCacheNode::Open(RuntimeState* state) {
TupleCacheMgr* tuple_cache_mgr = ExecEnv::GetInstance()->tuple_cache_mgr(); TupleCacheMgr* tuple_cache_mgr = ExecEnv::GetInstance()->tuple_cache_mgr();
handle_ = tuple_cache_mgr->Lookup(combined_key_, true); handle_ = tuple_cache_mgr->Lookup(combined_key_, true);
if (tuple_cache_mgr->IsAvailableForRead(handle_)) { if (tuple_cache_mgr->IsAvailableForRead(handle_)) {
reader_ = make_unique<TupleFileReader>( if (tuple_cache_mgr->DebugDumpEnabled() && TupleCacheVerificationEnabled(state)) {
tuple_cache_mgr->GetPath(handle_), mem_tracker(), runtime_profile()); // We need the original fragment id to construct the path for the reference debug
Status status = reader_->Open(state); // cache file. If it's missing from the metadata, we return an error status
// Clear reader if it's not usable // immediately.
if (!status.ok()) { string org_fragment_id = tuple_cache_mgr->GetFragmentIdForTupleCache(combined_key_);
LOG(WARNING) << "Could not read cache entry for " if (org_fragment_id.empty()) {
<< tuple_cache_mgr->GetPath(handle_); return Status(TErrorCode::TUPLE_CACHE_INCONSISTENCY,
reader_.reset(); Substitute("Metadata of tuple cache '$0' is missing for correctness check",
combined_key_));
}
string ref_sub_dir;
string sub_dir;
string ref_file_path = GetDebugDumpPath(state, org_fragment_id, &ref_sub_dir);
string file_path = GetDebugDumpPath(state, string(), &sub_dir);
DCHECK_EQ(ref_sub_dir, sub_dir);
DCHECK(!ref_sub_dir.empty());
DCHECK(!ref_file_path.empty());
DCHECK(!file_path.empty());
// Create the subdirectory for the debug caches if needed.
RETURN_IF_ERROR(tuple_cache_mgr->CreateDebugDumpSubdir(ref_sub_dir));
// Open the writer for writing the tuple data from the cache entries to be
// the reference cache data.
debug_dump_text_writer_ref_ = make_unique<TupleTextFileWriter>(ref_file_path);
RETURN_IF_ERROR(debug_dump_text_writer_ref_->Open());
// Open the writer for writing the tuple data from children in GetNext() to
// compare with the reference debug cache file.
debug_dump_text_writer_ = make_unique<TupleTextFileWriter>(file_path);
RETURN_IF_ERROR(debug_dump_text_writer_->Open());
} else {
reader_ = make_unique<TupleFileReader>(
tuple_cache_mgr->GetPath(handle_), mem_tracker(), runtime_profile());
Status status = reader_->Open(state);
// Clear reader if it's not usable
if (!status.ok()) {
LOG(WARNING) << "Could not read cache entry for "
<< tuple_cache_mgr->GetPath(handle_);
reader_.reset();
}
} }
} else if (tuple_cache_mgr->IsAvailableForWrite(handle_)) { } else if (tuple_cache_mgr->IsAvailableForWrite(handle_)) {
writer_ = make_unique<TupleFileWriter>(tuple_cache_mgr->GetPath(handle_), writer_ = make_unique<TupleFileWriter>(tuple_cache_mgr->GetPath(handle_),
@@ -123,9 +161,73 @@ Status TupleCacheNode::Open(RuntimeState* state) {
if (!buffer_pool_client()->is_registered()) { if (!buffer_pool_client()->is_registered()) {
RETURN_IF_ERROR(ClaimBufferReservation(state)); RETURN_IF_ERROR(ClaimBufferReservation(state));
} }
return Status::OK(); return Status::OK();
} }
// Helper function to rename the bad file.
static void MoveBadDebugCacheFile(const string& file_path) {
DCHECK(!file_path.empty());
string new_path = file_path + DEBUG_TUPLE_CACHE_BAD_POSTFIX;
int result = rename(file_path.c_str(), new_path.c_str());
if (result != 0) {
string error_msg = GetStrErrMsg();
LOG(ERROR) << "Failed to move debug tuple cache file from " << file_path << " to "
<< new_path << ". Error message: " << error_msg << ": " << result;
} else {
LOG(INFO) << "Moved bad debug tuple cache file from " << file_path << " to "
<< new_path;
}
}
// Move the debug dump cache after verification.
// If the verification passed, we clear the cache file.
// Otherwise, we will move the file with a "bad" postfix.
// The writer will be reset after the function.
static void MoveDebugCache(bool suc, unique_ptr<TupleTextFileWriter>& writer) {
DCHECK(writer != nullptr);
if (suc) {
writer->Delete();
} else {
MoveBadDebugCacheFile(writer->GetPath());
}
writer.reset();
}
Status TupleCacheNode::VerifyAndMoveDebugCache(RuntimeState* state) {
DCHECK(debug_dump_text_writer_ref_ != nullptr);
DCHECK(ExecEnv::GetInstance()->tuple_cache_mgr()->DebugDumpEnabled());
DCHECK(TupleCacheVerificationEnabled(state));
if (debug_dump_text_writer_->IsEmpty()) {
return Status::OK();
}
string ref_file_path = debug_dump_text_writer_ref_->GetPath();
string dump_file_path = debug_dump_text_writer_->GetPath();
bool passed = false;
DCHECK(!ref_file_path.empty());
DCHECK(!dump_file_path.empty());
VLOG_FILE << "Verify debug tuple cache file ref_file_path: " << ref_file_path
<< " and dump_file_path: " << dump_file_path
<< " with cache key:" << combined_key_;
// Fast path to verify the cache.
Status verify_status =
TupleTextFileUtil::VerifyDebugDumpCache(dump_file_path, ref_file_path, &passed);
if (verify_status.ok() && !passed) {
// Slow path to compare all rows in an order-insensitive way if the files are not the
// same.
verify_status = TupleTextFileUtil::VerifyRows(ref_file_path, dump_file_path);
passed = verify_status.ok();
}
// Move or clear the file after verification.
MoveDebugCache(passed, debug_dump_text_writer_ref_);
MoveDebugCache(passed, debug_dump_text_writer_);
return verify_status;
}
Status TupleCacheNode::GetNext( Status TupleCacheNode::GetNext(
RuntimeState* state, RowBatch* output_row_batch, bool* eos) { RuntimeState* state, RowBatch* output_row_batch, bool* eos) {
SCOPED_TIMER(runtime_profile()->total_time_counter()); SCOPED_TIMER(runtime_profile()->total_time_counter());
@@ -188,6 +290,15 @@ Status TupleCacheNode::GetNext(
size_t bytes_written = writer_->BytesWritten(); size_t bytes_written = writer_->BytesWritten();
Status status = writer_->Commit(state); Status status = writer_->Commit(state);
if (status.ok()) { if (status.ok()) {
if (tuple_cache_mgr->DebugDumpEnabled()) {
// Store the metadata whenever the debug dump path is set, regardless of
// whether the correctness verification is enabled in the query option. This
// is because the tuple cache eviction function does not account for the query
// option when removing metadata. Keeping this consistent ensures proper
// handling.
tuple_cache_mgr->StoreMetadataForTupleCache(
combined_key_, PrintId(state->fragment_instance_id()));
}
tuple_cache_mgr->CompleteWrite(move(handle_), bytes_written); tuple_cache_mgr->CompleteWrite(move(handle_), bytes_written);
} else { } else {
writer_->Abort(); writer_->Abort();
@@ -196,6 +307,10 @@ Status TupleCacheNode::GetNext(
writer_.reset(); writer_.reset();
} }
} }
if (debug_dump_text_writer_) {
RETURN_IF_ERROR(debug_dump_text_writer_->Write(output_row_batch));
if (*eos) debug_dump_text_writer_->Commit();
}
} }
// Note: TupleCacheNode does not alter its child's output (or the equivalent // Note: TupleCacheNode does not alter its child's output (or the equivalent
@@ -205,6 +320,28 @@ Status TupleCacheNode::GetNext(
int num_rows_added = output_row_batch->num_rows() - num_rows_before; int num_rows_added = output_row_batch->num_rows() - num_rows_before;
DCHECK_GE(num_rows_added, 0); DCHECK_GE(num_rows_added, 0);
IncrementNumRowsReturned(num_rows_added); IncrementNumRowsReturned(num_rows_added);
if (*eos && debug_dump_text_writer_) {
DCHECK(debug_dump_text_writer_ref_ != nullptr);
TupleFileReader cache_reader(
ExecEnv::GetInstance()->tuple_cache_mgr()->GetPath(handle_), mem_tracker(),
runtime_profile());
RETURN_IF_ERROR(cache_reader.Open(state));
// Read the cache entries from the cache reader, and write as the reference
// debug cache file. If an error occurs, abort the verification, and return the
// error status.
RowBatch row_batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
bool cache_eos = false;
while (!cache_eos) {
RETURN_IF_ERROR(
cache_reader.GetNext(state, buffer_pool_client(), &row_batch, &cache_eos));
DCHECK(row_batch.num_rows() > 0 || cache_eos);
RETURN_IF_ERROR(debug_dump_text_writer_ref_->Write(&row_batch));
row_batch.Reset();
}
DCHECK(cache_eos);
debug_dump_text_writer_ref_->Commit();
RETURN_IF_ERROR(VerifyAndMoveDebugCache(state));
}
COUNTER_SET(rows_returned_counter_, rows_returned()); COUNTER_SET(rows_returned_counter_, rows_returned());
return Status::OK(); return Status::OK();
} }
@@ -232,10 +369,33 @@ void TupleCacheNode::Close(RuntimeState* state) {
writer_->Abort(); writer_->Abort();
tuple_cache_mgr->AbortWrite(move(handle_), false); tuple_cache_mgr->AbortWrite(move(handle_), false);
} }
if (debug_dump_text_writer_) {
debug_dump_text_writer_->Delete();
debug_dump_text_writer_.reset();
}
if (debug_dump_text_writer_ref_) {
debug_dump_text_writer_ref_->Delete();
debug_dump_text_writer_ref_.reset();
}
ReleaseResult(); ReleaseResult();
ExecNode::Close(state); ExecNode::Close(state);
} }
string TupleCacheNode::GetDebugDumpPath(const RuntimeState* state,
const string& org_fragment_id, string* sub_dir_full_path) const {
// The name of the subdirectory is hash key.
// For non-reference files, the file name is the fragment instance id.
// For reference files, the name includes the current fragment instance id, the original
// fragment id, and a "ref" suffix.
string file_name = PrintId(state->fragment_instance_id());
if (!org_fragment_id.empty()) {
// Adds the original fragment id of the cache to the path for debugging purpose.
file_name += "_" + org_fragment_id + "_ref";
}
return ExecEnv::GetInstance()->tuple_cache_mgr()->GetDebugDumpPath(
combined_key_, file_name, sub_dir_full_path);
}
void TupleCacheNode::DebugString(int indentation_level, stringstream* out) const { void TupleCacheNode::DebugString(int indentation_level, stringstream* out) const {
*out << string(indentation_level * 2, ' '); *out << string(indentation_level * 2, ' ');
*out << "TupleCacheNode(" << combined_key_; *out << "TupleCacheNode(" << combined_key_;

View File

@@ -26,6 +26,7 @@ namespace impala {
class TupleFileReader; class TupleFileReader;
class TupleFileWriter; class TupleFileWriter;
class TupleTextFileWriter;
class TupleCachePlanNode : public PlanNode { class TupleCachePlanNode : public PlanNode {
public: public:
@@ -83,6 +84,17 @@ private:
TupleCacheMgr::UniqueHandle handle_; TupleCacheMgr::UniqueHandle handle_;
std::unique_ptr<TupleFileReader> reader_; std::unique_ptr<TupleFileReader> reader_;
std::unique_ptr<TupleFileWriter> writer_; std::unique_ptr<TupleFileWriter> writer_;
std::unique_ptr<TupleTextFileWriter> debug_dump_text_writer_;
std::unique_ptr<TupleTextFileWriter> debug_dump_text_writer_ref_;
/// Helper function to generate the path for debug dumping the tuple cache.
/// If sub_dir_full_path is not nullptr, the subdirectory path will be returned,
/// allowing the caller to create the subdirectory if necessary.
string GetDebugDumpPath(const RuntimeState* state, const string& fragment_id,
string* sub_dir_full_path = nullptr) const;
/// Helper function to verify the correctness of the debug tuple cache.
Status VerifyAndMoveDebugCache(RuntimeState* state);
}; };
} }

View File

@@ -0,0 +1,74 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "tuple-text-file-reader.h"
#include "common/names.h"
namespace impala {
TupleTextFileReader::TupleTextFileReader(const std::string& path) : path_(path) {}
Status TupleTextFileReader::Open() {
if (!reader_.is_open()) {
reader_.open(path_, std::ios::in | std::ios::binary);
if (!reader_.is_open()) {
return Status(TErrorCode::DISK_IO_ERROR,
"open tuple text reader on " + path_ + " failed", GetStrErrMsg());
}
reader_.seekg(0, std::ios::end);
file_size_ = reader_.tellg();
reader_.seekg(0, std::ios::beg);
}
return Status();
}
void TupleTextFileReader::Close() {
if (reader_.is_open()) {
reader_.close();
}
}
int TupleTextFileReader::GetFileSize() const {
if (!reader_.is_open()) return TUPLE_TEXT_FILE_SIZE_ERROR;
return file_size_;
}
void TupleTextFileReader::Rewind() {
if (reader_.is_open()) {
reader_.clear();
reader_.seekg(0, std::ios::beg);
}
}
Status TupleTextFileReader::GetNext(string* output, bool* eos) {
DCHECK(output != nullptr);
DCHECK(eos != nullptr);
DCHECK(reader_.is_open());
if (reader_.eof()) {
*eos = true;
return Status::OK();
}
getline(reader_, *output);
if (!reader_.eof() && !reader_.good()) {
return Status(TErrorCode::DISK_IO_ERROR, "tuple reader on " + path_, GetStrErrMsg());
}
*eos = false;
return Status::OK();
}
} // namespace impala

View File

@@ -0,0 +1,63 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <fstream>
#include "common/status.h"
namespace impala {
static const int TUPLE_TEXT_FILE_SIZE_ERROR = -1;
/// The TupleTextFileReader is responsible for reading text files created by
/// TupleTextFileWriter line by line. It opens the specified file and allows
/// for sequential access to the content. This class does not support
/// multithreaded access, meaning that all operations should be performed
/// in a single thread to avoid unexpected behaviors.
class TupleTextFileReader {
public:
TupleTextFileReader(const std::string& path);
~TupleTextFileReader() {}
// Opens the file for reading.
Status Open();
// Closes the file if it is open.
void Close();
// Returns the size of the file in bytes.
int GetFileSize() const;
// Resets the stream position to the beginning of the file.
// Should be called before GetNext() if someone wants to ensure starting
// reading from the beginning.
void Rewind();
// Reads one line from the file.
Status GetNext(string* output, bool* eos);
const string& GetPath() const { return path_; }
private:
// Destination path.
const std::string path_;
size_t file_size_ = 0;
std::ifstream reader_;
};
} // namespace impala

View File

@@ -0,0 +1,158 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <fstream>
#include <gtest/gtest.h>
#include "exec/tuple-text-file-util.h"
#include "runtime/test-env.h"
#include "util/filesystem-util.h"
using namespace std;
namespace impala {
class TupleTextFileUtilTest : public ::testing::Test {
protected:
void SetUp() override {
temp_dir_ = "/tmp/tuple_text_file_util_test";
ASSERT_OK(FileSystemUtil::RemoveAndCreateDirectory(temp_dir_));
}
void TearDown() override {
vector<string> temp_dirs;
temp_dirs.push_back(temp_dir_);
ASSERT_OK(FileSystemUtil::RemovePaths(temp_dirs));
}
string CreateTestFile(const string& filename, const vector<string>& lines) {
string full_path = temp_dir_ + "/" + filename;
ofstream file(full_path);
for (const auto& line : lines) {
file << line << endl;
}
file.close();
return full_path;
}
string temp_dir_;
};
TEST_F(TupleTextFileUtilTest, VerifyDebugDumpCache_IdenticalFiles) {
vector<string> lines = {"Line 1", "Line 2", "Line 3"};
string file1 = CreateTestFile("file1.txt", lines);
string file2 = CreateTestFile("file2.txt", lines);
bool passed = false;
ASSERT_OK(TupleTextFileUtil::VerifyDebugDumpCache(file1, file2, &passed));
EXPECT_TRUE(passed);
}
TEST_F(TupleTextFileUtilTest, VerifyDebugDumpCache_DifferentFiles) {
vector<string> lines1 = {"Line 1", "Line 2", "Line 3"};
vector<string> lines2 = {"Line 1", "Different Line", "Line 3"};
string file1 = CreateTestFile("file1.txt", lines1);
string file2 = CreateTestFile("file2.txt", lines2);
bool passed = false;
Status status = TupleTextFileUtil::VerifyDebugDumpCache(file1, file2, &passed);
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.code(), TErrorCode::TUPLE_CACHE_INCONSISTENCY);
EXPECT_FALSE(passed);
}
TEST_F(TupleTextFileUtilTest, VerifyDebugDumpCache_DifferentOrders) {
vector<string> lines1 = {"Line 1", "Line 2", "Line 3"};
vector<string> lines2 = {"Line 1", "Line 3", "Line 2"};
string file1 = CreateTestFile("file1.txt", lines1);
string file2 = CreateTestFile("file2.txt", lines2);
bool passed = true;
ASSERT_OK(TupleTextFileUtil::VerifyDebugDumpCache(file1, file2, &passed));
EXPECT_FALSE(passed);
}
TEST_F(TupleTextFileUtilTest, VerifyDebugDumpCache_NonexistentFile) {
vector<string> lines = {"Line 1", "Line 2", "Line 3"};
string file1 = CreateTestFile("file1.txt", lines);
string file2 = temp_dir_ + "/nonexistent.txt";
bool passed = true;
Status status = TupleTextFileUtil::VerifyDebugDumpCache(file1, file2, &passed);
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.code(), TErrorCode::TUPLE_CACHE_INCONSISTENCY);
}
TEST_F(TupleTextFileUtilTest, VerifyRows_IdenticalFiles) {
vector<string> lines = {"Row 1", "Row 2", "Row 3"};
string ref_file = CreateTestFile("ref.txt", lines);
string cmp_file = CreateTestFile("cmp.txt", lines);
ASSERT_OK(TupleTextFileUtil::VerifyRows(cmp_file, ref_file));
ASSERT_OK(TupleTextFileUtil::VerifyRows(ref_file, cmp_file));
}
TEST_F(TupleTextFileUtilTest, VerifyRows_DifferentRowCount) {
vector<string> ref_lines = {"Row 1", "Row 2", "Row 3"};
vector<string> cmp_lines = {"Row 1", "Row 2"};
string ref_file = CreateTestFile("ref.txt", ref_lines);
string cmp_file = CreateTestFile("cmp.txt", cmp_lines);
Status status = TupleTextFileUtil::VerifyRows(cmp_file, ref_file);
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.code(), TErrorCode::TUPLE_CACHE_INCONSISTENCY);
status = TupleTextFileUtil::VerifyRows(ref_file, cmp_file);
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.code(), TErrorCode::TUPLE_CACHE_INCONSISTENCY);
}
TEST_F(TupleTextFileUtilTest, VerifyRows_DifferentContent) {
vector<string> ref_lines = {"Row 1", "Row 2", "Row 3"};
vector<string> cmp_lines = {"Row 1", "Different Row", "Row 3"};
string ref_file = CreateTestFile("ref.txt", ref_lines);
string cmp_file = CreateTestFile("cmp.txt", cmp_lines);
Status status = TupleTextFileUtil::VerifyRows(cmp_file, ref_file);
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.code(), TErrorCode::TUPLE_CACHE_INCONSISTENCY);
status = TupleTextFileUtil::VerifyRows(ref_file, cmp_file);
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.code(), TErrorCode::TUPLE_CACHE_INCONSISTENCY);
}
TEST_F(TupleTextFileUtilTest, VerifyRows_DifferentOrders) {
vector<string> ref_lines = {"Row 1", "Row 2", "Row 3"};
vector<string> cmp_lines = {"Row 1", "Row 3", "Row 2"};
string ref_file = CreateTestFile("ref.txt", ref_lines);
string cmp_file = CreateTestFile("cmp.txt", cmp_lines);
ASSERT_OK(TupleTextFileUtil::VerifyRows(cmp_file, ref_file));
ASSERT_OK(TupleTextFileUtil::VerifyRows(ref_file, cmp_file));
}
TEST_F(TupleTextFileUtilTest, VerifyRows_NonexistentFile) {
vector<string> lines = {"Row 1", "Row 2", "Row 3"};
string ref_file = CreateTestFile("ref.txt", lines);
string cmp_file = temp_dir_ + "/nonexistent.txt";
Status status = TupleTextFileUtil::VerifyRows(cmp_file, ref_file);
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.code(), TErrorCode::DISK_IO_ERROR);
status = TupleTextFileUtil::VerifyRows(ref_file, cmp_file);
EXPECT_FALSE(status.ok());
EXPECT_EQ(status.code(), TErrorCode::DISK_IO_ERROR);
}
} // namespace impala

View File

@@ -0,0 +1,193 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <unordered_map>
#include "exec/tuple-text-file-reader.h"
#include "exec/tuple-text-file-util.h"
#include "common/names.h"
using std::unordered_map;
namespace impala {
const char* DEBUG_TUPLE_CACHE_BAD_POSTFIX = ".bad";
struct ReferenceRowCount {
uint64_t reference_rows_count;
uint64_t comparison_rows_count;
ReferenceRowCount() : reference_rows_count(0), comparison_rows_count(0) {}
void setCounts(uint64_t ref_rows_cnt, uint64_t cmp_rows_cnt) {
reference_rows_count = ref_rows_cnt;
comparison_rows_count = cmp_rows_cnt;
}
};
typedef unordered_map<string, ReferenceRowCount> ReferenceRowsMap;
typedef std::function<Status(const string&)> TupleCacheRowFunction;
// Applies fn to each row read from reader. Returns the number of rows read.
static Status ForEachRow(
TupleTextFileReader* reader, TupleCacheRowFunction& fn, int64_t* row_count) {
DCHECK(reader != nullptr);
DCHECK(row_count != nullptr);
bool eos = false;
int64_t num_rows = 0;
Status read_status;
string row_str;
do {
RETURN_IF_ERROR(reader->GetNext(&row_str, &eos));
RETURN_IF_ERROR(fn(row_str));
num_rows++;
} while (!eos);
*row_count = num_rows;
return Status::OK();
}
static Status VerifyRowsFromComparisonFile(const string& ref_file_path,
ReferenceRowsMap& cache, TupleTextFileReader* reader, int64_t* row_count) {
TupleCacheRowFunction verify_fn = [&ref_file_path, &cache, reader](const string& str) {
auto iter = cache.find(str);
if (iter == cache.end()) {
return Status(TErrorCode::TUPLE_CACHE_INCONSISTENCY,
Substitute("Result '$0' of file '$1' doesn't exist in the reference file: '$2'",
str, reader->GetPath() + DEBUG_TUPLE_CACHE_BAD_POSTFIX,
ref_file_path + DEBUG_TUPLE_CACHE_BAD_POSTFIX));
}
iter->second.comparison_rows_count++;
return Status::OK();
};
return ForEachRow(reader, verify_fn, row_count);
}
static Status InsertRowsFromReferenceFile(
ReferenceRowsMap& cache, TupleTextFileReader* reader, int64_t* row_count) {
TupleCacheRowFunction insert_fn = [&cache](const string& str) {
cache[str].reference_rows_count++;
return Status::OK();
};
return ForEachRow(reader, insert_fn, row_count);
}
static Status VerifyRowCount(ReferenceRowsMap& cache) {
for (const auto& it : cache) {
DCHECK_GE(it.second.reference_rows_count, 0);
if (it.second.reference_rows_count == 0) {
return Status(TErrorCode::TUPLE_CACHE_INCONSISTENCY,
Substitute("Row count abnormal for key '$0', '$1' vs '$2'", it.first,
it.second.reference_rows_count, it.second.comparison_rows_count));
}
if (it.second.reference_rows_count != it.second.comparison_rows_count) {
return Status(TErrorCode::TUPLE_CACHE_INCONSISTENCY,
Substitute("Row count doesn't match for key '$0', '$1' vs '$2'", it.first,
it.second.reference_rows_count, it.second.comparison_rows_count));
}
}
return Status::OK();
}
Status TupleTextFileUtil::VerifyRows(
const string& cmp_file_path, const string& ref_file_path) {
ReferenceRowsMap cache;
int64_t ref_row_count = 0;
int64_t cmp_row_count = 0;
{
TupleTextFileReader ref_reader(ref_file_path);
RETURN_IF_ERROR(ref_reader.Open());
RETURN_IF_ERROR(InsertRowsFromReferenceFile(cache, &ref_reader, &ref_row_count));
}
// Verify all the rows.
{
TupleTextFileReader cmp_reader(cmp_file_path);
RETURN_IF_ERROR(cmp_reader.Open());
RETURN_IF_ERROR(
VerifyRowsFromComparisonFile(ref_file_path, cache, &cmp_reader, &cmp_row_count));
}
// Verify all the row counts.
if (ref_row_count != cmp_row_count) {
return Status(TErrorCode::TUPLE_CACHE_INCONSISTENCY,
Substitute(
"Row count different. Reference file '$0': '$1', comparison file '$2': '$3'",
ref_file_path + DEBUG_TUPLE_CACHE_BAD_POSTFIX, ref_row_count,
cmp_file_path + DEBUG_TUPLE_CACHE_BAD_POSTFIX, cmp_row_count));
}
return VerifyRowCount(cache);
}
static Status CacheFileCmp(
const std::string& path_a, const std::string& path_b, bool* passed) {
DCHECK(passed != nullptr);
*passed = false;
// Create readers for the two files.
TupleTextFileReader reader_a(path_a);
TupleTextFileReader reader_b(path_b);
// Open both files.
if (!reader_a.Open().ok()) {
return Status(TErrorCode::TUPLE_CACHE_INCONSISTENCY,
Substitute("Failed to open file '$0'", path_a + DEBUG_TUPLE_CACHE_BAD_POSTFIX));
}
if (!reader_b.Open().ok()) {
return Status(TErrorCode::TUPLE_CACHE_INCONSISTENCY,
Substitute("Failed to open file '$0'", path_b + DEBUG_TUPLE_CACHE_BAD_POSTFIX));
}
// Compare file sizes.
int file1_length = reader_a.GetFileSize();
int file2_length = reader_b.GetFileSize();
if (file1_length != file2_length || file1_length == TUPLE_TEXT_FILE_SIZE_ERROR) {
return Status(TErrorCode::TUPLE_CACHE_INCONSISTENCY,
Substitute("Size of file '$0' (size: $1) and '$2' (size: $3) are different",
path_a + DEBUG_TUPLE_CACHE_BAD_POSTFIX, file1_length,
path_b + DEBUG_TUPLE_CACHE_BAD_POSTFIX, file2_length));
}
// Reset readers to the beginning of the files.
reader_a.Rewind();
reader_b.Rewind();
// Compare the content line by line.
string line_a, line_b;
bool eos_a = false, eos_b = false;
*passed = true;
while (true) {
// Read the next line from each file.
RETURN_IF_ERROR(reader_a.GetNext(&line_a, &eos_a));
RETURN_IF_ERROR(reader_b.GetNext(&line_b, &eos_b));
// If both files reached the end, the comparison is complete.
if (eos_a && eos_b) break;
DCHECK_EQ(eos_a, eos_b);
// If the lines differ, the files are not identical.
if (line_a != line_b) {
*passed = false;
break;
}
}
return Status::OK();
}
Status TupleTextFileUtil::VerifyDebugDumpCache(
const string& file_name, const string& ref_file_name, bool* passed) {
DCHECK(passed != nullptr);
*passed = true;
DCHECK(!ref_file_name.empty());
DCHECK(!file_name.empty());
// This is the fast path to compare the content of the files, passed will be set
// to true if all contents are the same.
return CacheFileCmp(ref_file_name, file_name, passed);
}
} // namespace impala

View File

@@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "common/status.h"
namespace impala {
/// Utility class for verifying the tuple text files.
class TupleTextFileUtil {
public:
/// Verify whether two debug dump caches are the same.
/// If the files are with different sizes or not able to open the files, will return
/// an error status to stop the query.
/// If the files content are different, return status Ok but set the passed as false
/// to allow further comparison.
/// If files are the same or there is no need for a comparison, like verification is
/// off or cache doesn't exist, return status Ok and set the passed as true.
static Status VerifyDebugDumpCache(
const std::string& file_name, const std::string& ref_file_name, bool* passed);
/// Slow path to varify the debug dump caches row by row.
static Status VerifyRows(const string& cmp_file_path, const string& ref_file_path);
};
} // namespace impala

View File

@@ -0,0 +1,81 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/tuple-text-file-writer.h"
#include "common/logging.h"
#include "common/names.h"
#include "kudu/util/env.h"
#include "runtime/row-batch.h"
#include "util/debug-util.h"
namespace impala {
TupleTextFileWriter::TupleTextFileWriter(std::string path)
: path_(std::move(path)), bytes_written_(0) {}
TupleTextFileWriter::~TupleTextFileWriter() {
if (writer_.is_open()) writer_.close();
}
Status TupleTextFileWriter::Open() {
VLOG_FILE << "Tuple Cache Debug: Opening " << path_ << " for text writing";
writer_.open(path_, std::ios::out | std::ios::binary | std::ios::trunc);
if (!writer_.is_open() || writer_.fail()) {
return Status(TErrorCode::DISK_IO_ERROR,
"Open tuple text writer on " + path_ + " failed", GetStrErrMsg());
}
return Status::OK();
}
Status TupleTextFileWriter::Write(RowBatch* row_batch) {
FOREACH_ROW(row_batch, 0, build_batch_iter) {
TupleRow* build_row = build_batch_iter.Get();
DCHECK(build_row != nullptr);
// Convert to the human-readable representation of the row.
std::string row_str = PrintRow(build_row, *(row_batch->row_desc())) + '\n';
writer_ << row_str;
bytes_written_ += row_str.size();
}
return Status::OK();
}
void TupleTextFileWriter::Delete() {
if (writer_.is_open()) writer_.close();
// Delete the file directly using the actual path.
kudu::Status status = kudu::Env::Default()->DeleteFile(path_);
if (!status.ok()) {
LOG(WARNING) << Substitute("Failed to delete $0: $1", path_, status.ToString());
}
}
void TupleTextFileWriter::Commit() {
DCHECK(writer_.is_open());
writer_.close();
if (writer_.fail()) {
// An error occurred while writing or closing the file.
string err_msg = GetStrErrMsg();
LOG(WARNING) << "Failed to flush " << path_ << ": " << err_msg;
return;
}
LOG(INFO) << "Tuple Cache Debug: Commit completed successfully for " << path_;
}
bool TupleTextFileWriter::IsEmpty() const {
return BytesWritten() == 0;
}
} // namespace impala

View File

@@ -0,0 +1,73 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <fstream>
#include "common/status.h"
namespace impala {
class RowBatch;
/// The TupleTextFileWriter is responsible for serializing a stream of RowBatches to a
/// local text file for the tuple cache. It uses a human-readable format to represent
/// the rows, making the data accessible and easy to interpret.
///
/// The TupleTextFileWriter writes to a specified location. Commit() is to ensure that
/// the file persists beyond the life of the writer.
/// If the TupleTextFileWriter is destructed without calling Commit(), it automatically
/// persists the file.
///
/// This class does not support multithreaded access, meaning that all operations should
/// be performed in a single thread to avoid unexpected behaviors.
class TupleTextFileWriter {
public:
TupleTextFileWriter(std::string path);
~TupleTextFileWriter();
Status Open();
// Writes a row batch to file. This holds no references to memory from the RowBatch.
// If Write() returns a non-OK Status, it is not recoverable and the caller should not
// call Write() or Commit().
Status Write(RowBatch* row_batch);
// Return true if nothing is written.
bool IsEmpty() const;
// Number of bytes written to file. Must be called before Commit/Delete.
size_t BytesWritten() const { return bytes_written_; }
// Stop writing and delete written data.
void Delete();
// Ensure data is available for future reads.
void Commit();
const std::string GetPath() const { return path_; }
private:
// Destination path
const std::string path_;
// Bytes of written data
size_t bytes_written_;
std::ofstream writer_;
};
} // namespace impala

View File

@@ -20,6 +20,8 @@
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
#include "common/logging.h" #include "common/logging.h"
#include "exec/tuple-file-reader.h"
#include "exec/tuple-text-file-reader.h"
#include "gutil/strings/split.h" #include "gutil/strings/split.h"
#include "gutil/strings/substitute.h" #include "gutil/strings/substitute.h"
#include "kudu/util/env.h" #include "kudu/util/env.h"
@@ -49,6 +51,9 @@ DEFINE_string(tuple_cache, "", "The configuration string for the tuple cache. "
DEFINE_string(tuple_cache_eviction_policy, "LRU", DEFINE_string(tuple_cache_eviction_policy, "LRU",
"(Advanced) The cache eviction policy to use for the tuple cache. " "(Advanced) The cache eviction policy to use for the tuple cache. "
"Either 'LRU' (default) or 'LIRS' (experimental)"); "Either 'LRU' (default) or 'LIRS' (experimental)");
DEFINE_string(tuple_cache_debug_dump_dir, "",
"Directory for dumping the intermediate query result tuples for debugging purpose.");
// Global feature flag for tuple caching. If false, enable_tuple_cache cannot be true // Global feature flag for tuple caching. If false, enable_tuple_cache cannot be true
// and the coordinator cannot produce plans with TupleCacheNodes. The tuple_cache // and the coordinator cannot produce plans with TupleCacheNodes. The tuple_cache
// parameter also cannot be specified. // parameter also cannot be specified.
@@ -65,6 +70,25 @@ static const char* MIN_TUPLE_CACHE_CAPACITY_STR = "64MB";
static constexpr int64_t STATS_MAX_TUPLE_CACHE_ENTRY_SIZE = 128L << 20; static constexpr int64_t STATS_MAX_TUPLE_CACHE_ENTRY_SIZE = 128L << 20;
static const char* CACHE_FILE_PREFIX = "tuple-cache-"; static const char* CACHE_FILE_PREFIX = "tuple-cache-";
static const char* DUBUG_DUMP_SUB_DIR_NAME = "tuple-cache-debug-dump";
static string ConstructTupleCacheDebugDumpPath() {
// Construct and return the path for debug dumping the tuple cache if configured.
string& cache_debug_dump_dir = FLAGS_tuple_cache_debug_dump_dir;
if (cache_debug_dump_dir != "") {
filesystem::path path(cache_debug_dump_dir);
path /= DUBUG_DUMP_SUB_DIR_NAME;
// Remove and recreate the subdirectory if it exists.
Status cr_status = FileSystemUtil::RemoveAndCreateDirectory(path.string());
if (cr_status.ok()) {
LOG(INFO) << "Created tuple cache debug dump path in " << path.string();
return path.string();
}
LOG(WARNING) << "Unable to create directory for tuple cache dumping: "
<< cr_status.GetDetail();
}
return string();
}
TupleCacheMgr::TupleCacheMgr(MetricGroup* metrics) TupleCacheMgr::TupleCacheMgr(MetricGroup* metrics)
: TupleCacheMgr(FLAGS_tuple_cache, FLAGS_tuple_cache_eviction_policy, metrics, 0) {} : TupleCacheMgr(FLAGS_tuple_cache, FLAGS_tuple_cache_eviction_policy, metrics, 0) {}
@@ -73,6 +97,7 @@ TupleCacheMgr::TupleCacheMgr(string cache_config, string eviction_policy_str,
MetricGroup* metrics, uint8_t debug_pos) MetricGroup* metrics, uint8_t debug_pos)
: cache_config_(move(cache_config)), : cache_config_(move(cache_config)),
eviction_policy_str_(move(eviction_policy_str)), eviction_policy_str_(move(eviction_policy_str)),
cache_debug_dump_dir_(ConstructTupleCacheDebugDumpPath()),
debug_pos_(debug_pos), debug_pos_(debug_pos),
tuple_cache_hits_(metrics->AddCounter("impala.tuple-cache.hits", 0)), tuple_cache_hits_(metrics->AddCounter("impala.tuple-cache.hits", 0)),
tuple_cache_misses_(metrics->AddCounter("impala.tuple-cache.misses", 0)), tuple_cache_misses_(metrics->AddCounter("impala.tuple-cache.misses", 0)),
@@ -348,6 +373,51 @@ const char* TupleCacheMgr::GetPath(UniqueHandle& handle) const {
return reinterpret_cast<const char*>(data + sizeof(TupleCacheEntry)); return reinterpret_cast<const char*>(data + sizeof(TupleCacheEntry));
} }
Status TupleCacheMgr::CreateDebugDumpSubdir(const string& sub_dir) {
DCHECK(!sub_dir.empty());
bool path_exists = false;
std::lock_guard<std::mutex> l(debug_dump_subdir_lock_);
// Try to create the subdir if it doesn't already exist.
RETURN_IF_ERROR(FileSystemUtil::PathExists(sub_dir, &path_exists));
if (!path_exists) {
RETURN_IF_ERROR(FileSystemUtil::CreateDirectory(sub_dir));
}
return Status::OK();
}
string TupleCacheMgr::GetDebugDumpPath(
const string& sub_dir, const string& file_name, string* sub_dir_full_path) const {
filesystem::path full_path(cache_debug_dump_dir_);
full_path /= sub_dir;
if (sub_dir_full_path != nullptr) *sub_dir_full_path = full_path.string();
full_path /= file_name;
return full_path.string();
}
void TupleCacheMgr::StoreMetadataForTupleCache(
const string& cache_key, const string& fragment_id) {
std::lock_guard<std::mutex> l(debug_dump_lock_);
DebugDumpCacheMetaData& cache = debug_dump_caches_metadata_[cache_key];
cache.fragment_id = fragment_id;
}
string TupleCacheMgr::GetFragmentIdForTupleCache(const string& cache_key) {
std::lock_guard<std::mutex> l(debug_dump_lock_);
auto iter = debug_dump_caches_metadata_.find(cache_key);
if (iter == debug_dump_caches_metadata_.end()) {
return string();
}
return iter->second.fragment_id;
}
void TupleCacheMgr::RemoveMetadataForTupleCache(const string& cache_key) {
std::lock_guard<std::mutex> l(debug_dump_lock_);
auto it = debug_dump_caches_metadata_.find(cache_key);
if (it != debug_dump_caches_metadata_.end()) {
debug_dump_caches_metadata_.erase(it);
}
}
void TupleCacheMgr::EvictedEntry(Slice key, Slice value) { void TupleCacheMgr::EvictedEntry(Slice key, Slice value) {
const TupleCacheEntry* entry = reinterpret_cast<const TupleCacheEntry*>(value.data()); const TupleCacheEntry* entry = reinterpret_cast<const TupleCacheEntry*>(value.data());
if (TupleCacheState::TOMBSTONE != entry->state) { if (TupleCacheState::TOMBSTONE != entry->state) {
@@ -374,6 +444,11 @@ void TupleCacheMgr::EvictedEntry(Slice key, Slice value) {
LOG(WARNING) << LOG(WARNING) <<
Substitute("Failed to unlink $0: $1", value.ToString(), status.ToString()); Substitute("Failed to unlink $0: $1", value.ToString(), status.ToString());
} }
// If correctness checking is enabled, remove the associated metadata as well.
if (DebugDumpEnabled()) {
string key_str(reinterpret_cast<const char*>(key.data()), key.size());
RemoveMetadataForTupleCache(key_str);
}
} }
Status TupleCacheMgr::DeleteExistingFiles() const { Status TupleCacheMgr::DeleteExistingFiles() const {
@@ -390,5 +465,4 @@ Status TupleCacheMgr::DeleteExistingFiles() const {
} }
return Status::OK(); return Status::OK();
} }
} // namespace impala } // namespace impala

View File

@@ -30,6 +30,8 @@ namespace impala {
class HistogramMetric; class HistogramMetric;
class TupleReader; class TupleReader;
// Declaration of the debug tuple cache bad postfix constant.
extern const char* DEBUG_TUPLE_CACHE_BAD_POSTFIX;
/// The TupleCacheMgr maintains per-daemon settings and metadata for the tuple cache. /// The TupleCacheMgr maintains per-daemon settings and metadata for the tuple cache.
/// This it used by the various TupleCacheNodes from queries to lookup the cache /// This it used by the various TupleCacheNodes from queries to lookup the cache
@@ -66,6 +68,11 @@ public:
HALTED, HALTED,
}; };
struct DebugDumpCacheMetaData {
/// The fragment id of the tuple cache for debug purpose.
std::string fragment_id;
};
struct Handle; struct Handle;
class HandleDeleter { class HandleDeleter {
public: public:
@@ -130,6 +137,24 @@ public:
/// meta-data of where the cached data is stored. /// meta-data of where the cached data is stored.
virtual void EvictedEntry(kudu::Slice key, kudu::Slice value) override; virtual void EvictedEntry(kudu::Slice key, kudu::Slice value) override;
/// Returns the full file path for debug dumping of the tuple cache.
/// If sub_dir_full_path is not nullptr, returns the full path of the subdirectory of
/// the debug tuple cache.
string GetDebugDumpPath(const string& sub_dir, const string& file_name,
string* sub_dir_full_path = nullptr) const;
/// Return whether debug dumping is enabled.
bool DebugDumpEnabled() const { return cache_debug_dump_dir_ != ""; }
/// Store, retrieve or remove the metadata of the stored tuple cache for correctness
/// verification.
void StoreMetadataForTupleCache(const string& cache_key, const string& fragment_id);
string GetFragmentIdForTupleCache(const string& cache_key);
void RemoveMetadataForTupleCache(const string& cache_key);
/// Create the subdirectory for debug dumping of the tuple cache if needed.
Status CreateDebugDumpSubdir(const string& sub_dir);
private: private:
// Disallow copy and assign // Disallow copy and assign
TupleCacheMgr(const TupleCacheMgr&) = delete; TupleCacheMgr(const TupleCacheMgr&) = delete;
@@ -152,6 +177,7 @@ public:
const std::string eviction_policy_str_; const std::string eviction_policy_str_;
std::string cache_dir_; std::string cache_dir_;
std::string cache_debug_dump_dir_;
bool enabled_ = false; bool enabled_ = false;
uint8_t debug_pos_; uint8_t debug_pos_;
@@ -171,6 +197,15 @@ public:
/// The instance of the cache. /// The instance of the cache.
mutable std::mutex creation_lock_; mutable std::mutex creation_lock_;
std::unique_ptr<Cache> cache_; std::unique_ptr<Cache> cache_;
/// Used by CreateDebugDumpSubdir() to ensure no two threads are creating the same
/// sub directory.
std::mutex debug_dump_subdir_lock_;
/// Protects the debug_dump_caches_metadata_.
std::mutex debug_dump_lock_;
/// An in-memory presentation for metadata of tuple caches for debug verification.
/// The key is the key of the tuple cache.
std::unordered_map<std::string, DebugDumpCacheMetaData> debug_dump_caches_metadata_;
}; };
} }

View File

@@ -1259,6 +1259,10 @@ Status impala::SetQueryOption(TImpalaQueryOptions::type option, const string& va
query_options->__set_enable_tuple_cache(enable_tuple_cache); query_options->__set_enable_tuple_cache(enable_tuple_cache);
break; break;
} }
case TImpalaQueryOptions::ENABLE_TUPLE_CACHE_VERIFICATION: {
query_options->__set_enable_tuple_cache_verification(IsTrue(value));
break;
}
case TImpalaQueryOptions::DISABLE_OPTIMIZED_JSON_COUNT_STAR: { case TImpalaQueryOptions::DISABLE_OPTIMIZED_JSON_COUNT_STAR: {
query_options->__set_disable_optimized_json_count_star(IsTrue(value)); query_options->__set_disable_optimized_json_count_star(IsTrue(value));
break; break;

View File

@@ -52,7 +52,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
// time we add or remove a query option to/from the enum TImpalaQueryOptions. // time we add or remove a query option to/from the enum TImpalaQueryOptions.
#define QUERY_OPTS_TABLE \ #define QUERY_OPTS_TABLE \
DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(), \ DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(), \
TImpalaQueryOptions::LONG_POLLING_TIME_MS + 1); \ TImpalaQueryOptions::ENABLE_TUPLE_CACHE_VERIFICATION + 1); \
REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \ REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR) \ QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR) \
REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS) \ REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS) \
@@ -338,6 +338,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
QUERY_OPT_FN(disable_optimized_json_count_star, DISABLE_OPTIMIZED_JSON_COUNT_STAR, \ QUERY_OPT_FN(disable_optimized_json_count_star, DISABLE_OPTIMIZED_JSON_COUNT_STAR, \
TQueryOptionLevel::ADVANCED) \ TQueryOptionLevel::ADVANCED) \
QUERY_OPT_FN(long_polling_time_ms, LONG_POLLING_TIME_MS, TQueryOptionLevel::REGULAR) \ QUERY_OPT_FN(long_polling_time_ms, LONG_POLLING_TIME_MS, TQueryOptionLevel::REGULAR) \
QUERY_OPT_FN(enable_tuple_cache_verification, ENABLE_TUPLE_CACHE_VERIFICATION, \
TQueryOptionLevel::ADVANCED) \
; ;
/// Enforce practical limits on some query options to avoid undesired query state. /// Enforce practical limits on some query options to avoid undesired query state.

View File

@@ -97,6 +97,11 @@ Status FileSystemUtil::RemoveAndCreateDirectory(const string& directory) {
"removing directory '$0': $1", directory, errcode.message()))); "removing directory '$0': $1", directory, errcode.message())));
} }
} }
return CreateDirectory(directory);
}
Status FileSystemUtil::CreateDirectory(const string& directory) {
error_code errcode;
filesystem::create_directories(directory, errcode); filesystem::create_directories(directory, errcode);
if (errcode != errc::success) { if (errcode != errc::success) {
return Status(ErrorMsg(TErrorCode::RUNTIME_ERROR, Substitute( return Status(ErrorMsg(TErrorCode::RUNTIME_ERROR, Substitute(

View File

@@ -34,6 +34,10 @@ class FileSystemUtil {
/// Returns Status::OK if successful, or a runtime error with a message otherwise. /// Returns Status::OK if successful, or a runtime error with a message otherwise.
static Status RemoveAndCreateDirectory(const std::string& directory) WARN_UNUSED_RESULT; static Status RemoveAndCreateDirectory(const std::string& directory) WARN_UNUSED_RESULT;
/// Create the specified directory and any ancestor directories that do not exist yet.
/// Returns Status::OK if successful, or a runtime error with a message otherwise.
static Status CreateDirectory(const string& directory) WARN_UNUSED_RESULT;
/// Create a file at the specified path. /// Create a file at the specified path.
static Status CreateFile(const std::string& file_path) WARN_UNUSED_RESULT; static Status CreateFile(const std::string& file_path) WARN_UNUSED_RESULT;

View File

@@ -85,6 +85,8 @@ fi
: ${TUPLE_CACHE_DIR:=} : ${TUPLE_CACHE_DIR:=}
# Tuple cache capacity. # Tuple cache capacity.
: ${TUPLE_CACHE_CAPACITY:=} : ${TUPLE_CACHE_CAPACITY:=}
# Tuple cache debug dump directory location.
: ${TUPLE_CACHE_DEBUG_DUMP_DIR:=}
if [[ "${TARGET_FILESYSTEM}" == "local" ]]; then if [[ "${TARGET_FILESYSTEM}" == "local" ]]; then
# TODO: Remove abort_on_config_error flag from here and create-load-data.sh once # TODO: Remove abort_on_config_error flag from here and create-load-data.sh once
# checkConfiguration() accepts the local filesystem (see IMPALA-1850). # checkConfiguration() accepts the local filesystem (see IMPALA-1850).
@@ -120,6 +122,10 @@ if [[ -n "${TUPLE_CACHE_DIR}" && -n "${TUPLE_CACHE_CAPACITY}" ]]; then
`"--tuple_cache_dir=${TUPLE_CACHE_DIR} " `"--tuple_cache_dir=${TUPLE_CACHE_DIR} "
TEST_START_CLUSTER_ARGS="${TEST_START_CLUSTER_ARGS} "` TEST_START_CLUSTER_ARGS="${TEST_START_CLUSTER_ARGS} "`
`"--tuple_cache_capacity=${TUPLE_CACHE_CAPACITY} " `"--tuple_cache_capacity=${TUPLE_CACHE_CAPACITY} "
if [[ -n "${TUPLE_CACHE_DEBUG_DUMP_DIR}" ]]; then
TEST_START_CLUSTER_ARGS="${TEST_START_CLUSTER_ARGS} "`
`"--tuple_cache_debug_dump_dir=${TUPLE_CACHE_DEBUG_DUMP_DIR}"
fi
fi fi
if [[ "${ERASURE_CODING}" = true ]]; then if [[ "${ERASURE_CODING}" = true ]]; then

View File

@@ -187,6 +187,10 @@ parser.add_option("--tuple_cache_capacity", dest="tuple_cache_capacity",
default=os.environ.get("TUPLE_CACHE_CAPACITY", "1GB"), default=os.environ.get("TUPLE_CACHE_CAPACITY", "1GB"),
help="This specifies the maximum storage usage of the tuple cache " help="This specifies the maximum storage usage of the tuple cache "
"each Impala daemon can use.") "each Impala daemon can use.")
parser.add_option("--tuple_cache_debug_dump_dir", dest="tuple_cache_debug_dump_dir",
default=os.environ.get("TUPLE_CACHE_DEBUG_DUMP_DIR", None),
help="Specifies a base directory for the dumping tuple cache files "
"for debug purposes")
parser.add_option("--tuple_cache_eviction_policy", dest="tuple_cache_eviction_policy", parser.add_option("--tuple_cache_eviction_policy", dest="tuple_cache_eviction_policy",
default="LRU", help="This specifies the cache eviction policy to use " default="LRU", help="This specifies the cache eviction policy to use "
"for the tuple cache.") "for the tuple cache.")
@@ -612,6 +616,23 @@ def build_impalad_arg_lists(cluster_size, num_coordinators, use_exclusive_coordi
args = "-tuple_cache_eviction_policy={policy} {args}".format( args = "-tuple_cache_eviction_policy={policy} {args}".format(
policy=options.tuple_cache_eviction_policy, args=args) policy=options.tuple_cache_eviction_policy, args=args)
if options.tuple_cache_debug_dump_dir:
# create the base directory
tuple_cache_debug_dump_path = \
os.path.join(options.tuple_cache_debug_dump_dir,
"impala-tuplecache-debugdump-{0}".format(str(i)))
# Try creating the directory if it doesn't exist already. May raise exception.
if not os.path.exists(tuple_cache_debug_dump_path):
os.makedirs(tuple_cache_debug_dump_path)
if options.docker_network is None:
tuple_cache_debug_dump_path_arg = tuple_cache_debug_dump_path
else:
# The cache directory will always be mounted at the same path inside the
# container. Reuses the data cache dedicated mount.
tuple_cache_debug_dump_path_arg = DATA_CACHE_CONTAINER_PATH
args = "-tuple_cache_debug_dump_dir={dir} {args}".format(
dir=tuple_cache_debug_dump_path_arg, args=args)
if options.enable_admission_service: if options.enable_admission_service:
args = "{args} -admission_service_host={host}".format( args = "{args} -admission_service_host={host}".format(
args=args, host=admissiond_host) args=args, host=admissiond_host)

View File

@@ -962,6 +962,12 @@ enum TImpalaQueryOptions {
// notification when the query completes and avoid added latency from waiting on the // notification when the query completes and avoid added latency from waiting on the
// client side. This defaults to off (0ms). // client side. This defaults to off (0ms).
LONG_POLLING_TIME_MS = 182 LONG_POLLING_TIME_MS = 182
// Enables the verification process for intermediate result caching.
// Tuple cache verification is performed only when the startup flag
// tuple_cache_debug_dump_dir is specified and enable_tuple_cache_verification is set
// to true.
ENABLE_TUPLE_CACHE_VERIFICATION = 183
} }
// The summary of a DML statement. // The summary of a DML statement.

View File

@@ -744,6 +744,9 @@ struct TQueryOptions {
// See comment in ImpalaService.thrift // See comment in ImpalaService.thrift
183: optional i32 long_polling_time_ms = 0; 183: optional i32 long_polling_time_ms = 0;
// See comment in ImpalaService.thrift
184: optional bool enable_tuple_cache_verification = true;
} }
// Impala currently has three types of sessions: Beeswax, HiveServer2 and external // Impala currently has three types of sessions: Beeswax, HiveServer2 and external

View File

@@ -489,7 +489,9 @@ error_codes = (
"Subscriber '$0' has incompatible protocol version V$1 conflicting with statestored's " "Subscriber '$0' has incompatible protocol version V$1 conflicting with statestored's "
"version V$2"), "version V$2"),
("JDBC_CONFIGURATION_ERROR", 159, "Error in JDBC table configuration: $0.") ("JDBC_CONFIGURATION_ERROR", 159, "Error in JDBC table configuration: $0."),
("TUPLE_CACHE_INCONSISTENCY", 160, "Inconsistent tuple cache found: $0.")
) )
import sys import sys