IMPALA-10866: Add testcases for failure cases involving the admission service

The admission service uses the statestore as the only source of
truth to determine whether a coordinator is down. If the statestore
reports a coordinator is down, all running and queued queries
associated with it should be cancelled or rejected.

In IMPALA-12057, we introduced logic to reject queued queries if
the corresponding coordinator has been removed, along with tests
for that behavior.

This patch adds additional test cases to cover other failure
scenarios, such as the coordinator or the statestore going down
with running queries, and verifies that the behavior is as expected
in each case.

Tests:
Passed exhaustive tests.

Change-Id: If617326cbc6fe2567857d6323c6413d98c92d009
Reviewed-on: http://gerrit.cloudera.org:8080/23217
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
Reviewed-by: Abhishek Rawat <arawat@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Yida Wu
2025-07-24 21:58:39 -07:00
committed by Yida Wu
parent 19f662301c
commit 59fdd7169a

View File

@@ -2221,6 +2221,82 @@ class TestAdmissionControllerWithACService(TestAdmissionController):
self.assert_log_contains_multiline(self.get_ac_log_name(), 'INFO',
"The coordinator no longer exists")
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--vmodule admission-controller=3 --default_pool_max_requests=1 "
"--queue_wait_timeout_ms=60000 ")
def test_kill_statestore_with_queries_running(self):
long_query = "select count(*), sleep(10000) from functional.alltypes limit 1"
short_query = "select count(*) from functional.alltypes limit 1"
timeout_s = 60
handle1 = self.client.execute_async(long_query)
# Make sure the first query has been admitted.
self.client.wait_for_impala_state(handle1, RUNNING, timeout_s)
# Run another query. This query should be queued because only 1 query is allowed in
# the default pool.
handle2 = self.client.execute_async(short_query)
self._wait_for_change_to_profile(handle2, "Admission result: Queued")
# Restart the statestore while queries are running/queued.
statestore = self.cluster.statestored
statestore.kill()
statestore.start()
# Verify that both queries eventually complete.
self.client.wait_for_impala_state(handle1, FINISHED, timeout_s)
self.client.close_query(handle1)
self.client.wait_for_impala_state(handle2, FINISHED, timeout_s)
self.client.close_query(handle2)
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--vmodule admission-controller=3 --default_pool_max_requests=1 "
"--queue_wait_timeout_ms=60000 ", disable_log_buffering=True)
def test_kill_coord_with_queries_running(self):
long_query = "select count(*), sleep(1000000000) from functional.alltypes limit 1"
short_query = "select count(*) from functional.alltypes limit 1"
timeout_s = 10
all_coords = self.cluster.get_all_coordinators()
assert len(all_coords) >= 2, "Test requires at least two coordinators"
coord1 = all_coords[0]
coord2 = all_coords[1]
# Make sure the first query has been admitted.
client1 = coord1.service.create_hs2_client()
handle1 = client1.execute_async(long_query)
client1.wait_for_impala_state(handle1, RUNNING, timeout_s)
query_id1 = client1.handle_id(handle1)
# Run another query. This query should be queued because only 1 query is allowed in
# the default pool.
client2 = coord2.service.create_hs2_client()
handle2 = client2.execute_async(short_query)
self._wait_for_change_to_profile(handle2, "Admission result: Queued", client=client2)
# Kill the coordinator handling the running query.
coord1.kill()
try:
client1.close_query(handle1)
except Exception:
pass
# The first query should be canceled after coord1 is killed,
# allowing the queued query to run.
admissiond_log = self.get_ac_log_name()
self.assert_log_contains(admissiond_log, 'INFO',
"Released query id={}".format(query_id1), expected_count=1)
client2.wait_for_impala_state(handle2, FINISHED, timeout_s)
client2.close_query(handle2)
# Cleanup.
client1.close()
client2.close()
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
@pytest.mark.execute_serially
def test_retained_removed_coords_size(self):