IMPALA-14278: Fix MemTracker data race between ExecEnv and Webserver

In the Webserver, while assigning or closing the compressed buffer's
memory tracker, no lock was being held across threads causing
TSAN build failures.

The critical section for this memory tracker is only necessary during
begining of the Webserver and is used rarely. So, only a general mutex
has been used instead of a shared mutex with concurrent reads.

Change-Id: Ife9198e911e526a9a0e88bdb175b4502a5bc2662
Reviewed-on: http://gerrit.cloudera.org:8080/23250
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Surya Hebbar
2025-08-05 17:35:38 +05:30
committed by Impala Public Jenkins
parent 9d6997b7c0
commit 09a6f0e6cd
2 changed files with 41 additions and 28 deletions

View File

@@ -27,6 +27,7 @@
#include <boost/mem_fn.hpp>
#include <boost/thread/shared_mutex.hpp>
#include <gssapi/gssapi_krb5.h>
#include <shared_mutex>
#include <rapidjson/document.h>
#include <rapidjson/prettywriter.h>
#include <rapidjson/stringbuffer.h>
@@ -365,10 +366,11 @@ Webserver::Webserver(const string& interface, const int port, MetricGroup* metri
}
Webserver::~Webserver() {
Stop();
std::shared_lock<std::shared_mutex> l(compressed_buffer_mem_tracker_lock_);
if (compressed_buffer_mem_tracker_.get() != nullptr) {
compressed_buffer_mem_tracker_->Close();
}
Stop();
}
void Webserver::ErrorHandler(const WebRequest& req, Document* document) {
@@ -566,20 +568,23 @@ Status Webserver::Start() {
// TODO: IMPALA-14179
// Track memory usage of other template generated webpages in the future
// Initialize compressed_buffer_mem_tracker_, if not initialized
if (compressed_buffer_mem_tracker_.get() == nullptr) {
MemTracker* process_mem_tracker = nullptr;
// Except for impala daemon, ExecEnv does not exist for statestore or catalog daemons
if (ExecEnv::GetInstance() != nullptr) {
// When ExecEnv is present, link to the current process_mem_tracker
DCHECK(ExecEnv::GetInstance()->process_mem_tracker() != nullptr);
process_mem_tracker = ExecEnv::GetInstance()->process_mem_tracker();
{
std::unique_lock<std::shared_mutex> l(compressed_buffer_mem_tracker_lock_);
// Initialize compressed_buffer_mem_tracker_, if not initialized
if (compressed_buffer_mem_tracker_.get() == nullptr) {
MemTracker* process_mem_tracker = nullptr;
// Currently, ExecEnv only exists for impalad not for statestore / catalog daemons
if (ExecEnv::GetInstance() != nullptr) {
// When ExecEnv is present, link to the current process_mem_tracker
DCHECK(ExecEnv::GetInstance()->process_mem_tracker() != nullptr);
process_mem_tracker = ExecEnv::GetInstance()->process_mem_tracker();
}
compressed_buffer_mem_tracker_ = std::make_shared<MemTracker>(-1,
"WebserverCompressedBuffer", process_mem_tracker);
}
compressed_buffer_mem_tracker_ = std::make_shared<MemTracker>(-1,
"WebserverCompressedBuffer", process_mem_tracker);
}
LOG(INFO) << "Webserver started";
return Status::OK();
}
@@ -628,23 +633,27 @@ void Webserver::SendResponse(struct sq_connection* connection,
oss << "Content-Type: " << content_type << CRLF;
const char * accepted_encodings = sq_get_header(connection, "Accept-Encoding");
// Include vector's new memory allocations into 'compressed_buffer_mem_tracker_'
MemTrackerAllocator<uint8_t> vector_mem_tracker_allocator(
compressed_buffer_mem_tracker_);
vector<uint8_t> output(0, vector_mem_tracker_allocator);
{
std::shared_lock<std::shared_mutex> l(compressed_buffer_mem_tracker_lock_);
// If accepted, support responses with gzip compression
if (accepted_encodings != NULL && std::string_view(accepted_encodings).find("gzip")
!= string::npos && CompressStringToBuffer(content, output).ok()) {
oss << "Content-Encoding: gzip" << CRLF;
oss << "Content-Length: " << output.size() << CRLF;
oss << CRLF;
// Interpret the data in the necessary form, do not reallocate
oss.write(reinterpret_cast<char*>(output.data()), output.size());
} else {
oss << "Content-Length: " << content.size() << CRLF;
oss << CRLF;
oss.write(content.data(), content.size());
// Include vector's new memory allocations into 'compressed_buffer_mem_tracker_'
MemTrackerAllocator<uint8_t> vector_mem_tracker_allocator(
compressed_buffer_mem_tracker_);
vector<uint8_t> output(0, vector_mem_tracker_allocator);
// If accepted, support responses with gzip compression
if (accepted_encodings != NULL && std::string_view(accepted_encodings).find("gzip")
!= string::npos && CompressStringToBuffer(content, output).ok()) {
oss << "Content-Encoding: gzip" << CRLF;
oss << "Content-Length: " << output.size() << CRLF;
oss << CRLF;
// Interpret the data in the necessary form, do not reallocate
oss.write(reinterpret_cast<char*>(output.data()), output.size());
} else {
oss << "Content-Length: " << content.size() << CRLF;
oss << CRLF;
oss.write(content.data(), content.size());
}
}
string output_str = oss.str();

View File

@@ -21,6 +21,7 @@
#include <string>
#include <boost/function.hpp>
#include <boost/thread/pthread/shared_mutex.hpp>
#include <shared_mutex>
#include <rapidjson/fwd.h>
#include "common/status.h"
@@ -306,6 +307,9 @@ class Webserver {
/// An incoming connection will be accepted if the OAuth token could be verified.
bool use_oauth_ = false;
// Lock guarding the compressed_buffer_mem_tracker
std::shared_mutex compressed_buffer_mem_tracker_lock_;
// Track memory for the comppressed string buffer
std::shared_ptr<MemTracker> compressed_buffer_mem_tracker_;