From 6cdcdb12ff74d38ab83f3257b8646ec516fbf014 Mon Sep 17 00:00:00 2001 From: Tim Armstrong Date: Wed, 24 Feb 2016 13:47:00 -0800 Subject: [PATCH] Test for IMPALA-2987 Add a custom cluster test that tests for delays in registering data stream receivers. We add a stress option to artificially delay this registration to ensure that it can be handled correctly. Change-Id: Id5f5746b6023c301bacfa305c525846cdde822c9 Reviewed-on: http://gerrit.cloudera.org:8080/2306 Tested-by: Internal Jenkins Reviewed-by: Silvius Rus --- be/src/common/global-flags.cc | 2 + be/src/exec/exchange-node.cc | 10 ++++ .../queries/QueryTest/exchange-delays.test | 10 ++++ tests/custom_cluster/test_exchange_delays.py | 46 +++++++++++++++++++ 4 files changed, 68 insertions(+) create mode 100644 testdata/workloads/functional-query/queries/QueryTest/exchange-delays.test create mode 100644 tests/custom_cluster/test_exchange_delays.py diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc index 19e981f69..b63abca1b 100644 --- a/be/src/common/global-flags.cc +++ b/be/src/common/global-flags.cc @@ -72,4 +72,6 @@ DEFINE_string(redaction_rules_file, "", "Absolute path to sensitive data redacti DEFINE_int32(stress_free_pool_alloc, 0, "A stress option which causes memory allocations " "to fail once every n allocations where n is the value of this flag. Effective in " "debug builds only."); +DEFINE_int32(stress_datastream_recvr_delay_ms, 0, "A stress option that causes data " + "stream receiver registration to be delayed. Effective in debug builds only."); #endif diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc index c7e9fab1a..b2c3f4e05 100644 --- a/be/src/exec/exchange-node.cc +++ b/be/src/exec/exchange-node.cc @@ -22,10 +22,13 @@ #include "runtime/row-batch.h" #include "util/debug-util.h" #include "util/runtime-profile.h" +#include "util/time.h" #include "gen-cpp/PlanNodes_types.h" #include "common/names.h" +DECLARE_int32(stress_datastream_recvr_delay_ms); + using namespace impala; DEFINE_int32(exchg_node_buffer_size_bytes, 1024 * 1024 * 10, @@ -61,6 +64,13 @@ Status ExchangeNode::Init(const TPlanNode& tnode, RuntimeState* state) { Status ExchangeNode::Prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::Prepare(state)); convert_row_batch_timer_ = ADD_TIMER(runtime_profile(), "ConvertRowBatchTime"); + +#ifndef NDEBUG + if (FLAGS_stress_datastream_recvr_delay_ms > 0) { + SleepForMs(FLAGS_stress_datastream_recvr_delay_ms); + } +#endif + // TODO: figure out appropriate buffer size DCHECK_GT(num_senders_, 0); stream_recvr_ = ExecEnv::GetInstance()->stream_mgr()->CreateRecvr(state, diff --git a/testdata/workloads/functional-query/queries/QueryTest/exchange-delays.test b/testdata/workloads/functional-query/queries/QueryTest/exchange-delays.test new file mode 100644 index 000000000..0dac1d932 --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/exchange-delays.test @@ -0,0 +1,10 @@ +==== +---- QUERY +select count(*) +from tpch.lineitem + inner join tpch.orders on l_orderkey = o_orderkey +---- RESULTS +6001215 +---- CATCH +Sender timed out waiting for receiver fragment instance +==== diff --git a/tests/custom_cluster/test_exchange_delays.py b/tests/custom_cluster/test_exchange_delays.py new file mode 100644 index 000000000..c46be0177 --- /dev/null +++ b/tests/custom_cluster/test_exchange_delays.py @@ -0,0 +1,46 @@ +# Copyright (c) 2016 Cloudera, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.common.skip import SkipIfNotDebugBuild + +@SkipIfNotDebugBuild.debug_only +class TestExchangeDelays(CustomClusterTestSuite): + """Tests for handling delays in finding data stream receivers""" + + @classmethod + def get_workload(self): + return 'functional-query' + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args("--stress_datastream_recvr_delay_ms=1000" + " --datastream_sender_timeout_ms=500") + def test_exchange_small_delay(self, vector): + """Test delays in registering data stream receivers where the first one or two + batches will time out before the receiver registers, but subsequent batches will + arrive after the receiver registers. Before IMPALA-2987, this scenario resulted in + incorrect results. + """ + self.run_test_case('QueryTest/exchange-delays', vector) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args("--stress_datastream_recvr_delay_ms=10000" + " --datastream_sender_timeout_ms=1") + def test_exchange_large_delay(self, vector): + """Test delays in registering data stream receivers where all of the batches sent + will time out before the receiver registers. Before IMPALA-2987, this scenario + resulted in the query hanging. + """ + self.run_test_case('QueryTest/exchange-delays', vector)