IMPALA-9612: Fix race condition in RuntimeFilter::WaitForArrival

In function RuntimeFilter::WaitForArrival, there is a race condition
where condition variable arrival_cv_ may be signaled right after
thread get into the loop and before it call arrival_cv_.WaitFor().
This can cause runtime filter to wait the entire
RUNTIME_FILTER_WAIT_TIME_MS even though the filter has arrived or
canceled earlier than that. This commit avoid the race condition by
making RuntimeFilter::SetFilter and RuntimeFilter::Cancel acquire
arrival_mutex_ first before checking the value of arrival_time_ and
release arrival_mutex_ before signaling arrival_cv_.

Testing:
- Add new be test runtime-filter-test.cc
- Pass core tests.

Change-Id: I7dffa626103ef0af06ad1e89231b0d2ee54bb94a
Reviewed-on: http://gerrit.cloudera.org:8080/15673
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Riza Suminto
2020-04-06 22:24:05 -07:00
committed by Impala Public Jenkins
parent e0ed7d321c
commit 5e69ae1d7d
4 changed files with 133 additions and 12 deletions

View File

@@ -98,6 +98,7 @@ add_library(RuntimeTests STATIC
multi-precision-test.cc
raw-value-test.cc
row-batch-serialize-test.cc
runtime-filter-test.cc
string-buffer-test.cc
string-compare-test.cc
string-search-test.cc
@@ -131,6 +132,7 @@ ADD_UNIFIED_BE_LSAN_TEST(hdfs-fs-cache-test "HdfsFsCacheTest.*")
ADD_UNIFIED_BE_LSAN_TEST(tmp-file-mgr-test "TmpFileMgrTest.*")
ADD_UNIFIED_BE_LSAN_TEST(row-batch-serialize-test "RowBatchSerializeTest.*")
# Exception to unified be tests: Custom main function with global Frontend object
ADD_UNIFIED_BE_LSAN_TEST(runtime-filter-test "RuntimeFilterTest.*")
ADD_BE_LSAN_TEST(row-batch-test)
# Exception to unified be tests: Custom main function with global Frontend object
ADD_BE_LSAN_TEST(collection-value-builder-test)

View File

@@ -0,0 +1,103 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <boost/thread/thread.hpp>
#include "common/init.h"
#include "common/object-pool.h"
#include "runtime/runtime-filter.h"
#include "runtime/runtime-filter.inline.h"
#include "testutil/gtest-util.h"
#include "util/stopwatch.h"
#include "common/names.h"
using namespace impala;
namespace impala {
class RuntimeFilterTest : public testing::Test {
protected:
ObjectPool pool_;
MemTracker tracker_;
virtual void SetUp() {}
virtual void TearDown() { pool_.Clear(); }
void SetDelay(RuntimeFilter* rf, int64_t delay) { rf->injection_delay_ = delay; }
};
struct TestConfig {
RuntimeFilter* runtime_filter;
int64_t injection_delay;
int64_t wait_for_ms;
MinMaxFilter* min_max_filter;
};
// Test that RuntimeFilter stop waiting after it is canceled.
// See IMPALA-9612.
TEST_F(RuntimeFilterTest, Canceled) {
TRuntimeFilterDesc desc;
desc.__set_type(TRuntimeFilterType::MIN_MAX);
RuntimeFilter* rf = pool_.Add(new RuntimeFilter(desc, desc.filter_size_bytes));
TestConfig tc = {rf, 500, 1000, nullptr};
SetDelay(rf, tc.injection_delay);
MonotonicStopWatch sw;
thread_group workers;
sw.Start();
workers.add_thread(
new thread([&tc] { tc.runtime_filter->WaitForArrival(tc.wait_for_ms); }));
SleepForMs(100); // give waiting thread a head start
workers.add_thread(new thread([&tc] { tc.runtime_filter->Cancel(); }));
workers.join_all();
sw.Stop();
ASSERT_GE(tc.runtime_filter->arrival_delay_ms(), tc.injection_delay);
ASSERT_LT(sw.ElapsedTime(), (tc.injection_delay + tc.wait_for_ms) * 1000000);
}
// Test that RuntimeFilter stop waiting after the filter arrived.
// See IMPALA-9612.
TEST_F(RuntimeFilterTest, Arrived) {
TRuntimeFilterDesc desc;
desc.__set_type(TRuntimeFilterType::MIN_MAX);
RuntimeFilter* rf = pool_.Add(new RuntimeFilter(desc, desc.filter_size_bytes));
MinMaxFilter* mmf =
MinMaxFilter::Create(ColumnType(PrimitiveType::TYPE_BOOLEAN), &pool_, &tracker_);
TestConfig tc = {rf, 500, 1000, mmf};
SetDelay(rf, tc.injection_delay);
MonotonicStopWatch sw;
thread_group workers;
sw.Start();
workers.add_thread(
new thread([&tc] { tc.runtime_filter->WaitForArrival(tc.wait_for_ms); }));
SleepForMs(100); // give waiting thread a head start
workers.add_thread(
new thread([&tc] { tc.runtime_filter->SetFilter(nullptr, tc.min_max_filter); }));
workers.join_all();
sw.Stop();
ASSERT_GE(tc.runtime_filter->arrival_delay_ms(), tc.injection_delay);
ASSERT_LT(sw.ElapsedTime(), (tc.injection_delay + tc.wait_for_ms) * 1000000);
}
} // namespace impala

View File

@@ -26,17 +26,20 @@ using namespace impala;
const char* RuntimeFilter::LLVM_CLASS_NAME = "class.impala::RuntimeFilter";
void RuntimeFilter::SetFilter(BloomFilter* bloom_filter, MinMaxFilter* min_max_filter) {
DCHECK(!HasFilter()) << "SetFilter() should not be called multiple times.";
DCHECK(bloom_filter_.Load() == nullptr && min_max_filter_.Load() == nullptr);
if (arrival_time_.Load() != 0) return; // The filter may already have been cancelled.
if (is_bloom_filter()) {
bloom_filter_.Store(bloom_filter);
} else {
DCHECK(is_min_max_filter());
min_max_filter_.Store(min_max_filter);
{
unique_lock<mutex> l(arrival_mutex_);
DCHECK(!HasFilter()) << "SetFilter() should not be called multiple times.";
DCHECK(bloom_filter_.Load() == nullptr && min_max_filter_.Load() == nullptr);
if (arrival_time_.Load() != 0) return; // The filter may already have been cancelled.
if (is_bloom_filter()) {
bloom_filter_.Store(bloom_filter);
} else {
DCHECK(is_min_max_filter());
min_max_filter_.Store(min_max_filter);
}
arrival_time_.Store(MonotonicMillis());
has_filter_.Store(true);
}
arrival_time_.Store(MonotonicMillis());
has_filter_.Store(true);
arrival_cv_.NotifyAll();
}
@@ -64,8 +67,11 @@ void RuntimeFilter::Or(RuntimeFilter* other) {
}
void RuntimeFilter::Cancel() {
if (arrival_time_.Load() != 0) return;
arrival_time_.Store(MonotonicMillis());
{
unique_lock<mutex> l(arrival_mutex_);
if (arrival_time_.Load() != 0) return;
arrival_time_.Store(MonotonicMillis());
}
arrival_cv_.NotifyAll();
}
@@ -75,6 +81,9 @@ bool RuntimeFilter::WaitForArrival(int32_t timeout_ms) const {
int64_t ms_since_registration = MonotonicMillis() - registration_time_;
int64_t ms_remaining = timeout_ms - ms_since_registration;
if (ms_remaining <= 0) break;
#ifndef NDEBUG
if (injection_delay_ > 0) SleepForMs(injection_delay_);
#endif
arrival_cv_.WaitFor(l, ms_remaining * MICROS_PER_MILLI);
}
return arrival_time_.Load() != 0;

View File

@@ -28,6 +28,7 @@
namespace impala {
class BloomFilter;
class RuntimeFilterTest;
/// RuntimeFilters represent set-membership predicates that are computed during query
/// execution (rather than during planning). They can then be sent to other operators to
@@ -117,6 +118,8 @@ class RuntimeFilter {
static const char* LLVM_CLASS_NAME;
private:
friend class RuntimeFilterTest;
/// Membership bloom_filter. May be NULL even after arrival_time_ is set, meaning that
/// it does not filter any rows, either because it was not created
/// (filter_desc_.bloom_filter is false), there was not enough memory, or the false
@@ -148,5 +151,9 @@ class RuntimeFilter {
/// Signalled when a filter arrives or the filter is cancelled. Paired with
/// 'arrival_mutex_'
mutable ConditionVariable arrival_cv_;
/// Injection delay for WaitForArrival. Used in testing only.
/// See IMPALA-9612.
int64_t injection_delay_ = 0;
};
}