mirror of
https://github.com/apache/impala.git
synced 2025-12-22 19:35:22 -05:00
This change adds a flag "--use_krpc" to start-impala-cluster.py. The flag is currently passed as an argument to the impalad daemon. In the future it will also enable KRPC for the catalogd and statestored daemons. This change also adds a flag "--test_krpc" to pytest. When running tests using "impala-py.test --test_krpc", the test cluster will be started by passing "--use_krpc" to start-impala-cluster.py (see above). This change also adds a SkipIf to skip tests based on whether the cluster was started with KRPC support or not. - SkipIf.not_krpc can be used to mark a test that depends on KRPC. - SkipIf.not_thrift can be used to mark a test that depends on Thrift RPC. This change adds a meta test to make sure that the new SkipIf decorators work correctly. The test should be removed as soon as real tests have been added with the new decorators. Change-Id: Ie01a5de2afac4a0f43d5fceff283f6108ad6a3ab Reviewed-on: http://gerrit.cloudera.org:8080/9291 Reviewed-by: David Knupp <dknupp@cloudera.com> Tested-by: Impala Public Jenkins
107 lines
4.2 KiB
Python
107 lines
4.2 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import pytest
|
|
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
|
|
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
|
from tests.common.skip import SkipIf, SkipIfBuildType
|
|
|
|
@SkipIfBuildType.not_dev_build
|
|
class TestRPCException(CustomClusterTestSuite):
|
|
"""Tests Impala exception handling in TransmitData() RPC to make sure no
|
|
duplicated row batches are sent. """
|
|
# This query ends up calling TransmitData() more than 2048 times to ensure
|
|
# proper test coverage.
|
|
TEST_QUERY = "select count(*) from tpch_parquet.lineitem t1, tpch_parquet.lineitem t2 \
|
|
where t1.l_orderkey = t2.l_orderkey"
|
|
EXPECTED_RESULT = ['30012985']
|
|
|
|
@classmethod
|
|
def get_workload(self):
|
|
return 'functional-query'
|
|
|
|
@classmethod
|
|
def setup_class(cls):
|
|
if cls.exploration_strategy() != 'exhaustive':
|
|
pytest.skip('runs only in exhaustive')
|
|
super(TestRPCException, cls).setup_class()
|
|
|
|
# Execute TEST_QUERY. If 'exception_string' is None, it's expected to complete
|
|
# sucessfully with result matching EXPECTED_RESULT. Otherwise, it's expected
|
|
# to fail with 'exception_string'.
|
|
def execute_test_query(self, exception_string):
|
|
try:
|
|
result = self.client.execute(self.TEST_QUERY)
|
|
assert result.data == self.EXPECTED_RESULT
|
|
assert not exception_string
|
|
except ImpalaBeeswaxException as e:
|
|
if exception_string is None:
|
|
raise e
|
|
assert exception_string in str(e)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=1")
|
|
def test_rpc_send_closed_connection(self, vector):
|
|
self.execute_test_query(None)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=2")
|
|
def test_rpc_send_stale_connection(self, vector):
|
|
self.execute_test_query(None)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=3")
|
|
def test_rpc_send_timed_out(self, vector):
|
|
self.execute_test_query(None)
|
|
|
|
@SkipIf.not_thrift
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=4")
|
|
def test_rpc_recv_closed_connection(self, vector):
|
|
self.execute_test_query("Called read on non-open socket")
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=5")
|
|
def test_rpc_recv_timed_out(self, vector):
|
|
self.execute_test_query(None)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=6")
|
|
def test_rpc_secure_send_closed_connection(self, vector):
|
|
self.execute_test_query(None)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=7")
|
|
def test_rpc_secure_send_stale_connection(self, vector):
|
|
self.execute_test_query(None)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=8")
|
|
def test_rpc_secure_send_timed_out(self, vector):
|
|
self.execute_test_query(None)
|
|
|
|
@SkipIf.not_thrift
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=9")
|
|
def test_rpc_secure_recv_closed_connection(self, vector):
|
|
self.execute_test_query("TTransportException: Transport not open")
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=10")
|
|
def test_rpc_secure_recv_timed_out(self, vector):
|
|
self.execute_test_query(None)
|