IMPALA-10594: Handle failed coordinators in admissiond

This patch adds a statestore callback for the admissiond that monitors
for coordinators that have been removed from the cluster membership
and releases all of the resources for queries running on those
coordinators.

Testing:
- Added a custom cluster test that kills a coordinator and verifies
  that resources for queries running on it are eventually released.

Change-Id: I883f323bb765680ef24b3c3f51fb209dea15f0b0
Reviewed-on: http://gerrit.cloudera.org:8080/17209
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Thomas Tauber-Marshall
2021-03-18 10:42:41 -07:00
committed by Impala Public Jenkins
parent 8ac761348a
commit 9adb093ae0
6 changed files with 125 additions and 9 deletions

View File

@@ -1141,9 +1141,12 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
self.wait_for_state(queued_query_handle, QueryState.EXCEPTION, 20),
self.close_query(queued_query_handle)
def _wait_for_change_to_profile(self, query_handle, search_string, timeout=20):
def _wait_for_change_to_profile(
self, query_handle, search_string, timeout=20, client=None):
if client is None:
client = self.client
for _ in range(timeout * 10):
profile = self.client.get_runtime_profile(query_handle)
profile = client.get_runtime_profile(query_handle)
if search_string in profile:
return
sleep(0.1)
@@ -1456,6 +1459,36 @@ class TestAdmissionControllerWithACService(TestAdmissionController):
self.wait_for_state(
handle2, self.client.QUERY_STATES['RUNNING'], timeout_s)
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--vmodule admission-controller=3 --default_pool_max_requests=1")
def test_coordinator_failed(self):
"""Tests that if a coordinator fails, the resources for queries running at that
coordinator are eventually released."""
# Query designed to run for a few minutes.
query = "select count(*) from functional.alltypes where int_col = sleep(10000)"
impalad1 = self.cluster.impalads[0]
client1 = impalad1.service.create_beeswax_client()
handle1 = client1.execute_async(query)
timeout_s = 10
# Make sure the first query has been admitted.
self.wait_for_state(
handle1, self.client.QUERY_STATES['RUNNING'], timeout_s, client=client1)
# Run another query with a different coordinator. This query should be queued because
# only 1 query is allowed in the default pool.
impalad2 = self.cluster.impalads[1]
client2 = impalad2.service.create_beeswax_client()
handle2 = client2.execute_async(query)
self._wait_for_change_to_profile(handle2, "Admission result: Queued", client=client2)
# Kill the coordinator for the first query. The resources for the query should get
# cleaned up and the second query should be admitted.
impalad1.kill()
self.wait_for_state(
handle2, self.client.QUERY_STATES['RUNNING'], timeout_s, client=client2)
class TestAdmissionControllerStress(TestAdmissionControllerBase):
"""Submits a number of queries (parameterized) with some delay between submissions
(parameterized) and the ability to submit to one impalad or many in a round-robin