mirror of
https://github.com/apache/impala.git
synced 2025-12-25 02:03:09 -05:00
IMPALA-14163: (Addendum) Always reset max-query-mem-limit
test_pool_config_change_while_queued now consistently pass in TestAdmissionController and fail in TestAdmissionControllerWithACService. The root cause of this issue is because copy-mem-limit-test-llama-site.xml is only copied once for both tests. TestAdmissionController left max-query-mem-limit of invalidTestPool at 25MB without resetting it back to 0, which then cause test failure at TestAdmissionControllerWithACService. This patch improve the test by always setting max-query-mem-limit of invalidTestPool at 0 both in the beginning and the end of test. Change ResourcePoolConfig to use mem_limit_coordinators and mem_limit_executors because, unlike mem_limit option, they are not subject to pool-level memory clamping. Disable --clamp_query_mem_limit_backend_mem_limit flag so that coord_backend_mem_limit is not clamped to coordinator's process limit. Removed make_copy parameter in test_pool_mem_limit_configs since it does not mutate the config files. Added more log details in admission-controller.cc to help make better association. Testing: - Loop and pass the test in ARM build. Change-Id: I41f671b8fb3eabf263041a834b54740fbacda68e Reviewed-on: http://gerrit.cloudera.org:8080/23106 Reviewed-by: Yida Wu <wydbaggio000@gmail.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
@@ -1634,7 +1634,7 @@ Status AdmissionController::SubmitForAdmission(const AdmissionRequest& request,
|
||||
stats->metrics()->total_rejected->Increment(1);
|
||||
const ErrorMsg& rejected_msg = ErrorMsg(TErrorCode::ADMISSION_REJECTED,
|
||||
queue_node->pool_name, queue_node->not_admitted_reason);
|
||||
VLOG_QUERY << rejected_msg.msg();
|
||||
VLOG_QUERY << "query_id=" << PrintId(request.query_id) << " " << rejected_msg.msg();
|
||||
return Status::Expected(rejected_msg);
|
||||
}
|
||||
|
||||
@@ -1767,7 +1767,8 @@ Status AdmissionController::WaitOnQueued(const UniqueIdPB& query_id,
|
||||
PROFILE_INFO_KEY_ADMISSION_RESULT, PROFILE_INFO_VAL_REJECTED);
|
||||
const ErrorMsg& rejected_msg = ErrorMsg(TErrorCode::ADMISSION_REJECTED,
|
||||
queue_node->pool_name, queue_node->not_admitted_reason);
|
||||
VLOG_QUERY << rejected_msg.msg();
|
||||
VLOG_QUERY << "query_id=" << PrintId(queue_node->admission_request.query_id) << " "
|
||||
<< rejected_msg.msg();
|
||||
return Status::Expected(rejected_msg);
|
||||
} else if (outcome == AdmissionOutcome::TIMED_OUT) {
|
||||
bool removed = queue->Remove(queue_node);
|
||||
@@ -2313,6 +2314,18 @@ Status AdmissionController::ComputeGroupScheduleStates(
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
static inline string PrintScheduleStateMemInfo(ScheduleState* state) {
|
||||
return Substitute("{per_backend_mem_limit=$0, per_backend_mem_to_admit=$1, "
|
||||
"per_backend_mem_to_admit_source=$2, coord_backend_mem_limit=$3, "
|
||||
"coord_backend_mem_to_admit=$4, coord_backend_mem_to_admit_source=$5}",
|
||||
PrintBytes(state->per_backend_mem_limit()),
|
||||
PrintBytes(state->per_backend_mem_to_admit()),
|
||||
MemLimitSourcePB_Name(state->per_backend_mem_to_admit_source()),
|
||||
PrintBytes(state->coord_backend_mem_limit()),
|
||||
PrintBytes(state->coord_backend_mem_to_admit()),
|
||||
MemLimitSourcePB_Name(state->coord_backend_mem_to_admit_source()));
|
||||
}
|
||||
|
||||
bool AdmissionController::FindGroupToAdmitOrReject(
|
||||
ClusterMembershipMgr::SnapshotPtr& membership_snapshot,
|
||||
const TPoolConfig& pool_config, const TPoolConfig& root_cfg, bool admit_from_queue,
|
||||
@@ -2371,7 +2384,8 @@ bool AdmissionController::FindGroupToAdmitOrReject(
|
||||
<< PrintBytes(state->GetDedicatedCoordMemoryEstimate())
|
||||
<< " max_requests=" << max_requests << " max_queued=" << max_queued
|
||||
<< " max_mem=" << PrintBytes(max_mem) << " is_trivial_query="
|
||||
<< PrettyPrinter::Print(state->GetIsTrivialQuery(), TUnit::NONE);
|
||||
<< PrettyPrinter::Print(state->GetIsTrivialQuery(), TUnit::NONE)
|
||||
<< " ScheduleState=" << PrintScheduleStateMemInfo(state);
|
||||
VLOG_QUERY << "Stats: " << pool_stats->DebugString();
|
||||
|
||||
if (state->GetIsTrivialQuery() && CanAdmitTrivialRequest(*state)) {
|
||||
@@ -2674,15 +2688,8 @@ AdmissionController::PoolStats* AdmissionController::GetPoolStats(
|
||||
void AdmissionController::AdmitQuery(
|
||||
QueueNode* node, string& user, bool was_queued, bool is_trivial) {
|
||||
ScheduleState* state = node->admitted_schedule.get();
|
||||
VLOG_RPC << "For Query " << PrintId(state->query_id())
|
||||
<< " per_backend_mem_limit set to: "
|
||||
<< PrintBytes(state->per_backend_mem_limit())
|
||||
<< " per_backend_mem_to_admit set to: "
|
||||
<< PrintBytes(state->per_backend_mem_to_admit())
|
||||
<< " coord_backend_mem_limit set to: "
|
||||
<< PrintBytes(state->coord_backend_mem_limit())
|
||||
<< " coord_backend_mem_to_admit set to: "
|
||||
<< PrintBytes(state->coord_backend_mem_to_admit());
|
||||
VLOG_RPC << "Admitting query " << PrintId(state->query_id())
|
||||
<< " ScheduleState=" << PrintScheduleStateMemInfo(state);
|
||||
|
||||
// Update memory and number of queries.
|
||||
bool track_per_user = HasQuotaConfig(node->pool_cfg) || HasQuotaConfig(node->root_cfg);
|
||||
|
||||
@@ -63,9 +63,13 @@ class ResourcePoolConfig(object):
|
||||
metric_str = self.CONFIG_TO_METRIC_STR_MAPPING[config_str]
|
||||
client = self.impala_service.create_hs2_client()
|
||||
client.set_configuration_option('request_pool', pool_name)
|
||||
# set mem_limit to something above the proc limit so that the query always gets
|
||||
# rejected.
|
||||
client.set_configuration_option('mem_limit', '128G')
|
||||
# set mem limit to something above the proc limit so that the query always gets
|
||||
# rejected. Use mem_limit_coordinators and mem_limit_executors because
|
||||
# they have precedent over pool mem limit. mem_limit option, on the other hand,
|
||||
# is clamped to the pool's min and max mem limit (see
|
||||
# ScheduleState::UpdateMemoryRequirements).
|
||||
client.set_configuration_option('mem_limit_coordinators', '128G')
|
||||
client.set_configuration_option('mem_limit_executors', '128G')
|
||||
client.set_configuration_option("enable_trivial_query_for_admission", "false")
|
||||
metric_key = "admission-controller.{0}.root.{1}".format(metric_str, pool_name)
|
||||
start_time = time()
|
||||
|
||||
@@ -1273,7 +1273,7 @@ class TestAdmissionController(TestAdmissionControllerBase):
|
||||
@CustomClusterTestSuite.with_args(
|
||||
impalad_args=impalad_admission_ctrl_config_args(
|
||||
fs_allocation_file="mem-limit-test-fair-scheduler.xml",
|
||||
llama_site_file="mem-limit-test-llama-site.xml", make_copy=True),
|
||||
llama_site_file="mem-limit-test-llama-site.xml"),
|
||||
statestored_args=_STATESTORED_ARGS)
|
||||
def test_pool_mem_limit_configs(self, vector):
|
||||
"""Runs functional tests for the max/min_query_mem_limit pool config attributes"""
|
||||
@@ -1453,7 +1453,9 @@ class TestAdmissionController(TestAdmissionControllerBase):
|
||||
impalad_args=impalad_admission_ctrl_config_args(
|
||||
fs_allocation_file="mem-limit-test-fair-scheduler.xml",
|
||||
llama_site_file="mem-limit-test-llama-site.xml",
|
||||
additional_args="-default_pool_max_requests 1", make_copy=True),
|
||||
additional_args=("-clamp_query_mem_limit_backend_mem_limit=false "
|
||||
"-default_pool_max_requests 1"),
|
||||
make_copy=True),
|
||||
statestored_args=_STATESTORED_ARGS)
|
||||
def test_pool_config_change_while_queued(self):
|
||||
"""Tests that the invalid checks work even if the query is queued. Makes sure that a
|
||||
@@ -1463,8 +1465,15 @@ class TestAdmissionController(TestAdmissionControllerBase):
|
||||
# by not involving BufferedPlanRootSink.
|
||||
self.client.set_configuration_option('spool_query_results', 'false')
|
||||
|
||||
# Instantiate ResourcePoolConfig modifier and initialize 'max-query-mem-limit' to 0
|
||||
# (unclamped).
|
||||
pool_name = "invalidTestPool"
|
||||
config_str = "max-query-mem-limit"
|
||||
llama_site_path = os.path.join(RESOURCES_DIR, "copy-mem-limit-test-llama-site.xml")
|
||||
config = ResourcePoolConfig(
|
||||
self.cluster.impalads[0].service, self.get_ac_process().service, llama_site_path)
|
||||
config.set_config_value(pool_name, config_str, 0)
|
||||
|
||||
self.client.set_configuration_option('request_pool', pool_name)
|
||||
self.client.set_configuration_option('enable_trivial_query_for_admission', 'false')
|
||||
# Setup to queue a query.
|
||||
@@ -1475,10 +1484,9 @@ class TestAdmissionController(TestAdmissionControllerBase):
|
||||
queued_query_handle = self.client.execute_async("select 2")
|
||||
self._wait_for_change_to_profile(queued_query_handle, "Admission result: Queued")
|
||||
|
||||
# Change config to be invalid.
|
||||
llama_site_path = os.path.join(RESOURCES_DIR, "copy-mem-limit-test-llama-site.xml")
|
||||
config = ResourcePoolConfig(
|
||||
self.cluster.impalads[0].service, self.get_ac_process().service, llama_site_path)
|
||||
# Change config to be invalid (max-query-mem-limit < min-query-mem-limit).
|
||||
# impala.admission-control.min-query-mem-limit.root.invalidTestPool is set to
|
||||
# 25MB in mem-limit-test-llama-site.xml.
|
||||
config.set_config_value(pool_name, config_str, 1)
|
||||
# Close running query so the queued one gets a chance.
|
||||
self.client.close_query(sleep_query_handle)
|
||||
@@ -1499,7 +1507,8 @@ class TestAdmissionController(TestAdmissionControllerBase):
|
||||
"select * from functional_parquet.alltypes limit 1")
|
||||
self._wait_for_change_to_profile(queued_query_handle, "Admission result: Queued")
|
||||
# Change config to something less than what is required to accommodate the
|
||||
# largest min_reservation (which in this case is 32.09 MB.
|
||||
# largest min_reservation (which in this case is 32.09 MB).
|
||||
# Setting max-query-mem-limit = min-query-mem-limit = 25MB is sufficient.
|
||||
config.set_config_value(pool_name, config_str, 25 * 1024 * 1024)
|
||||
# Close running query so the queued one gets a chance.
|
||||
self.client.close_query(sleep_query_handle)
|
||||
@@ -1508,6 +1517,9 @@ class TestAdmissionController(TestAdmissionControllerBase):
|
||||
self.client.wait_for_impala_state(queued_query_handle, ERROR, 20),
|
||||
self.client.close_query(queued_query_handle)
|
||||
|
||||
# Change the config back to a valid value
|
||||
config.set_config_value(pool_name, config_str, 0)
|
||||
|
||||
@pytest.mark.execute_serially
|
||||
@CustomClusterTestSuite.with_args(
|
||||
impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1,
|
||||
|
||||
Reference in New Issue
Block a user