diff --git a/be/src/scheduling/admission-control-service.cc b/be/src/scheduling/admission-control-service.cc index 43d3b7f30..dcdf4e816 100644 --- a/be/src/scheduling/admission-control-service.cc +++ b/be/src/scheduling/admission-control-service.cc @@ -217,7 +217,7 @@ void AdmissionControlService::ReleaseQuery(const ReleaseQueryRequestPB* req, lock_guard l(admission_state->lock); if (!admission_state->released) { AdmissiondEnv::GetInstance()->admission_controller()->ReleaseQuery( - req->query_id(), req->peak_mem_consumption()); + req->query_id(), admission_state->coord_id, req->peak_mem_consumption()); admission_state->released = true; } else { LOG(WARNING) << "Query " << req->query_id() << " was already released."; @@ -252,7 +252,7 @@ void AdmissionControlService::ReleaseQueryBackends( } AdmissiondEnv::GetInstance()->admission_controller()->ReleaseQueryBackends( - req->query_id(), host_addrs); + req->query_id(), admission_state->coord_id, host_addrs); } RespondAndReleaseRpc(Status::OK(), resp, rpc_context); @@ -267,6 +267,27 @@ void AdmissionControlService::CancelAdmission(const CancelAdmissionRequestPB* re RespondAndReleaseRpc(Status::OK(), resp, rpc_context); } +void AdmissionControlService::AdmissionHeartbeat(const AdmissionHeartbeatRequestPB* req, + AdmissionHeartbeatResponsePB* resp, kudu::rpc::RpcContext* rpc_context) { + VLOG(2) << "AdmissionHeartbeat: host_id=" << req->host_id(); + + std::unordered_set query_ids; + for (const UniqueIdPB& query_id : req->query_ids()) { + query_ids.insert(query_id); + } + vector cleaned_up = + AdmissiondEnv::GetInstance()->admission_controller()->CleanupQueriesForHost( + req->host_id(), query_ids); + + for (const UniqueIdPB& query_id : cleaned_up) { + // ShardedQueryMap::Delete will log an error already if anything goes wrong, so just + // ignore the return value. + discard_result(admission_state_map_.Delete(query_id)); + } + + RespondAndReleaseRpc(Status::OK(), resp, rpc_context); +} + void AdmissionControlService::AdmitFromThreadPool(UniqueIdPB query_id) { shared_ptr admission_state; Status s = admission_state_map_.Get(query_id, &admission_state); diff --git a/be/src/scheduling/admission-control-service.h b/be/src/scheduling/admission-control-service.h index 23ebce656..93d8bd47b 100644 --- a/be/src/scheduling/admission-control-service.h +++ b/be/src/scheduling/admission-control-service.h @@ -63,6 +63,8 @@ class AdmissionControlService : public AdmissionControlServiceIf, ReleaseQueryBackendsResponsePB* resp, kudu::rpc::RpcContext* context) override; virtual void CancelAdmission(const CancelAdmissionRequestPB* req, CancelAdmissionResponsePB* resp, kudu::rpc::RpcContext* context) override; + virtual void AdmissionHeartbeat(const AdmissionHeartbeatRequestPB* req, + AdmissionHeartbeatResponsePB* resp, kudu::rpc::RpcContext* context) override; /// Gets a AdmissionControlService proxy to the configured admission control service. /// The newly created proxy is returned in 'proxy'. Returns error status on failure. diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc index 7c53ecc4d..208d9d2d8 100644 --- a/be/src/scheduling/admission-controller.cc +++ b/be/src/scheduling/admission-controller.cc @@ -683,11 +683,14 @@ void AdmissionController::PoolStats::ReleaseQuery(int64_t peak_mem_consumption) DCHECK_GE(agg_num_running_, 0); // Update the 'peak_mem_histogram' based on the given peak memory consumption of the - // query. - int64_t histogram_bucket = - BitUtil::RoundUp(peak_mem_consumption, HISTOGRAM_BIN_SIZE) / HISTOGRAM_BIN_SIZE; - histogram_bucket = std::max(std::min(histogram_bucket, HISTOGRAM_NUM_OF_BINS), 1L) - 1; - peak_mem_histogram_[histogram_bucket] = ++(peak_mem_histogram_[histogram_bucket]); + // query, if provided. + if (peak_mem_consumption != -1) { + int64_t histogram_bucket = + BitUtil::RoundUp(peak_mem_consumption, HISTOGRAM_BIN_SIZE) / HISTOGRAM_BIN_SIZE; + histogram_bucket = + std::max(std::min(histogram_bucket, HISTOGRAM_NUM_OF_BINS), 1L) - 1; + peak_mem_histogram_[histogram_bucket] = ++(peak_mem_histogram_[histogram_bucket]); + } } void AdmissionController::PoolStats::ReleaseMem(int64_t mem_to_release) { @@ -1210,7 +1213,7 @@ Status AdmissionController::SubmitForAdmission(const AdmissionRequest& request, return Status::CANCELLED; } VLOG_QUERY << "Admitting query id=" << PrintId(request.query_id); - AdmitQuery(queue_node->admitted_schedule.get(), false); + AdmitQuery(queue_node, false); stats->UpdateWaitTime(0); VLOG_RPC << "Final: " << stats->DebugString(); *schedule_result = move(queue_node->admitted_schedule->query_schedule_pb()); @@ -1343,14 +1346,24 @@ Status AdmissionController::WaitOnQueued(const UniqueIdPB& query_id, } } -void AdmissionController::ReleaseQuery( - const UniqueIdPB& query_id, int64_t peak_mem_consumption) { +void AdmissionController::ReleaseQuery(const UniqueIdPB& query_id, + const UniqueIdPB& coord_id, int64_t peak_mem_consumption) { { lock_guard lock(admission_ctrl_lock_); - auto it = running_queries_.find(query_id); - if (it == running_queries_.end()) { - LOG(DFATAL) << "Unable to find resources to release for query " - << PrintId(query_id); + auto host_it = running_queries_.find(coord_id); + if (host_it == running_queries_.end()) { + LOG(DFATAL) << "Unable to find host " << PrintId(coord_id) + << " to get resources to release for query " << PrintId(query_id) + << ", may have already been released."; + return; + } + auto it = host_it->second.find(query_id); + if (it == host_it->second.end()) { + // In the context of the admission control service, this may happen, eg. if a + // ReleaseQuery rpc is reported as failed to the coordinator but actually ends up + // arriving much later, so only log at WARNING level. + LOG(WARNING) << "Unable to find resources to release for query " + << PrintId(query_id) << ", may have already been released."; return; } @@ -1365,19 +1378,26 @@ void AdmissionController::ReleaseQuery( UpdateExecGroupMetric(running_query.executor_group, -1); VLOG_RPC << "Released query id=" << PrintId(query_id) << " " << stats->DebugString(); pending_dequeue_ = true; - running_queries_.erase(it); + host_it->second.erase(it); } dequeue_cv_.NotifyOne(); } -void AdmissionController::ReleaseQueryBackends( - const UniqueIdPB& query_id, const vector& host_addrs) { +void AdmissionController::ReleaseQueryBackends(const UniqueIdPB& query_id, + const UniqueIdPB& coord_id, const vector& host_addrs) { { lock_guard lock(admission_ctrl_lock_); - auto it = running_queries_.find(query_id); - if (it == running_queries_.end()) { - LOG(DFATAL) << "Unable to find resources to release for query backends " - << PrintId(query_id); + auto host_it = running_queries_.find(coord_id); + if (host_it == running_queries_.end()) { + LOG(DFATAL) << "Unable to find host " << PrintId(coord_id) + << " to get resources to release backends for query " + << PrintId(query_id) << ", may have already been released."; + return; + } + auto it = host_it->second.find(query_id); + if (it == host_it->second.end()) { + LOG(DFATAL) << "Unable to find resources to release backends for query " + << PrintId(query_id) << ", may have already been released."; return; } @@ -1408,6 +1428,37 @@ void AdmissionController::ReleaseQueryBackends( dequeue_cv_.NotifyOne(); } +vector AdmissionController::CleanupQueriesForHost( + const UniqueIdPB& coord_id, const std::unordered_set query_ids) { + vector to_clean_up; + { + lock_guard lock(admission_ctrl_lock_); + auto host_it = running_queries_.find(coord_id); + if (host_it == running_queries_.end()) { + // This is expected if a coordinator has not submitted any queries yet, eg. at + // startup, so we log at a higher level to avoid log spam. + VLOG(3) << "Unable to find host " << PrintId(coord_id) + << " to cleanup queries for."; + return to_clean_up; + } + for (auto entry : host_it->second) { + const UniqueIdPB& query_id = entry.first; + auto it = query_ids.find(query_id); + if (it == query_ids.end()) { + to_clean_up.push_back(query_id); + } + } + } + + for (const UniqueIdPB& query_id : to_clean_up) { + LOG(INFO) << "Releasing resources for query " << PrintId(query_id) + << " as it's coordinator " << PrintId(coord_id) + << " reports that it is no longer registered."; + ReleaseQuery(query_id, coord_id, -1); + } + return to_clean_up; +} + Status AdmissionController::ResolvePoolAndGetConfig( const TQueryCtx& query_ctx, string* pool_name, TPoolConfig* pool_config) { RETURN_IF_ERROR(request_pool_service_->ResolveRequestPool(query_ctx, pool_name)); @@ -1877,7 +1928,7 @@ void AdmissionController::DequeueLoop() { DCHECK(!is_cancelled); DCHECK(!is_rejected); DCHECK(queue_node->admitted_schedule != nullptr); - AdmitQuery(queue_node->admitted_schedule.get(), true); + AdmitQuery(queue_node, true); } pools_for_updates_.insert(pool_name); } @@ -1949,7 +2000,8 @@ AdmissionController::PoolStats* AdmissionController::GetPoolStats( return &it->second; } -void AdmissionController::AdmitQuery(ScheduleState* state, bool was_queued) { +void AdmissionController::AdmitQuery(QueueNode* node, bool was_queued) { + ScheduleState* state = node->admitted_schedule.get(); VLOG_RPC << "For Query " << PrintId(state->query_id()) << " per_backend_mem_limit set to: " << PrintBytes(state->per_backend_mem_limit()) @@ -1987,8 +2039,16 @@ void AdmissionController::AdmitQuery(ScheduleState* state, bool was_queued) { num_released_backends_[state->query_id()] = state->per_backend_schedule_states().size(); // Store info about the admitted resources so that we can release them. - DCHECK(running_queries_.find(state->query_id()) == running_queries_.end()); - RunningQuery& running_query = running_queries_[state->query_id()]; + auto it = running_queries_.find(node->admission_request.coord_id); + if (it == running_queries_.end()) { + auto insert_result = + running_queries_.insert(make_pair(node->admission_request.coord_id, + std::unordered_map())); + DCHECK(insert_result.second); + it = insert_result.first; + } + DCHECK(it->second.find(state->query_id()) == it->second.end()); + RunningQuery& running_query = it->second[state->query_id()]; running_query.request_pool = state->request_pool(); running_query.executor_group = state->executor_group(); for (const auto& entry : state->per_backend_schedule_states()) { diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h index b2817304d..ed3f3bafa 100644 --- a/be/src/scheduling/admission-controller.h +++ b/be/src/scheduling/admission-controller.h @@ -20,6 +20,7 @@ #include #include +#include #include #include @@ -168,11 +169,30 @@ enum class AdmissionOutcome { /// When queries complete they must be explicitly released from the admission controller /// using the methods 'ReleaseQuery' and 'ReleaseQueryBackends'. These methods release /// the admitted memory and decrement the number of admitted queries for the resource -/// pool. All Backends for a query must be released via 'ReleaseQueryBackends' before the -/// query is released using 'ReleaseQuery'. Releasing Backends releases the admitted -/// memory used by that Backend and decrements the number of running queries on the host -/// running that Backend. Releasing a query does not release any admitted memory, it only -/// decrements the number of running queries in the resource pool. +/// pool. +/// +/// In the traditional distributed admission control mode, it is required that all +/// backends for a query must be released via 'ReleaseQueryBackends' then the query is +/// released using 'ReleaseQuery'. This is possible to guarantee since the coordinator +/// and AdmissionController are running in the same process. +/// +/// In the admission control service mode, more flexibility is allowed to maintain fault +/// tolerance in the case of rpc failures between coordinators and the admissiond. In this +/// case, proper resource accounting is ensured with two invariants: 1) the aggregate +/// values of resources in use always matches the contents of 'running_queries_" 2) any +/// query will always eventually be removed from 'running_queries_' and have all of its +/// resources released, regardless of any failures. +/// There are a few failure cases to consider: +/// - ReleaseQuery rpc fails: coordinators periodically send a list of registered query +/// ids via a heartbeat rpc, allowing the admission contoller to clean up any queries +/// that are not in that list. +/// - TODO(IMPALA-10594): handle the case of coordinators failing +/// - TODO(IMPALA-10591): handle the case of a ReleaseQueryBackends rpc failing +/// +/// Releasing Backends releases the admitted memory used by that Backend and decrements +/// the number of running queries on the host running that Backend. Releasing a query does +/// not release any admitted memory, it only decrements the number of running queries in +/// the resource pool. /// /// Executor Groups: /// Executors in a cluster can be assigned to executor groups. Each executor can only be @@ -358,19 +378,28 @@ class AdmissionController { /// Updates the pool statistics when a query completes (either successfully, /// is cancelled or failed). This should be called for all requests that have - /// been submitted via AdmitQuery(). 'query_id' is the completed query and - /// 'peak_mem_consumption' is the peak memory consumption of the query. + /// been submitted via AdmitQuery(). 'query_id' is the completed query, 'coord_id' is + /// the backend id of the coordinator for the query, and 'peak_mem_consumption' is the + /// peak memory consumption of the query, which may be -1 if unavailable. /// This does not block. - void ReleaseQuery(const UniqueIdPB& query_id, int64_t peak_mem_consumption); + void ReleaseQuery(const UniqueIdPB& query_id, const UniqueIdPB& coord_id, + int64_t peak_mem_consumption); /// Updates the pool statistics when a Backend running a query completes (either /// successfully, is cancelled or failed). This should be called for all Backends part /// of a query for all queries that have been submitted via AdmitQuery(). - /// 'query_id' is the associated query and the vector of NetworkAddressPBs identify the - /// completed Backends. + /// 'query_id' is the associated query, 'coord_id' is the backend id of the coordinator + /// for the query, and the vector of NetworkAddressPBs identify the completed Backends. /// This does not block. - void ReleaseQueryBackends( - const UniqueIdPB& query_id, const vector& host_addr); + void ReleaseQueryBackends(const UniqueIdPB& query_id, const UniqueIdPB& coord_id, + const vector& host_addr); + + /// Releases the resources for any queries that were scheduled for the coordinator + /// 'coord_id' that are not in the list 'query_ids'. Only used in the context of the + /// admission control service. Returns a list of the queries that had their resources + /// released. + std::vector CleanupQueriesForHost( + const UniqueIdPB& coord_id, const std::unordered_set query_ids); /// Registers the request queue topic with the statestore, starts up the dequeue thread /// and registers a callback with the cluster membership manager to receive updates for @@ -822,11 +851,12 @@ class AdmissionController { std::unordered_map per_backend_resources; }; - /// Map from query id of currently running queries to information about the resources - /// that were allocated to them. Used to properly account for resources when releasing - /// queries. + /// Map from host id to a map from query id of currently running queries to information + /// about the resources that were allocated to them. Used to properly account for + /// resources when releasing queries. /// Protected by admission_ctrl_lock_. - std::unordered_map running_queries_; + std::unordered_map> + running_queries_; /// Map of pool names to the pool configs returned by request_pool_service_. Stored so /// that the dequeue thread does not need to access the configs via the request pool @@ -1020,7 +1050,7 @@ class AdmissionController { /// Sets the per host mem limit and mem admitted in the schedule and does the necessary /// accounting and logging on successful submission. /// Caller must hold 'admission_ctrl_lock_'. - void AdmitQuery(ScheduleState* state, bool was_queued); + void AdmitQuery(QueueNode* node, bool was_queued); /// Same as PoolToJson() but requires 'admission_ctrl_lock_' to be held by the caller. /// Is a helper method used by both PoolToJson() and AllPoolsToJson() diff --git a/be/src/scheduling/local-admission-control-client.cc b/be/src/scheduling/local-admission-control-client.cc index da62ee14d..faf47a582 100644 --- a/be/src/scheduling/local-admission-control-client.cc +++ b/be/src/scheduling/local-admission-control-client.cc @@ -50,13 +50,13 @@ Status LocalAdmissionControlClient::SubmitForAdmission( void LocalAdmissionControlClient::ReleaseQuery(int64_t peak_mem_consumption) { ExecEnv::GetInstance()->admission_controller()->ReleaseQuery( - query_id_, peak_mem_consumption); + query_id_, ExecEnv::GetInstance()->backend_id(), peak_mem_consumption); } void LocalAdmissionControlClient::ReleaseQueryBackends( const vector& host_addrs) { ExecEnv::GetInstance()->admission_controller()->ReleaseQueryBackends( - query_id_, host_addrs); + query_id_, ExecEnv::GetInstance()->backend_id(), host_addrs); } void LocalAdmissionControlClient::CancelAdmission() { diff --git a/be/src/scheduling/remote-admission-control-client.h b/be/src/scheduling/remote-admission-control-client.h index 92ddd6575..1cc3b5fdf 100644 --- a/be/src/scheduling/remote-admission-control-client.h +++ b/be/src/scheduling/remote-admission-control-client.h @@ -36,7 +36,17 @@ namespace impala { class AdmissionControlServiceProxy; /// Implementation of AdmissionControlClient used to submit queries for admission to an -/// AdmissionController running locally on the coordinator. +/// AdmissionController running remotely in an admissiond. +/// +/// Handles retrying of rpcs for fault tolerance: +/// - For the AdmitQuery() rpc, retries with jitter and backoff for a configurable amount +/// of time, then fails the query if unsuccessful. The default retry time was chosen as +/// a larger value (60 seconds) to minimize the number of failed queries when the +/// admissiond is restarted or temporarily unavailable. +/// - For the ReleaseQuery(), ReleaseQueryBackends(), and CancelAdmission() rpcs, retries +/// just 3 times before giving up. Failures of these rpcs are not considered to fail the +/// overall query, and there are other mechanisms in place to ensure resources are +/// eventually released regardless of failures of these rpcs, eg. AdmissionHeartbeat. class RemoteAdmissionControlClient : public AdmissionControlClient { public: RemoteAdmissionControlClient(const TQueryCtx& query_ctx); diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index b460d2518..b5626e2fc 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -57,7 +57,9 @@ #include "exec/external-data-source-executor.h" #include "exprs/timezone_db.h" #include "gen-cpp/CatalogService_constants.h" +#include "gen-cpp/admission_control_service.proxy.h" #include "kudu/rpc/rpc_context.h" +#include "kudu/rpc/rpc_controller.h" #include "kudu/util/random_util.h" #include "rpc/authentication.h" #include "rpc/rpc-mgr.h" @@ -72,6 +74,7 @@ #include "runtime/timestamp-value.h" #include "runtime/timestamp-value.inline.h" #include "runtime/tmp-file-mgr.h" +#include "scheduling/admission-control-service.h" #include "scheduling/admission-controller.h" #include "service/cancellation-work.h" #include "service/client-request-state.h" @@ -335,6 +338,11 @@ DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false, "be converted from UTC to local time. Writes are unaffected. " "Can be overriden with the query option with the same name."); +DEFINE_int32(admission_heartbeat_frequency_ms, 1000, + "(Advanced) The time in milliseconds to wait between sending heartbeats to the " + "admission service, if enabled. Heartbeats are used to ensure resources are properly " + "accounted for even if rpcs to the admission service occasionally fail."); + namespace { using namespace impala; @@ -536,6 +544,11 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env) bind(&ImpalaServer::UnresponsiveBackendThread, this), &unresponsive_backend_thread_)); } + if (exec_env_->AdmissionServiceEnabled()) { + ABORT_IF_ERROR(Thread::Create("impala-server", "admission-heartbeat-thread", + bind(&ImpalaServer::AdmissionHeartbeatThread, this), + &admission_heartbeat_thread_)); + } is_coordinator_ = FLAGS_is_coordinator; is_executor_ = FLAGS_is_executor; @@ -2723,6 +2736,41 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) { } } +[[noreturn]] void ImpalaServer::AdmissionHeartbeatThread() { + while (true) { + SleepForMs(FLAGS_admission_heartbeat_frequency_ms); + std::unique_ptr proxy; + Status get_proxy_status = AdmissionControlService::GetProxy(&proxy); + if (!get_proxy_status.ok()) { + LOG(ERROR) << "Admission heartbeat thread was unabe to get an " + "AdmissionControlService proxy:" + << get_proxy_status; + continue; + } + + AdmissionHeartbeatRequestPB request; + AdmissionHeartbeatResponsePB response; + *request.mutable_host_id() = exec_env_->backend_id(); + query_driver_map_.DoFuncForAllEntries( + [&](const std::shared_ptr& query_driver) { + ClientRequestState* request_state = query_driver->GetActiveClientRequestState(); + TUniqueIdToUniqueIdPB(request_state->query_id(), request.add_query_ids()); + }); + + kudu::rpc::RpcController rpc_controller; + kudu::Status rpc_status = + proxy->AdmissionHeartbeat(request, &response, &rpc_controller); + if (!rpc_status.ok()) { + LOG(ERROR) << "Admission heartbeat rpc failed: " << rpc_status.ToString(); + continue; + } + Status heartbeat_status(response.status()); + if (!heartbeat_status.ok()) { + LOG(ERROR) << "Admission heartbeat failed: " << heartbeat_status; + } + } +} + Status ImpalaServer::CheckResourceLimits(ClientRequestState* crs) { Coordinator* coord = crs->GetCoordinator(); // Coordinator may be null if query has not started executing, check again later. diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h index a98c2b42e..4b74d4b34 100644 --- a/be/src/service/impala-server.h +++ b/be/src/service/impala-server.h @@ -1153,6 +1153,10 @@ class ImpalaServer : public ImpalaServiceIf, /// status report in greater than GetMaxReportRetryMs(). [[noreturn]] void UnresponsiveBackendThread(); + /// If the admission control service is enabled, periodically sends a list of all + /// current query ids to the admissiond. + [[noreturn]] void AdmissionHeartbeatThread(); + /// Called from ExpireQueries() to check query resource limits for 'crs'. If the query /// exceeded a resource limit, returns a non-OK status with information about what /// limit was exceeded. Returns OK if the query will continue running and expiration @@ -1254,6 +1258,9 @@ class ImpalaServer : public ImpalaServiceIf, /// Thread that runs UnresponsiveBackendThread(). std::unique_ptr unresponsive_backend_thread_; + /// Thread that runs AdmissionHeartbeatThread(). + std::unique_ptr admission_heartbeat_thread_; + /// The QueryDriverMap maps query ids to QueryDrivers. The QueryDrivers are owned by the /// ImpalaServer and QueryDriverMap references them using shared_ptr to allow /// asynchronous deletion. diff --git a/common/protobuf/admission_control_service.proto b/common/protobuf/admission_control_service.proto index 140a47ea4..798f74c40 100644 --- a/common/protobuf/admission_control_service.proto +++ b/common/protobuf/admission_control_service.proto @@ -220,6 +220,18 @@ message CancelAdmissionResponsePB { optional StatusPB status = 1; } +message AdmissionHeartbeatRequestPB { + // The backend id for the coordinator sending this heartbeat. + optional UniqueIdPB host_id = 1; + + // A list of all queries registered at this coordinator. + repeated UniqueIdPB query_ids = 2; +} + +message AdmissionHeartbeatResponsePB { + optional StatusPB status = 1; +} + service AdmissionControlService { /// Called by the coordinator to start scheduling. The actual work is done on a thread /// pool, so this call returns immedately. Idempotent - if the query has already been @@ -250,4 +262,13 @@ service AdmissionControlService { /// Called by the coordinator to cancel scheduling of a query for which GetQueryStatus /// has not yet returned a schedule. rpc CancelAdmission(CancelAdmissionRequestPB) returns (CancelAdmissionResponsePB); + + /// Used to ensure that the admission service and coordinator have a consistent view of + /// what resources are being used even in the face of possible rpc failures. + /// Periodically called by each coordinator with a list of query ids for all queries at + /// that coordinator. If the admissiond has resources allocated to a query that is not + /// included in the list, it assumes the query has completed and releases it's remaining + /// resources. + rpc AdmissionHeartbeat(AdmissionHeartbeatRequestPB) + returns (AdmissionHeartbeatResponsePB); } diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py index f012ae23c..c445e0fa3 100644 --- a/tests/custom_cluster/test_admission_controller.py +++ b/tests/custom_cluster/test_admission_controller.py @@ -1103,10 +1103,10 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): # Setup to queue a query. sleep_query_handle = self.client.execute_async("select sleep(10000)") self.client.wait_for_admission_control(sleep_query_handle) - self.__wait_for_change_to_profile(sleep_query_handle, + self._wait_for_change_to_profile(sleep_query_handle, "Admission result: Admitted immediately") queued_query_handle = self.client.execute_async("select 2") - self.__wait_for_change_to_profile(queued_query_handle, "Admission result: Queued") + self._wait_for_change_to_profile(queued_query_handle, "Admission result: Queued") # Change config to be invalid. llama_site_path = os.path.join(RESOURCES_DIR, "copy-mem-limit-test-llama-site.xml") @@ -1130,7 +1130,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): self.client.wait_for_admission_control(sleep_query_handle) queued_query_handle = self.client.execute_async( "select * from functional_parquet.alltypes limit 1") - self.__wait_for_change_to_profile(queued_query_handle, "Admission result: Queued") + self._wait_for_change_to_profile(queued_query_handle, "Admission result: Queued") # Change config to something less than the what is required to accommodate the # largest min_reservation (which in this case is 32.09 MB. config.set_config_value(pool_name, config_str, 25 * 1024 * 1024) @@ -1141,7 +1141,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): self.wait_for_state(queued_query_handle, QueryState.EXCEPTION, 20), self.close_query(queued_query_handle) - def __wait_for_change_to_profile(self, query_handle, search_string, timeout=20): + def _wait_for_change_to_profile(self, query_handle, search_string, timeout=20): for _ in range(timeout * 10): profile = self.client.get_runtime_profile(query_handle) if search_string in profile: @@ -1395,6 +1395,36 @@ class TestAdmissionControllerWithACService(TestAdmissionController): except ImpalaBeeswaxException as e: assert "Failed to admit query after waiting " in str(e) + @SkipIfNotHdfsMinicluster.tuned_for_minicluster + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args="--vmodule admission-controller=3 --default_pool_max_requests=1 " + "--debug_actions=IMPALA_SERVICE_POOL:127.0.0.1:29500:ReleaseQuery:FAIL@1.0") + def test_release_query_failed(self): + """Tests that if the ReleaseQuery rpc fails, the query's resources will eventually be + cleaned up. Uses the --debug_action flag to simulate rpc failures, and sets max + requests for the default pool as the number of requests per pool is decremented when + the entire query is released.""" + # Query designed to run for a few minutes. + query = "select count(*) from functional.alltypes where int_col = sleep(10000)" + handle1 = self.execute_query_async(query) + timeout_s = 10 + # Make sure the first query has been admitted. + self.wait_for_state( + handle1, self.client.QUERY_STATES['RUNNING'], timeout_s) + + # Run another query. This query should be queued because only 1 query is allowed in + # the default pool. + handle2 = self.execute_query_async(query) + self._wait_for_change_to_profile(handle2, "Admission result: Queued") + + # Cancel the first query. It's resources should be released and the second query + # should be admitted. + self.client.cancel(handle1) + self.client.close_query(handle1) + self.wait_for_state( + handle2, self.client.QUERY_STATES['RUNNING'], timeout_s) + class TestAdmissionControllerStress(TestAdmissionControllerBase): """Submits a number of queries (parameterized) with some delay between submissions (parameterized) and the ability to submit to one impalad or many in a round-robin