IMPALA-5904: Add full_tsan option and fix several TSAN bugs

This patch adds an additional build flag -full_tsan in addition to the
existing -tsan build flag. -full_tsan is equivalent to the current -tsan
behavior, and -tsan is changed to set the ignore_noninstrumented_modules
flag to true. ignore_noninstrumented_modules causes TSAN to ignore any
modules that are not TSAN-instrumented. This is necessary to get TSAN to
play nicely with Java, since Java is not TSAN-instrumented (see
https://wiki.openjdk.java.net/display/tsan/Main and JDK-8208520). While
this might decrease the number of issues surfaced by TSAN, it drastically
decreases the amount of noise produced by TSAN because the JVM is not
running TSAN-instrumented code. Without this flag set to true, almost
every single backend test fails with the error:

WARNING: ThreadSanitizer: data race (pid=12939)
  Write of size 1 at 0x7fcbe379c4c6 by thread T31:
    #0 strncpy /mnt/source/llvm/llvm-5.0.1.src-p2/projects/compiler-rt/lib/tsan/rtl/tsan_interceptors.cc:650 (unifiedbetests+0x1b2a4ad)
    #1 <null> <null> (libjvm.so+0x90e706)

This patch fixes various TSAN bugs (e.g. data races) reported while
running backend tests and E2E against a TSAN build (it does not make
Impala completely TSAN-clean). This patch makes the following changes:
* Fixes several bugs involving issues with updating shared variables
  between threads
* Fixes a few race conditions in test classes
* Where possible, existing locks are used to fix any data races; in cases
  where the locking logic is non-trivial, atomics are used
* There are a few places where variables are marked as 'volatile'
  presumably for synchronization purposes; TSAN flags these 'volatile'
  variables as unsafe, and according to
  https://github.com/isocpp/CppCoreGuidelines/blob/master/CppCoreGuidelines.md#Rconc-volatile
  using 'volatile' for synchronization is dangerous; in these cases, the
  'volatile' variables are changed to 'atomic' variables
* This patch adds a suppression file (bin/tsan-suppresions.txt) similar to
  the UBSAN suppresion file (bin/ubsan-suppresions.txt)

Testing:
* Ran exhaustive tests
* Ran core tests w/ ASAN build
* Manually re-ran backend tests against a TSAN build and made sure the
  reported errors are gone

Change-Id: I3d7ef5c228afd5882e145e6f53885b355d6c25a0
Reviewed-on: http://gerrit.cloudera.org:8080/15116
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Sahil Takiar
2020-01-20 18:06:32 -08:00
committed by Impala Public Jenkins
parent ebc2c366f5
commit ca6c8d43d7
29 changed files with 240 additions and 121 deletions

View File

@@ -232,7 +232,8 @@ if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG"
OR "${CMAKE_BUILD_TYPE}" STREQUAL "TIDY"
OR "${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN"
OR "${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN_FULL"
OR "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN")
OR "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN"
OR "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN_FULL")
# Use the LLVM libaries with assertions for debug builds.
set(LLVM_ROOT ${LLVM_DEBUG_ROOT})
endif()

View File

