mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
This patch improves REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION error message by saying the specific configuration that must be adjusted such that the query can pass the Admission Control. New fields 'per_backend_mem_to_admit_source' and 'coord_backend_mem_to_admit_source' of type MemLimitSourcePB are added into QuerySchedulePB. These fields explain what limiting factor drives final numbers at 'per_backend_mem_to_admit' and 'coord_backend_mem_to_admit' respectively. In turn, Admission Control will use this information to compose a more informative error message that the user can act upon. The new error message pattern also explicitly mentions "Per Host Min Memory Reservation" as a place to look at to investigate memory reservations scheduled for each backend node. Updated documentation with examples of query rejection by Admission Control and how to read the error message. Testing: - Add BE tests at admission-controller-test.cc - Adjust and pass affected EE tests Change-Id: I1ef7fb7e7a194b2036c2948639a06c392590bf66 Reviewed-on: http://gerrit.cloudera.org:8080/21436 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
339 lines
13 KiB
Protocol Buffer
339 lines
13 KiB
Protocol Buffer
// Licensed to the Apache Software Foundation (ASF) under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing,
|
|
// software distributed under the License is distributed on an
|
|
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
// KIND, either express or implied. See the License for the
|
|
// specific language governing permissions and limitations
|
|
// under the License.
|
|
//
|
|
|
|
syntax = "proto2";
|
|
|
|
package impala;
|
|
|
|
import "common.proto";
|
|
import "control_service.proto";
|
|
import "statestore_service.proto";
|
|
|
|
// Execution parameters for a single fragment instance. Used to assemble the
|
|
// TPlanFragmentInstanceCtx/PlanFragmentInstanceCtxPB.
|
|
message FInstanceExecParamsPB {
|
|
// The fragment instance id.
|
|
optional UniqueIdPB instance_id = 1;
|
|
|
|
// Ordinal number of the corresponding fragment in the query, i.e. TPlanFragment.idx.
|
|
optional int32 fragment_idx = 2;
|
|
|
|
// Map from plan node id to a list of scan ranges.
|
|
map<int32, ScanRangesPB> per_node_scan_ranges = 5;
|
|
|
|
// 0-based ordinal number of this particular instance. This is within its fragment, not
|
|
// query-wide, so eg. there will be one instance '0' for each fragment.
|
|
optional int32 per_fragment_instance_idx = 6;
|
|
|
|
// In its role as a data sender, a fragment instance is assigned a "sender id" to
|
|
// uniquely identify it to a receiver. -1 = invalid.
|
|
optional int32 sender_id = 7 [default = -1];
|
|
|
|
// List of input join build finstances for joins in this finstance.
|
|
repeated JoinBuildInputPB join_build_inputs = 8;
|
|
|
|
// If this is a join build fragment, the number of fragment instances that consume the
|
|
// join build. -1 = invalid.
|
|
optional int32 num_join_build_outputs = 9 [default = -1];
|
|
}
|
|
|
|
// Execution parameters for a single backend. Used to construct the
|
|
// Coordinator::BackendStates.
|
|
message BackendExecParamsPB {
|
|
// The id of this backend.
|
|
optional UniqueIdPB backend_id = 1;
|
|
|
|
// The hostname + port of the KRPC backend service on this backend.
|
|
optional NetworkAddressPB address = 8;
|
|
|
|
// The IP address + port of the KRPC backend service on this backend.
|
|
optional NetworkAddressPB krpc_address = 9;
|
|
|
|
// The fragment instance params assigned to this backend. All instances of a
|
|
// particular fragment are contiguous in this list. This can be empty only for the
|
|
// coordinator backend, that is, if 'is_coord_backend' is true.
|
|
repeated FInstanceExecParamsPB instance_params = 2;
|
|
|
|
// The minimum query-wide buffer reservation size (in bytes) required for this backend.
|
|
// This is the peak minimum reservation that may be required by the
|
|
// concurrently-executing operators at any point in query execution. It may be less
|
|
// than the initial reservation total claims (below) if execution of some operators
|
|
// never overlaps, which allows reuse of reservations.
|
|
optional int64 min_mem_reservation_bytes = 3;
|
|
|
|
// Total of the initial buffer reservations that we expect to be claimed on this
|
|
// backend for all fragment instances in instance_params. I.e. the sum over all
|
|
// operators in all fragment instances that execute on this backend. This is used for
|
|
// an optimization in InitialReservation. Measured in bytes.
|
|
optional int64 initial_mem_reservation_total_claims = 4;
|
|
|
|
// Total thread reservation for fragment instances scheduled on this backend. This is
|
|
// the peak number of required threads that may be required by the
|
|
// concurrently-executing fragment instances at any point in query execution.
|
|
optional int64 thread_reservation = 5;
|
|
|
|
// Number of slots that this query should count for in admission control.
|
|
// This is calculated as the maximum # of instances of any fragment on this backend.
|
|
// I.e. 1 if mt_dop is not used and at most the mt_dop value if mt_dop is specified
|
|
// (but less if the query is not actually running with mt_dop instances on this node).
|
|
optional int32 slots_to_use = 6;
|
|
|
|
// Indicates whether this backend is the coordinator.
|
|
optional bool is_coord_backend = 7;
|
|
}
|
|
|
|
// Information about selected backend that designated as runtime filter preaggregator
|
|
// (before final aggregation in the coordinator) and fragment instances that must send
|
|
// filter update to them. This is populated by Scheduler::ComputeRandomKrpcForAggregation
|
|
// only for fragment having partitioned join that produce bloom filter and only if num
|
|
// backend executor (excluding coordinator) is at least 2x num aggregator (which default
|
|
// to 2).
|
|
message RuntimeFilterAggregatorInfoPB {
|
|
// Number of aggregator.
|
|
required int32 num_aggregators = 1;
|
|
|
|
// hostname:port of designated aggregators.
|
|
repeated NetworkAddressPB aggregator_krpc_addresses = 2;
|
|
|
|
// ip:port of designated aggregators.
|
|
repeated NetworkAddressPB aggregator_krpc_backends = 3;
|
|
|
|
// Number of backend executor that report to each designated aggregator + 1
|
|
// (including itself).
|
|
repeated int32 num_reporter_per_aggregator = 4;
|
|
|
|
// Size must be equal to the size of FragmentExecParamsPB.instances().
|
|
repeated int32 aggregator_idx_to_report = 5;
|
|
}
|
|
|
|
// Execution parameters shared between fragment instances
|
|
message FragmentExecParamsPB {
|
|
// Ordinal number of the corresponding fragment in the query, i.e. TPlanFragment.idx.
|
|
optional int32 fragment_idx = 1;
|
|
|
|
// Output destinations of this fragment.
|
|
repeated PlanFragmentDestinationPB destinations = 2;
|
|
|
|
// map from node id to the number of senders (node id expected to be for an
|
|
// ExchangeNode)
|
|
map<int32, int32> per_exch_num_senders = 3;
|
|
|
|
// List of fragment instance ids for all instances of this fragment.
|
|
repeated UniqueIdPB instances = 4;
|
|
|
|
// Total number of backends this fragment is scheduled on. Note that this represents
|
|
// the number of individual impalads, not the number of physical hosts.
|
|
optional int32 num_hosts = 5;
|
|
|
|
optional RuntimeFilterAggregatorInfoPB filter_agg_info = 6;
|
|
}
|
|
|
|
// Enum describing the initialization source of per_backend_mem_to_admit
|
|
// and coord_backend_mem_to_admit of QuerySchedulePB.
|
|
enum MemLimitSourcePB {
|
|
NO_LIMIT = 0;
|
|
QUERY_OPTION_MEM_LIMIT = 1;
|
|
QUERY_PLAN_PER_HOST_MEM_ESTIMATE = 2;
|
|
ADJUSTED_PER_HOST_MEM_ESTIMATE = 3;
|
|
QUERY_PLAN_DEDICATED_COORDINATOR_MEM_ESTIMATE = 4;
|
|
ADJUSTED_DEDICATED_COORDINATOR_MEM_ESTIMATE = 5;
|
|
QUERY_OPTION_MEM_LIMIT_EXECUTORS = 6;
|
|
QUERY_OPTION_MEM_LIMIT_COORDINATORS = 7;
|
|
COORDINATOR_ONLY_OPTIMIZATION = 8;
|
|
POOL_CONFIG_MIN_QUERY_MEM_LIMIT = 9;
|
|
POOL_CONFIG_MAX_QUERY_MEM_LIMIT = 10;
|
|
HOST_MEM_TRACKER_LIMIT = 11;
|
|
}
|
|
|
|
// Contains the output from scheduling and admission control that is used by the
|
|
// coordinator to start query execution.
|
|
message QuerySchedulePB {
|
|
optional UniqueIdPB query_id = 1;
|
|
|
|
// The per-fragment execution parameters for this schedule.
|
|
repeated FragmentExecParamsPB fragment_exec_params = 2;
|
|
|
|
// The per-backend execution parameters for this schedule.
|
|
repeated BackendExecParamsPB backend_exec_params = 3;
|
|
|
|
// Total number of scan ranges of this query.
|
|
optional int64 num_scan_ranges = 4;
|
|
|
|
// The memory limit per executor that will be imposed on the query.
|
|
// Set by the admission controller with a value that is only valid if it was admitted
|
|
// successfully. -1 means no limit.
|
|
optional int64 per_backend_mem_limit = 5;
|
|
|
|
// The per executor memory used for admission accounting.
|
|
// Set by the admission controller with a value that is only valid if it was admitted
|
|
// successfully. Can be zero if the query is only scheduled to run on the coordinator.
|
|
optional int64 per_backend_mem_to_admit = 6;
|
|
|
|
// The memory limit for the coordinator that will be imposed on the query. Used only if
|
|
// the query has a coordinator fragment.
|
|
// Set by the admission controller with a value that is only valid if it was admitted
|
|
// successfully. -1 means no limit.
|
|
optional int64 coord_backend_mem_limit = 7;
|
|
|
|
// The coordinator memory used for admission accounting.
|
|
// Set by the admission controller with a value that is only valid if it was admitted
|
|
// successfully.
|
|
optional int64 coord_backend_mem_to_admit = 8;
|
|
|
|
/// The cluster wide estimated memory usage of this query.
|
|
optional int64 cluster_mem_est = 9;
|
|
|
|
// Mapping to store which data file is read on which hosts, grouped by scan node ID.
|
|
map<int32, FilepathToHostsMapPB> by_node_filepath_to_hosts = 10;
|
|
|
|
// Source of per_backend_mem_to_admit.
|
|
optional MemLimitSourcePB per_backend_mem_to_admit_source = 11;
|
|
|
|
// Source of coord_backend_mem_to_admit.
|
|
optional MemLimitSourcePB coord_backend_mem_to_admit_source = 12;
|
|
}
|
|
|
|
message AdmitQueryRequestPB {
|
|
optional UniqueIdPB query_id = 1;
|
|
|
|
// The BackendId of the coordinator for this query.
|
|
optional UniqueIdPB coord_id = 2;
|
|
|
|
// Idx of the TQueryExecRequest sidecar.
|
|
optional int32 query_exec_request_sidecar_idx = 3;
|
|
|
|
// List of backends this query should not be scheduled on.
|
|
repeated NetworkAddressPB blacklisted_executor_addresses = 4;
|
|
}
|
|
|
|
message AdmitQueryResponsePB {
|
|
// Ok if the request was successfully handed off to the admission thread pool for
|
|
// processing
|
|
optional StatusPB status = 1;
|
|
}
|
|
|
|
message GetQueryStatusRequestPB {
|
|
optional UniqueIdPB query_id = 1;
|
|
}
|
|
|
|
message GetQueryStatusResponsePB {
|
|
// Error if the query was rejected or retrieving the status failed.
|
|
optional StatusPB status = 1;
|
|
|
|
// The results of scheduling and admisison control. WIll only be set if admission was
|
|
// successful and the query has not yet been released.
|
|
optional QuerySchedulePB query_schedule = 2;
|
|
|
|
// Idx of the TRuntimeProfileTree sidecar.
|
|
optional int32 summary_profile_sidecar_idx = 3;
|
|
|
|
// Start time of the query queuing, in Unix milliseconds.
|
|
optional int64 wait_start_time_ms = 4;
|
|
|
|
// End time of the query queuing, in Unix milliseconds.
|
|
optional int64 wait_end_time_ms = 5;
|
|
}
|
|
|
|
message ReleaseQueryRequestPB {
|
|
optional UniqueIdPB query_id = 1;
|
|
|
|
// Corresponds to the 'peak_mem_consumption' parameter of
|
|
// AdmissionController::ReleaseQuery()
|
|
optional int64 peak_mem_consumption = 3;
|
|
}
|
|
|
|
message ReleaseQueryResponsePB {
|
|
optional StatusPB status = 1;
|
|
}
|
|
|
|
message ReleaseQueryBackendsRequestPB {
|
|
optional UniqueIdPB query_id = 1;
|
|
|
|
// List of backends that have completed. The resources for this query on these backends
|
|
// will be released.
|
|
repeated NetworkAddressPB host_addr = 2;
|
|
}
|
|
|
|
message ReleaseQueryBackendsResponsePB {
|
|
optional StatusPB status = 1;
|
|
}
|
|
|
|
message CancelAdmissionRequestPB {
|
|
optional UniqueIdPB query_id = 1;
|
|
}
|
|
|
|
message CancelAdmissionResponsePB {
|
|
optional StatusPB status = 1;
|
|
}
|
|
|
|
message AdmissionHeartbeatRequestPB {
|
|
// The backend id for the coordinator sending this heartbeat.
|
|
optional UniqueIdPB host_id = 1;
|
|
|
|
// The version number of this heartbeat. Incremented every time a new heartbeat is sent.
|
|
optional int64 version = 2;
|
|
|
|
// A list of all queries registered at this coordinator.
|
|
repeated UniqueIdPB query_ids = 3;
|
|
}
|
|
|
|
message AdmissionHeartbeatResponsePB {
|
|
optional StatusPB status = 1;
|
|
}
|
|
|
|
service AdmissionControlService {
|
|
/// Called by the coordinator to start scheduling. The actual work is done on a thread
|
|
/// pool, so this call returns immedately. Idempotent - if the query has already been
|
|
/// submitted previously, returns OK without doing anything. TODO: there are some
|
|
/// situations where we can return the admission result quickly, eg. if the query is
|
|
/// rejected. We should evaluate the benefits of saving a call to GetQueryStatus() in
|
|
/// those situations.
|
|
rpc AdmitQuery(AdmitQueryRequestPB) returns (AdmitQueryResponsePB);
|
|
|
|
/// Called by the coordinator after AdmitQuery() to monitor the admission status of the
|
|
/// query. The call will block for a configurable amount of time before returning. This
|
|
/// call is idempotent and will return the schedule on each call between successful
|
|
/// admission and the query getting released.
|
|
rpc GetQueryStatus(GetQueryStatusRequestPB) returns (GetQueryStatusResponsePB);
|
|
|
|
/// Called by the coordinator when the query has completely finished, releases all
|
|
/// remaining resources.
|
|
rpc ReleaseQuery(ReleaseQueryRequestPB) returns (ReleaseQueryResponsePB);
|
|
|
|
/// Called after individual backends have finished to release their resources while
|
|
/// other backends are running. Due to the use of Coordinator::BackendResourceState,
|
|
/// this will be called a max of log(# of backends) times per query. TODO: we can save
|
|
/// an rpc if we combine the release of the final batch of backends with the call to
|
|
/// ReleaseQuery.
|
|
rpc ReleaseQueryBackends(ReleaseQueryBackendsRequestPB)
|
|
returns (ReleaseQueryBackendsResponsePB);
|
|
|
|
/// Called by the coordinator to cancel scheduling of a query for which GetQueryStatus
|
|
/// has not yet returned a schedule.
|
|
rpc CancelAdmission(CancelAdmissionRequestPB) returns (CancelAdmissionResponsePB);
|
|
|
|
/// Used to ensure that the admission service and coordinator have a consistent view of
|
|
/// what resources are being used even in the face of possible rpc failures.
|
|
/// Periodically called by each coordinator with a list of query ids for all queries at
|
|
/// that coordinator. If the admissiond has resources allocated to a query that is not
|
|
/// included in the list, it assumes the query has completed and releases it's remaining
|
|
/// resources. Stale heartbeat messages are ignored.
|
|
rpc AdmissionHeartbeat(AdmissionHeartbeatRequestPB)
|
|
returns (AdmissionHeartbeatResponsePB);
|
|
}
|