# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from __future__ import absolute_import, division, print_function import pytest import tests.common.cluster_config as cluster_config from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.test_result_verifier import error_msg_startswith from tests.util.cancel_util import ( QueryToKill, assert_kill_error, assert_kill_ok ) class TestKillQuery(CustomClusterTestSuite): @pytest.mark.execute_serially def test_coordinator_unreachable(self): """ The coordinator of the query to kill is unreachable. It is required that each impalad in the cluster is a coordinator. """ protocol = 'hs2' with self.create_client_for_nth_impalad(0, protocol) as client, \ QueryToKill( self, protocol, check_on_exit=False, nth_impalad=2) as query_id_to_kill: coordinator_to_kill = self.cluster.impalads[2] coordinator_to_kill.kill() assert_kill_error( client, "KillQuery() RPC failed: Network error:", query_id=query_id_to_kill, ) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args(force_restart=True) def test_another_coordinator_unreachable(self): """ A coordinator other than the one of the query to kill is unreachable. It is required that each impalad in the cluster is a coordinator. """ protocol = 'hs2' with self.create_client_for_nth_impalad(0, protocol) as client, \ QueryToKill(self, protocol, nth_impalad=2) as query_id_to_kill: coordinator_to_kill = self.cluster.impalads[1] # impalad 1 is between 0 and 2. coordinator_to_kill.kill() assert_kill_ok(client, query_id_to_kill) @pytest.mark.execute_serially @cluster_config.single_coordinator def test_single_coordinator(self): """ Test when there is only one coordinator in the cluster. """ protocol = 'hs2' with self.create_client_for_nth_impalad(0, protocol) as client: assert_kill_error( client, "Could not find query on any coordinator.", query_id='123:456') @pytest.mark.execute_serially @cluster_config.admit_one_query_at_a_time def test_admit_one_query_at_a_time(self): """ Make sure queries can be killed when only one query is allowed to run at a time. """ protocol = 'hs2' with self.create_client_for_nth_impalad(0, protocol) as client, \ QueryToKill(self, protocol) as query_id_to_kill: assert_kill_ok(client, query_id_to_kill) @pytest.mark.execute_serially @cluster_config.admit_no_query def test_admit_no_query(self): """ Make sure KILL QUERY statement can be executed when no query will be admitted. This is to show that KILL QUERY statements are not subject to admission control. """ protocol = 'hs2' with self.create_client_for_nth_impalad(0, protocol) as client: try: client.execute("SELECT 1") except Exception as e: expected_msg = ( "Rejected query from pool default-pool: " "disabled by requests limit set to 0" ) assert error_msg_startswith(str(e), expected_msg) assert_kill_error( client, "Could not find query on any coordinator", query_id="123:456" ) @cluster_config.enable_authorization class TestKillQueryAuthorization(CustomClusterTestSuite): @pytest.mark.execute_serially def test_kill_as_admin(self): # ImpylaHS2Connection does not support authentication yet. protocol = 'beeswax' with self.create_client_for_nth_impalad(0, protocol) as client, \ QueryToKill(self, protocol, user="user1") as query_id_to_kill: assert_kill_ok(client, query_id_to_kill, user=cluster_config.ADMIN) @pytest.mark.execute_serially def test_kill_as_non_admin(self): # ImpylaHS2Connection does not support authentication yet. protocol = 'beeswax' user1, user2 = "user1", "user2" with self.create_client_for_nth_impalad(0, protocol) as user1_client, \ self.create_client_for_nth_impalad(0, protocol) as user2_client, \ QueryToKill(self, protocol, user=user1) as query_id_to_kill: assert_kill_error( user2_client, "User '{0}' is not authorized to kill the query.".format(user2), query_id=query_id_to_kill, user=user2, ) assert_kill_ok(user1_client, query_id_to_kill, user=user1)