mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
IMPALA-14276: Fix memory leak by removing AdmissionState on rejection
Normally, AdmissionState entries in admissiond are cleaned up when a query is released. However, for requests that are rejected, releasing query is not called, and their AdmissionState was not removed from admission_state_map_ resulting in a memory leak over time. This leak was less noticeable because AdmissionState entries were relatively small. However, when admissiond is run as a standalone process, each AdmissionState includes a profile sidecar, which can be large, making the leak much more. This change adds logic to remove AdmissionState entries when the admission request is rejected. Testing: Add test_admission_state_map_mem_leak for regression test. Change-Id: I9fba4f176c648ed7811225f7f94c91342a724d10 Reviewed-on: http://gerrit.cloudera.org:8080/23257 Reviewed-by: Riza Suminto <riza.suminto@cloudera.com> Reviewed-by: Abhishek Rawat <arawat@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
@@ -212,6 +212,16 @@ void AdmissionControlService::GetQueryStatus(const GetQueryStatusRequestPB* req,
|
||||
}
|
||||
|
||||
RespondAndReleaseRpc(status, resp, rpc_context);
|
||||
if (admission_state->admission_done && !admission_state->admit_status.ok()) {
|
||||
LOG(INFO) << "Query " << req->query_id()
|
||||
<< " was rejected. Removing admission state to free resources.";
|
||||
// If this RPC fails and the admission state is already removed,
|
||||
// a retry may fail with an "Invalid handle" error because the entry is gone.
|
||||
// This is okay and doesn't cause any real problem.
|
||||
// To make it more robust, we may delay the removal using a time-based approach.
|
||||
discard_result(admission_state_map_.Delete(req->query_id()));
|
||||
VLOG(3) << "Current admission state map size: " << admission_state_map_.Count();
|
||||
}
|
||||
}
|
||||
|
||||
void AdmissionControlService::ReleaseQuery(const ReleaseQueryRequestPB* req,
|
||||
|
||||
@@ -2327,6 +2327,76 @@ class TestAdmissionControllerWithACService(TestAdmissionController):
|
||||
except subprocess.CalledProcessError as e:
|
||||
assert "cluster_membership_retained_removed_coords" in str(e)
|
||||
|
||||
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
|
||||
@pytest.mark.execute_serially
|
||||
@CustomClusterTestSuite.with_args(
|
||||
impalad_args=("--vmodule=admission-control-service=3 --default_pool_max_requests=1 "
|
||||
"--queue_wait_timeout_ms=1"),
|
||||
disable_log_buffering=True)
|
||||
def test_admission_state_map_mem_leak(self):
|
||||
"""
|
||||
Regression test to reproduce IMPALA-14276.
|
||||
Steps:
|
||||
1. Submit a long-running query to coord1 and let it run.
|
||||
2. Repeatedly submit short queries to coord2 that get queued and time out due to
|
||||
admission limits.
|
||||
3. Get memory usage before and after to check for possible memory leak in
|
||||
admissiond.
|
||||
"""
|
||||
|
||||
# Long-running query that blocks a request slot.
|
||||
long_query = "select count(*) from functional.alltypes where int_col = sleep(10000)"
|
||||
# Simple short query used to trigger queuing and timeout.
|
||||
short_query = "select count(*) from functional.alltypes limit 1"
|
||||
|
||||
# Max timeout for waiting on query state transitions.
|
||||
timeout_s = 10
|
||||
|
||||
ac = self.cluster.admissiond
|
||||
all_coords = self.cluster.get_all_coordinators()
|
||||
assert len(all_coords) >= 2, "Test requires at least two coordinators"
|
||||
|
||||
coord1, coord2 = all_coords[0], all_coords[1]
|
||||
|
||||
# Submit long query to coord1 to occupy the admission slot.
|
||||
client1 = coord1.service.create_hs2_client()
|
||||
handle1 = client1.execute_async(long_query)
|
||||
client1.wait_for_impala_state(handle1, RUNNING, timeout_s)
|
||||
|
||||
# Allow some time for the system to stabilize.
|
||||
sleep(5)
|
||||
# Capture memory usage before stressing the system.
|
||||
old_total_bytes = ac.service.get_metric_value("tcmalloc.bytes-in-use")
|
||||
assert old_total_bytes != 0
|
||||
|
||||
# Submit short queries to coord2 which will be queued and time out.
|
||||
client2 = coord2.service.create_hs2_client()
|
||||
number_of_iterations = 500
|
||||
for i in range(number_of_iterations):
|
||||
handle2 = client2.execute_async(short_query)
|
||||
self._wait_for_change_to_profile(
|
||||
handle2,
|
||||
"Query Status: Admission for query exceeded timeout",
|
||||
client=client2,
|
||||
timeout=timeout_s)
|
||||
client2.close_query(handle2)
|
||||
|
||||
# Capture memory usage after the test.
|
||||
new_total_bytes = ac.service.get_metric_value("tcmalloc.bytes-in-use")
|
||||
|
||||
# Ensure memory usage has not grown more than 10%, indicating no leak.
|
||||
assert new_total_bytes < old_total_bytes * 1.1
|
||||
# Check if the admission state map size stays 1 all the time, which is
|
||||
# the long running query.
|
||||
admissiond_log = self.get_ac_log_name()
|
||||
self.assert_log_contains(admissiond_log, 'INFO',
|
||||
"Current admission state map size: {}".format(1),
|
||||
expected_count=number_of_iterations)
|
||||
|
||||
# Cleanup clients.
|
||||
client1.close()
|
||||
client2.close()
|
||||
|
||||
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
|
||||
@pytest.mark.execute_serially
|
||||
@CustomClusterTestSuite.with_args(
|
||||
|
||||
Reference in New Issue
Block a user