mirror of
https://github.com/apache/impala.git
synced 2025-12-22 11:28:09 -05:00
In the past, Impala had a very simple 'fault injection' framework for simulating failed rpcs between impalads. With the move to KRPC, that framework was not carried over, and we lost the ability to test certain failure scenarios. This patch reintroduces this functionality. It removes the prior fault injection framework in favor of the existing debug action framework, which is more flexible. To facilitate this, a few modifications are made to the debug action framework: - In addition to matching on a label, debug actions may now match on optional arguments. In this patch, the debug action IMPALA_SERVICE_POOL takes the arguments 'host', 'port', and 'rpc name' to allow simulating the failure of specific rpcs to specific impalads. - The FAIL action now takes an optional 'error message' parameter. In this patch, the debug action IMPALA_SERVICE_POOL uses this to simulate different types of rpc errors, eg. 'service too busy'. - The FAIL action increments a metric, 'impala.debug_action.fail', so that tests can check that it has actually been hit. Prior to this patch the tests in test_rpc_exception.py where all passing spuriously as the faults they were supposed to be testing were no longer being injected. This patch uses these new mechanisms to introduce tests that simulate failures in DataStreamService rpcs. Follow up patches will add test cases for ControlService rpcs. Change-Id: I9c047ebce6d32c5ae461f70279391fa2df4c2029 Reviewed-on: http://gerrit.cloudera.org:8080/14641 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
124 lines
5.5 KiB
Python
124 lines
5.5 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import pytest
|
|
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
|
|
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
|
from tests.common.skip import SkipIf, SkipIfBuildType
|
|
|
|
@SkipIfBuildType.not_dev_build
|
|
class TestRPCException(CustomClusterTestSuite):
|
|
"""Tests Impala exception handling in TransmitData() RPC to make sure no
|
|
duplicated row batches are sent. """
|
|
# DataStreamService rpc names
|
|
TRANSMIT_DATA_RPC = "TransmitData"
|
|
END_DATA_STREAM_RPC = "EndDataStream"
|
|
|
|
# Error to specify for ImpalaServicePool to reject rpcs with a 'server too busy' error.
|
|
REJECT_TOO_BUSY_MSG = "REJECT_TOO_BUSY"
|
|
|
|
# The BE krpc port of the impalad these tests simulate rpc errors at.
|
|
KRPC_PORT = 27002
|
|
|
|
# This query ends up calling TransmitData() more than 2048 times to ensure
|
|
# proper test coverage.
|
|
TEST_QUERY = "select count(*) from tpch_parquet.lineitem t1, tpch_parquet.lineitem t2 \
|
|
where t1.l_orderkey = t2.l_orderkey"
|
|
EXPECTED_RESULT = ['30012985']
|
|
|
|
@classmethod
|
|
def get_workload(self):
|
|
return 'functional-query'
|
|
|
|
@classmethod
|
|
def setup_class(cls):
|
|
if cls.exploration_strategy() != 'exhaustive':
|
|
pytest.skip('runs only in exhaustive')
|
|
super(TestRPCException, cls).setup_class()
|
|
|
|
def _get_num_fails(self, impalad):
|
|
num_fails = impalad.service.get_metric_value("impala.debug_action.fail")
|
|
if num_fails is None:
|
|
return 0
|
|
return num_fails
|
|
|
|
# Execute TEST_QUERY repeatedly until the FAIL debug action has been hit. If
|
|
# 'exception_string' is None, it's expected to always complete sucessfully with result
|
|
# matching EXPECTED_RESULT. Otherwise, it's expected to fail with 'exception_string' if
|
|
# the debug action has been hit.
|
|
def execute_test_query(self, exception_string):
|
|
impalad = self.cluster.impalads[2]
|
|
assert impalad.service.krpc_port == self.KRPC_PORT
|
|
# Re-run the query until the metrics show that we hit the debug action or we've run 10
|
|
# times. Each test in this file has at least a 50% chance of hitting the action per
|
|
# run, so there's at most a (1/2)^10 chance that this loop will fail spuriously.
|
|
i = 0
|
|
while self._get_num_fails(impalad) == 0 and i < 10:
|
|
i += 1
|
|
try:
|
|
result = self.client.execute(self.TEST_QUERY)
|
|
assert result.data == self.EXPECTED_RESULT
|
|
assert not exception_string or self._get_num_fails(impalad) == 0
|
|
except ImpalaBeeswaxException as e:
|
|
if exception_string is None:
|
|
raise e
|
|
assert exception_string in str(e)
|
|
assert self._get_num_fails(impalad) > 0
|
|
|
|
def _get_fail_action(rpc, error=None, port=KRPC_PORT, p=0.1):
|
|
"""Returns a debug action that causes rpcs with the name 'rpc' that are sent to the
|
|
impalad at 'port' to FAIL with probability 'p' and return 'error' if specified."""
|
|
debug_action = "IMPALA_SERVICE_POOL:127.0.0.1:{port}:{rpc}:FAIL@{probability}" \
|
|
.format(rpc=rpc, probability=p, port=port)
|
|
if error is not None:
|
|
debug_action += "@" + error
|
|
return debug_action
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args("--debug_actions=" +
|
|
_get_fail_action(rpc=TRANSMIT_DATA_RPC, error=REJECT_TOO_BUSY_MSG))
|
|
def test_transmit_data_retry(self):
|
|
"""Run a query where TransmitData may fail with a "server too busy" error. We should
|
|
always retry in this case, so the query should always eventually succeed."""
|
|
self.execute_test_query(None)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args("--debug_actions=" +
|
|
_get_fail_action(rpc=TRANSMIT_DATA_RPC, error=REJECT_TOO_BUSY_MSG) +
|
|
"|" + _get_fail_action(rpc=TRANSMIT_DATA_RPC))
|
|
def test_transmit_data_error(self):
|
|
"""Run a query where TransmitData may fail with a "server too busy" or with a generic
|
|
error. The query should either succeed or fail with the given error."""
|
|
self.execute_test_query("Debug Action: IMPALA_SERVICE_POOL:FAIL@0.1")
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args("--debug_actions=" +
|
|
_get_fail_action(rpc=END_DATA_STREAM_RPC, error=REJECT_TOO_BUSY_MSG, p=0.5))
|
|
def test_end_data_stream_retry(self):
|
|
"""Run a query where EndDataStream may fail with a "server too busy" error. We should
|
|
always retry in this case, so the query should always eventually succeed."""
|
|
self.execute_test_query(None)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args("--debug_actions=" +
|
|
_get_fail_action(rpc=END_DATA_STREAM_RPC, error=REJECT_TOO_BUSY_MSG, p=0.5) +
|
|
"|" + _get_fail_action(rpc=END_DATA_STREAM_RPC, p=0.5))
|
|
def test_end_data_stream_error(self):
|
|
"""Run a query where EndDataStream may fail with a "server too busy" or with a generic
|
|
error. The query should either succeed or fail with the given error."""
|
|
self.execute_test_query("Debug Action: IMPALA_SERVICE_POOL:FAIL@0.5")
|