mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
Improve AtomicInt abstraction and implementation
Improvements: 1) Don't use operator overloading. It makes it very difficult to reason about code that uses atomics. So it's better to have visible hints in the code, and we generally prefer to avoid operator overloading anyway. Also don't allow copy constructor for some reason. Eliminating assignment overloading brought to light a bug in the use of ProgressUpdater - some progress could have been dropped if it occurs before the ProgressUpdater is re-constructed. 2) Make the memory ordering semantics well defined and documented. 3) Make all operators actually atomic. For example, in the old code, 'operator T()' and 'operator=' were not guaranteed to be atomic (though on x64 it would be in practice - but again, memory ordering wasn't defined because of no compiler barrier). 4) Read() was pessimistic and provided full barrier semantics even though this is not what's generally needed. 5) Trim down the interface to the bare minimum. Better to keep something as subtle as atomics as simple as possible. Some could be removed, others were moved into the place that used them when the routines weren't generally useful. For now, let's stick to the operation/barrier combinations that are most commonly needed rather than implement to the full set. Later, if Impala implements more complicated lock-free algorithms, we may need to expand this set, but it's currently sufficient and best to keep things simple. The new implementation is based on atomicops, which we depend on already by SpinLock. Later we can consider using C++11 atomics to implement the underlying value_. Change-Id: If2dc1b258ad7ef7ddc3db4e78de3065012cfa923 Reviewed-on: http://gerrit.cloudera.org:8080/2573 Reviewed-by: Dan Hecht <dhecht@cloudera.com> Tested-by: Internal Jenkins
This commit is contained in:
committed by
Internal Jenkins
parent
e999617719
commit
152c586f39
@@ -23,99 +23,62 @@
|
||||
|
||||
namespace impala {
|
||||
|
||||
// Simple test to make sure there is no obvious error in the usage of the
|
||||
// __sync* operations. This is not intended to test the thread safety.
|
||||
TEST(AtomicTest, Basic) {
|
||||
AtomicInt<int> i1;
|
||||
EXPECT_EQ(i1, 0);
|
||||
i1 = 10;
|
||||
EXPECT_EQ(i1, 10);
|
||||
i1 += 5;
|
||||
EXPECT_EQ(i1, 15);
|
||||
i1 -= 25;
|
||||
EXPECT_EQ(i1, -10);
|
||||
++i1;
|
||||
EXPECT_EQ(i1, -9);
|
||||
--i1;
|
||||
EXPECT_EQ(i1, -10);
|
||||
i1 = 100;
|
||||
EXPECT_EQ(i1, 100);
|
||||
// Simple test to make sure there is no obvious error in the operations. This is not
|
||||
// intended to test the thread safety.
|
||||
template<typename T>
|
||||
static void TestBasic() {
|
||||
AtomicInt<T> i1;
|
||||
EXPECT_EQ(i1.Load(), 0);
|
||||
i1.Store(10);
|
||||
EXPECT_EQ(i1.Load(), 10);
|
||||
i1.Add(5);
|
||||
EXPECT_EQ(i1.Load(), 15);
|
||||
i1.Add(-25);
|
||||
EXPECT_EQ(i1.Load(), -10);
|
||||
i1.Store(100);
|
||||
EXPECT_EQ(i1.Load(), 100);
|
||||
|
||||
i1.UpdateMax(50);
|
||||
EXPECT_EQ(i1, 100);
|
||||
i1.UpdateMax(150);
|
||||
EXPECT_EQ(i1, 150);
|
||||
|
||||
i1.UpdateMin(200);
|
||||
EXPECT_EQ(i1, 150);
|
||||
i1.UpdateMin(-200);
|
||||
EXPECT_EQ(i1, -200);
|
||||
|
||||
bool success = i1.CompareAndSwap(-200, 50);
|
||||
EXPECT_EQ(i1, 50);
|
||||
bool success = i1.CompareAndSwap(100, 50);
|
||||
EXPECT_EQ(i1.Load(), 50);
|
||||
EXPECT_EQ(success, true);
|
||||
success = i1.CompareAndSwap(50, 100);
|
||||
EXPECT_EQ(i1, 100);
|
||||
EXPECT_EQ(i1.Load(), 100);
|
||||
EXPECT_EQ(success, true);
|
||||
|
||||
success = i1.CompareAndSwap(-200, 50);
|
||||
EXPECT_EQ(i1, 100);
|
||||
EXPECT_EQ(i1.Load(), 100);
|
||||
EXPECT_EQ(success, false);
|
||||
success = i1.CompareAndSwap(50, 200);
|
||||
EXPECT_EQ(i1, 100);
|
||||
EXPECT_EQ(i1.Load(), 100);
|
||||
EXPECT_EQ(success, false);
|
||||
|
||||
int retval = i1.CompareAndSwapVal(100, 200);
|
||||
EXPECT_EQ(i1, 200);
|
||||
EXPECT_EQ(retval, 100);
|
||||
retval = i1.CompareAndSwapVal(200, 250);
|
||||
EXPECT_EQ(i1, 250);
|
||||
EXPECT_EQ(retval, 200);
|
||||
|
||||
retval = i1.CompareAndSwapVal(100, 200);
|
||||
EXPECT_EQ(i1, 250);
|
||||
EXPECT_EQ(retval, 250);
|
||||
retval = i1.CompareAndSwapVal(-200, 50);
|
||||
EXPECT_EQ(i1, 250);
|
||||
EXPECT_EQ(retval, 250);
|
||||
|
||||
retval = i1.Swap(300);
|
||||
EXPECT_EQ(i1, 300);
|
||||
EXPECT_EQ(retval, 250);
|
||||
retval = i1.Swap(350);
|
||||
EXPECT_EQ(i1, 350);
|
||||
EXPECT_EQ(retval, 300);
|
||||
}
|
||||
|
||||
TEST(AtomicTest, TestAndSet) {
|
||||
AtomicInt<int> i1;
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
EXPECT_EQ(i + 1, i1.UpdateAndFetch(1));
|
||||
}
|
||||
TEST(AtomicTest, Basic) {
|
||||
TestBasic<int32_t>();
|
||||
}
|
||||
|
||||
i1 = 0;
|
||||
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
EXPECT_EQ(i, i1.FetchAndUpdate(1));
|
||||
}
|
||||
TEST(AtomicTest, Basic64) {
|
||||
TestBasic<int64_t>();
|
||||
}
|
||||
|
||||
// Basic multi-threaded testing
|
||||
typedef function<void (int64_t, int64_t , AtomicInt<int>*)> Fn;
|
||||
|
||||
void IncrementThread(int64_t id, int64_t n, AtomicInt<int>* ai) {
|
||||
template<typename T>
|
||||
static void IncrementThread(int64_t id, int64_t n, AtomicInt<T>* ai) {
|
||||
for (int64_t i = 0; i < n * id; ++i) {
|
||||
++*ai;
|
||||
ai->Add(1);
|
||||
}
|
||||
}
|
||||
|
||||
void DecrementThread(int64_t id, int64_t n, AtomicInt<int>* ai) {
|
||||
template<typename T>
|
||||
static void DecrementThread(int64_t id, int64_t n, AtomicInt<T>* ai) {
|
||||
for (int64_t i = 0; i < n * id; ++i) {
|
||||
--*ai;
|
||||
ai->Add(-1);
|
||||
}
|
||||
}
|
||||
|
||||
TEST(AtomicTest, MultipleThreadsIncDec) {
|
||||
template<typename T>
|
||||
static void TestMultipleThreadsIncDec() {
|
||||
thread_group increments, decrements;
|
||||
vector<int> ops;
|
||||
ops.push_back(1000);
|
||||
@@ -128,47 +91,58 @@ TEST(AtomicTest, MultipleThreadsIncDec) {
|
||||
for (vector<int>::iterator thrit = num_threads.begin(); thrit != num_threads.end();
|
||||
++thrit) {
|
||||
for (vector<int>::iterator opit = ops.begin(); opit != ops.end(); ++opit) {
|
||||
AtomicInt<int> ai = 0;
|
||||
AtomicInt<T> ai(0);
|
||||
for (int i = 0; i < *thrit; ++i) {
|
||||
increments.add_thread( new thread(IncrementThread, i, *opit, &ai));
|
||||
decrements.add_thread( new thread(DecrementThread, i, *opit, &ai));
|
||||
increments.add_thread(new thread(IncrementThread<T>, i, *opit, &ai));
|
||||
decrements.add_thread(new thread(DecrementThread<T>, i, *opit, &ai));
|
||||
}
|
||||
increments.join_all();
|
||||
decrements.join_all();
|
||||
EXPECT_EQ(ai, 0);
|
||||
EXPECT_EQ(ai.Load(), 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void CASIncrementThread(int64_t id, int64_t n, AtomicInt<int>* ai) {
|
||||
int oldval = 0;
|
||||
int newval = 0;
|
||||
TEST(AtomicTest, MultipleThreadsIncDec) {
|
||||
TestMultipleThreadsIncDec<int32_t>();
|
||||
}
|
||||
|
||||
TEST(AtomicTest, MultipleThreadsIncDec64) {
|
||||
TestMultipleThreadsIncDec<int64_t>();
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
static void CASIncrementThread(int64_t id, int64_t n, AtomicInt<T>* ai) {
|
||||
T oldval = 0;
|
||||
T newval = 0;
|
||||
bool success = false;
|
||||
for (int64_t i = 0; i < n * id; ++i) {
|
||||
success = false;
|
||||
while ( !success ) {
|
||||
oldval = ai->Read();
|
||||
while (!success) {
|
||||
oldval = ai->Load();
|
||||
newval = oldval + 1;
|
||||
success = ai->CompareAndSwap(oldval, newval);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void CASDecrementThread(int64_t id, int64_t n, AtomicInt<int>* ai) {
|
||||
int oldval = 0;
|
||||
int newval = 0;
|
||||
template<typename T>
|
||||
static void CASDecrementThread(int64_t id, int64_t n, AtomicInt<T>* ai) {
|
||||
T oldval = 0;
|
||||
T newval = 0;
|
||||
bool success = false;
|
||||
for (int64_t i = 0; i < n * id; ++i) {
|
||||
success = false;
|
||||
while ( !success ) {
|
||||
oldval = ai->Read();
|
||||
while (!success) {
|
||||
oldval = ai->Load();
|
||||
newval = oldval - 1;
|
||||
success = ai->CompareAndSwap(oldval, newval);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST(AtomicTest, MultipleThreadsCASIncDec) {
|
||||
template<typename T>
|
||||
static void TestMultipleThreadsCASIncDec() {
|
||||
thread_group increments, decrements;
|
||||
vector<int> ops;
|
||||
ops.push_back(10);
|
||||
@@ -181,18 +155,97 @@ TEST(AtomicTest, MultipleThreadsCASIncDec) {
|
||||
for (vector<int>::iterator thrit = num_threads.begin(); thrit != num_threads.end();
|
||||
++thrit) {
|
||||
for (vector<int>::iterator opit = ops.begin(); opit != ops.end(); ++opit) {
|
||||
AtomicInt<int> ai = 0;
|
||||
AtomicInt<T> ai(0);
|
||||
for (int i = 0; i < *thrit; ++i) {
|
||||
increments.add_thread( new thread(CASIncrementThread, i, *opit, &ai));
|
||||
decrements.add_thread( new thread(CASDecrementThread, i, *opit, &ai));
|
||||
increments.add_thread(new thread(CASIncrementThread<T>, i, *opit, &ai));
|
||||
decrements.add_thread(new thread(CASDecrementThread<T>, i, *opit, &ai));
|
||||
}
|
||||
increments.join_all();
|
||||
decrements.join_all();
|
||||
EXPECT_EQ(ai, 0);
|
||||
EXPECT_EQ(ai.Load(), 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST(AtomicTest, MultipleThreadsCASIncDec) {
|
||||
TestMultipleThreadsCASIncDec<int32_t>();
|
||||
}
|
||||
|
||||
TEST(AtomicTest, MultipleThreadsCASIncDec64) {
|
||||
TestMultipleThreadsCASIncDec<int64_t>();
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
static void AcquireReleaseThreadA(T id, T other_id, int iters, AtomicInt<T>* control,
|
||||
T* payload) {
|
||||
const int DELAY = 100;
|
||||
for (int it = 0; it < iters; ++it) {
|
||||
// Phase 1
|
||||
// (A-1) This store is not allowed to reorder with the below "store release".
|
||||
*payload = it;
|
||||
control->Store(other_id);
|
||||
|
||||
// Phase 2
|
||||
while (control->Load() != id) ;
|
||||
// (A-2) This store is not allowed to reorder with the above "load acquire".
|
||||
*payload = -it;
|
||||
|
||||
// Give thread B a chance to get started on Phase 1.
|
||||
for (int i = 0; i < DELAY; ++i) {
|
||||
AtomicUtil::CompilerBarrier();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
static void AcquireReleaseThreadB(T id, T other_id, int iters, AtomicInt<T>* control,
|
||||
T* payload) {
|
||||
const int DELAY = 100;
|
||||
for (int it = 0; it < iters; ++it) {
|
||||
T p;
|
||||
// Phase 1
|
||||
// Wait until ThreadA signaled that payload was updated.
|
||||
while (control->Load() != id);
|
||||
// (B-1) This load is not allowed to reorder with the above "load acquire".
|
||||
p = *payload;
|
||||
// Verify ordering for both (A-1) and (B-1).
|
||||
EXPECT_EQ(it, p);
|
||||
|
||||
// Phase 2
|
||||
// Payload should not change until we give ThreadA the go-ahead.
|
||||
for (int i = 0; i < DELAY; ++i) {
|
||||
AtomicUtil::CompilerBarrier();
|
||||
// (B-2) This load is not allowed to reorder with the below "store release".
|
||||
p = *payload;
|
||||
}
|
||||
control->Store(other_id);
|
||||
// Verify ordering for both (A-2) and (B-2).
|
||||
EXPECT_EQ(it, p);
|
||||
}
|
||||
}
|
||||
|
||||
// Test "acquire" and "release" memory ordering semantics. There are two threads, A and
|
||||
// B, and each execute phase 1 and phase 2 in lockstep to verify the various
|
||||
// combinations of memory accesses.
|
||||
template<typename T>
|
||||
static void TestAcquireReleaseLoadStore() {
|
||||
const int ITERS = 1000000;
|
||||
AtomicInt<T> control(-1);
|
||||
T payload = -1;
|
||||
thread* t_a = new thread(AcquireReleaseThreadA<T>, 0, 1, ITERS, &control, &payload);
|
||||
thread* t_b = new thread(AcquireReleaseThreadB<T>, 1, 0, ITERS, &control, &payload);
|
||||
t_a->join();
|
||||
t_b->join();
|
||||
}
|
||||
|
||||
TEST(AtomicTest, MultipleTreadsAcquireReleaseLoadStoreInt) {
|
||||
TestAcquireReleaseLoadStore<int32_t>();
|
||||
}
|
||||
|
||||
TEST(AtomicTest, MultipleTreadsAcquireReleaseLoadStoreInt64) {
|
||||
TestAcquireReleaseLoadStore<int64_t>();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
|
||||
@@ -15,9 +15,9 @@
|
||||
#ifndef IMPALA_COMMON_ATOMIC_H
|
||||
#define IMPALA_COMMON_ATOMIC_H
|
||||
|
||||
#include <algorithm>
|
||||
|
||||
#include "common/compiler-util.h"
|
||||
#include "gutil/atomicops.h"
|
||||
#include "gutil/macros.h"
|
||||
|
||||
namespace impala {
|
||||
|
||||
@@ -29,122 +29,77 @@ class AtomicUtil {
|
||||
/// while (1);
|
||||
/// should be:
|
||||
/// while (1) CpuWait();
|
||||
static inline void CpuWait() {
|
||||
asm volatile("pause\n": : :"memory");
|
||||
static ALWAYS_INLINE void CpuWait() {
|
||||
base::subtle::PauseCPU();
|
||||
}
|
||||
|
||||
static inline void MemoryBarrier() {
|
||||
__sync_synchronize();
|
||||
/// Provides "barrier" semantics (see below) without a memory access.
|
||||
static ALWAYS_INLINE void MemoryBarrier() {
|
||||
base::subtle::MemoryBarrier();
|
||||
}
|
||||
|
||||
/// Provides a compiler barrier. The compiler is not allowed to reorder memory
|
||||
/// accesses across this (but the CPU can). This generates no instructions.
|
||||
static ALWAYS_INLINE void CompilerBarrier() {
|
||||
__asm__ __volatile__("" : : : "memory");
|
||||
}
|
||||
};
|
||||
|
||||
/// Wrapper for atomic integers. This should be switched to c++ 11 when
|
||||
/// we can switch.
|
||||
/// This class overloads operators to behave like a regular integer type
|
||||
/// but all operators and functions are thread safe.
|
||||
/// Atomic integer. 'T' can be either 32-bit or 64-bit signed integer. Each operation
|
||||
/// is performed atomically and has a specified memory-ordering semantic:
|
||||
///
|
||||
/// Acquire: these operations ensure no later memory access by the same thread can be
|
||||
/// reordered ahead of the operation. (C++11: memory_order_relaxed)
|
||||
///
|
||||
/// Release: these operations ensure that no previous memory access by the same thread
|
||||
/// can be reordered after the operation (C++11: memory_order_release).
|
||||
///
|
||||
/// Barrier: these operations have both Acquire and Release semantics (C++11:
|
||||
/// memory_order_acq_rel).
|
||||
///
|
||||
/// NoBarrier: these operations do not guarantee any ordering (C++11:
|
||||
/// memory_order_relaxed). The compiler/CPU is free to reorder memory accesses (as seen
|
||||
/// by other threads) just like any normal variable.
|
||||
///
|
||||
template<typename T>
|
||||
class AtomicInt {
|
||||
public:
|
||||
AtomicInt(T initial = 0) : value_(initial) {}
|
||||
|
||||
operator T() const { return value_; }
|
||||
|
||||
AtomicInt& operator=(T val) {
|
||||
value_ = val;
|
||||
return *this;
|
||||
}
|
||||
AtomicInt& operator=(const AtomicInt<T>& val) {
|
||||
value_ = val.value_;
|
||||
return *this;
|
||||
AtomicInt(T initial = 0) : value_(initial) {
|
||||
DCHECK(sizeof(T) == sizeof(base::subtle::Atomic32) ||
|
||||
sizeof(T) == sizeof(base::subtle::Atomic64));
|
||||
}
|
||||
|
||||
AtomicInt& operator+=(T delta) {
|
||||
__sync_add_and_fetch(&value_, delta);
|
||||
return *this;
|
||||
}
|
||||
AtomicInt& operator-=(T delta) {
|
||||
__sync_add_and_fetch(&value_, -delta);
|
||||
return *this;
|
||||
/// Atomic load with "acquire" memory-ordering semantic.
|
||||
ALWAYS_INLINE T Load() const {
|
||||
return base::subtle::Acquire_Load(&value_);
|
||||
}
|
||||
|
||||
AtomicInt& operator|=(T v) {
|
||||
__sync_or_and_fetch(&value_, v);
|
||||
return *this;
|
||||
}
|
||||
AtomicInt& operator&=(T v) {
|
||||
__sync_and_and_fetch(&value_, v);
|
||||
return *this;
|
||||
/// Atomic store with "release" memory-ordering semantic.
|
||||
ALWAYS_INLINE void Store(T x) {
|
||||
base::subtle::Release_Store(&value_, x);
|
||||
}
|
||||
|
||||
/// These define the preincrement (i.e. --value) operators.
|
||||
AtomicInt& operator++() {
|
||||
__sync_add_and_fetch(&value_, 1);
|
||||
return *this;
|
||||
}
|
||||
AtomicInt& operator--() {
|
||||
__sync_add_and_fetch(&value_, -1);
|
||||
return *this;
|
||||
/// Atomic add with "barrier" memory-ordering semantic. Returns the new value.
|
||||
ALWAYS_INLINE T Add(T x) {
|
||||
return base::subtle::Barrier_AtomicIncrement(&value_, x);
|
||||
}
|
||||
|
||||
/// This is post increment, which needs to return a new object.
|
||||
AtomicInt<T> operator++(int) {
|
||||
T prev = __sync_fetch_and_add(&value_, 1);
|
||||
return AtomicInt<T>(prev);
|
||||
}
|
||||
AtomicInt<T> operator--(int) {
|
||||
T prev = __sync_fetch_and_add(&value_, -1);
|
||||
return AtomicInt<T>(prev);
|
||||
}
|
||||
|
||||
/// Safe read of the value
|
||||
T Read() {
|
||||
return __sync_fetch_and_add(&value_, 0);
|
||||
}
|
||||
|
||||
/// Increments by delta (i.e. += delta) and returns the new val
|
||||
T UpdateAndFetch(T delta) {
|
||||
return __sync_add_and_fetch(&value_, delta);
|
||||
}
|
||||
|
||||
/// Increment by delta and returns the old val
|
||||
T FetchAndUpdate(T delta) {
|
||||
return __sync_fetch_and_add(&value_, delta);
|
||||
}
|
||||
|
||||
/// Updates the int to 'value' if value is larger
|
||||
void UpdateMax(T value) {
|
||||
while (true) {
|
||||
T old_value = value_;
|
||||
T new_value = std::max(old_value, value);
|
||||
if (LIKELY(CompareAndSwap(old_value, new_value))) break;
|
||||
}
|
||||
}
|
||||
void UpdateMin(T value) {
|
||||
while (true) {
|
||||
T old_value = value_;
|
||||
T new_value = std::min(old_value, value);
|
||||
if (LIKELY(CompareAndSwap(old_value, new_value))) break;
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if the atomic compare-and-swap was successful.
|
||||
bool CompareAndSwap(T old_val, T new_val) {
|
||||
return __sync_bool_compare_and_swap(&value_, old_val, new_val);
|
||||
}
|
||||
|
||||
/// Returns the content of value_ before the operation.
|
||||
/// If returnValue == old_val, then the atomic compare-and-swap was successful.
|
||||
T CompareAndSwapVal(T old_val, T new_val) {
|
||||
return __sync_val_compare_and_swap(&value_, old_val, new_val);
|
||||
}
|
||||
|
||||
/// Atomically updates value_ with new_val. Returns the old value_.
|
||||
T Swap(const T& new_val) {
|
||||
return __sync_lock_test_and_set(&value_, new_val);
|
||||
/// Atomically compare 'old_val' to 'value_' and set 'value_' to 'new_val' and return
|
||||
/// true if they compared equal, otherwise return false (and do no updates), with
|
||||
/// "barrier" memory-ordering semantic. That is, atomically performs:
|
||||
/// if (value_ == old_val) {
|
||||
/// value_ = new_val;
|
||||
/// return true;
|
||||
/// }
|
||||
/// return false;
|
||||
ALWAYS_INLINE bool CompareAndSwap(T old_val, T new_val) {
|
||||
return base::subtle::Barrier_CompareAndSwap(&value_, old_val, new_val) == old_val;
|
||||
}
|
||||
|
||||
private:
|
||||
T value_;
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(AtomicInt);
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -277,7 +277,7 @@ Status HdfsScanNode::GetNextInternal(
|
||||
*eos = false;
|
||||
RowBatch* materialized_batch = materialized_row_batches_->GetBatch();
|
||||
if (materialized_batch != NULL) {
|
||||
num_owned_io_buffers_ -= materialized_batch->num_io_buffers();
|
||||
num_owned_io_buffers_.Add(-materialized_batch->num_io_buffers());
|
||||
row_batch->AcquireState(materialized_batch);
|
||||
// Update the number of materialized rows now instead of when they are materialized.
|
||||
// This means that scanners might process and queue up more rows than are necessary
|
||||
@@ -549,7 +549,7 @@ Status HdfsScanNode::Prepare(RuntimeState* state) {
|
||||
file_desc->file_compression = split.file_compression;
|
||||
RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
|
||||
native_file_path, &file_desc->fs, &fs_cache));
|
||||
++num_unqueued_files_;
|
||||
num_unqueued_files_.Add(1);
|
||||
per_type_files_[partition_desc->file_format()].push_back(file_desc);
|
||||
} else {
|
||||
// File already processed
|
||||
@@ -787,7 +787,7 @@ Status HdfsScanNode::Open(RuntimeState* state) {
|
||||
|
||||
stringstream ss;
|
||||
ss << "Splits complete (node=" << id() << "):";
|
||||
progress_ = ProgressUpdater(ss.str(), total_splits);
|
||||
progress_.Init(ss.str(), total_splits);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@@ -809,8 +809,8 @@ void HdfsScanNode::Close(RuntimeState* state) {
|
||||
|
||||
scanner_threads_.JoinAll();
|
||||
|
||||
num_owned_io_buffers_ -= materialized_row_batches_->Cleanup();
|
||||
DCHECK_EQ(num_owned_io_buffers_, 0) << "ScanNode has leaked io buffers";
|
||||
num_owned_io_buffers_.Add(-materialized_row_batches_->Cleanup());
|
||||
DCHECK_EQ(num_owned_io_buffers_.Load(), 0) << "ScanNode has leaked io buffers";
|
||||
|
||||
if (reader_context_ != NULL) {
|
||||
// There may still be io buffers used by parent nodes so we can't unregister the
|
||||
@@ -859,8 +859,8 @@ Status HdfsScanNode::AddDiskIoRanges(const vector<DiskIoMgr::ScanRange*>& ranges
|
||||
int num_files_queued) {
|
||||
RETURN_IF_ERROR(
|
||||
runtime_state_->io_mgr()->AddScanRanges(reader_context_, ranges));
|
||||
num_unqueued_files_ -= num_files_queued;
|
||||
DCHECK_GE(num_unqueued_files_, 0);
|
||||
num_unqueued_files_.Add(-num_files_queued);
|
||||
DCHECK_GE(num_unqueued_files_.Load(), 0);
|
||||
ThreadTokenAvailableCb(runtime_state_->resource_pool());
|
||||
return Status::OK();
|
||||
}
|
||||
@@ -956,7 +956,6 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
|
||||
// TODO: It would be good to have a test case for that.
|
||||
if (!initial_ranges_issued_) return;
|
||||
|
||||
bool started_scanner = false;
|
||||
while (true) {
|
||||
// The lock must be given up between loops in order to give writers to done_,
|
||||
// all_ranges_started_ etc. a chance to grab the lock.
|
||||
@@ -995,13 +994,11 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
|
||||
ss << "scanner-thread(" << num_scanner_threads_started_counter_->value() << ")";
|
||||
scanner_threads_.AddThread(
|
||||
new Thread("hdfs-scan-node", ss.str(), &HdfsScanNode::ScannerThread, this));
|
||||
started_scanner = true;
|
||||
|
||||
if (runtime_state_->query_resource_mgr() != NULL) {
|
||||
runtime_state_->query_resource_mgr()->NotifyThreadUsageChange(1);
|
||||
}
|
||||
}
|
||||
if (!started_scanner) ++num_skipped_tokens_;
|
||||
}
|
||||
|
||||
void HdfsScanNode::ScannerThread() {
|
||||
@@ -1060,7 +1057,9 @@ void HdfsScanNode::ScannerThread() {
|
||||
// Take a snapshot of num_unqueued_files_ before calling GetNextRange().
|
||||
// We don't want num_unqueued_files_ to go to zero between the return from
|
||||
// GetNextRange() and the check for when all ranges are complete.
|
||||
int num_unqueued_files = num_unqueued_files_;
|
||||
int num_unqueued_files = num_unqueued_files_.Load();
|
||||
// TODO: the Load() acts as an acquire barrier. Is this needed? (i.e. any earlier
|
||||
// stores that need to complete?)
|
||||
AtomicUtil::MemoryBarrier();
|
||||
Status status = runtime_state_->io_mgr()->GetNextRange(reader_context_, &scan_range);
|
||||
|
||||
@@ -1307,10 +1306,9 @@ void HdfsScanNode::StopAndFinalizeCounters() {
|
||||
}
|
||||
|
||||
// Output fraction of scanners with codegen enabled
|
||||
ss.str(std::string());
|
||||
ss << "Codegen enabled: " << num_scanners_codegen_enabled_ << " out of "
|
||||
<< (num_scanners_codegen_enabled_ + num_scanners_codegen_disabled_);
|
||||
AddRuntimeExecOption(ss.str());
|
||||
int num_enabled = num_scanners_codegen_enabled_.Load();
|
||||
int total = num_enabled + num_scanners_codegen_disabled_.Load();
|
||||
AddRuntimeExecOption(Substitute("Codegen enabled: $0 out of $1", num_enabled, total));
|
||||
|
||||
if (reader_context_ != NULL) {
|
||||
bytes_read_local_->Set(runtime_state_->io_mgr()->bytes_read_local(reader_context_));
|
||||
|
||||
@@ -192,11 +192,11 @@ class HdfsScanNode : public ScanNode {
|
||||
void* GetCodegenFn(THdfsFileFormat::type);
|
||||
|
||||
inline void IncNumScannersCodegenEnabled() {
|
||||
++num_scanners_codegen_enabled_;
|
||||
num_scanners_codegen_enabled_.Add(1);
|
||||
}
|
||||
|
||||
inline void IncNumScannersCodegenDisabled() {
|
||||
++num_scanners_codegen_disabled_;
|
||||
num_scanners_codegen_disabled_.Add(1);
|
||||
}
|
||||
|
||||
/// Adds a materialized row batch for the scan node. This is called from scanner
|
||||
@@ -426,10 +426,6 @@ class HdfsScanNode : public ScanNode {
|
||||
/// happening in the scanners vs other parts of the execution.
|
||||
AtomicInt<int> num_owned_io_buffers_;
|
||||
|
||||
/// The number of times a token was offered but no scanner threads started.
|
||||
/// This is used for diagnostics only.
|
||||
AtomicInt<int> num_skipped_tokens_;
|
||||
|
||||
/// Counters which track the number of scanners that have codegen enabled for the
|
||||
/// materialize and conjuncts evaluation code paths.
|
||||
AtomicInt<int> num_scanners_codegen_enabled_;
|
||||
|
||||
@@ -103,7 +103,7 @@ void ScannerContext::Stream::ReleaseCompletedResources(RowBatch* batch, bool don
|
||||
// there are too many, we should compact.
|
||||
} else {
|
||||
(*it)->Return();
|
||||
--parent_->scan_node_->num_owned_io_buffers_;
|
||||
parent_->scan_node_->num_owned_io_buffers_.Add(-1);
|
||||
}
|
||||
}
|
||||
parent_->num_completed_io_buffers_ -= completed_io_buffers_.size();
|
||||
@@ -161,7 +161,7 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
|
||||
}
|
||||
|
||||
DCHECK(io_buffer_ != NULL);
|
||||
++parent_->scan_node_->num_owned_io_buffers_;
|
||||
parent_->scan_node_->num_owned_io_buffers_.Add(1);
|
||||
io_buffer_pos_ = reinterpret_cast<uint8_t*>(io_buffer_->buffer());
|
||||
io_buffer_bytes_left_ = io_buffer_->len();
|
||||
if (io_buffer_->len() == 0) {
|
||||
|
||||
@@ -125,11 +125,12 @@ inline Atomic32 Barrier_AtomicIncrement(volatile Atomic32* ptr,
|
||||
return temp + increment;
|
||||
}
|
||||
|
||||
// On x86, the NoBarrier_CompareAndSwap() uses a locked instruction and so also
|
||||
// provides both acquire and release barriers.
|
||||
inline Atomic32 Acquire_CompareAndSwap(volatile Atomic32* ptr,
|
||||
Atomic32 old_value,
|
||||
Atomic32 new_value) {
|
||||
Atomic32 x = NoBarrier_CompareAndSwap(ptr, old_value, new_value);
|
||||
return x;
|
||||
return NoBarrier_CompareAndSwap(ptr, old_value, new_value);
|
||||
}
|
||||
|
||||
inline Atomic32 Release_CompareAndSwap(volatile Atomic32* ptr,
|
||||
@@ -138,6 +139,12 @@ inline Atomic32 Release_CompareAndSwap(volatile Atomic32* ptr,
|
||||
return NoBarrier_CompareAndSwap(ptr, old_value, new_value);
|
||||
}
|
||||
|
||||
inline Atomic32 Barrier_CompareAndSwap(volatile Atomic32* ptr,
|
||||
Atomic32 old_value,
|
||||
Atomic32 new_value) {
|
||||
return NoBarrier_CompareAndSwap(ptr, old_value, new_value);
|
||||
}
|
||||
|
||||
inline void NoBarrier_Store(volatile Atomic32* ptr, Atomic32 value) {
|
||||
CheckNaturalAlignment(ptr);
|
||||
*ptr = value;
|
||||
@@ -464,8 +471,7 @@ inline Atomic64 Release_Load(volatile const Atomic64* ptr) {
|
||||
inline Atomic64 Acquire_CompareAndSwap(volatile Atomic64* ptr,
|
||||
Atomic64 old_value,
|
||||
Atomic64 new_value) {
|
||||
Atomic64 x = NoBarrier_CompareAndSwap(ptr, old_value, new_value);
|
||||
return x;
|
||||
return NoBarrier_CompareAndSwap(ptr, old_value, new_value);
|
||||
}
|
||||
|
||||
inline Atomic64 Release_CompareAndSwap(volatile Atomic64* ptr,
|
||||
@@ -474,6 +480,12 @@ inline Atomic64 Release_CompareAndSwap(volatile Atomic64* ptr,
|
||||
return NoBarrier_CompareAndSwap(ptr, old_value, new_value);
|
||||
}
|
||||
|
||||
inline Atomic64 Barrier_CompareAndSwap(volatile Atomic64* ptr,
|
||||
Atomic64 old_value,
|
||||
Atomic64 new_value) {
|
||||
return NoBarrier_CompareAndSwap(ptr, old_value, new_value);
|
||||
}
|
||||
|
||||
} // namespace subtle
|
||||
} // namespace base
|
||||
|
||||
|
||||
@@ -122,14 +122,14 @@ void RpcEventHandler::Reset(const string& method_name) {
|
||||
MethodMap::iterator it = method_map_.find(method_name);
|
||||
if (it == method_map_.end()) return;
|
||||
it->second->time_stats->Reset();
|
||||
it->second->num_in_flight = 0L;
|
||||
it->second->num_in_flight.Store(0L);
|
||||
}
|
||||
|
||||
void RpcEventHandler::ResetAll() {
|
||||
lock_guard<mutex> l(method_map_lock_);
|
||||
BOOST_FOREACH(const MethodMap::value_type& method, method_map_) {
|
||||
method.second->time_stats->Reset();
|
||||
method.second->num_in_flight = 0L;
|
||||
method.second->num_in_flight.Store(0L);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -150,7 +150,8 @@ void RpcEventHandler::ToJson(Value* server, Document* document) {
|
||||
const string& human_readable = rpc.second->time_stats->ToHumanReadable();
|
||||
Value summary(human_readable.c_str(), document->GetAllocator());
|
||||
method.AddMember("summary", summary, document->GetAllocator());
|
||||
method.AddMember("in_flight", rpc.second->num_in_flight, document->GetAllocator());
|
||||
method.AddMember("in_flight", rpc.second->num_in_flight.Load(),
|
||||
document->GetAllocator());
|
||||
Value server_name(server_name_.c_str(), document->GetAllocator());
|
||||
method.AddMember("server_name", server_name, document->GetAllocator());
|
||||
methods.PushBack(method, document->GetAllocator());
|
||||
@@ -174,7 +175,7 @@ void* RpcEventHandler::getContext(const char* fn_name, void* server_context) {
|
||||
it = method_map_.insert(make_pair(descriptor->name, descriptor)).first;
|
||||
}
|
||||
}
|
||||
++(it->second->num_in_flight);
|
||||
it->second->num_in_flight.Add(1);
|
||||
// TODO: Consider pooling these
|
||||
InvocationContext* ctxt_ptr =
|
||||
new InvocationContext(MonotonicMillis(), cnxn_ctx, it->second);
|
||||
@@ -193,6 +194,6 @@ void RpcEventHandler::postWrite(void* ctx, const char* fn_name, uint32_t bytes)
|
||||
<< PrettyPrinter::Print(elapsed_time * 1000L * 1000L, TUnit::TIME_NS);
|
||||
MethodDescriptor* descriptor = rpc_ctx->method_descriptor;
|
||||
delete rpc_ctx;
|
||||
--descriptor->num_in_flight;
|
||||
descriptor->num_in_flight.Add(-1);
|
||||
descriptor->time_stats->Update(elapsed_time);
|
||||
}
|
||||
|
||||
@@ -84,7 +84,7 @@ class RpcEventHandler : public apache::thrift::TProcessorEventHandler {
|
||||
StatsMetric<double>* time_stats;
|
||||
|
||||
/// Number of invocations in flight
|
||||
AtomicInt<uint32_t> num_in_flight;
|
||||
AtomicInt<int32_t> num_in_flight;
|
||||
};
|
||||
|
||||
/// Map from method name to descriptor
|
||||
|
||||
@@ -454,10 +454,15 @@ Status Coordinator::Exec(QuerySchedule& schedule,
|
||||
|
||||
executor_.reset(NULL);
|
||||
}
|
||||
|
||||
// Initialize the execution profile structures.
|
||||
InitExecProfile(request);
|
||||
|
||||
// Once remote fragments are started, they can start making ReportExecStatus RPCs,
|
||||
// which will update the progress updater. So initialize it before starting remote
|
||||
// fragments.
|
||||
const string& str = Substitute("Query $0", PrintId(query_id_));
|
||||
progress_.Init(str, schedule.num_scan_ranges());
|
||||
|
||||
if (schedule.num_fragment_instances() > 0) {
|
||||
RETURN_IF_ERROR(StartRemoteFragments(&schedule));
|
||||
// If we have a coordinator fragment and remote fragments (the common case), release
|
||||
@@ -469,10 +474,6 @@ Status Coordinator::Exec(QuerySchedule& schedule,
|
||||
}
|
||||
|
||||
PrintFragmentInstanceInfo();
|
||||
|
||||
const string& str = Substitute("Query $0", PrintId(query_id_));
|
||||
progress_ = ProgressUpdater(str, schedule.num_scan_ranges());
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
@@ -134,7 +134,7 @@ Status DataStreamRecvr::SenderQueue::GetBatch(RowBatch** next_batch) {
|
||||
|
||||
DCHECK(!batch_queue_.empty());
|
||||
RowBatch* result = batch_queue_.front().second;
|
||||
recvr_->num_buffered_bytes_ -= batch_queue_.front().first;
|
||||
recvr_->num_buffered_bytes_.Add(-batch_queue_.front().first);
|
||||
VLOG_ROW << "fetched #rows=" << result->num_rows();
|
||||
batch_queue_.pop_front();
|
||||
data_removal__cv_.notify_one();
|
||||
@@ -161,7 +161,7 @@ void DataStreamRecvr::SenderQueue::AddBatch(const TRowBatch& thrift_batch) {
|
||||
while (!batch_queue_.empty() && recvr_->ExceedsLimit(batch_size) && !is_cancelled_) {
|
||||
CANCEL_SAFE_SCOPED_TIMER(recvr_->buffer_full_total_timer_, &is_cancelled_);
|
||||
VLOG_ROW << " wait removal: empty=" << (batch_queue_.empty() ? 1 : 0)
|
||||
<< " #buffered=" << recvr_->num_buffered_bytes_
|
||||
<< " #buffered=" << recvr_->num_buffered_bytes_.Load()
|
||||
<< " batch_size=" << batch_size << "\n";
|
||||
|
||||
// We only want one thread running the timer at any one time. Only
|
||||
@@ -209,7 +209,7 @@ void DataStreamRecvr::SenderQueue::AddBatch(const TRowBatch& thrift_batch) {
|
||||
VLOG_ROW << "added #rows=" << batch->num_rows()
|
||||
<< " batch_size=" << batch_size << "\n";
|
||||
batch_queue_.push_back(make_pair(batch_size, batch));
|
||||
recvr_->num_buffered_bytes_ += batch_size;
|
||||
recvr_->num_buffered_bytes_.Add(batch_size);
|
||||
data_arrival_cv_.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -113,7 +113,7 @@ class DataStreamRecvr {
|
||||
/// Return true if the addition of a new batch of size 'batch_size' would exceed the
|
||||
/// total buffer limit.
|
||||
bool ExceedsLimit(int batch_size) {
|
||||
return num_buffered_bytes_ + batch_size > total_buffer_limit_;
|
||||
return num_buffered_bytes_.Load() + batch_size > total_buffer_limit_;
|
||||
}
|
||||
|
||||
/// DataStreamMgr instance used to create this recvr. (Not owned)
|
||||
|
||||
@@ -322,8 +322,9 @@ class DiskIoMgr::RequestContext {
|
||||
}
|
||||
|
||||
int num_threads_in_op() const {
|
||||
int v = num_threads_in_op_;
|
||||
__sync_synchronize();
|
||||
int v = num_threads_in_op_.Load();
|
||||
// TODO: determine whether this barrier is necessary for any callsites.
|
||||
AtomicUtil::MemoryBarrier();
|
||||
return v;
|
||||
}
|
||||
|
||||
@@ -360,21 +361,19 @@ class DiskIoMgr::RequestContext {
|
||||
/// reader per disk that are in the unlocked hdfs read code section. This is updated
|
||||
/// by multiple threads without a lock so we need to use an atomic int.
|
||||
void IncrementRequestThreadAndDequeue() {
|
||||
++num_threads_in_op_;
|
||||
num_threads_in_op_.Add(1);
|
||||
is_on_queue_ = false;
|
||||
}
|
||||
|
||||
void DecrementRequestThread() {
|
||||
--num_threads_in_op_;
|
||||
num_threads_in_op_.Add(-1);
|
||||
}
|
||||
|
||||
/// Decrement request thread count and do final cleanup if this is the last
|
||||
/// thread. RequestContext lock must be taken before this.
|
||||
void DecrementRequestThreadAndCheckDone(RequestContext* context) {
|
||||
--num_threads_in_op_;
|
||||
// We don't need to worry about reordered loads here because updating
|
||||
// num_threads_in_request_ uses an atomic, which is a barrier.
|
||||
if (!is_on_queue_ && num_threads_in_op_ == 0 && !done_) {
|
||||
num_threads_in_op_.Add(-1); // Also acts as a barrier.
|
||||
if (!is_on_queue_ && num_threads_in_op_.Load() == 0 && !done_) {
|
||||
// This thread is the last one for this reader on this disk, do final cleanup
|
||||
context->DecrementDiskRefCount();
|
||||
done_ = true;
|
||||
@@ -389,7 +388,7 @@ class DiskIoMgr::RequestContext {
|
||||
done_ = true;
|
||||
num_remaining_ranges_ = 0;
|
||||
is_on_queue_ = false;
|
||||
num_threads_in_op_ = 0;
|
||||
num_threads_in_op_.Store(0);
|
||||
next_scan_range_to_start_ = NULL;
|
||||
}
|
||||
|
||||
|
||||
@@ -105,7 +105,7 @@ void DiskIoMgr::RequestContext::AddRequestRange(
|
||||
ScheduleScanRange(scan_range);
|
||||
} else {
|
||||
state.unstarted_scan_ranges()->Enqueue(scan_range);
|
||||
++num_unstarted_scan_ranges_;
|
||||
num_unstarted_scan_ranges_.Add(1);
|
||||
}
|
||||
// If next_scan_range_to_start is NULL, schedule this RequestContext so that it will
|
||||
// be set. If it's not NULL, this context will be scheduled when GetNextRange() is
|
||||
@@ -149,18 +149,18 @@ void DiskIoMgr::RequestContext::Reset(MemTracker* tracker) {
|
||||
state_ = Active;
|
||||
mem_tracker_ = tracker;
|
||||
|
||||
num_unstarted_scan_ranges_ = 0;
|
||||
num_unstarted_scan_ranges_.Store(0);
|
||||
num_disks_with_ranges_ = 0;
|
||||
num_used_buffers_ = 0;
|
||||
num_buffers_in_reader_ = 0;
|
||||
num_ready_buffers_ = 0;
|
||||
total_range_queue_capacity_ = 0;
|
||||
num_finished_ranges_ = 0;
|
||||
num_remote_ranges_ = 0;
|
||||
bytes_read_local_ = 0;
|
||||
bytes_read_short_circuit_ = 0;
|
||||
bytes_read_dn_cache_ = 0;
|
||||
unexpected_remote_bytes_ = 0;
|
||||
num_used_buffers_.Store(0);
|
||||
num_buffers_in_reader_.Store(0);
|
||||
num_ready_buffers_.Store(0);
|
||||
total_range_queue_capacity_.Store(0);
|
||||
num_finished_ranges_.Store(0);
|
||||
num_remote_ranges_.Store(0);
|
||||
bytes_read_local_.Store(0);
|
||||
bytes_read_short_circuit_.Store(0);
|
||||
bytes_read_dn_cache_.Store(0);
|
||||
unexpected_remote_bytes_.Store(0);
|
||||
initial_queue_capacity_ = DiskIoMgr::DEFAULT_QUEUE_CAPACITY;
|
||||
|
||||
DCHECK(ready_to_start_ranges_.empty());
|
||||
@@ -181,10 +181,10 @@ string DiskIoMgr::RequestContext::DebugString() const {
|
||||
if (state_ == RequestContext::Active) ss << "Active";
|
||||
if (state_ != RequestContext::Inactive) {
|
||||
ss << " status_=" << (status_.ok() ? "OK" : status_.GetDetail())
|
||||
<< " #ready_buffers=" << num_ready_buffers_
|
||||
<< " #used_buffers=" << num_used_buffers_
|
||||
<< " #num_buffers_in_reader=" << num_buffers_in_reader_
|
||||
<< " #finished_scan_ranges=" << num_finished_ranges_
|
||||
<< " #ready_buffers=" << num_ready_buffers_.Load()
|
||||
<< " #used_buffers=" << num_used_buffers_.Load()
|
||||
<< " #num_buffers_in_reader=" << num_buffers_in_reader_.Load()
|
||||
<< " #finished_scan_ranges=" << num_finished_ranges_.Load()
|
||||
<< " #disk_with_ranges=" << num_disks_with_ranges_
|
||||
<< " #disks=" << num_disks_with_ranges_;
|
||||
for (int i = 0; i < disk_states_.size(); ++i) {
|
||||
@@ -209,13 +209,13 @@ bool DiskIoMgr::RequestContext::Validate() const {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (num_used_buffers_ < 0) {
|
||||
LOG(WARNING) << "num_used_buffers_ < 0: #used=" << num_used_buffers_;
|
||||
if (num_used_buffers_.Load() < 0) {
|
||||
LOG(WARNING) << "num_used_buffers_ < 0: #used=" << num_used_buffers_.Load();
|
||||
return false;
|
||||
}
|
||||
|
||||
if (num_ready_buffers_ < 0) {
|
||||
LOG(WARNING) << "num_ready_buffers_ < 0: #used=" << num_ready_buffers_;
|
||||
if (num_ready_buffers_.Load() < 0) {
|
||||
LOG(WARNING) << "num_ready_buffers_ < 0: #used=" << num_ready_buffers_.Load();
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -286,9 +286,9 @@ bool DiskIoMgr::RequestContext::Validate() const {
|
||||
}
|
||||
|
||||
if (state_ != RequestContext::Cancelled) {
|
||||
if (total_unstarted_ranges != num_unstarted_scan_ranges_) {
|
||||
if (total_unstarted_ranges != num_unstarted_scan_ranges_.Load()) {
|
||||
LOG(WARNING) << "total_unstarted_ranges=" << total_unstarted_ranges
|
||||
<< " sum_in_states=" << num_unstarted_scan_ranges_;
|
||||
<< " sum_in_states=" << num_unstarted_scan_ranges_.Load();
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -42,14 +42,14 @@ bool DiskIoMgr::ScanRange::EnqueueBuffer(BufferDescriptor* buffer) {
|
||||
if (is_cancelled_) {
|
||||
// Return the buffer, this range has been cancelled
|
||||
if (buffer->buffer_ != NULL) {
|
||||
++io_mgr_->num_buffers_in_readers_;
|
||||
++reader_->num_buffers_in_reader_;
|
||||
io_mgr_->num_buffers_in_readers_.Add(1);
|
||||
reader_->num_buffers_in_reader_.Add(1);
|
||||
}
|
||||
--reader_->num_used_buffers_;
|
||||
reader_->num_used_buffers_.Add(-1);
|
||||
buffer->Return();
|
||||
return false;
|
||||
}
|
||||
++reader_->num_ready_buffers_;
|
||||
reader_->num_ready_buffers_.Add(1);
|
||||
ready_buffers_.push_back(buffer);
|
||||
eosr_queued_ = buffer->eosr();
|
||||
|
||||
@@ -100,10 +100,10 @@ Status DiskIoMgr::ScanRange::GetNext(BufferDescriptor** buffer) {
|
||||
|
||||
// Update tracking counters. The buffer has now moved from the IoMgr to the
|
||||
// caller.
|
||||
++io_mgr_->num_buffers_in_readers_;
|
||||
++reader_->num_buffers_in_reader_;
|
||||
--reader_->num_ready_buffers_;
|
||||
--reader_->num_used_buffers_;
|
||||
io_mgr_->num_buffers_in_readers_.Add(1);
|
||||
reader_->num_buffers_in_reader_.Add(1);
|
||||
reader_->num_ready_buffers_.Add(-1);
|
||||
reader_->num_used_buffers_.Add(-1);
|
||||
|
||||
Status status = (*buffer)->status_;
|
||||
if (!status.ok()) {
|
||||
@@ -114,10 +114,10 @@ Status DiskIoMgr::ScanRange::GetNext(BufferDescriptor** buffer) {
|
||||
|
||||
unique_lock<mutex> reader_lock(reader_->lock_);
|
||||
if (eosr_returned_) {
|
||||
reader_->total_range_queue_capacity_ += ready_buffers_capacity_;
|
||||
++reader_->num_finished_ranges_;
|
||||
reader_->initial_queue_capacity_ =
|
||||
reader_->total_range_queue_capacity_ / reader_->num_finished_ranges_;
|
||||
reader_->total_range_queue_capacity_.Add(ready_buffers_capacity_);
|
||||
reader_->num_finished_ranges_.Add(1);
|
||||
reader_->initial_queue_capacity_ = reader_->total_range_queue_capacity_.Load() /
|
||||
reader_->num_finished_ranges_.Load();
|
||||
}
|
||||
|
||||
DCHECK(reader_->Validate()) << endl << reader_->DebugString();
|
||||
@@ -164,10 +164,10 @@ void DiskIoMgr::ScanRange::Cancel(const Status& status) {
|
||||
|
||||
void DiskIoMgr::ScanRange::CleanupQueuedBuffers() {
|
||||
DCHECK(is_cancelled_);
|
||||
io_mgr_->num_buffers_in_readers_ += ready_buffers_.size();
|
||||
reader_->num_buffers_in_reader_ += ready_buffers_.size();
|
||||
reader_->num_used_buffers_ -= ready_buffers_.size();
|
||||
reader_->num_ready_buffers_ -= ready_buffers_.size();
|
||||
io_mgr_->num_buffers_in_readers_.Add(ready_buffers_.size());
|
||||
reader_->num_buffers_in_reader_.Add(ready_buffers_.size());
|
||||
reader_->num_used_buffers_.Add(-ready_buffers_.size());
|
||||
reader_->num_ready_buffers_.Add(-ready_buffers_.size());
|
||||
|
||||
while (!ready_buffers_.empty()) {
|
||||
BufferDescriptor* buffer = ready_buffers_.front();
|
||||
@@ -302,14 +302,14 @@ void DiskIoMgr::ScanRange::Close() {
|
||||
if (IsDfsPath(file())) {
|
||||
int success = hdfsFileGetReadStatistics(hdfs_file_->file(), &stats);
|
||||
if (success == 0) {
|
||||
reader_->bytes_read_local_ += stats->totalLocalBytesRead;
|
||||
reader_->bytes_read_short_circuit_ += stats->totalShortCircuitBytesRead;
|
||||
reader_->bytes_read_dn_cache_ += stats->totalZeroCopyBytesRead;
|
||||
reader_->bytes_read_local_.Add(stats->totalLocalBytesRead);
|
||||
reader_->bytes_read_short_circuit_.Add(stats->totalShortCircuitBytesRead);
|
||||
reader_->bytes_read_dn_cache_.Add(stats->totalZeroCopyBytesRead);
|
||||
if (stats->totalLocalBytesRead != stats->totalBytesRead) {
|
||||
++reader_->num_remote_ranges_;
|
||||
reader_->num_remote_ranges_.Add(1);
|
||||
if (expected_local_) {
|
||||
int remote_bytes = stats->totalBytesRead - stats->totalLocalBytesRead;
|
||||
reader_->unexpected_remote_bytes_ += remote_bytes;
|
||||
reader_->unexpected_remote_bytes_.Add(remote_bytes);
|
||||
VLOG_FILE << "Unexpected remote HDFS read of "
|
||||
<< PrettyPrinter::Print(remote_bytes, TUnit::BYTES)
|
||||
<< " for file '" << file_ << "'";
|
||||
@@ -448,6 +448,6 @@ Status DiskIoMgr::ScanRange::ReadFromCache(bool* read_succeeded) {
|
||||
COUNTER_ADD(reader_->bytes_read_counter_, bytes_read);
|
||||
}
|
||||
*read_succeeded = true;
|
||||
++reader_->num_used_buffers_;
|
||||
reader_->num_used_buffers_.Add(1);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@@ -144,7 +144,7 @@ class DiskIoMgrTest : public testing::Test {
|
||||
ASSERT_TRUE(status.ok() || status.code() == expected_status.code());
|
||||
if (range == NULL) break;
|
||||
ValidateScanRange(range, expected_result, expected_len, expected_status);
|
||||
++(*num_ranges_processed);
|
||||
num_ranges_processed->Add(1);
|
||||
++num_ranges;
|
||||
}
|
||||
}
|
||||
@@ -381,7 +381,7 @@ TEST_F(DiskIoMgrTest, SingleReader) {
|
||||
}
|
||||
threads.join_all();
|
||||
|
||||
EXPECT_EQ(num_ranges_processed, ranges.size());
|
||||
EXPECT_EQ(num_ranges_processed.Load(), ranges.size());
|
||||
io_mgr.UnregisterContext(reader);
|
||||
EXPECT_EQ(reader_mem_tracker.consumption(), 0);
|
||||
}
|
||||
@@ -452,7 +452,7 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) {
|
||||
}
|
||||
|
||||
threads.join_all();
|
||||
EXPECT_EQ(num_ranges_processed, len);
|
||||
EXPECT_EQ(num_ranges_processed.Load(), len);
|
||||
io_mgr.UnregisterContext(reader);
|
||||
EXPECT_EQ(reader_mem_tracker.consumption(), 0);
|
||||
}
|
||||
@@ -525,7 +525,7 @@ TEST_F(DiskIoMgrTest, SyncReadTest) {
|
||||
ValidateSyncRead(&io_mgr, reader, complete_range, data);
|
||||
ValidateSyncRead(&io_mgr, reader, complete_range, data);
|
||||
|
||||
EXPECT_EQ(num_ranges_processed, ranges.size());
|
||||
EXPECT_EQ(num_ranges_processed.Load(), ranges.size());
|
||||
io_mgr.UnregisterContext(reader);
|
||||
EXPECT_EQ(reader_mem_tracker.consumption(), 0);
|
||||
}
|
||||
@@ -577,7 +577,7 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) {
|
||||
ScanRangeThread(&io_mgr, reader, data, strlen(data), Status::OK(), 1,
|
||||
&num_ranges_processed);
|
||||
}
|
||||
EXPECT_EQ(num_ranges_processed, num_succesful_ranges);
|
||||
EXPECT_EQ(num_ranges_processed.Load(), num_succesful_ranges);
|
||||
|
||||
// Start up some threads and then cancel
|
||||
thread_group threads;
|
||||
@@ -735,7 +735,7 @@ TEST_F(DiskIoMgrTest, CachedReads) {
|
||||
ValidateSyncRead(&io_mgr, reader, complete_range, data);
|
||||
ValidateSyncRead(&io_mgr, reader, complete_range, data);
|
||||
|
||||
EXPECT_EQ(num_ranges_processed, ranges.size());
|
||||
EXPECT_EQ(num_ranges_processed.Load(), ranges.size());
|
||||
io_mgr.UnregisterContext(reader);
|
||||
EXPECT_EQ(reader_mem_tracker.consumption(), 0);
|
||||
}
|
||||
@@ -908,7 +908,7 @@ TEST_F(DiskIoMgrTest, MultipleReader) {
|
||||
}
|
||||
}
|
||||
threads.join_all();
|
||||
EXPECT_EQ(num_ranges_processed, DATA_LEN * NUM_READERS);
|
||||
EXPECT_EQ(num_ranges_processed.Load(), DATA_LEN * NUM_READERS);
|
||||
for (int i = 0; i < NUM_READERS; ++i) {
|
||||
io_mgr.UnregisterContext(readers[i]);
|
||||
}
|
||||
@@ -942,7 +942,7 @@ TEST_F(DiskIoMgrTest, Buffers) {
|
||||
int64_t buffer_len = 1;
|
||||
char* buf = io_mgr.GetFreeBuffer(&buffer_len);
|
||||
EXPECT_EQ(buffer_len, min_buffer_size);
|
||||
EXPECT_EQ(io_mgr.num_allocated_buffers_, 1);
|
||||
EXPECT_EQ(io_mgr.num_allocated_buffers_.Load(), 1);
|
||||
io_mgr.ReturnFreeBuffer(buf, buffer_len);
|
||||
EXPECT_EQ(mem_tracker.consumption(), min_buffer_size);
|
||||
|
||||
@@ -950,7 +950,7 @@ TEST_F(DiskIoMgrTest, Buffers) {
|
||||
buffer_len = min_buffer_size;
|
||||
buf = io_mgr.GetFreeBuffer(&buffer_len);
|
||||
EXPECT_EQ(buffer_len, min_buffer_size);
|
||||
EXPECT_EQ(io_mgr.num_allocated_buffers_, 1);
|
||||
EXPECT_EQ(io_mgr.num_allocated_buffers_.Load(), 1);
|
||||
io_mgr.ReturnFreeBuffer(buf, buffer_len);
|
||||
EXPECT_EQ(mem_tracker.consumption(), min_buffer_size);
|
||||
|
||||
@@ -958,12 +958,12 @@ TEST_F(DiskIoMgrTest, Buffers) {
|
||||
buffer_len = min_buffer_size + 1;
|
||||
buf = io_mgr.GetFreeBuffer(&buffer_len);
|
||||
EXPECT_EQ(buffer_len, min_buffer_size * 2);
|
||||
EXPECT_EQ(io_mgr.num_allocated_buffers_, 2);
|
||||
EXPECT_EQ(io_mgr.num_allocated_buffers_.Load(), 2);
|
||||
EXPECT_EQ(mem_tracker.consumption(), min_buffer_size * 3);
|
||||
|
||||
// gc unused buffer
|
||||
io_mgr.GcIoBuffers();
|
||||
EXPECT_EQ(io_mgr.num_allocated_buffers_, 1);
|
||||
EXPECT_EQ(io_mgr.num_allocated_buffers_.Load(), 1);
|
||||
EXPECT_EQ(mem_tracker.consumption(), min_buffer_size * 2);
|
||||
|
||||
io_mgr.ReturnFreeBuffer(buf, buffer_len);
|
||||
@@ -972,13 +972,13 @@ TEST_F(DiskIoMgrTest, Buffers) {
|
||||
buffer_len = max_buffer_size;
|
||||
buf = io_mgr.GetFreeBuffer(&buffer_len);
|
||||
EXPECT_EQ(buffer_len, max_buffer_size);
|
||||
EXPECT_EQ(io_mgr.num_allocated_buffers_, 2);
|
||||
EXPECT_EQ(io_mgr.num_allocated_buffers_.Load(), 2);
|
||||
io_mgr.ReturnFreeBuffer(buf, buffer_len);
|
||||
EXPECT_EQ(mem_tracker.consumption(), min_buffer_size * 2 + max_buffer_size);
|
||||
|
||||
// gc buffers
|
||||
io_mgr.GcIoBuffers();
|
||||
EXPECT_EQ(io_mgr.num_allocated_buffers_, 0);
|
||||
EXPECT_EQ(io_mgr.num_allocated_buffers_.Load(), 0);
|
||||
EXPECT_EQ(mem_tracker.consumption(), 0);
|
||||
}
|
||||
|
||||
|
||||
@@ -323,14 +323,14 @@ DiskIoMgr::~DiskIoMgr() {
|
||||
DCHECK(request_context_cache_.get() == NULL ||
|
||||
request_context_cache_->ValidateAllInactive())
|
||||
<< endl << DebugString();
|
||||
DCHECK_EQ(num_buffers_in_readers_, 0);
|
||||
DCHECK_EQ(num_buffers_in_readers_.Load(), 0);
|
||||
|
||||
// Delete all allocated buffers
|
||||
int num_free_buffers = 0;
|
||||
for (int idx = 0; idx < free_buffers_.size(); ++idx) {
|
||||
num_free_buffers += free_buffers_[idx].size();
|
||||
}
|
||||
DCHECK_EQ(num_allocated_buffers_, num_free_buffers);
|
||||
DCHECK_EQ(num_allocated_buffers_.Load(), num_free_buffers);
|
||||
GcIoBuffers();
|
||||
|
||||
for (int i = 0; i < disk_queues_.size(); ++i) {
|
||||
@@ -396,8 +396,8 @@ void DiskIoMgr::UnregisterContext(RequestContext* reader) {
|
||||
|
||||
// All the disks are done with clean, validate nothing is leaking.
|
||||
unique_lock<mutex> reader_lock(reader->lock_);
|
||||
DCHECK_EQ(reader->num_buffers_in_reader_, 0) << endl << reader->DebugString();
|
||||
DCHECK_EQ(reader->num_used_buffers_, 0) << endl << reader->DebugString();
|
||||
DCHECK_EQ(reader->num_buffers_in_reader_.Load(), 0) << endl << reader->DebugString();
|
||||
DCHECK_EQ(reader->num_used_buffers_.Load(), 0) << endl << reader->DebugString();
|
||||
|
||||
DCHECK(reader->Validate()) << endl << reader->DebugString();
|
||||
request_context_cache_->ReturnContext(reader);
|
||||
@@ -454,7 +454,7 @@ void DiskIoMgr::set_disks_access_bitmap(RequestContext* r,
|
||||
}
|
||||
|
||||
int64_t DiskIoMgr::queue_size(RequestContext* reader) const {
|
||||
return reader->num_ready_buffers_;
|
||||
return reader->num_ready_buffers_.Load();
|
||||
}
|
||||
|
||||
Status DiskIoMgr::context_status(RequestContext* context) const {
|
||||
@@ -462,28 +462,24 @@ Status DiskIoMgr::context_status(RequestContext* context) const {
|
||||
return context->status_;
|
||||
}
|
||||
|
||||
int DiskIoMgr::num_unstarted_ranges(RequestContext* reader) const {
|
||||
return reader->num_unstarted_scan_ranges_;
|
||||
}
|
||||
|
||||
int64_t DiskIoMgr::bytes_read_local(RequestContext* reader) const {
|
||||
return reader->bytes_read_local_;
|
||||
return reader->bytes_read_local_.Load();
|
||||
}
|
||||
|
||||
int64_t DiskIoMgr::bytes_read_short_circuit(RequestContext* reader) const {
|
||||
return reader->bytes_read_short_circuit_;
|
||||
return reader->bytes_read_short_circuit_.Load();
|
||||
}
|
||||
|
||||
int64_t DiskIoMgr::bytes_read_dn_cache(RequestContext* reader) const {
|
||||
return reader->bytes_read_dn_cache_;
|
||||
return reader->bytes_read_dn_cache_.Load();
|
||||
}
|
||||
|
||||
int DiskIoMgr::num_remote_ranges(RequestContext* reader) const {
|
||||
return reader->num_remote_ranges_;
|
||||
return reader->num_remote_ranges_.Load();
|
||||
}
|
||||
|
||||
int64_t DiskIoMgr::unexpected_remote_bytes(RequestContext* reader) const {
|
||||
return reader->unexpected_remote_bytes_;
|
||||
return reader->unexpected_remote_bytes_.Load();
|
||||
}
|
||||
|
||||
int64_t DiskIoMgr::GetReadThroughput() {
|
||||
@@ -563,7 +559,7 @@ Status DiskIoMgr::GetNextRange(RequestContext* reader, ScanRange** range) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (reader->num_unstarted_scan_ranges_ == 0 &&
|
||||
if (reader->num_unstarted_scan_ranges_.Load() == 0 &&
|
||||
reader->ready_to_start_ranges_.empty() && reader->cached_ranges_.empty()) {
|
||||
// All ranges are done, just return.
|
||||
break;
|
||||
@@ -632,8 +628,8 @@ void DiskIoMgr::ReturnBuffer(BufferDescriptor* buffer_desc) {
|
||||
ReturnFreeBuffer(buffer_desc);
|
||||
}
|
||||
buffer_desc->buffer_ = NULL;
|
||||
--num_buffers_in_readers_;
|
||||
--reader->num_buffers_in_reader_;
|
||||
num_buffers_in_readers_.Add(-1);
|
||||
reader->num_buffers_in_reader_.Add(-1);
|
||||
} else {
|
||||
// A NULL buffer means there was an error in which case there is no buffer
|
||||
// to return.
|
||||
@@ -685,7 +681,7 @@ char* DiskIoMgr::GetFreeBuffer(int64_t* buffer_size) {
|
||||
unique_lock<mutex> lock(free_buffers_lock_);
|
||||
char* buffer = NULL;
|
||||
if (free_buffers_[idx].empty()) {
|
||||
++num_allocated_buffers_;
|
||||
num_allocated_buffers_.Add(1);
|
||||
if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != NULL) {
|
||||
ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(1L);
|
||||
}
|
||||
@@ -716,7 +712,7 @@ void DiskIoMgr::GcIoBuffers() {
|
||||
iter != free_buffers_[idx].end(); ++iter) {
|
||||
int64_t buffer_size = (1 << idx) * min_buffer_size_;
|
||||
process_mem_tracker_->Release(buffer_size);
|
||||
--num_allocated_buffers_;
|
||||
num_allocated_buffers_.Add(-1);
|
||||
delete[] *iter;
|
||||
|
||||
++buffers_freed;
|
||||
@@ -756,7 +752,7 @@ void DiskIoMgr::ReturnFreeBuffer(char* buffer, int64_t buffer_size) {
|
||||
}
|
||||
} else {
|
||||
process_mem_tracker_->Release(buffer_size);
|
||||
--num_allocated_buffers_;
|
||||
num_allocated_buffers_.Add(-1);
|
||||
delete[] buffer;
|
||||
if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != NULL) {
|
||||
ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(-1L);
|
||||
@@ -843,11 +839,11 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
|
||||
// read next. Populate that. We want to have one range waiting to minimize
|
||||
// wait time in GetNextRange.
|
||||
ScanRange* new_range = request_disk_state->unstarted_scan_ranges()->Dequeue();
|
||||
--(*request_context)->num_unstarted_scan_ranges_;
|
||||
(*request_context)->num_unstarted_scan_ranges_.Add(-1);
|
||||
(*request_context)->ready_to_start_ranges_.Enqueue(new_range);
|
||||
request_disk_state->set_next_scan_range_to_start(new_range);
|
||||
|
||||
if ((*request_context)->num_unstarted_scan_ranges_ == 0) {
|
||||
if ((*request_context)->num_unstarted_scan_ranges_.Load() == 0) {
|
||||
// All the ranges have been started, notify everyone blocked on GetNextRange.
|
||||
// Only one of them will get work so make sure to return NULL to the other
|
||||
// caller threads.
|
||||
@@ -1046,10 +1042,10 @@ void DiskIoMgr::ReadRange(DiskQueue* disk_queue, RequestContext* reader,
|
||||
}
|
||||
|
||||
buffer = GetFreeBuffer(&buffer_size);
|
||||
++reader->num_used_buffers_;
|
||||
reader->num_used_buffers_.Add(1);
|
||||
|
||||
// Validate more invariants.
|
||||
DCHECK_GT(reader->num_used_buffers_, 0);
|
||||
DCHECK_GT(reader->num_used_buffers_.Load(), 0);
|
||||
DCHECK(range != NULL);
|
||||
DCHECK(reader != NULL);
|
||||
DCHECK(buffer != NULL);
|
||||
|
||||
@@ -604,9 +604,6 @@ class DiskIoMgr {
|
||||
/// Returns the current status of the context.
|
||||
Status context_status(RequestContext* context) const;
|
||||
|
||||
/// Returns the number of unstarted scan ranges for this reader.
|
||||
int num_unstarted_ranges(RequestContext* reader) const;
|
||||
|
||||
void set_bytes_read_counter(RequestContext*, RuntimeProfile::Counter*);
|
||||
void set_read_timer(RequestContext*, RuntimeProfile::Counter*);
|
||||
void set_active_read_thread_counter(RequestContext*, RuntimeProfile::Counter*);
|
||||
@@ -642,12 +639,6 @@ class DiskIoMgr {
|
||||
/// The disk ID (and therefore disk_queues_ index) used for S3 accesses.
|
||||
int RemoteS3DiskId() const { return num_local_disks() + REMOTE_S3_DISK_OFFSET; }
|
||||
|
||||
/// Returns the number of allocated buffers.
|
||||
int num_allocated_buffers() const { return num_allocated_buffers_; }
|
||||
|
||||
/// Returns the number of buffers currently owned by all readers.
|
||||
int num_buffers_in_readers() const { return num_buffers_in_readers_; }
|
||||
|
||||
/// Dumps the disk IoMgr queues (for readers and disks)
|
||||
std::string DebugString();
|
||||
|
||||
|
||||
@@ -416,6 +416,6 @@ string LibCache::MakeLocalPath(const string& hdfs_path, const string& local_dir)
|
||||
filesystem::path src(hdfs_path);
|
||||
stringstream dst;
|
||||
dst << local_dir << "/" << src.stem().native() << "." << getpid() << "."
|
||||
<< (num_libs_copied_++) << src.extension().native();
|
||||
<< (num_libs_copied_.Add(1) - 1) << src.extension().native();
|
||||
return dst.str();
|
||||
}
|
||||
|
||||
@@ -329,7 +329,7 @@ bool MemTracker::GcMemory(int64_t max_consumption) {
|
||||
|
||||
void MemTracker::GcTcmalloc() {
|
||||
#ifndef ADDRESS_SANITIZER
|
||||
released_memory_since_gc_ = 0;
|
||||
released_memory_since_gc_.Store(0);
|
||||
MallocExtension::instance()->ReleaseFreeMemory();
|
||||
#else
|
||||
// Nothing to do if not using tcmalloc.
|
||||
|
||||
@@ -221,7 +221,7 @@ class MemTracker {
|
||||
return;
|
||||
}
|
||||
|
||||
if (UNLIKELY(released_memory_since_gc_.UpdateAndFetch(bytes)) > GC_RELEASE_SIZE) {
|
||||
if (UNLIKELY(released_memory_since_gc_.Add(bytes) > GC_RELEASE_SIZE)) {
|
||||
GcTcmalloc();
|
||||
}
|
||||
|
||||
|
||||
@@ -422,7 +422,7 @@ void PlanFragmentExecutor::ReportProfile() {
|
||||
|
||||
if (!report_thread_active_) break;
|
||||
|
||||
if (completed_report_sent_.Read() == 0) {
|
||||
if (completed_report_sent_.Load() == 0) {
|
||||
// No complete fragment report has been sent.
|
||||
SendReport(false);
|
||||
}
|
||||
@@ -504,7 +504,7 @@ Status PlanFragmentExecutor::GetNextInternal(RowBatch** batch) {
|
||||
void PlanFragmentExecutor::FragmentComplete() {
|
||||
// Check the atomic flag. If it is set, then a fragment complete report has already
|
||||
// been sent.
|
||||
bool send_report = completed_report_sent_.CompareAndSwap(0,1);
|
||||
bool send_report = completed_report_sent_.CompareAndSwap(0, 1);
|
||||
|
||||
fragment_sw_.Stop();
|
||||
int64_t cpu_and_wait_time = fragment_sw_.ElapsedTime();
|
||||
@@ -525,7 +525,7 @@ void PlanFragmentExecutor::FragmentComplete() {
|
||||
void PlanFragmentExecutor::UpdateStatus(const Status& status) {
|
||||
if (status.ok()) return;
|
||||
|
||||
bool send_report = completed_report_sent_.CompareAndSwap(0,1);
|
||||
bool send_report = completed_report_sent_.CompareAndSwap(0, 1);
|
||||
|
||||
{
|
||||
lock_guard<mutex> l(status_lock_);
|
||||
|
||||
@@ -97,8 +97,8 @@ void QueryResourceMgr::InitVcoreAcquisition(int32_t init_vcores) {
|
||||
// inspects immediately after exiting Expand(), and if true, exits before touching any
|
||||
// of the class-wide state (because the destructor may have finished before this point).
|
||||
|
||||
thread_in_expand_.reset(new AtomicInt<int16_t>());
|
||||
early_exit_.reset(new AtomicInt<int16_t>());
|
||||
thread_in_expand_.reset(new AtomicInt<int32_t>());
|
||||
early_exit_.reset(new AtomicInt<int32_t>());
|
||||
acquire_vcore_thread_.reset(
|
||||
new Thread("resource-mgmt", Substitute("acquire-cpu-$0", PrintId(query_id_)),
|
||||
bind<void>(mem_fn(&QueryResourceMgr::AcquireVcoreResources), this,
|
||||
@@ -170,8 +170,8 @@ Status QueryResourceMgr::RequestMemExpansion(int64_t requested_bytes,
|
||||
}
|
||||
|
||||
void QueryResourceMgr::AcquireVcoreResources(
|
||||
shared_ptr<AtomicInt<int16_t> > thread_in_expand,
|
||||
shared_ptr<AtomicInt<int16_t> > early_exit) {
|
||||
shared_ptr<AtomicInt<int32_t> > thread_in_expand,
|
||||
shared_ptr<AtomicInt<int32_t> > early_exit) {
|
||||
// Take a copy because we'd like to print it in some cases after the destructor.
|
||||
TUniqueId reservation_id = reservation_id_;
|
||||
VLOG_QUERY << "Starting Vcore acquisition for: " << reservation_id;
|
||||
@@ -188,18 +188,18 @@ void QueryResourceMgr::AcquireVcoreResources(
|
||||
VLOG_QUERY << "Expanding VCore allocation: " << reservation_id_;
|
||||
|
||||
// First signal that we are about to enter a blocking Expand() call.
|
||||
thread_in_expand->FetchAndUpdate(1L);
|
||||
thread_in_expand->Add(1L);
|
||||
|
||||
// TODO: Could cause problems if called during or after a system-wide shutdown
|
||||
llama::TAllocatedResource resource;
|
||||
llama::TUniqueId expansion_id;
|
||||
Status status = ExecEnv::GetInstance()->resource_broker()->Expand(reservation_id,
|
||||
res, -1, &expansion_id, &resource);
|
||||
thread_in_expand->FetchAndUpdate(-1L);
|
||||
thread_in_expand->Add(-1L);
|
||||
// If signalled to exit quickly by the destructor, exit the loop now. It's important
|
||||
// to do so without accessing any class variables since they may no longer be valid.
|
||||
// Need to check after setting thread_in_expand to avoid a race.
|
||||
if (early_exit->FetchAndUpdate(0L) != 0) {
|
||||
if (early_exit->Add(0L) != 0) {
|
||||
VLOG_QUERY << "Fragment finished during Expand(): " << reservation_id;
|
||||
break;
|
||||
}
|
||||
@@ -261,8 +261,8 @@ QueryResourceMgr::~QueryResourceMgr() {
|
||||
// First, set the early exit flag. Then check to see if the thread is in Expand(). If
|
||||
// so, the acquisition thread is guaranteed to see early_exit_ == 1L once it finishes
|
||||
// Expand(), and will exit immediately. It's therefore safe not to wait for it.
|
||||
early_exit_->FetchAndUpdate(1L);
|
||||
if (thread_in_expand_->FetchAndUpdate(0L) == 0L) {
|
||||
early_exit_->Add(1L);
|
||||
if (thread_in_expand_->Add(0L) == 0L) {
|
||||
acquire_vcore_thread_->Join();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -195,12 +195,12 @@ class QueryResourceMgr {
|
||||
/// parent QueryResourceMgr has been destroyed.
|
||||
/// TODO: Combine with ShouldExit(), and replace with AtomicBool when we have such a
|
||||
/// thing.
|
||||
boost::shared_ptr<AtomicInt<int16_t> > early_exit_;
|
||||
boost::shared_ptr<AtomicInt<int32_t> > early_exit_;
|
||||
|
||||
/// Signals to the destructor that the vcore acquisition thread is currently in an
|
||||
/// Expand() RPC. If so, the destructor does not need to wait for the acquisition thread
|
||||
/// to exit.
|
||||
boost::shared_ptr<AtomicInt<int16_t> > thread_in_expand_;
|
||||
boost::shared_ptr<AtomicInt<int32_t> > thread_in_expand_;
|
||||
|
||||
/// Creates the llama resource for the memory and/or cores specified, associated with
|
||||
/// the reservation context.
|
||||
@@ -209,8 +209,8 @@ class QueryResourceMgr {
|
||||
/// Run as a thread owned by acquire_cpu_thread_. Waits for notification from
|
||||
/// NotifyThreadUsageChange(), then checks the subscription level to decide if more
|
||||
/// VCores are needed, and starts a new expansion request if so.
|
||||
void AcquireVcoreResources(boost::shared_ptr<AtomicInt<int16_t> > thread_in_expand,
|
||||
boost::shared_ptr<AtomicInt<int16_t> > early_exit);
|
||||
void AcquireVcoreResources(boost::shared_ptr<AtomicInt<int32_t> > thread_in_expand,
|
||||
boost::shared_ptr<AtomicInt<int32_t> > early_exit);
|
||||
|
||||
/// True if thread:VCore subscription is too high, meaning more VCores are required.
|
||||
/// Must be called holding threads_running_ lock.
|
||||
|
||||
@@ -28,7 +28,7 @@ class CountingBarrier {
|
||||
|
||||
/// Sends one notification, decrementing the number of pending notifications by one.
|
||||
void Notify() {
|
||||
if (count_.UpdateAndFetch(-1) == 0) promise_.Set(true);
|
||||
if (count_.Add(-1) == 0) promise_.Set(true);
|
||||
}
|
||||
|
||||
/// Blocks until all notifications are received.
|
||||
|
||||
@@ -134,7 +134,7 @@ bool HdfsOperationSet::Execute(ThreadPool<HdfsOp>* pool,
|
||||
}
|
||||
int64_t num_ops = ops_.size();
|
||||
if (num_ops == 0) return true;
|
||||
num_ops_ += num_ops;
|
||||
num_ops_.Add(num_ops);
|
||||
|
||||
BOOST_FOREACH(const HdfsOp& op, ops_) {
|
||||
pool->Offer(op);
|
||||
@@ -160,7 +160,7 @@ void HdfsOperationSet::AddError(const string& err, const HdfsOp* op) {
|
||||
}
|
||||
|
||||
void HdfsOperationSet::MarkOneOpDone() {
|
||||
if (num_ops_.UpdateAndFetch(-1) == 0) {
|
||||
if (num_ops_.Add(-1) == 0) {
|
||||
promise_.Set(errors().size() == 0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
#include <gtest/gtest.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include "common/atomic.h"
|
||||
#include "util/internal-queue.h"
|
||||
|
||||
#include "common/names.h"
|
||||
@@ -152,7 +153,7 @@ void ProducerThread(InternalQueue<IntNode>* queue, int num_inserts,
|
||||
vector<IntNode>* nodes, AtomicInt<int32_t>* counter, bool* failed) {
|
||||
for (int i = 0; i < num_inserts && !*failed; ++i) {
|
||||
// Get the next index to queue.
|
||||
AtomicInt<int32_t> value = (*counter)++;
|
||||
int32_t value = counter->Add(1) - 1;
|
||||
nodes->at(value).value = value;
|
||||
queue->Enqueue(&nodes->at(value));
|
||||
if (i % VALIDATE_INTERVAL == 0) {
|
||||
@@ -215,7 +216,7 @@ TEST(InternalQueue, TestSingleProducerSingleConsumer) {
|
||||
ASSERT_TRUE(queue.empty());
|
||||
ASSERT_EQ(results.size(), nodes.size());
|
||||
|
||||
counter = 0;
|
||||
counter.Store(0);
|
||||
results.clear();
|
||||
thread producer_thread(ProducerThread, &queue, nodes.size(), &nodes, &counter, &failed);
|
||||
thread consumer_thread(ConsumerThread, &queue, nodes.size(), 1, &results, &failed);
|
||||
|
||||
@@ -18,10 +18,8 @@
|
||||
|
||||
#include <boost/thread/locks.hpp>
|
||||
|
||||
#include "common/atomic.h"
|
||||
#include "util/spinlock.h"
|
||||
|
||||
|
||||
namespace impala {
|
||||
|
||||
/// Thread safe fifo-queue. This is an internal queue, meaning the links to nodes
|
||||
|
||||
@@ -37,7 +37,7 @@ PeriodicCounterUpdater::PeriodicCounterUpdater() : done_(0) {
|
||||
}
|
||||
|
||||
PeriodicCounterUpdater::~PeriodicCounterUpdater() {
|
||||
done_.Swap(1);
|
||||
done_.Store(1);
|
||||
update_thread_->join();
|
||||
}
|
||||
|
||||
@@ -126,7 +126,7 @@ void PeriodicCounterUpdater::StopTimeSeriesCounter(
|
||||
}
|
||||
|
||||
void PeriodicCounterUpdater::UpdateLoop() {
|
||||
while (done_.Read() == 0) {
|
||||
while (done_.Load() == 0) {
|
||||
system_time before_time = get_system_time();
|
||||
SleepForMs(FLAGS_periodic_counter_update_period_ms);
|
||||
posix_time::time_duration elapsed = get_system_time() - before_time;
|
||||
|
||||
@@ -137,7 +137,7 @@ class PeriodicCounterUpdater {
|
||||
TimeSeriesCounters time_series_counters_;
|
||||
|
||||
/// If 1, tear down the update thread.
|
||||
AtomicInt<uint32_t> done_;
|
||||
AtomicInt<int32_t> done_;
|
||||
|
||||
/// Singleton object that keeps track of all rate counters and the thread
|
||||
/// for updating them.
|
||||
|
||||
@@ -21,26 +21,31 @@
|
||||
|
||||
using namespace impala;
|
||||
|
||||
ProgressUpdater::ProgressUpdater(const string& label, int64_t total, int period) :
|
||||
label_(label), logging_level_(2), total_(total), update_period_(period),
|
||||
ProgressUpdater::ProgressUpdater() :
|
||||
logging_level_(2), total_(-1), update_period_(0),
|
||||
num_complete_(0), last_output_percentage_(0) {
|
||||
}
|
||||
|
||||
ProgressUpdater::ProgressUpdater() :
|
||||
logging_level_(2), total_(0), update_period_(0),
|
||||
num_complete_(0), last_output_percentage_(0) {
|
||||
void ProgressUpdater::Init(const string& label, int64_t total, int update_period) {
|
||||
DCHECK_GE(total, 0);
|
||||
label_ = label;
|
||||
total_ = total;
|
||||
update_period_ = update_period;
|
||||
DCHECK_EQ(num_complete_.Load(), 0) << "Update() should not have been called yet";
|
||||
DCHECK_EQ(last_output_percentage_.Load(), 0);
|
||||
}
|
||||
|
||||
void ProgressUpdater::Update(int64_t delta) {
|
||||
DCHECK_GE(total_, 0) << "Init() should have been called already";
|
||||
DCHECK_GE(delta, 0);
|
||||
if (delta == 0) return;
|
||||
|
||||
num_complete_ += delta;
|
||||
num_complete_.Add(delta);
|
||||
|
||||
// Cache some shared variables to avoid locking. It's possible the progress
|
||||
// update is out of order (e.g. prints 1 out of 10 after 2 out of 10)
|
||||
double old_percentage = last_output_percentage_;
|
||||
int64_t num_complete = num_complete_;
|
||||
double old_percentage = last_output_percentage_.Load();
|
||||
int64_t num_complete = num_complete_.Load();
|
||||
|
||||
if (num_complete >= total_) {
|
||||
// Always print the final 100% complete
|
||||
@@ -53,7 +58,7 @@ void ProgressUpdater::Update(int64_t delta) {
|
||||
int new_percentage = (static_cast<double>(num_complete) / total_) * 100;
|
||||
if (new_percentage - old_percentage > update_period_) {
|
||||
// Only update shared variable if this guy was the latest.
|
||||
__sync_val_compare_and_swap(&last_output_percentage_, old_percentage, new_percentage);
|
||||
last_output_percentage_.CompareAndSwap(old_percentage, new_percentage);
|
||||
VLOG(logging_level_) << label_ << ": " << new_percentage << "\% Complete ("
|
||||
<< num_complete << " out of " << total_ << ")";
|
||||
}
|
||||
@@ -61,7 +66,7 @@ void ProgressUpdater::Update(int64_t delta) {
|
||||
|
||||
string ProgressUpdater::ToString() const {
|
||||
stringstream ss;
|
||||
int64_t num_complete = num_complete_;
|
||||
int64_t num_complete = num_complete_.Load();
|
||||
if (num_complete >= total_) {
|
||||
// Always print the final 100% complete
|
||||
ss << label_ << " 100\% Complete (" << num_complete << " out of " << total_ << ")";
|
||||
|
||||
@@ -25,47 +25,60 @@ namespace impala {
|
||||
|
||||
/// Utility class to update progress. This is split out so a different
|
||||
/// logging level can be set for these updates (GLOG_module)
|
||||
/// This class is thread safe.
|
||||
/// This class is thread safe after Init() is called.
|
||||
/// Example usage:
|
||||
/// ProgressUpdater updater("Task", 100, 10); // 100 items, print every 10%
|
||||
/// ProgressUpdater updater;
|
||||
/// updater.Init("Task", 100, 10); // 100 items, print every 10%
|
||||
/// updater.Update(15); // 15 done, prints 15%
|
||||
/// updater.Update(3); // 18 done, doesn't print
|
||||
/// update.Update(5); // 23 done, prints 23%
|
||||
class ProgressUpdater {
|
||||
public:
|
||||
/// label - label that is printed with each update.
|
||||
/// max - maximum number of work items
|
||||
/// update_period - how often the progress is spewed expressed as a percentage
|
||||
ProgressUpdater(const std::string& label, int64_t max, int update_period = 1);
|
||||
|
||||
ProgressUpdater();
|
||||
|
||||
/// Sets the GLOG level for this progress updater. By default, this will use
|
||||
/// 2 but objects can override it.
|
||||
/// Initialize this ProgressUpdater with:
|
||||
/// 'label' - label that is printed with each update.
|
||||
/// 'total' - maximum number of work items
|
||||
/// 'update_period' - how often the progress is printed expressed as a percentage
|
||||
void Init(const std::string& label, int64_t total, int update_period = 1);
|
||||
|
||||
/// Sets the GLOG level for this progress updater. By default, this will use 2 but
|
||||
/// objects can override it.
|
||||
void set_logging_level(int level) { logging_level_ = level; }
|
||||
|
||||
/// 'delta' more of the work has been complete. Will potentially output to
|
||||
/// VLOG_PROGRESS
|
||||
/// VLOG_PROGRESS. Init() must be called before Update().
|
||||
void Update(int64_t delta);
|
||||
|
||||
/// Returns if all tasks are done.
|
||||
bool done() const { return num_complete_ >= total_; }
|
||||
/// Returns true if all tasks are done.
|
||||
bool done() const { return num_complete() >= total_; }
|
||||
|
||||
int64_t total() const { return total_; }
|
||||
int64_t num_complete() const { return num_complete_; }
|
||||
int64_t num_complete() const { return num_complete_.Load(); }
|
||||
int64_t remaining() const { return total() - num_complete(); }
|
||||
|
||||
/// Returns a string representation of the current progress
|
||||
std::string ToString() const;
|
||||
|
||||
private:
|
||||
/// Label printed with each update.
|
||||
std::string label_;
|
||||
|
||||
/// GLOG level for updates.
|
||||
int logging_level_;
|
||||
|
||||
/// Total number of work items. -1 before Init().
|
||||
int64_t total_;
|
||||
|
||||
/// Number of percentage points between outputs.
|
||||
int update_period_;
|
||||
|
||||
/// Number of completed work items.
|
||||
AtomicInt<int64_t> num_complete_;
|
||||
int last_output_percentage_;
|
||||
|
||||
/// Percentage when the last output was generated.
|
||||
AtomicInt<int> last_output_percentage_;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -222,6 +222,39 @@ TEST(CountersTest, MergeAndUpdate) {
|
||||
profile2.PrettyPrint(&dummy);
|
||||
}
|
||||
|
||||
TEST(CountersTest, HighWaterMarkCounters) {
|
||||
ObjectPool pool;
|
||||
RuntimeProfile profile(&pool, "Profile");
|
||||
RuntimeProfile::HighWaterMarkCounter* bytes_counter =
|
||||
profile.AddHighWaterMarkCounter("bytes", TUnit::BYTES);
|
||||
|
||||
bytes_counter->Set(10);
|
||||
EXPECT_EQ(bytes_counter->current_value(), 10);
|
||||
EXPECT_EQ(bytes_counter->value(), 10);
|
||||
|
||||
bytes_counter->Add(5);
|
||||
EXPECT_EQ(bytes_counter->current_value(), 15);
|
||||
EXPECT_EQ(bytes_counter->value(), 15);
|
||||
|
||||
bytes_counter->Set(5);
|
||||
EXPECT_EQ(bytes_counter->current_value(), 5);
|
||||
EXPECT_EQ(bytes_counter->value(), 15);
|
||||
|
||||
bytes_counter->Add(3);
|
||||
EXPECT_EQ(bytes_counter->current_value(), 8);
|
||||
EXPECT_EQ(bytes_counter->value(), 15);
|
||||
|
||||
bool success = bytes_counter->TryAdd(20, 30);
|
||||
EXPECT_TRUE(success);
|
||||
EXPECT_EQ(bytes_counter->current_value(), 28);
|
||||
EXPECT_EQ(bytes_counter->value(), 28);
|
||||
|
||||
success = bytes_counter->TryAdd(5, 30);
|
||||
EXPECT_FALSE(success);
|
||||
EXPECT_EQ(bytes_counter->current_value(), 28);
|
||||
EXPECT_EQ(bytes_counter->value(), 28);
|
||||
}
|
||||
|
||||
TEST(CountersTest, DerivedCounters) {
|
||||
ObjectPool pool;
|
||||
RuntimeProfile profile(&pool, "Profile");
|
||||
|
||||
@@ -315,7 +315,7 @@ void RuntimeProfile::Divide(int n) {
|
||||
if (iter->second->unit() == TUnit::DOUBLE_VALUE) {
|
||||
iter->second->Set(iter->second->double_value() / n);
|
||||
} else {
|
||||
iter->second->value_ = iter->second->value() / n;
|
||||
iter->second->value_.Store(iter->second->value() / n);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,26 +92,32 @@ class RuntimeProfile {
|
||||
virtual ~Counter(){}
|
||||
|
||||
virtual void Add(int64_t delta) {
|
||||
value_ += delta;
|
||||
value_.Add(delta);
|
||||
}
|
||||
|
||||
/// Use this to update if the counter is a bitmap
|
||||
void BitOr(int64_t delta) {
|
||||
value_ |= delta;
|
||||
int64_t old;
|
||||
do {
|
||||
old = value_.Load();
|
||||
if (LIKELY((old | delta) == old)) return; // Bits already set, avoid atomic.
|
||||
} while (UNLIKELY(!value_.CompareAndSwap(old, old | delta)));
|
||||
}
|
||||
|
||||
virtual void Set(int64_t value) { value_ = value; }
|
||||
virtual void Set(int64_t value) { value_.Store(value); }
|
||||
|
||||
virtual void Set(int value) { value_ = value; }
|
||||
virtual void Set(int value) { value_.Store(value); }
|
||||
|
||||
virtual void Set(double value) {
|
||||
value_ = *reinterpret_cast<int64_t*>(&value);
|
||||
DCHECK_EQ(sizeof(value), sizeof(int64_t));
|
||||
value_.Store(*reinterpret_cast<int64_t*>(&value));
|
||||
}
|
||||
|
||||
virtual int64_t value() const { return value_; }
|
||||
virtual int64_t value() const { return value_.Load(); }
|
||||
|
||||
virtual double double_value() const {
|
||||
return *reinterpret_cast<const double*>(&value_);
|
||||
int64_t v = value_.Load();
|
||||
return *reinterpret_cast<const double*>(&v);
|
||||
}
|
||||
|
||||
TUnit::type unit() const { return unit_; }
|
||||
@@ -130,32 +136,43 @@ class RuntimeProfile {
|
||||
HighWaterMarkCounter(TUnit::type unit) : Counter(unit) {}
|
||||
|
||||
virtual void Add(int64_t delta) {
|
||||
int64_t new_val = current_value_.UpdateAndFetch(delta);
|
||||
value_.UpdateMax(new_val);
|
||||
int64_t new_val = current_value_.Add(delta);
|
||||
UpdateMax(new_val);
|
||||
}
|
||||
|
||||
/// Tries to increase the current value by delta. If current_value() + delta
|
||||
/// exceeds max, return false and current_value is not changed.
|
||||
bool TryAdd(int64_t delta, int64_t max) {
|
||||
while (true) {
|
||||
int64_t old_val = current_value_;
|
||||
int64_t old_val = current_value_.Load();
|
||||
int64_t new_val = old_val + delta;
|
||||
if (UNLIKELY(new_val > max)) return false;
|
||||
if (LIKELY(current_value_.CompareAndSwap(old_val, new_val))) {
|
||||
value_.UpdateMax(new_val);
|
||||
UpdateMax(new_val);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
virtual void Set(int64_t v) {
|
||||
current_value_ = v;
|
||||
value_.UpdateMax(v);
|
||||
current_value_.Store(v);
|
||||
UpdateMax(v);
|
||||
}
|
||||
|
||||
int64_t current_value() const { return current_value_; }
|
||||
int64_t current_value() const { return current_value_.Load(); }
|
||||
|
||||
private:
|
||||
/// Set 'value_' to 'v' if 'v' is larger than 'value_'. The entire operation is
|
||||
/// atomic.
|
||||
void UpdateMax(int64_t v) {
|
||||
while (true) {
|
||||
int64_t old_max = value_.Load();
|
||||
int64_t new_max = std::max(old_max, v);
|
||||
if (new_max == old_max) break; // Avoid atomic update.
|
||||
if (LIKELY(value_.CompareAndSwap(old_max, new_max))) break;
|
||||
}
|
||||
}
|
||||
|
||||
/// The current value of the counter. value_ in the super class represents
|
||||
/// the high water mark.
|
||||
AtomicInt<int64_t> current_value_;
|
||||
@@ -202,7 +219,7 @@ class RuntimeProfile {
|
||||
int64_t old_val = 0;
|
||||
if (it != counter_value_map_.end()) {
|
||||
old_val = it->second;
|
||||
it->second = new_counter->value_;
|
||||
it->second = new_counter->value();
|
||||
} else {
|
||||
counter_value_map_[new_counter] = new_counter->value();
|
||||
}
|
||||
@@ -211,10 +228,10 @@ class RuntimeProfile {
|
||||
double old_double_val = *reinterpret_cast<double*>(&old_val);
|
||||
current_double_sum_ += (new_counter->double_value() - old_double_val);
|
||||
double result_val = current_double_sum_ / (double) counter_value_map_.size();
|
||||
value_ = *reinterpret_cast<int64_t*>(&result_val);
|
||||
value_.Store(*reinterpret_cast<int64_t*>(&result_val));
|
||||
} else {
|
||||
current_int_sum_ += (new_counter->value_ - old_val);
|
||||
value_ = current_int_sum_ / counter_value_map_.size();
|
||||
current_int_sum_ += (new_counter->value() - old_val);
|
||||
value_.Store(current_int_sum_ / counter_value_map_.size());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user