IMPALA-11858: Cap per backend memory estimate to its memory limit for admission

Admission controller caps memory estimates for a given query to
its physical memory. The memory estimates should instead be capped to
the backend's memory limit for admission, which is computed during
daemon initialization in ExecEnv::Init().

With this patch, for a given query schedule, the Coordinator backend's
memory limit is used for capping memory to admit on coordinator and min
of all executor backend's memory limit is used for capping mem to admit
on executors. A config option 'clamp_query_mem_limit_backend_mem_limit'
is also added to revert to the old behavior where queries requesting
more memory than backend's admission limit get rejected.

The memory requested by a query when MEM_LIMIT or MEM_LIMIT_EXECUTORS is
set is also capped to the memory limit for admission on the backends.

Also fixed the issue related to excessive logging in query profiles
when using global admission controller. If the query was queued
the remote admission controller client was logging 'Queued' status in
profile every time it checked the query status and it hadn't changed.

Testing:
- Updated existing unit tests in admission-controller-test.cc
- Added new checks in existing tests in executor-group-test.cc
- Updated custom_cluster tests in test_admission_controller.py
- Ran exhaustive tests

Change-Id: I3b1f6e530785ef832dbc831d7cc6793133f3335c
Reviewed-on: http://gerrit.cloudera.org:8080/19533
Reviewed-by: Abhishek Rawat <arawat@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Abhishek Rawat
2023-02-09 13:21:06 -08:00
committed by Impala Public Jenkins
parent 39fea06f2b
commit c810c51fa7
12 changed files with 299 additions and 92 deletions

View File

@@ -489,7 +489,8 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1,
pool_max_mem=10 * PROC_MEM_TEST_LIMIT, proc_mem_limit=PROC_MEM_TEST_LIMIT),
pool_max_mem=10 * PROC_MEM_TEST_LIMIT, proc_mem_limit=PROC_MEM_TEST_LIMIT)
+ " -clamp_query_mem_limit_backend_mem_limit=false",
num_exclusive_coordinators=1)
def test_mem_limit_dedicated_coordinator(self, vector):
"""Regression test for IMPALA-8469: coordinator fragment should be admitted on
@@ -510,6 +511,32 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
"1.10 GB is greater than memory available for admission 1.00 GB" in
str(ex)), str(ex)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1,
pool_max_mem=10 * PROC_MEM_TEST_LIMIT, proc_mem_limit=PROC_MEM_TEST_LIMIT)
+ " -clamp_query_mem_limit_backend_mem_limit=true",
num_exclusive_coordinators=1,
cluster_size=2)
def test_clamp_query_mem_limit_backend_mem_limit_flag(self, vector):
"""If a query requests more memory than backend's memory limit for admission, the
query gets admitted with the max memory for admission on backend."""
query = "select * from functional.alltypesagg limit 10"
exec_options = vector.get_value('exec_option')
# Requested mem_limit is more than the memory limit for admission on backends.
# mem_limit will be clamped to the mem limit for admission on backends.
exec_options['mem_limit'] = int(self.PROC_MEM_TEST_LIMIT * 1.1)
result = self.execute_query_expect_success(self.client, query, exec_options)
assert "Cluster Memory Admitted: 2.00 GB" in str(result.runtime_profile), \
str(result.runtime_profile)
# Request mem_limit more than memory limit for admission on executors. Executor's
# memory limit will be clamped to the mem limit for admission on executor.
exec_options['mem_limit'] = 0
exec_options['mem_limit_executors'] = int(self.PROC_MEM_TEST_LIMIT * 1.1)
result = self.execute_query_expect_success(self.client, query, exec_options)
assert "Cluster Memory Admitted: 1.10 GB" in str(result.runtime_profile), \
str(result.runtime_profile)
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
@@ -695,8 +722,9 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_flags(max_requests=2, max_queued=1,
pool_max_mem=10 * PROC_MEM_TEST_LIMIT,
queue_wait_timeout_ms=2 * STATESTORE_RPC_FREQUENCY_MS),
start_args="--per_impalad_args=-mem_limit=3G;-mem_limit=3G;-mem_limit=2G",
queue_wait_timeout_ms=2 * STATESTORE_RPC_FREQUENCY_MS)
+ " -clamp_query_mem_limit_backend_mem_limit=false",
start_args="--per_impalad_args=-mem_limit=3G;-mem_limit=3G;-mem_limit=2G;",
statestored_args=_STATESTORED_ARGS)
def test_heterogeneous_proc_mem_limit(self, vector):
""" Test to ensure that the admission controller takes into account the actual proc
@@ -720,14 +748,12 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
exec_options['num_nodes'] = "1"
self.execute_query_expect_success(self.client, query, exec_options)
# Exercise rejection checks in admission controller.
try:
exec_options = copy(vector.get_value('exec_option'))
exec_options['mem_limit'] = "3G"
self.execute_query(query, exec_options)
except ImpalaBeeswaxException as e:
assert re.search("Rejected query from pool \S+: request memory needed 3.00 GB"
" is greater than memory available for admission 2.00 GB of \S+", str(e)), \
str(e)
exec_options = copy(vector.get_value('exec_option'))
exec_options['mem_limit'] = "3G"
ex = self.execute_query_expect_failure(self.client, query, exec_options)
assert ("Rejected query from pool default-pool: request memory needed "
"3.00 GB is greater than memory available for admission 2.00 GB" in
str(ex)), str(ex)
# Exercise queuing checks in admission controller.
try:
# Wait for previous queries to finish to avoid flakiness.