mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
IMPALA-13040: Add waiting mechanism in UpdateFilterFromRemote
It is possible to have UpdateFilterFromRemote RPC arrive to an impalad executor before QueryState of the destination query is created or complete initialization. This patch add wait mechanism in UpdateFilterFromRemote RPC endpoint to wait for few miliseconds until QueryState exist and complete initialization. The wait time is fixed at 500ms, with exponential sleep period in between. If wait time passed and QueryState still not found or initialized, UpdateFilterFromRemote RPC is deemed fail and query execution move on without complete filter. Testing: - Add BE tests in network-util-test.cc - Add test_runtime_filter_aggregation.py::TestLateQueryStateInit - Pass exhastive runs of test_runtime_filter_aggregation.py, test_query_live.py, and test_query_log.py Change-Id: I156d1f0c694b91ba34be70bc53ae9bacf924b3b9 Reviewed-on: http://gerrit.cloudera.org:8080/21383 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
committed by
Impala Public Jenkins
parent
d1d28c0eec
commit
09d2f10f4d
@@ -53,6 +53,8 @@ DEFINE_int32(query_exec_mgr_cancellation_thread_pool_size, 1,
|
||||
"(Advanced) Size of the QueryExecMgr thread-pool processing cancellations due to "
|
||||
"coordinator failure");
|
||||
|
||||
DECLARE_int32(krpc_port);
|
||||
|
||||
const uint32_t QUERY_EXEC_MGR_MAX_CANCELLATION_QUEUE_SIZE = 65536;
|
||||
|
||||
QueryExecMgr::QueryExecMgr() {
|
||||
@@ -78,6 +80,10 @@ Status QueryExecMgr::StartQuery(const ExecQueryFInstancesRequestPB* request,
|
||||
bool dummy;
|
||||
QueryState* qs =
|
||||
GetOrCreateQueryState(query_ctx, request->per_backend_mem_limit(), &dummy);
|
||||
RETURN_IF_ERROR(DebugAction(query_ctx.client_request.query_options.debug_action,
|
||||
"QUERY_STATE_BEFORE_INIT_GLOBAL"));
|
||||
RETURN_IF_ERROR(DebugAction(query_ctx.client_request.query_options.debug_action,
|
||||
"QUERY_STATE_BEFORE_INIT", {std::to_string(FLAGS_krpc_port)}));
|
||||
Status status = qs->Init(request, fragment_info);
|
||||
if (!status.ok()) {
|
||||
qs->ReleaseBackendResourceRefcount(); // Release refcnt acquired in Init().
|
||||
|
||||
@@ -841,6 +841,11 @@ bool QueryState::codegen_cache_enabled() const {
|
||||
&& ExecEnv::GetInstance()->codegen_cache_enabled();
|
||||
}
|
||||
|
||||
bool QueryState::is_initialized() {
|
||||
std::lock_guard<std::mutex> l(init_lock_);
|
||||
return is_initialized_;
|
||||
}
|
||||
|
||||
bool QueryState::StartFInstances() {
|
||||
VLOG(2) << "StartFInstances(): query_id=" << PrintId(query_id())
|
||||
<< " #instances=" << fragment_info_.fragment_instance_ctxs.size();
|
||||
|
||||
@@ -168,6 +168,7 @@ class QueryState {
|
||||
return query_ctx_.client_request.query_options;
|
||||
}
|
||||
bool codegen_cache_enabled() const;
|
||||
bool is_initialized();
|
||||
MemTracker* query_mem_tracker() const { return query_mem_tracker_; }
|
||||
RuntimeProfile* host_profile() const { return host_profile_; }
|
||||
const NodeToFileSchedulings* node_to_file_schedulings() const {
|
||||
@@ -320,6 +321,16 @@ class QueryState {
|
||||
return fragment_state_map_;
|
||||
}
|
||||
|
||||
/// Returns true if the query has reached a terminal state.
|
||||
bool IsTerminalState() const {
|
||||
// Read into local variable to protect against concurrent modification
|
||||
// of backend_exec_state_.
|
||||
BackendExecState exec_state = backend_exec_state_;
|
||||
return exec_state == BackendExecState::FINISHED
|
||||
|| exec_state == BackendExecState::CANCELLED
|
||||
|| exec_state == BackendExecState::ERROR;
|
||||
}
|
||||
|
||||
private:
|
||||
friend class QueryExecMgr;
|
||||
|
||||
@@ -548,13 +559,6 @@ class QueryState {
|
||||
return !overall_status_.ok() && !overall_status_.IsCancelled();
|
||||
}
|
||||
|
||||
/// Returns true if the query has reached a terminal state.
|
||||
bool IsTerminalState() const {
|
||||
return backend_exec_state_ == BackendExecState::FINISHED
|
||||
|| backend_exec_state_ == BackendExecState::CANCELLED
|
||||
|| backend_exec_state_ == BackendExecState::ERROR;
|
||||
}
|
||||
|
||||
/// Updates the BackendExecState based on 'overall_status_'. Should only be called when
|
||||
/// the current state is a non-terminal state. The transition can either be to the next
|
||||
/// legal state or ERROR if 'overall_status_' is an error. Called by the query state
|
||||
|
||||
@@ -557,6 +557,10 @@ void RuntimeFilterBank::DistributeCompleteFilter(
|
||||
++num_inflight_rpcs_;
|
||||
}
|
||||
|
||||
int32_t remaining_wait_time_ms =
|
||||
max(0, GetRuntimeFilterWaitTime() - complete_filter->TimeSinceRegistrationMs());
|
||||
params.set_remaining_filter_wait_time_ms(remaining_wait_time_ms);
|
||||
|
||||
if (to_coordinator) {
|
||||
proxy->UpdateFilterAsync(params, res, controller,
|
||||
boost::bind(
|
||||
@@ -743,10 +747,7 @@ vector<unique_lock<SpinLock>> RuntimeFilterBank::LockAllFilters() {
|
||||
}
|
||||
|
||||
void RuntimeFilterBank::SendIncompleteFilters() {
|
||||
int32_t wait_time_ms = FLAGS_runtime_filter_wait_time_ms;
|
||||
if (query_state_->query_options().runtime_filter_wait_time_ms > 0) {
|
||||
wait_time_ms = query_state_->query_options().runtime_filter_wait_time_ms;
|
||||
}
|
||||
int32_t wait_time_ms = GetRuntimeFilterWaitTime();
|
||||
|
||||
bool try_wait_aggregation = !cancelled_;
|
||||
for (auto& entry : filters_) {
|
||||
@@ -845,6 +846,14 @@ void RuntimeFilterBank::Close() {
|
||||
filter_mem_tracker_->Close();
|
||||
}
|
||||
|
||||
int32_t RuntimeFilterBank::GetRuntimeFilterWaitTime() const {
|
||||
int32_t wait_time_ms = FLAGS_runtime_filter_wait_time_ms;
|
||||
if (query_state_->query_options().runtime_filter_wait_time_ms > 0) {
|
||||
wait_time_ms = query_state_->query_options().runtime_filter_wait_time_ms;
|
||||
}
|
||||
return wait_time_ms;
|
||||
}
|
||||
|
||||
RuntimeFilterBank::ProducedFilter::ProducedFilter(
|
||||
int pending_producers, int pending_remotes, RuntimeFilter* result_filter)
|
||||
: result_filter(result_filter),
|
||||
|
||||
@@ -373,6 +373,8 @@ class RuntimeFilterBank {
|
||||
/// Disable a bloom filter by replacing it with an ALWAYS_TRUE_FILTER.
|
||||
/// Return a pointer to the new runtime filter.
|
||||
RuntimeFilter* DisableBloomFilter(std::unique_ptr<RuntimeFilter>& bloom_filter);
|
||||
|
||||
int32_t GetRuntimeFilterWaitTime() const;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -56,6 +56,10 @@ using namespace apache::thrift;
|
||||
using namespace org::apache::impala::fb;
|
||||
using namespace strings;
|
||||
|
||||
DEFINE_bool_hidden(sort_runtime_filter_aggregator_candidates, false,
|
||||
"Control whether to sort intermediate runtime filter aggregator candidates based on "
|
||||
"their KRPC address. Only used for testing.");
|
||||
|
||||
namespace impala {
|
||||
|
||||
static const string LOCAL_ASSIGNMENTS_KEY("simple-scheduler.local-assignments.total");
|
||||
@@ -334,7 +338,18 @@ void Scheduler::ComputeRandomKrpcForAggregation(const ExecutorConfig& executor_c
|
||||
int num_agg = (int)ceil((double)num_non_coordinator_host / num_filters_per_host);
|
||||
DCHECK_GT(num_agg, 0);
|
||||
|
||||
std::shuffle(instance_groups.begin(), instance_groups.end(), *state->rng());
|
||||
if (UNLIKELY(FLAGS_sort_runtime_filter_aggregator_candidates)) {
|
||||
sort(instance_groups.begin(), instance_groups.end(),
|
||||
[src_state](InstanceToAggPairs a, InstanceToAggPairs b) {
|
||||
int idx_a = a[0].first;
|
||||
int idx_b = b[0].first;
|
||||
return CompareNetworkAddressPB(src_state->instance_states[idx_a].krpc_host,
|
||||
src_state->instance_states[idx_b].krpc_host)
|
||||
< 0;
|
||||
});
|
||||
} else {
|
||||
std::shuffle(instance_groups.begin(), instance_groups.end(), *state->rng());
|
||||
}
|
||||
if (coordinator_instances.size() > 0) {
|
||||
// Put coordinator group behind so that coordinator won't be selected as intermediate
|
||||
// aggregator.
|
||||
|
||||
@@ -51,6 +51,9 @@ DEFINE_string(datastream_service_queue_mem_limit, "5%", QUEUE_LIMIT_MSG.c_str())
|
||||
DEFINE_int32(datastream_service_num_svc_threads, 0, "Number of threads for processing "
|
||||
"datastream services' RPCs. If left at default value 0, it will be set to number of "
|
||||
"CPU cores. Set it to a positive value to change from the default.");
|
||||
DEFINE_int32_hidden(update_filter_min_wait_time_ms, 500,
|
||||
"Minimum time for UpdateFilterFromRemote RPC to wait until destination QueryState is "
|
||||
"ready.");
|
||||
DECLARE_string(debug_actions);
|
||||
|
||||
namespace impala {
|
||||
@@ -128,20 +131,52 @@ void DataStreamService::UpdateFilterFromRemote(
|
||||
DCHECK(req->has_query_id());
|
||||
DCHECK(
|
||||
req->has_bloom_filter() || req->has_min_max_filter() || req->has_in_list_filter());
|
||||
QueryState::ScopedRef qs(ProtoToQueryId(req->query_id()));
|
||||
int64_t arrival_time = MonotonicMillis();
|
||||
|
||||
if (qs.get() != nullptr) {
|
||||
qs->UpdateFilterFromRemote(*req, context);
|
||||
RespondAndReleaseRpc(Status::OK(), resp, context, mem_tracker_.get());
|
||||
} else {
|
||||
// Query state for requested query_id might have been cancelled or closed.
|
||||
// i.e., RUNTIME_FILTER_WAIT_TIME_MS has passed and all fragment instances of
|
||||
// query_id has complete their execution.
|
||||
string err_msg = Substitute("Query State not found for query_id=$0",
|
||||
PrintId(ProtoToQueryId(req->query_id())));
|
||||
LOG(INFO) << err_msg;
|
||||
RespondAndReleaseRpc(Status(err_msg), resp, context, mem_tracker_.get());
|
||||
// Loop until destination QueryState is ready to accept filter update from remote.
|
||||
// Sleep for few miliseconds in-between and break after 500ms grace period passed.
|
||||
// The grace period is short so that RPC thread is not blocked for too long.
|
||||
// This is a much simpler mechanism than KrpcDataStreamMgr::AddData.
|
||||
// TODO: Revisit this with more sophisticated deferral mechanism if needed.
|
||||
bool query_found = false;
|
||||
int64_t total_wait_time = 0;
|
||||
int32_t sleep_duration_ms = 2;
|
||||
if (req->remaining_filter_wait_time_ms() < FLAGS_update_filter_min_wait_time_ms) {
|
||||
LOG(INFO) << "UpdateFilterFromRemote RPC called with remaining wait time "
|
||||
<< req->remaining_filter_wait_time_ms() << " ms, less than "
|
||||
<< FLAGS_update_filter_min_wait_time_ms << " ms minimum wait time.";
|
||||
}
|
||||
|
||||
do {
|
||||
{
|
||||
QueryState::ScopedRef qs(ProtoToQueryId(req->query_id()));
|
||||
query_found |= (qs.get() != nullptr);
|
||||
if (query_found) {
|
||||
if (qs.get() == nullptr || qs->IsTerminalState()) {
|
||||
// Query was found, but now is either missing or in terminal state.
|
||||
// Break the loop and response with an error.
|
||||
break;
|
||||
} else if (qs->is_initialized()) {
|
||||
qs->UpdateFilterFromRemote(*req, context);
|
||||
RespondAndReleaseRpc(Status::OK(), resp, context, mem_tracker_.get());
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
usleep(sleep_duration_ms * 1000);
|
||||
// double sleep time for next iteration up to 128ms.
|
||||
if (2 * sleep_duration_ms <= 128) sleep_duration_ms *= 2;
|
||||
total_wait_time = MonotonicMillis() - arrival_time;
|
||||
} while (total_wait_time < FLAGS_update_filter_min_wait_time_ms);
|
||||
|
||||
// Query state for requested query_id might have been cancelled, closed, or not ready.
|
||||
// i.e., RUNTIME_FILTER_WAIT_TIME_MS has passed and all fragment instances of
|
||||
// query_id has complete their execution.
|
||||
string err_msg = Substitute("QueryState for query_id=$0 $1 after $2 ms",
|
||||
PrintId(ProtoToQueryId(req->query_id())),
|
||||
query_found ? "no longer running" : "not found", total_wait_time);
|
||||
LOG(INFO) << err_msg;
|
||||
RespondAndReleaseRpc(Status(err_msg), resp, context, mem_tracker_.get());
|
||||
}
|
||||
|
||||
void DataStreamService::PublishFilter(
|
||||
|
||||
@@ -221,10 +221,7 @@ QueryStateExpanded::QueryStateExpanded(const ClientRequestState& exec_state,
|
||||
// Per-Host Metrics
|
||||
for (int i =0; i < exec_state.schedule()->backend_exec_params_size(); i++) {
|
||||
const BackendExecParamsPB& b = exec_state.schedule()->backend_exec_params(i);
|
||||
TNetworkAddress host;
|
||||
host.hostname = b.address().hostname();
|
||||
host.uds_address = b.address().uds_address();
|
||||
host.port = b.address().port();
|
||||
TNetworkAddress host = FromNetworkAddressPB(b.address());
|
||||
|
||||
PerHostState state;
|
||||
state.fragment_instance_count = b.instance_params_size();
|
||||
@@ -445,4 +442,4 @@ EventsTimelineIterator EventsTimelineIterator::end() {
|
||||
return EventsTimelineIterator(labels_, timestamps_, labels_->size());
|
||||
}
|
||||
|
||||
} // namespace impala
|
||||
} // namespace impala
|
||||
|
||||
@@ -41,6 +41,12 @@ TEST(NetworkUtil, NetAddrCompHostnameDiff) {
|
||||
|
||||
ASSERT_TRUE(fixture(first, second));
|
||||
ASSERT_FALSE(fixture(second, first));
|
||||
ASSERT_TRUE(
|
||||
CompareNetworkAddressPB(FromTNetworkAddress(first), FromTNetworkAddress(second))
|
||||
< 0);
|
||||
ASSERT_TRUE(
|
||||
CompareNetworkAddressPB(FromTNetworkAddress(second), FromTNetworkAddress(first))
|
||||
> 0);
|
||||
}
|
||||
|
||||
// Assert where host fields are equal but port is different.
|
||||
@@ -59,6 +65,12 @@ TEST(NetworkUtil, NetAddrCompPortDiff) {
|
||||
|
||||
ASSERT_TRUE(fixture(first, second));
|
||||
ASSERT_FALSE(fixture(second, first));
|
||||
ASSERT_TRUE(
|
||||
CompareNetworkAddressPB(FromTNetworkAddress(first), FromTNetworkAddress(second))
|
||||
< 0);
|
||||
ASSERT_TRUE(
|
||||
CompareNetworkAddressPB(FromTNetworkAddress(second), FromTNetworkAddress(first))
|
||||
> 0);
|
||||
}
|
||||
|
||||
// Assert where host and port fields are equal but uds address is different.
|
||||
@@ -77,6 +89,12 @@ TEST(NetworkUtil, NetAddrCompUDSAddrDiff) {
|
||||
|
||||
ASSERT_TRUE(fixture(first, second));
|
||||
ASSERT_FALSE(fixture(second, first));
|
||||
ASSERT_TRUE(
|
||||
CompareNetworkAddressPB(FromTNetworkAddress(first), FromTNetworkAddress(second))
|
||||
< 0);
|
||||
ASSERT_TRUE(
|
||||
CompareNetworkAddressPB(FromTNetworkAddress(second), FromTNetworkAddress(first))
|
||||
> 0);
|
||||
}
|
||||
|
||||
// Assert where all three comparison fields are equal.
|
||||
@@ -95,6 +113,81 @@ TEST(NetworkUtil, NetAddrUDSAddrSame) {
|
||||
|
||||
ASSERT_FALSE(fixture(first, second));
|
||||
ASSERT_FALSE(fixture(second, first));
|
||||
ASSERT_TRUE(
|
||||
CompareNetworkAddressPB(FromTNetworkAddress(first), FromTNetworkAddress(second))
|
||||
== 0);
|
||||
ASSERT_TRUE(
|
||||
CompareNetworkAddressPB(FromTNetworkAddress(second), FromTNetworkAddress(first))
|
||||
== 0);
|
||||
}
|
||||
|
||||
// Assert where host and port fields are equal first address does not have
|
||||
// uds address set.
|
||||
TEST(NetworkUtil, NetAddrOneMissUDSAddr) {
|
||||
TNetworkAddressComparator fixture;
|
||||
TNetworkAddress first;
|
||||
TNetworkAddress second;
|
||||
|
||||
first.__set_hostname("host");
|
||||
first.__set_port(0);
|
||||
|
||||
second.__set_hostname("host");
|
||||
second.__set_port(0);
|
||||
second.__set_uds_address("");
|
||||
|
||||
ASSERT_TRUE(fixture(first, second));
|
||||
ASSERT_FALSE(fixture(second, first));
|
||||
ASSERT_TRUE(
|
||||
CompareNetworkAddressPB(FromTNetworkAddress(first), FromTNetworkAddress(second))
|
||||
< 0);
|
||||
ASSERT_TRUE(
|
||||
CompareNetworkAddressPB(FromTNetworkAddress(second), FromTNetworkAddress(first))
|
||||
> 0);
|
||||
}
|
||||
|
||||
// Assert where host and port fields are equal and both address does not have
|
||||
// uds address set.
|
||||
TEST(NetworkUtil, NetAddrAllMissUDSAddr) {
|
||||
TNetworkAddressComparator fixture;
|
||||
TNetworkAddress first;
|
||||
TNetworkAddress second;
|
||||
|
||||
first.__set_hostname("host");
|
||||
first.__set_port(0);
|
||||
|
||||
second.__set_hostname("host");
|
||||
second.__set_port(0);
|
||||
|
||||
ASSERT_FALSE(fixture(first, second));
|
||||
ASSERT_FALSE(fixture(second, first));
|
||||
ASSERT_TRUE(
|
||||
CompareNetworkAddressPB(FromTNetworkAddress(first), FromTNetworkAddress(second))
|
||||
== 0);
|
||||
ASSERT_TRUE(
|
||||
CompareNetworkAddressPB(FromTNetworkAddress(second), FromTNetworkAddress(first))
|
||||
== 0);
|
||||
}
|
||||
|
||||
void CheckTranslation(TNetworkAddress thrift_address) {
|
||||
NetworkAddressPB proto_address = FromTNetworkAddress(thrift_address);
|
||||
TNetworkAddress thrift_address2 = FromNetworkAddressPB(proto_address);
|
||||
NetworkAddressPB proto_address2 = FromTNetworkAddress(thrift_address2);
|
||||
|
||||
TNetworkAddressComparator fixture;
|
||||
ASSERT_FALSE(fixture(thrift_address, thrift_address2));
|
||||
ASSERT_FALSE(fixture(thrift_address2, thrift_address));
|
||||
ASSERT_TRUE(CompareNetworkAddressPB(proto_address, proto_address2) == 0);
|
||||
ASSERT_TRUE(CompareNetworkAddressPB(proto_address2, proto_address) == 0);
|
||||
}
|
||||
|
||||
// Assert consistent translation between TNetworkAddress and NetworkAddressPB.
|
||||
TEST(NetworkUtil, NetAddrTranslation) {
|
||||
TNetworkAddress addr;
|
||||
addr.__set_hostname("host");
|
||||
addr.__set_port(0);
|
||||
CheckTranslation(addr);
|
||||
addr.__set_uds_address("uds");
|
||||
CheckTranslation(addr);
|
||||
}
|
||||
|
||||
} // namespace impala
|
||||
|
||||
@@ -240,7 +240,7 @@ TNetworkAddress FromNetworkAddressPB(const NetworkAddressPB& address) {
|
||||
TNetworkAddress t_address;
|
||||
t_address.__set_hostname(address.hostname());
|
||||
t_address.__set_port(address.port());
|
||||
t_address.__set_uds_address(address.uds_address());
|
||||
if (address.has_uds_address()) t_address.__set_uds_address(address.uds_address());
|
||||
return t_address;
|
||||
}
|
||||
|
||||
@@ -248,7 +248,7 @@ NetworkAddressPB FromTNetworkAddress(const TNetworkAddress& address) {
|
||||
NetworkAddressPB address_pb;
|
||||
address_pb.set_hostname(address.hostname);
|
||||
address_pb.set_port(address.port);
|
||||
address_pb.set_uds_address(address.uds_address);
|
||||
if (address.__isset.uds_address) address_pb.set_uds_address(address.uds_address);
|
||||
return address_pb;
|
||||
}
|
||||
|
||||
@@ -270,7 +270,16 @@ bool TNetworkAddressComparator::operator()(const TNetworkAddress& a,
|
||||
}
|
||||
|
||||
// Hostnames and ports were the same, compare on uds address.
|
||||
return a.uds_address.compare(b.uds_address) < 0;
|
||||
if (a.__isset.uds_address) {
|
||||
if (b.__isset.uds_address) {
|
||||
return a.uds_address.compare(b.uds_address) < 0;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
} else if (b.__isset.uds_address) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/// Pick a random port in the range of ephemeral ports
|
||||
|
||||
@@ -109,10 +109,29 @@ struct TNetworkAddressComparator {
|
||||
/// a free ephemeral port can't be found after 100 tries.
|
||||
int FindUnusedEphemeralPort();
|
||||
|
||||
/// Compare function for two NetworkAddressPB.
|
||||
/// The order is decided first by hostname, then by port, then by uds address.
|
||||
inline int CompareNetworkAddressPB(
|
||||
const NetworkAddressPB& lhs, const NetworkAddressPB& rhs) {
|
||||
int comp = lhs.hostname().compare(rhs.hostname());
|
||||
if (comp == 0) comp = lhs.port() - rhs.port();
|
||||
if (comp == 0) {
|
||||
if (lhs.has_uds_address()) {
|
||||
if (rhs.has_uds_address()) {
|
||||
comp = lhs.uds_address().compare(rhs.uds_address());
|
||||
} else {
|
||||
comp = 1; // lhs preceed rhs
|
||||
}
|
||||
} else if (rhs.has_uds_address()) {
|
||||
comp = -1; // rhs preceed lhs
|
||||
}
|
||||
}
|
||||
return comp;
|
||||
}
|
||||
|
||||
/// Return true if two NetworkAddressPB are match.
|
||||
inline bool KrpcAddressEqual(const NetworkAddressPB& lhs, const NetworkAddressPB& rhs) {
|
||||
return lhs.hostname() == rhs.hostname() && lhs.port() == rhs.port()
|
||||
&& lhs.uds_address() == rhs.uds_address();
|
||||
return CompareNetworkAddressPB(lhs, rhs) == 0;
|
||||
}
|
||||
|
||||
extern const std::string LOCALHOST_IP_STR;
|
||||
|
||||
@@ -124,6 +124,9 @@ message UpdateFilterParamsPB {
|
||||
optional MinMaxFilterPB min_max_filter = 4;
|
||||
|
||||
optional InListFilterPB in_list_filter = 5;
|
||||
|
||||
// Remaining filter wait time as understood by sender.
|
||||
optional int32 remaining_filter_wait_time_ms = 6;
|
||||
}
|
||||
|
||||
message UpdateFilterResultPB {
|
||||
|
||||
@@ -21,7 +21,10 @@ import pytest
|
||||
|
||||
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
||||
from tests.common.environ import build_flavor_timeout, ImpalaTestClusterProperties
|
||||
from tests.common.test_dimensions import add_mandatory_exec_option
|
||||
from tests.common.test_dimensions import (
|
||||
add_mandatory_exec_option,
|
||||
add_exec_option_dimension
|
||||
)
|
||||
|
||||
# slow_build_timeout is set to 200000 to avoid failures like IMPALA-8064 where the
|
||||
# runtime filters don't arrive in time.
|
||||
@@ -43,6 +46,9 @@ class TestRuntimeFilterAggregation(CustomClusterTestSuite):
|
||||
def add_test_dimensions(cls):
|
||||
super(TestRuntimeFilterAggregation, cls).add_test_dimensions()
|
||||
add_mandatory_exec_option(cls, 'max_num_filters_aggregated_per_host', 2)
|
||||
# Exercise small, non-fatal jitters.
|
||||
add_exec_option_dimension(
|
||||
cls, 'debug_action', ['', 'QUERY_STATE_BEFORE_INIT_GLOBAL:JITTER@200]'])
|
||||
# Enable query option ASYNC_CODEGEN for slow build
|
||||
if build_runs_slowly:
|
||||
add_mandatory_exec_option(cls, "async_codegen", 1)
|
||||
@@ -60,3 +66,64 @@ class TestRuntimeFilterAggregation(CustomClusterTestSuite):
|
||||
}
|
||||
self.run_test_case('QueryTest/runtime_filters', vector, test_file_vars=vars)
|
||||
self.run_test_case('QueryTest/bloom_filters', vector)
|
||||
|
||||
|
||||
class TestLateQueryStateInit(CustomClusterTestSuite):
|
||||
"""Test that distributed runtime filter aggregation still works
|
||||
when remote query state of intermediate aggregator node is late to initialize."""
|
||||
_wait_time = WAIT_TIME_MS // 20
|
||||
_init_delay = [100, 3000]
|
||||
|
||||
@classmethod
|
||||
def get_workload(cls):
|
||||
return 'functional-query'
|
||||
|
||||
@classmethod
|
||||
def add_test_dimensions(cls):
|
||||
super(TestLateQueryStateInit, cls).add_test_dimensions()
|
||||
add_mandatory_exec_option(cls, 'max_num_filters_aggregated_per_host', 2)
|
||||
add_mandatory_exec_option(cls, 'runtime_filter_wait_time_ms', cls._wait_time)
|
||||
# Inject sleep in second impalad since sort_runtime_filter_aggregator_cadidate=true
|
||||
# and the first one (coordinator) will never be selected as intermediate aggregator.
|
||||
actions = ["QUERY_STATE_BEFORE_INIT:27001:SLEEP@{0}".format(d) for d in
|
||||
cls._init_delay]
|
||||
add_exec_option_dimension(cls, 'debug_action', actions)
|
||||
# Enable query option ASYNC_CODEGEN for slow build
|
||||
if build_runs_slowly:
|
||||
add_mandatory_exec_option(cls, "async_codegen", 1)
|
||||
|
||||
@pytest.mark.execute_serially
|
||||
@CustomClusterTestSuite.with_args(
|
||||
impalad_args="--sort_runtime_filter_aggregator_candidates=true --logbuflevel=-1")
|
||||
def test_late_query_state_init(self, vector):
|
||||
"""Test that distributed runtime filter aggregation still works
|
||||
when remote query state of intermediate aggregator node is late to initialize."""
|
||||
query = ('select count(*) from functional.alltypes p '
|
||||
'join [SHUFFLE] functional.alltypestiny b '
|
||||
'on p.month = b.int_col and b.month = 1 and b.string_col = "1"')
|
||||
exec_options = vector.get_value('exec_option')
|
||||
result = self.execute_query_expect_success(self.client, query, exec_options)
|
||||
assert result.data[0] == '620'
|
||||
|
||||
# Expect no log printed in short init delay scenario.
|
||||
# In long init delay scenario, two possible situations can happen:
|
||||
# 1. The build scanner assigned in first impalad exchange all build rows (1) to
|
||||
# the second impalad that is blocked at QueryExecMgr::StartQuery. This indirectly
|
||||
# delay all impalad because all JOIN BUILD fragment need to wait for EOS signal
|
||||
# from exchange sender. The probability for this case is 1/3.
|
||||
# 2. The build scanner exchange all build rows to other impalads than the second one.
|
||||
# The JOIN BUILD fragment in first and third impalads immediately receive EOS
|
||||
# signal from exchange sender, complete build, and send their filter update to
|
||||
# the second impalad. The second impalad stay blocked at QueryExecMgr::StartQuery
|
||||
# and filter update need to wait until it gives up. The probability for this case
|
||||
# is 2/3.
|
||||
expected = -1 if str(self._init_delay[-1]) in exec_options['debug_action'] else 0
|
||||
all_blocked = 'UpdateFilterFromRemote RPC called with remaining wait time'
|
||||
preagg_blocked = 'QueryState for query_id={0} no'.format(result.query_id)
|
||||
log_pattern = '({0}|{1})'.format(all_blocked, preagg_blocked)
|
||||
if expected == -1:
|
||||
if 'Filter 0 inflight for final aggregation' in result.runtime_profile:
|
||||
log_pattern = all_blocked # case 1.
|
||||
else:
|
||||
log_pattern = preagg_blocked # case 2.
|
||||
self.assert_log_contains('impalad_node1', 'INFO', log_pattern, expected)
|
||||
|
||||
@@ -359,8 +359,10 @@ def assert_query(query_tbl, client, expected_cluster_id, raw_profile=None, impal
|
||||
perhost_frags = re.search(r'\n\s+Per Host Number of Fragment Instances:\s+(.*?)\n',
|
||||
profile_text)
|
||||
assert perhost_frags is not None
|
||||
assert data[index] == ",".join(sorted(perhost_frags.group(1).replace("(", "=")
|
||||
.replace(")", "").split(" "))), "per-host fragment instances incorrect"
|
||||
expected = ",".join(sorted(perhost_frags.group(1).replace("(", "=")
|
||||
.replace(")", "").split(" ")))
|
||||
assert data[index] == expected, ('per-host fragment instances incorrect.'
|
||||
' expected="{0}" actual="{1}"').format(expected, data[index])
|
||||
|
||||
# Backends Count
|
||||
index += 1
|
||||
|
||||
Reference in New Issue
Block a user