mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
IMPALA-14493: Cap memory usage of global admission service
The global admission service can experience OOM errors under high concurrency because its process memory tracker is inaccurate and doesn't account for all memory allocations. Ensuring memory tracker accurately accounts for every allocation could be difficult, this patch uses a simpler solution to introduce a hard memory cap using tcmalloc statistics, which accurately reflect the true process memory usage. If a new query is submitted while tcmalloc memory usage is over the process limit, the query will be rejected immediately to protect from OOM. Adds a new flag enable_admission_service_mem_safeguard allowing this feature to be enabled or disabled. By default, this feature is turned on Tests: Added test test_admission_service_low_mem_limit. Passed exhaustive tests. Change-Id: I2ee2c942a73fcd69358851fc2fdc0fc4fe531c73 Reviewed-on: http://gerrit.cloudera.org:8080/23542 Reviewed-by: Abhishek Rawat <arawat@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
@@ -26,6 +26,7 @@
|
||||
#include "runtime/bufferpool/reservation-util.h"
|
||||
#include "runtime/exec-env.h"
|
||||
#include "runtime/mem-tracker.h"
|
||||
#include "scheduling/admissiond-env.h"
|
||||
#include "scheduling/cluster-membership-mgr.h"
|
||||
#include "scheduling/executor-group.h"
|
||||
#include "scheduling/schedule-state.h"
|
||||
@@ -36,6 +37,7 @@
|
||||
#include "util/bit-util.h"
|
||||
#include "util/collection-metrics.h"
|
||||
#include "util/debug-util.h"
|
||||
#include "util/memory-metrics.h"
|
||||
#include "util/metrics.h"
|
||||
#include "util/pretty-printer.h"
|
||||
#include "util/runtime-profile-counters.h"
|
||||
@@ -266,6 +268,9 @@ const string REASON_NO_EXECUTOR_GROUPS =
|
||||
"Waiting for executors to start. Only DDL queries and queries scheduled only on the "
|
||||
"coordinator (either NUM_NODES set to 1 or when small query optimization is "
|
||||
"triggered) can currently run.";
|
||||
const string REASON_EXCEED_MEMORY_LIMIT =
|
||||
"Admission rejected due to memory pressure in admissiond. Current usage: $0 bytes, "
|
||||
"limit: $1 bytes";
|
||||
|
||||
// The name of the root pool.
|
||||
const string ROOT_POOL = "root";
|
||||
@@ -1646,6 +1651,21 @@ Status AdmissionController::SubmitForAdmission(const AdmissionRequest& request,
|
||||
return Status::Expected(rejected_msg);
|
||||
}
|
||||
|
||||
int64_t bytes_inuse = TcmallocMetric::BYTES_IN_USE->GetValue();
|
||||
if (!is_trivial && AdmissiondEnv::GetInstance() != nullptr
|
||||
&& AdmissiondEnv::GetInstance()->admission_service_mem_limit() > 0
|
||||
&& bytes_inuse > AdmissiondEnv::GetInstance()->admission_service_mem_limit()) {
|
||||
queue_node->not_admitted_reason = Substitute(REASON_EXCEED_MEMORY_LIMIT,
|
||||
bytes_inuse, AdmissiondEnv::GetInstance()->admission_service_mem_limit());
|
||||
request.summary_profile->AddInfoString(
|
||||
PROFILE_INFO_KEY_ADMISSION_RESULT, PROFILE_INFO_VAL_REJECTED);
|
||||
stats->metrics()->total_rejected->Increment(1);
|
||||
const ErrorMsg& rejected_msg = ErrorMsg(TErrorCode::ADMISSION_REJECTED,
|
||||
queue_node->pool_name, queue_node->not_admitted_reason);
|
||||
VLOG_QUERY << "query_id=" << PrintId(request.query_id) << " " << rejected_msg.msg();
|
||||
return Status::Expected(rejected_msg);
|
||||
}
|
||||
|
||||
string user;
|
||||
RETURN_IF_ERROR(GetEffectiveShortUser(
|
||||
queue_node->admission_request.request.query_ctx.session, &user));
|
||||
|
||||
@@ -30,6 +30,7 @@
|
||||
#include "util/mem-info.h"
|
||||
#include "util/memory-metrics.h"
|
||||
#include "util/metrics.h"
|
||||
#include "util/pretty-printer.h"
|
||||
#include "util/uid-util.h"
|
||||
|
||||
#include "common/names.h"
|
||||
@@ -45,6 +46,10 @@ DEFINE_validator(
|
||||
<< "' must be greater than 0 and less than or equal to 1000.";
|
||||
return false;
|
||||
});
|
||||
DEFINE_bool(enable_admission_service_mem_safeguard, true,
|
||||
"When true, enables a hard memory limit safeguard for the admission service. "
|
||||
"This rejects new queries if the in-use process memory from tcmalloc exceeds "
|
||||
"admission_service_mem_limit to prevent OOM.");
|
||||
|
||||
DECLARE_string(state_store_host);
|
||||
DECLARE_int32(state_store_port);
|
||||
@@ -107,6 +112,11 @@ Status AdmissiondEnv::Init() {
|
||||
new MemTracker(AggregateMemoryMetrics::TOTAL_USED, bytes_limit, "Process"));
|
||||
mem_tracker_->RegisterMetrics(
|
||||
DaemonEnv::GetInstance()->metrics(), "mem-tracker.process");
|
||||
if (FLAGS_enable_admission_service_mem_safeguard) {
|
||||
admission_mem_limit_ = bytes_limit;
|
||||
LOG(INFO) << "Set admission service memory limit to "
|
||||
<< PrettyPrinter::Print(admission_mem_limit_, TUnit::BYTES);
|
||||
}
|
||||
|
||||
http_handler_->RegisterHandlers(DaemonEnv::GetInstance()->webserver());
|
||||
if (DaemonEnv::GetInstance()->metrics_webserver() != nullptr) {
|
||||
|
||||
@@ -61,6 +61,7 @@ class AdmissiondEnv {
|
||||
RpcMgr* rpc_mgr() { return rpc_mgr_.get(); }
|
||||
Scheduler* scheduler() { return scheduler_.get(); }
|
||||
StatestoreSubscriber* subscriber() { return statestore_subscriber_.get(); }
|
||||
int64_t admission_service_mem_limit() { return admission_mem_limit_; }
|
||||
|
||||
private:
|
||||
static AdmissiondEnv* admissiond_env_;
|
||||
@@ -80,6 +81,11 @@ class AdmissiondEnv {
|
||||
std::unique_ptr<StatestoreSubscriber> statestore_subscriber_;
|
||||
|
||||
MetricGroup* rpc_metrics_ = nullptr;
|
||||
|
||||
/// Memory limit for the admission service. If admission_mem_limit_ is set to a value
|
||||
/// over 0, new admission requests are rejected when the tcmalloc in-use bytes are over
|
||||
/// this limit.
|
||||
int64_t admission_mem_limit_ = 0;
|
||||
};
|
||||
|
||||
} // namespace impala
|
||||
|
||||
@@ -1100,7 +1100,8 @@ class TestAdmissionController(TestAdmissionControllerBase):
|
||||
@CustomClusterTestSuite.with_args(
|
||||
impalad_args=impalad_admission_ctrl_flags(max_requests=10, max_queued=10,
|
||||
pool_max_mem=10 * 1024 * 1024, proc_mem_limit=2 * 1024 * 1024,
|
||||
queue_wait_timeout_ms=1000),
|
||||
queue_wait_timeout_ms=1000)
|
||||
+ " --enable_admission_service_mem_safeguard=false",
|
||||
statestored_args=_STATESTORED_ARGS)
|
||||
def test_timeout_reason_host_memory(self):
|
||||
self.client.set_configuration_option('enable_trivial_query_for_admission', 'false')
|
||||
@@ -1134,7 +1135,8 @@ class TestAdmissionController(TestAdmissionControllerBase):
|
||||
@CustomClusterTestSuite.with_args(
|
||||
impalad_args=impalad_admission_ctrl_flags(max_requests=10, max_queued=10,
|
||||
pool_max_mem=2 * 1024 * 1024, proc_mem_limit=20 * 1024 * 1024,
|
||||
queue_wait_timeout_ms=1000),
|
||||
queue_wait_timeout_ms=1000)
|
||||
+ " --enable_admission_service_mem_safeguard=false",
|
||||
statestored_args=_STATESTORED_ARGS)
|
||||
def test_timeout_reason_pool_memory(self):
|
||||
self.client.set_configuration_option('enable_trivial_query_for_admission', 'false')
|
||||
@@ -2297,6 +2299,23 @@ class TestAdmissionControllerWithACService(TestAdmissionController):
|
||||
client1.close()
|
||||
client2.close()
|
||||
|
||||
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
|
||||
@pytest.mark.execute_serially
|
||||
@CustomClusterTestSuite.with_args(
|
||||
impalad_args="--vmodule admission-controller=3 --mem_limit=10MB ")
|
||||
def test_admission_service_low_mem_limit(self):
|
||||
EXPECTED_REASON = "Admission rejected due to memory pressure"
|
||||
# Test whether it will fail for a normal query.
|
||||
failed_query_handle = self.client.execute_async(
|
||||
"select * from functional_parquet.alltypes limit 100")
|
||||
self.client.wait_for_impala_state(failed_query_handle, ERROR, 20)
|
||||
profile = self.client.get_runtime_profile(failed_query_handle)
|
||||
assert EXPECTED_REASON in profile, \
|
||||
"Expected reason '{0}' not found in profile: {1}".format(EXPECTED_REASON, profile)
|
||||
self.client.close_query(failed_query_handle)
|
||||
# Test it should pass all the trivial queries.
|
||||
self._test_trivial_queries_suc()
|
||||
|
||||
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
|
||||
@pytest.mark.execute_serially
|
||||
def test_retained_removed_coords_size(self):
|
||||
|
||||
Reference in New Issue
Block a user