@@ -130,7 +130,8 @@ SET(CXX_FLAGS_UBSAN "${CXX_FLAGS_UBSAN} -O0")
# Set the flags to the thread sanitizer, also known as "tsan"
# Turn on sanitizer and debug symbols to get stack traces:
SET(CXX_FLAGS_TSAN "${CXX_CLANG_FLAGS} -Werror -O1 -ggdb3 -fno-omit-frame-pointer")
SET(CXX_FLAGS_TSAN "${CXX_FLAGS_TSAN} -fsanitize=thread -DTHREAD_SANITIZER")
SET(CXX_FLAGS_TSAN "${CXX_FLAGS_TSAN} -fsanitize=thread -DTHREAD_SANITIZER -DDYNAMIC_ANNOTATIONS_ENABLED")
SET(CXX_FLAGS_TSAN "${CXX_FLAGS_TSAN} -DTHREAD_SANITIZER_SUPPRESSIONS=\\\"$ENV{IMPALA_HOME}/bin/tsan-suppressions.txt\\\"")
SET(CXX_FLAGS_TIDY "${CXX_CLANG_FLAGS}")
# Catching unused variables requires an optimization level greater than 0
@@ -163,6 +164,9 @@ elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN_FULL")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DUNDEFINED_SANITIZER_FULL")
elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "TSAN")
SET(CMAKE_CXX_FLAGS "${CXX_FLAGS_TSAN}")
elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "TSAN_FULL")
SET(CMAKE_CXX_FLAGS "${CXX_FLAGS_TSAN}")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DTHREAD_SANITIZER_FULL")
else()
message(FATAL_ERROR "Unknown build type: ${CMAKE_BUILD_TYPE}")
endif()
@@ -188,7 +192,8 @@ if (CCACHE AND NOT DEFINED ENV{DISABLE_CCACHE})
OR "${CMAKE_BUILD_TYPE}" STREQUAL "TIDY"
OR "${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN"
OR "${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN_FULL"
OR "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN")
OR "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN"
OR "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN_FULL")
# Need to set CCACHE_CPP so that ccache calls clang with the original source file for
# both preprocessing and compilation. Otherwise, ccache will use clang to preprocess
# the file and then call clang with the preprocessed output if not cached. However,
@@ -337,7 +342,8 @@ if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG" OR
"${CMAKE_BUILD_TYPE}" STREQUAL "ADDRESS_SANITIZER" OR
"${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN" OR
"${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN_FULL" OR
"${CMAKE_BUILD_TYPE}" STREQUAL "TSAN")
"${CMAKE_BUILD_TYPE}" STREQUAL "TSAN" OR
"${CMAKE_BUILD_TYPE}" STREQUAL "TSAN_FULL")
set(BUILD_OUTPUT_ROOT_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/build/debug/")
else()
set(BUILD_OUTPUT_ROOT_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/build/release/")
@@ -542,7 +548,8 @@ endif()
# malloc/free)
set (IMPALA_LINK_LIBS_NO_TCMALLOC ${IMPALA_LINK_LIBS})
if (NOT "${CMAKE_BUILD_TYPE}" STREQUAL "ADDRESS_SANITIZER" AND
NOT "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN")
NOT "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN" AND
NOT "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN_FULL")
set (IMPALA_LINK_LIBS ${IMPALA_LINK_LIBS} tcmallocstatic)
set (UNIFIED_TEST_LINK_LIBS ${UNIFIED_TEST_LINK_LIBS} tcmallocstatic)
endif()

View File

@@ -416,10 +416,23 @@ extern "C" const char *__asan_default_options() {
#endif
#if defined(THREAD_SANITIZER)
// Default TSAN_OPTIONS. Override by setting environment variable $TSAN_OPTIONS.
extern "C" const char *__tsan_default_options() {
// Note that backend test should re-configure to halt_on_error=1
return "halt_on_error=0 history_size=7";
extern "C" const char* __tsan_default_options() {
// Default TSAN_OPTIONS. Override by setting environment variable $TSAN_OPTIONS.
// TSAN and Java don't play nicely together because JVM code is not instrumented with
// TSAN. TSAN requires all libs to be compiled with '-fsanitize=thread' (see
// https://github.com/google/sanitizers/wiki/ThreadSanitizerCppManual#non-instrumented-code),
// which is not currently possible for Java code. See
// https://wiki.openjdk.java.net/display/tsan/Main and JDK-8208520 for efforts to get
// TSAN to run against Java code. The flag ignore_noninstrumented_modules tells TSAN to
// ignore all interceptors called from any non-instrumented libraries (e.g. Java).
return "ignore_noninstrumented_modules="
#if defined(THREAD_SANITIZER_FULL)
"0 "
#else
"1 "
#endif
"halt_on_error=0 history_size=7 allocator_may_return_null=1 "
"suppressions=" THREAD_SANITIZER_SUPPRESSIONS;
}
#endif

View File

@@ -564,8 +564,8 @@ void HdfsScanNodeBase::Close(RuntimeState* state) {
}
Status HdfsScanNodeBase::IssueInitialScanRanges(RuntimeState* state) {
DCHECK(!initial_ranges_issued_);
initial_ranges_issued_ = true;
DCHECK(!initial_ranges_issued_.Load());
initial_ranges_issued_.Store(true);
// We want to decrement this remaining_scan_range_submissions in all cases.
auto remaining_scan_range_submissions_trigger =
MakeScopeExitTrigger([&](){ UpdateRemainingScanRangeSubmissions(-1); });

View File

@@ -490,7 +490,7 @@ class HdfsScanNodeBase : public ScanNode {
/// Set to true when the initial scan ranges are issued to the IoMgr. This happens on
/// the first call to GetNext(). The token manager, in a different thread, will read
/// this variable.
bool initial_ranges_issued_ = false;
AtomicBool initial_ranges_issued_;
/// When this counter drops to 0, AddDiskIoRanges() will not be called again, and
/// therefore scanner threads that can't get work should exit. For most

View File

@@ -51,7 +51,7 @@ Status HdfsScanNodeMt::Prepare(RuntimeState* state) {
Status HdfsScanNodeMt::Open(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
RETURN_IF_ERROR(HdfsScanNodeBase::Open(state));
DCHECK(!initial_ranges_issued_);
DCHECK(!initial_ranges_issued_.Load());
RETURN_IF_ERROR(IssueInitialScanRanges(state));
return Status::OK();
}

View File

@@ -81,7 +81,7 @@ Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos
SCOPED_TIMER(runtime_profile_->total_time_counter());
ScopedGetNextEventAdder ea(this, eos);
if (!initial_ranges_issued_) {
if (!initial_ranges_issued_.Load()) {
// We do this in GetNext() to maximise the amount of work we can do while waiting for
// runtime filters to show up. The scanner threads have already started (in Open()),
// so we need to tell them there is work to do.
@@ -181,7 +181,7 @@ void HdfsScanNode::Close(RuntimeState* state) {
// At this point, the other threads have been joined, and
// remaining_scan_range_submissions_ should be 0, if the
// query started and wasn't cancelled or exited early.
if (ranges_issued_barrier_.pending() == 0 && initial_ranges_issued_
if (ranges_issued_barrier_.pending() == 0 && initial_ranges_issued_.Load()
&& progress_.done()) {
DCHECK_EQ(remaining_scan_range_submissions_.Load(), 0);
}
@@ -269,7 +269,7 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
// Case 4. We have not issued the initial ranges so don't start a scanner thread.
// Issuing ranges will call this function and we'll start the scanner threads then.
// TODO: It would be good to have a test case for that.
if (!initial_ranges_issued_) return;
if (!initial_ranges_issued_.Load()) return;
ScannerMemLimiter* scanner_mem_limiter =
runtime_state_->query_state()->scanner_mem_limiter();

View File

@@ -57,7 +57,7 @@ void ThriftThread::join() {
boost::shared_ptr<atc::Thread> ThriftThreadFactory::newThread(
boost::shared_ptr<atc::Runnable> runnable) const {
stringstream name;
name << prefix_ << "-" << count_++;
name << prefix_ << "-" << count_.Add(1);
boost::shared_ptr<ThriftThread> result =
boost::shared_ptr<ThriftThread>(new ThriftThread(group_, name.str(), runnable));
runnable->thread(result);

View File

@@ -36,7 +36,7 @@ class ThriftThreadFactory : public apache::thrift::concurrency::ThreadFactory {
/// Group is the thread group for new threads to be assigned to, and prefix is the
/// per-thread prefix (threads are named "prefix-<count_>-<tid>").
ThriftThreadFactory(const std::string& group, const std::string& prefix)
: group_(group), prefix_(prefix), count_(0) { }
: group_(group), prefix_(prefix), count_(-1) { }
/// (From ThreadFactory) - creates a new ThriftThread to run the supplied Runnable.
virtual boost::shared_ptr<apache::thrift::concurrency::Thread> newThread(
@@ -55,7 +55,7 @@ class ThriftThreadFactory : public apache::thrift::concurrency::ThreadFactory {
/// Marked mutable because we want to increment it inside newThread, which for some
/// reason is const.
mutable int64_t count_;
mutable AtomicInt64 count_;
};
/// A ThriftThread is a Thrift-compatible wrapper for Impala's Thread class, so that all

View File

@@ -45,14 +45,14 @@ void ReservationTracker::InitRootTracker(
DCHECK(!initialized_);
parent_ = nullptr;
mem_tracker_ = nullptr;
reservation_limit_ = reservation_limit;
reservation_ = 0;
used_reservation_ = 0;
child_reservations_ = 0;
reservation_limit_.Store(reservation_limit);
reservation_.Store(0);
used_reservation_.Store(0);
child_reservations_.Store(0);
initialized_ = true;
InitCounters(profile, reservation_limit_);
COUNTER_SET(counters_.peak_reservation, reservation_);
InitCounters(profile, reservation_limit);
COUNTER_SET(counters_.peak_reservation, 0);
CheckConsistency();
}
@@ -69,10 +69,10 @@ void ReservationTracker::InitChildTracker(RuntimeProfile* profile,
mem_tracker_ = mem_tracker;
mem_limit_mode_ = mem_limit_mode;
reservation_limit_ = reservation_limit;
reservation_ = 0;
used_reservation_ = 0;
child_reservations_ = 0;
reservation_limit_.Store(reservation_limit);
reservation_.Store(0);
used_reservation_.Store(0);
child_reservations_.Store(0);
initialized_ = true;
if (mem_tracker_ != nullptr) {
@@ -95,7 +95,7 @@ void ReservationTracker::InitChildTracker(RuntimeProfile* profile,
}
}
InitCounters(profile, reservation_limit_);
InitCounters(profile, reservation_limit);
CheckConsistency();
}
@@ -126,10 +126,10 @@ void ReservationTracker::Close() {
lock_guard<SpinLock> l(lock_);
if (!initialized_) return;
CheckConsistency();
DCHECK_EQ(used_reservation_, 0);
DCHECK_EQ(child_reservations_, 0);
DCHECK_EQ(used_reservation_.Load(), 0);
DCHECK_EQ(child_reservations_.Load(), 0);
// Release any reservation to parent.
if (parent_ != nullptr) DecreaseReservationLocked(reservation_, false);
if (parent_ != nullptr) DecreaseReservationLocked(reservation_.Load(), false);
mem_tracker_ = nullptr;
parent_ = nullptr;
initialized_ = false;
@@ -175,7 +175,7 @@ bool ReservationTracker::IncreaseReservationInternalLocked(int64_t bytes,
Substitute("Debug random failure mode is turned on: Reservation of $0 denied.",
PrettyPrinter::Print(bytes, TUnit::BYTES)));
}
} else if (reservation_ + reservation_increase > reservation_limit_) {
} else if (reservation_.Load() + reservation_increase > reservation_limit_.Load()) {
granted = false;
if (error_status != nullptr) {
MemTracker* mem_tracker = mem_tracker_;
@@ -190,10 +190,10 @@ bool ReservationTracker::IncreaseReservationInternalLocked(int64_t bytes,
"reservation=$3 used_reservation=$4 child_reservations=$5",
PrettyPrinter::Print(bytes, TUnit::BYTES),
mem_tracker == nullptr ? "Process" : mem_tracker->label(),
PrettyPrinter::Print(reservation_limit_, TUnit::BYTES),
PrettyPrinter::Print(reservation_, TUnit::BYTES),
PrettyPrinter::Print(used_reservation_, TUnit::BYTES),
PrettyPrinter::Print(child_reservations_, TUnit::BYTES));
PrettyPrinter::Print(reservation_limit_.Load(), TUnit::BYTES),
PrettyPrinter::Print(reservation_.Load(), TUnit::BYTES),
PrettyPrinter::Print(used_reservation_.Load(), TUnit::BYTES),
PrettyPrinter::Print(child_reservations_.Load(), TUnit::BYTES));
string top_n_queries = mem_tracker->LogTopNQueries(5);
if (!top_n_queries.empty()) {
error_msg = Substitute(
@@ -228,7 +228,7 @@ bool ReservationTracker::IncreaseReservationInternalLocked(int64_t bytes,
// The reservation was granted and state updated in all ancestors: we can modify
// this tracker's state now.
UpdateReservation(reservation_increase);
if (is_child_reservation) child_reservations_ += bytes;
if (is_child_reservation) child_reservations_.Add(bytes);
}
CheckConsistency();
@@ -270,9 +270,9 @@ void ReservationTracker::DecreaseReservation(int64_t bytes, bool is_child_reserv
void ReservationTracker::DecreaseReservationLocked(
int64_t bytes, bool is_child_reservation) {
DCHECK(initialized_);
DCHECK_GE(reservation_, bytes);
DCHECK_GE(reservation_.Load(), bytes);
if (bytes == 0) return;
if (is_child_reservation) child_reservations_ -= bytes;
if (is_child_reservation) child_reservations_.Add(-bytes);
UpdateReservation(-bytes);
ReleaseToMemTracker(bytes);
// The reservation should be returned up the tree.
@@ -325,7 +325,9 @@ bool ReservationTracker::TransferReservationTo(ReservationTracker* other, int64_
// Check reservation limits will not be violated before applying any updates.
for (ReservationTracker* tracker : other_path_to_common) {
if (tracker->reservation_ + bytes > tracker->reservation_limit_) return false;
if (tracker->reservation_.Load() + bytes > tracker->reservation_limit_.Load()) {
return false;
}
}
// Do the updates now that we have checked the limits. We're holding all the locks
@@ -337,10 +339,10 @@ bool ReservationTracker::TransferReservationTo(ReservationTracker* other, int64_
DCHECK(tracker->mem_tracker_ == nullptr || !tracker->mem_tracker_->has_limit());
bool success = tracker->TryConsumeFromMemTracker(bytes, MemLimit::HARD);
DCHECK(success);
if (tracker != other_path_to_common[0]) tracker->child_reservations_ += bytes;
if (tracker != other_path_to_common[0]) tracker->child_reservations_.Add(bytes);
}
for (ReservationTracker* tracker : path_to_common) {
if (tracker != path_to_common[0]) tracker->child_reservations_ -= bytes;
if (tracker != path_to_common[0]) tracker->child_reservations_.Add(-bytes);
tracker->UpdateReservation(-bytes);
tracker->ReleaseToMemTracker(bytes);
}
@@ -348,15 +350,19 @@ bool ReservationTracker::TransferReservationTo(ReservationTracker* other, int64_
// Update the 'child_reservations_' on the common ancestor if needed.
// Case 1: reservation was pushed up to 'other'.
if (common_ancestor == other) {
other->child_reservations_.Add(-bytes);
#ifndef NDEBUG
lock_guard<SpinLock> l(other->lock_);
other->child_reservations_ -= bytes;
other->CheckConsistency();
#endif
}
// Case 2: reservation was pushed down below 'this'.
if (common_ancestor == this) {
child_reservations_.Add(bytes);
#ifndef NDEBUG
lock_guard<SpinLock> l(lock_);
child_reservations_ += bytes;
CheckConsistency();
#endif
}
return true;
}
@@ -388,7 +394,7 @@ void ReservationTracker::ReleaseTo(int64_t bytes) {
lock_guard<SpinLock> l(lock_);
DCHECK(initialized_);
DCHECK_GE(bytes, 0);
DCHECK_LE(bytes, used_reservation_);
DCHECK_LE(bytes, used_reservation_.Load());
UpdateUsedReservation(-bytes);
CheckConsistency();
}
@@ -397,14 +403,14 @@ int64_t ReservationTracker::GetReservation() {
// Don't acquire lock - there is no point in holding it for this function only since
// the value read can change as soon as we release it.
DCHECK(initialized_);
return base::subtle::Acquire_Load(&reservation_);
return reservation_.Load();
}
int64_t ReservationTracker::GetUsedReservation() {
// Don't acquire lock - there is no point in holding it for this function only since
// the value read can change as soon as we release it.
DCHECK(initialized_);
return base::subtle::Acquire_Load(&used_reservation_);
return used_reservation_.Load();
}
int64_t ReservationTracker::GetUnusedReservation() {
@@ -417,35 +423,35 @@ int64_t ReservationTracker::GetChildReservations() {
// Don't acquire lock - there is no point in holding it for this function only since
// the value read can change as soon as we release it.
DCHECK(initialized_);
return base::subtle::Acquire_Load(&child_reservations_);
return child_reservations_.Load();
}
void ReservationTracker::CheckConsistency() const {
// Check internal invariants.
DCHECK_GE(reservation_, 0);
DCHECK_LE(reservation_, reservation_limit_);
DCHECK_GE(child_reservations_, 0);
DCHECK_GE(used_reservation_, 0);
DCHECK_LE(used_reservation_ + child_reservations_, reservation_);
DCHECK_GE(reservation_.Load(), 0);
DCHECK_LE(reservation_.Load(), reservation_limit_.Load());
DCHECK_GE(child_reservations_.Load(), 0);
DCHECK_GE(used_reservation_.Load(), 0);
DCHECK_LE(used_reservation_.Load() + child_reservations_.Load(), reservation_.Load());
DCHECK_EQ(reservation_, counters_.peak_reservation->current_value());
DCHECK_LE(reservation_, counters_.peak_reservation->value());
DCHECK_EQ(used_reservation_, counters_.peak_used_reservation->current_value());
DCHECK_LE(used_reservation_, counters_.peak_used_reservation->value());
DCHECK_EQ(reservation_.Load(), counters_.peak_reservation->current_value());
DCHECK_LE(reservation_.Load(), counters_.peak_reservation->value());
DCHECK_EQ(used_reservation_.Load(), counters_.peak_used_reservation->current_value());
DCHECK_LE(used_reservation_.Load(), counters_.peak_used_reservation->value());
if (counters_.reservation_limit != nullptr) {
DCHECK_EQ(reservation_limit_, counters_.reservation_limit->value());
DCHECK_EQ(reservation_limit_.Load(), counters_.reservation_limit->value());
}
}
void ReservationTracker::UpdateUsedReservation(int64_t delta) {
used_reservation_ += delta;
COUNTER_SET(counters_.peak_used_reservation, used_reservation_);
int64_t used_reservation = used_reservation_.Add(delta);
COUNTER_SET(counters_.peak_used_reservation, used_reservation);
CheckConsistency();
}
void ReservationTracker::UpdateReservation(int64_t delta) {
reservation_ += delta;
COUNTER_SET(counters_.peak_reservation, reservation_);
int64_t reservation = reservation_.Add(delta);
COUNTER_SET(counters_.peak_reservation, reservation);
CheckConsistency();
}
@@ -457,7 +463,7 @@ string ReservationTracker::DebugString() {
return Substitute(
"<ReservationTracker>: reservation_limit $0 reservation $1 used_reservation $2 "
"child_reservations $3 parent:\n$4",
reservation_limit_, reservation_, used_reservation_, child_reservations_,
parent_debug_string);
reservation_limit_.Load(), reservation_.Load(), used_reservation_.Load(),
child_reservations_.Load(), parent_debug_string);
}
}

View File

@@ -189,7 +189,7 @@ class ReservationTracker {
private:
/// Returns the amount of 'reservation_' that is unused.
inline int64_t unused_reservation() const {
return reservation_ - used_reservation_ - child_reservations_;
return reservation_.Load() - used_reservation_.Load() - child_reservations_.Load();
}
/// Returns the parent's memtracker if 'parent_' is non-NULL, or NULL otherwise.
@@ -308,23 +308,23 @@ class ReservationTracker {
/// reservation increases.
MemLimit mem_limit_mode_;
/// The maximum reservation in bytes that this tracker can have. Can be read with an
/// atomic load without holding lock.
int64_t reservation_limit_;
/// The maximum reservation in bytes that this tracker can have. Can be read without
/// holding lock.
AtomicInt64 reservation_limit_;
/// This tracker's current reservation in bytes. 'reservation_' <= 'reservation_limit_'.
/// Can be read with an atomic load without holding lock.
int64_t reservation_;
/// Can be read without holding lock.
AtomicInt64 reservation_;
/// Total reservation of children in bytes. This is included in 'reservation_'.
/// 'used_reservation_' + 'child_reservations_' <= 'reservation_'.
/// Can be read with an atomic load without holding lock.
int64_t child_reservations_;
/// 'used_reservation_' + 'child_reservations_' <= 'reservation_'. Can be read without
/// holding lock.
AtomicInt64 child_reservations_;
/// The amount of the reservation currently used by this tracker in bytes.
/// 'used_reservation_' + 'child_reservations_' <= 'reservation_'.
/// Can be read with an atomic load without holding lock.
int64_t used_reservation_;
/// Can be read without holding lock.
AtomicInt64 used_reservation_;
};
}

View File

@@ -121,9 +121,9 @@ Status SystemAllocator::AllocateViaMalloc(int64_t len, uint8_t** buffer_mem) {
// This ensures that it can be backed by a whole pages, rather than parts of pages.
size_t alignment = use_huge_pages ? HUGE_PAGE_SIZE : SMALL_PAGE_SIZE;
int rc = posix_memalign(reinterpret_cast<void**>(buffer_mem), alignment, len);
#ifdef ADDRESS_SANITIZER
// Workaround ASAN bug where posix_memalign returns 0 even when allocation fails.
// It should instead return ENOMEM. See https://bugs.llvm.org/show_bug.cgi?id=32968.
#ifdef THREAD_SANITIZER
// Workaround TSAN bug where posix_memalign returns 0 even when allocation fails. It
// should instead return ENOMEM. See https://reviews.llvm.org/D35690.
if (rc == 0 && *buffer_mem == nullptr && len != 0) rc = ENOMEM;
#endif
if (rc != 0) {

View File

@@ -175,7 +175,15 @@ void Coordinator::BackendState::Exec(const DebugOptions& debug_options,
const kudu::Slice& serialized_query_ctx, CountingBarrier* exec_complete_barrier) {
const auto trigger = MakeScopeExitTrigger([&]() {
// Ensure that 'last_report_time_ms_' is set prior to the barrier being notified.
last_report_time_ms_ = GenerateReportTimestamp();
{
// Since last_report_time_ms_ is protected by lock_ it must be acquired before
// updating last_report_time_ms_. The lock_ is guaranteed to not be held by this
// method when this lambda runs since it has not already been acquired by the
// method. The lambda executes in an object destructor and C++ destroys objects in
// the reverse order they were created.
lock_guard<mutex> lock(lock_);
last_report_time_ms_ = GenerateReportTimestamp();
}
exec_complete_barrier->Notify();
});

View File

@@ -677,7 +677,7 @@ Status Coordinator::FinalizeHdfsDml() {
// All instances must have reported their final statuses before finalization, which is a
// post-condition of Wait. If the query was not successful, still try to clean up the
// staging directory.
DCHECK(has_called_wait_);
DCHECK(has_called_wait_.Load());
DCHECK(finalize_params() != nullptr);
bool is_transactional = finalize_params()->__isset.write_id;
@@ -730,8 +730,8 @@ void Coordinator::WaitForBackends() {
Status Coordinator::Wait() {
lock_guard<SpinLock> l(wait_lock_);
SCOPED_TIMER(query_profile_->total_time_counter());
if (has_called_wait_) return Status::OK();
has_called_wait_ = true;
if (has_called_wait_.Load()) return Status::OK();
has_called_wait_.Store(true);
if (stmt_type_ == TStmtType::QUERY) {
DCHECK(coord_instance_ != nullptr);
@@ -756,7 +756,7 @@ Status Coordinator::Wait() {
Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos,
int64_t block_on_wait_time_us) {
VLOG_ROW << "GetNext() query_id=" << PrintId(query_id());
DCHECK(has_called_wait_);
DCHECK(has_called_wait_.Load());
SCOPED_TIMER(query_profile_->total_time_counter());
if (ReturnedAllResults()) {
@@ -978,7 +978,7 @@ void Coordinator::ComputeQuerySummary() {
// In this case, the query did not even get to start all fragment instances.
// Some of the state that is used below might be uninitialized. In this case,
// the query has made so little progress, reporting a summary is not very useful.
if (!has_called_wait_) return;
if (!has_called_wait_.Load()) return;
if (backend_states_.empty()) return;
// make sure fragment_stats_ are up-to-date

View File

@@ -290,7 +290,9 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
/// ensures single-threaded execution of Wait(). See lock ordering class comment.
SpinLock wait_lock_;
bool has_called_wait_ = false; // if true, Wait() was called; protected by wait_lock_
/// If true, Wait() was called. When read / written in Wait() the wait_lock_ should
/// still be acquired to ensure Wait() is only executed once.
AtomicBool has_called_wait_;
BackendResourceState* backend_resource_state_ = nullptr;

View File

@@ -517,7 +517,7 @@ class DataStreamTest : public testing::Test {
sender_info_.emplace_back(make_unique<SenderInfo>());
sender_info_.back()->thread_handle.reset(
new thread(&DataStreamTest::Sender, this, num_senders, channel_buffer_size,
partition_type));
partition_type, sender_info_[num_senders].get()));
}
void JoinSenders() {
@@ -527,8 +527,8 @@ class DataStreamTest : public testing::Test {
}
}
void Sender(
int sender_num, int channel_buffer_size, TPartitionType::type partition_type) {
void Sender(int sender_num, int channel_buffer_size,
TPartitionType::type partition_type, SenderInfo* info) {
RuntimeState state(TQueryCtx(), exec_env_.get(), desc_tbl_);
VLOG_QUERY << "create sender " << sender_num;
const TDataSink& sink = GetSink(partition_type);
@@ -545,7 +545,6 @@ class DataStreamTest : public testing::Test {
EXPECT_OK(sender->Prepare(&state, &tracker_));
EXPECT_OK(sender->Open(&state));
scoped_ptr<RowBatch> batch(CreateRowBatch());
SenderInfo* info = sender_info_[sender_num].get();
int next_val = 0;
for (int i = 0; i < NUM_BATCHES; ++i) {
GetNextBatch(batch.get(), &next_val);

View File

@@ -267,17 +267,18 @@ class DataCache::CacheFile {
int64_t Allocate(int64_t len, const std::unique_lock<SpinLock>& partition_lock) {
DCHECK(partition_lock.owns_lock());
DCHECK_EQ(len % PAGE_SIZE, 0);
DCHECK_EQ(current_offset_ % PAGE_SIZE, 0);
int64_t current_offset = current_offset_.Load();
DCHECK_EQ(current_offset % PAGE_SIZE, 0);
// Hold the lock in shared mode to check if 'file_' is not closed already.
kudu::shared_lock<rw_spinlock> lock(lock_.get_lock());
if (!allow_append_ || (current_offset_ + len > FLAGS_data_cache_file_max_size_bytes &&
current_offset_ > 0)) {
if (!allow_append_ || (current_offset + len > FLAGS_data_cache_file_max_size_bytes &&
current_offset > 0)) {
allow_append_ = false;
return -1;
}
DCHECK(file_);
int64_t insertion_offset = current_offset_;
current_offset_ += len;
int64_t insertion_offset = current_offset;
current_offset_.Add(len);
return insertion_offset;
}
@@ -289,7 +290,7 @@ class DataCache::CacheFile {
// Hold the lock in shared mode to check if 'file_' is not closed already.
kudu::shared_lock<rw_spinlock> lock(lock_.get_lock());
if (UNLIKELY(!file_)) return false;
DCHECK_LE(offset + bytes_to_read, current_offset_);
DCHECK_LE(offset + bytes_to_read, current_offset_.Load());
kudu::Status status = file_->Read(offset, Slice(buffer, bytes_to_read));
if (UNLIKELY(!status.ok())) {
LOG(ERROR) << Substitute("Failed to read from $0 at offset $1 for $2 bytes: $3",
@@ -304,11 +305,11 @@ class DataCache::CacheFile {
// already closed.
bool Write(int64_t offset, const uint8_t* buffer, int64_t buffer_len) {
DCHECK_EQ(offset % PAGE_SIZE, 0);
DCHECK_LE(offset, current_offset_);
DCHECK_LE(offset, current_offset_.Load());
// Hold the lock in shared mode to check if 'file_' is not closed already.
kudu::shared_lock<rw_spinlock> lock(lock_.get_lock());
if (UNLIKELY(!file_)) return false;
DCHECK_LE(offset + buffer_len, current_offset_);
DCHECK_LE(offset + buffer_len, current_offset_.Load());
kudu::Status status = file_->Write(offset, Slice(buffer, buffer_len));
if (UNLIKELY(!status.ok())) {
LOG(ERROR) << Substitute("Failed to write to $0 at offset $1 for $2 bytes: $3",
@@ -324,7 +325,7 @@ class DataCache::CacheFile {
// Hold the lock in shared mode to check if 'file_' is not closed already.
kudu::shared_lock<rw_spinlock> lock(lock_.get_lock());
if (UNLIKELY(!file_)) return;
DCHECK_LE(offset + hole_size, current_offset_);
DCHECK_LE(offset + hole_size, current_offset_.Load());
kudu::Status status = file_->PunchHole(offset, hole_size);
if (UNLIKELY(!status.ok())) {
LOG(DFATAL) << Substitute("Failed to punch hole in $0 at offset $1 for $2 $3",
@@ -345,7 +346,7 @@ class DataCache::CacheFile {
bool allow_append_ = true;
/// The current offset in the file to append to on next insert.
int64_t current_offset_ = 0;
AtomicInt64 current_offset_;
/// This is a reader-writer lock used for synchronization with the deleter thread.
/// It is taken in write mode in Close() and shared mode everywhere else. It's expected

View File

@@ -111,7 +111,7 @@ void DiskIoMgrStress::ClientThread(int client_id) {
Status status;
char read_buffer[MAX_FILE_LEN];
while (!shutdown_) {
while (!shutdown_.Load()) {
bool eos = false;
int bytes_read = 0;
@@ -193,7 +193,7 @@ void DiskIoMgrStress::CancelRandomReader() {
}
void DiskIoMgrStress::Run(int sec) {
shutdown_ = false;
shutdown_.Store(false);
for (int i = 0; i < num_clients_; ++i) {
readers_.add_thread(
new thread(&DiskIoMgrStress::ClientThread, this, i));
@@ -210,7 +210,7 @@ void DiskIoMgrStress::Run(int sec) {
}
// Signal shutdown for the client threads
shutdown_ = true;
shutdown_.Store(true);
for (int i = 0; i < num_clients_; ++i) {
unique_lock<mutex> lock(clients_[i].lock);

View File

@@ -97,7 +97,7 @@ class DiskIoMgrStress {
bool includes_cancellation_;
/// Flag to signal that client reader threads should exit
volatile bool shutdown_;
AtomicBool shutdown_;
/// Helper to initialize a new reader client, registering a new reader with the
/// io mgr and initializing the scan ranges

View File

@@ -240,11 +240,14 @@ ReadOutcome ScanRange::DoRead(DiskQueue* queue, int disk_id) {
return ReadOutcome::CANCELLED;
}
bytes_read_ += buffer_desc->len();
DCHECK_LE(bytes_read_, bytes_to_read_);
{
unique_lock<mutex> lock(lock_);
bytes_read_ += buffer_desc->len();
DCHECK_LE(bytes_read_, bytes_to_read_);
// It is end of stream if it is end of file, or read all the bytes.
buffer_desc->eosr_ = eof || bytes_read_ == bytes_to_read_;
// It is end of stream if it is end of file, or read all the bytes.
buffer_desc->eosr_ = eof || bytes_read_ == bytes_to_read_;
}
// After calling EnqueueReadyBuffer(), it is no longer valid to touch 'buffer_desc'.
// Store the state we need before calling EnqueueReadyBuffer().

View File

@@ -51,6 +51,8 @@ DECLARE_int32(beeswax_port);
// instead of destroying it to avoid destroying the contained objects.
static ObjectPool* perm_objects;
namespace impala {
TEST(SessionTest, TestExpiry) {
const int NUM_SESSIONS = 5;
const int MAX_IDLE_TIMEOUT_MS = 4000;
@@ -121,6 +123,8 @@ TEST(SessionTest, TestExpiry) {
statestore->ShutdownForTesting();
}
} // namespace impala
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);

View File

@@ -53,6 +53,7 @@ TEST(StatestoreTest, SmokeTest) {
// Port already in use
Statestore* statestore_wont_start = perm_objects->Add(new Statestore(metrics_2));
ASSERT_FALSE(statestore_wont_start->Init(statestore->port()).ok());
statestore_wont_start->ShutdownForTesting();
StatestoreSubscriber* sub_will_start = perm_objects->Add(
new StatestoreSubscriber("sub1", MakeNetworkAddress("localhost", 0),

View File

@@ -826,7 +826,7 @@ class TimerCounterTest {
struct DummyWorker {
thread* thread_handle;
bool done;
AtomicBool done;
DummyWorker()
: thread_handle(NULL), done(false) {}
@@ -835,9 +835,12 @@ class TimerCounterTest {
Stop();
}
DummyWorker(const DummyWorker& dummy_worker)
: thread_handle(dummy_worker.thread_handle), done(dummy_worker.done.Load()) {}
void Stop() {
if (!done && thread_handle != NULL) {
done = true;
if (!done.Load() && thread_handle != NULL) {
done.Store(true);
thread_handle->join();
delete thread_handle;
thread_handle = NULL;
@@ -848,7 +851,7 @@ class TimerCounterTest {
void Run(DummyWorker* worker) {
SCOPED_CONCURRENT_STOP_WATCH(&csw_);
SCOPED_CONCURRENT_COUNTER(&timercounter_);
while (!worker->done) {
while (!worker->done.Load()) {
SleepForMs(10);
// Each test case should be no more than one second.
// Consider test failed if timer is more than 3 seconds.
@@ -970,6 +973,9 @@ TEST(TimerCounterTest, CountersTestRandom) {
ValidateLapTime(&tester, MonotonicStopWatch::Now() - lap_time_start);
}
// Don't run TestAddClearRace against TSAN builds as it is expected to have race
// conditions.
#ifndef THREAD_SANITIZER
TEST(TimeSeriesCounterTest, TestAddClearRace) {
ObjectPool pool;
@@ -1010,6 +1016,8 @@ TEST(TimeSeriesCounterTest, TestAddClearRace) {
EXPECT_EQ(num_samples, 0);
}
#endif
/// Stops the periodic counter updater in 'profile' and then clears the samples in
/// 'counter'.
void StopAndClearCounter(RuntimeProfile* profile,

View File

@@ -227,13 +227,16 @@ class ConcurrentStopWatch {
return lap_duration;
}
uint64_t TotalRunningTime() const { return msw_.ElapsedTime(); }
uint64_t TotalRunningTime() const {
boost::lock_guard<SpinLock> l(thread_counter_lock_);
return msw_.ElapsedTime();
}
private:
MonotonicStopWatch msw_;
/// Lock with busy_threads_.
SpinLock thread_counter_lock_;
mutable SpinLock thread_counter_lock_;
/// Track how many threads are currently busy.
int busy_threads_;

View File

@@ -38,7 +38,19 @@ cd ${IMPALA_BE_DIR}
cd ..
export CTEST_OUTPUT_ON_FAILURE=1
# Override default TSAN_OPTIONS so that halt_on_error is set.
export TSAN_OPTIONS="halt_on_error=1 history_size=7"
# See be/src/common/init.cc and
# https://github.com/google/sanitizers/wiki/ThreadSanitizerCppManual#non-instrumented-code
# for an explanation of what this flag does.
IGNORE_NONINSTRUMENTED_MODULES="ignore_noninstrumented_modules="
if [ "${TSAN_FULL+x}" ]; then
IGNORE_NONINSTRUMENTED_MODULES+="0"
else
IGNORE_NONINSTRUMENTED_MODULES+="1"
fi
export TSAN_OPTIONS="${IGNORE_NONINSTRUMENTED_MODULES} halt_on_error=1 history_size=7
allocator_may_return_null=1 suppressions=${IMPALA_HOME}/bin/tsan-suppressions.txt"
export PATH="${IMPALA_TOOLCHAIN}/llvm-${IMPALA_LLVM_VERSION}/bin:${PATH}"
"${MAKE_CMD:-make}" test ARGS="${BE_TEST_ARGS}"

32
bin/tsan-suppressions.txt Normal file
View File

@@ -0,0 +1,32 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# This file suppresses TSAN errors, following
# https://github.com/google/sanitizers/wiki/ThreadSanitizerSuppressions
# This method in Boost's UUID library operates on static state with impunity,
# triggering (harmless) data races in TSAN when boost::uuids::random_generator
# instances are created across threads (this suppression and the corresponding
# explanation were copied from Kudu's sanitizer_options.cc).
race:boost::uuids::detail::seed_rng::sha1_random_digest_
# TODO: IMPALA-9314: The ThriftServer used by the Statestore has various race conditions
# during the shutdown process (and leaks threads)
race:impala::StatestoreTest_SmokeTest_Test
thread:impala::StatestoreTest_SmokeTest_Test
race:impala::SessionTest_TestExpiry_Test
race:impala::StatestoreSslTest_ValidCertSmokeTest_Test

View File

@@ -70,6 +70,7 @@ BUILD_TIDY=0
BUILD_UBSAN=0
BUILD_UBSAN_FULL=0
BUILD_TSAN=0
BUILD_TSAN_FULL=0
BUILD_SHARED_LIBS=0
# Export MAKE_CMD so it is visible in scripts that invoke make, e.g. copy-udfs-udas.sh
export MAKE_CMD=make
@@ -141,6 +142,9 @@ do
-tsan)
BUILD_TSAN=1
;;
-full_tsan)
BUILD_TSAN_FULL=1
;;
-testpairwise)
EXPLORATION_STRATEGY=pairwise
;;
@@ -206,10 +210,18 @@ do
echo "[-codecoverage] : Build with code coverage [Default: False]"
echo "[-asan] : Address sanitizer build [Default: False]"
echo "[-tidy] : clang-tidy build [Default: False]"
echo "[-tsan] : Thread sanitizer build, runs with"\
"ignore_noninstrumented_modules=1. When this flag is true, TSAN ignores"\
"memory accesses from non-instrumented libraries. This decreases the number"\
"of false positives, but might miss real issues. -full_tsan disables this"\
"flag [Default: False]"
echo "[-full_tsan] : Thread sanitizer build, runs with"\
"ignore_noninstrumented_modules=0 (see the -tsan description for an"\
"explanation of what this flag does) [Default: False]"
echo "[-ubsan] : Undefined behavior sanitizer build [Default: False]"
echo "[-full_ubsan] : Undefined behavior sanitizer build, including code generated"\
" by cross-compilation to LLVM IR. Much slower queries than plain -ubsan "\
" [Default: False]"
"by cross-compilation to LLVM IR. Much slower queries than plain -ubsan"\
"[Default: False]"
echo "[-skiptests] : Skips execution of all tests"
echo "[-notests] : Skips building and execution of all tests"
echo "[-start_minicluster] : Start test cluster including Impala and all"\
@@ -299,6 +311,10 @@ fi
if [[ ${BUILD_TSAN} -eq 1 ]]; then
CMAKE_BUILD_TYPE_LIST+=(TSAN)
fi
if [[ ${BUILD_TSAN_FULL} -eq 1 ]]; then
CMAKE_BUILD_TYPE_LIST+=(TSAN_FULL)
export TSAN_FULL=1
fi
if [[ -n "${CMAKE_BUILD_TYPE_LIST:+1}" ]]; then
if [[ ${#CMAKE_BUILD_TYPE_LIST[@]} -gt 1 ]]; then
echo "ERROR: more than one CMake build type defined: ${CMAKE_BUILD_TYPE_LIST[@]}"
@@ -433,7 +449,8 @@ generate_cmake_files() {
|| ("$build_type" == "TIDY") \
|| ("$build_type" == "UBSAN") \
|| ("$build_type" == "UBSAN_FULL") \
|| ("$build_type" == "TSAN") ]]; then
|| ("$build_type" == "TSAN") \
|| ("$build_type" == "TSAN_FULL") ]]; then
CMAKE_ARGS+=(-DCMAKE_TOOLCHAIN_FILE=$IMPALA_HOME/cmake_modules/clang_toolchain.cmake)
else
CMAKE_ARGS+=(-DCMAKE_TOOLCHAIN_FILE=$IMPALA_HOME/cmake_modules/toolchain.cmake)

View File

@@ -113,13 +113,15 @@ class ImpalaBuildFlavors:
TIDY = 'tidy'
# ./buildall.sh -tsan
TSAN = 'tsan'
# ./buildall.sh -full_tsan
TSAN_FULL = 'tsan_full'
# ./buildall.sh -ubsan
UBSAN = 'ubsan'
# ./buildall.sh -full_ubsan
UBSAN_FULL = 'ubsan_full'
VALID_BUILD_TYPES = [ADDRESS_SANITIZER, DEBUG, CODE_COVERAGE_DEBUG, RELEASE,
CODE_COVERAGE_RELEASE, TIDY, TSAN, UBSAN, UBSAN_FULL]
CODE_COVERAGE_RELEASE, TIDY, TSAN, TSAN_FULL, UBSAN, UBSAN_FULL]
class LinkTypes:

View File

@@ -75,7 +75,7 @@ class TestWebPage(ImpalaTestSuite):
assert build_flags["is_ndebug"] in ["true", "false"]
assert "cmake_build_type" in build_flags
assert build_flags["cmake_build_type"] in ["debug", "release", "address_sanitizer",
"tidy", "ubsan", "ubsan_full", "tsan", "code_coverage_release",
"tidy", "ubsan", "ubsan_full", "tsan", "tsan_full", "code_coverage_release",
"code_coverage_debug"]
assert "library_link_type" in build_flags
assert build_flags["library_link_type"] in ["dynamic", "static"]