// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // syntax = "proto2"; package impala; import "common.proto"; import "control_service.proto"; import "statestore_service.proto"; // Execution parameters for a single fragment instance. Used to assemble the // TPlanFragmentInstanceCtx/PlanFragmentInstanceCtxPB. message FInstanceExecParamsPB { // The fragment instance id. optional UniqueIdPB instance_id = 1; // Ordinal number of the corresponding fragment in the query, i.e. TPlanFragment.idx. optional int32 fragment_idx = 2; // Map from plan node id to a list of scan ranges. map per_node_scan_ranges = 5; // 0-based ordinal number of this particular instance. This is within its fragment, not // query-wide, so eg. there will be one instance '0' for each fragment. optional int32 per_fragment_instance_idx = 6; // In its role as a data sender, a fragment instance is assigned a "sender id" to // uniquely identify it to a receiver. -1 = invalid. optional int32 sender_id = 7 [default = -1]; // List of input join build finstances for joins in this finstance. repeated JoinBuildInputPB join_build_inputs = 8; // If this is a join build fragment, the number of fragment instances that consume the // join build. -1 = invalid. optional int32 num_join_build_outputs = 9 [default = -1]; } // Execution parameters for a single backend. Used to construct the // Coordinator::BackendStates. message BackendExecParamsPB { // The id of this backend. optional UniqueIdPB backend_id = 1; // The hostname + port of the KRPC backend service on this backend. optional NetworkAddressPB address = 8; // The IP address + port of the KRPC backend service on this backend. optional NetworkAddressPB krpc_address = 9; // The fragment instance params assigned to this backend. All instances of a // particular fragment are contiguous in this list. This can be empty only for the // coordinator backend, that is, if 'is_coord_backend' is true. repeated FInstanceExecParamsPB instance_params = 2; // The minimum query-wide buffer reservation size (in bytes) required for this backend. // This is the peak minimum reservation that may be required by the // concurrently-executing operators at any point in query execution. It may be less // than the initial reservation total claims (below) if execution of some operators // never overlaps, which allows reuse of reservations. optional int64 min_mem_reservation_bytes = 3; // Total of the initial buffer reservations that we expect to be claimed on this // backend for all fragment instances in instance_params. I.e. the sum over all // operators in all fragment instances that execute on this backend. This is used for // an optimization in InitialReservation. Measured in bytes. optional int64 initial_mem_reservation_total_claims = 4; // Total thread reservation for fragment instances scheduled on this backend. This is // the peak number of required threads that may be required by the // concurrently-executing fragment instances at any point in query execution. optional int64 thread_reservation = 5; // Number of slots that this query should count for in admission control. // This is calculated as the maximum # of instances of any fragment on this backend. // I.e. 1 if mt_dop is not used and at most the mt_dop value if mt_dop is specified // (but less if the query is not actually running with mt_dop instances on this node). optional int32 slots_to_use = 6; // Indicates whether this backend is the coordinator. optional bool is_coord_backend = 7; } // Information about selected backend that designated as runtime filter preaggregator // (before final aggregation in the coordinator) and fragment instances that must send // filter update to them. This is populated by Scheduler::ComputeRandomKrpcForAggregation // only for fragment having partitioned join that produce bloom filter and only if num // backend executor (excluding coordinator) is at least 2x num aggregator (which default // to 2). message RuntimeFilterAggregatorInfoPB { // Number of aggregator. required int32 num_aggregators = 1; // hostname:port of designated aggregators. repeated NetworkAddressPB aggregator_krpc_addresses = 2; // ip:port of designated aggregators. repeated NetworkAddressPB aggregator_krpc_backends = 3; // Number of backend executor that report to each designated aggregator + 1 // (including itself). repeated int32 num_reporter_per_aggregator = 4; // Size must be equal to the size of FragmentExecParamsPB.instances(). repeated int32 aggregator_idx_to_report = 5; } // Execution parameters shared between fragment instances message FragmentExecParamsPB { // Ordinal number of the corresponding fragment in the query, i.e. TPlanFragment.idx. optional int32 fragment_idx = 1; // Output destinations of this fragment. repeated PlanFragmentDestinationPB destinations = 2; // map from node id to the number of senders (node id expected to be for an // ExchangeNode) map per_exch_num_senders = 3; // List of fragment instance ids for all instances of this fragment. repeated UniqueIdPB instances = 4; // Total number of backends this fragment is scheduled on. Note that this represents // the number of individual impalads, not the number of physical hosts. optional int32 num_hosts = 5; optional RuntimeFilterAggregatorInfoPB filter_agg_info = 6; } // Enum describing the initialization source of per_backend_mem_to_admit // and coord_backend_mem_to_admit of QuerySchedulePB. enum MemLimitSourcePB { NO_LIMIT = 0; QUERY_OPTION_MEM_LIMIT = 1; QUERY_PLAN_PER_HOST_MEM_ESTIMATE = 2; ADJUSTED_PER_HOST_MEM_ESTIMATE = 3; QUERY_PLAN_DEDICATED_COORDINATOR_MEM_ESTIMATE = 4; ADJUSTED_DEDICATED_COORDINATOR_MEM_ESTIMATE = 5; QUERY_OPTION_MEM_LIMIT_EXECUTORS = 6; QUERY_OPTION_MEM_LIMIT_COORDINATORS = 7; COORDINATOR_ONLY_OPTIMIZATION = 8; POOL_CONFIG_MIN_QUERY_MEM_LIMIT = 9; POOL_CONFIG_MAX_QUERY_MEM_LIMIT = 10; HOST_MEM_TRACKER_LIMIT = 11; } // Contains the output from scheduling and admission control that is used by the // coordinator to start query execution. message QuerySchedulePB { optional UniqueIdPB query_id = 1; // The per-fragment execution parameters for this schedule. repeated FragmentExecParamsPB fragment_exec_params = 2; // The per-backend execution parameters for this schedule. repeated BackendExecParamsPB backend_exec_params = 3; // Total number of scan ranges of this query. optional int64 num_scan_ranges = 4; // The memory limit per executor that will be imposed on the query. // Set by the admission controller with a value that is only valid if it was admitted // successfully. -1 means no limit. optional int64 per_backend_mem_limit = 5; // The per executor memory used for admission accounting. // Set by the admission controller with a value that is only valid if it was admitted // successfully. Can be zero if the query is only scheduled to run on the coordinator. optional int64 per_backend_mem_to_admit = 6; // The memory limit for the coordinator that will be imposed on the query. Used only if // the query has a coordinator fragment. // Set by the admission controller with a value that is only valid if it was admitted // successfully. -1 means no limit. optional int64 coord_backend_mem_limit = 7; // The coordinator memory used for admission accounting. // Set by the admission controller with a value that is only valid if it was admitted // successfully. optional int64 coord_backend_mem_to_admit = 8; /// The cluster wide estimated memory usage of this query. optional int64 cluster_mem_est = 9; // Mapping to store which data file is read on which hosts, grouped by scan node ID. map by_node_filepath_to_hosts = 10; // Source of per_backend_mem_to_admit. optional MemLimitSourcePB per_backend_mem_to_admit_source = 11; // Source of coord_backend_mem_to_admit. optional MemLimitSourcePB coord_backend_mem_to_admit_source = 12; } message AdmitQueryRequestPB { optional UniqueIdPB query_id = 1; // The BackendId of the coordinator for this query. optional UniqueIdPB coord_id = 2; // Idx of the TQueryExecRequest sidecar. optional int32 query_exec_request_sidecar_idx = 3; // List of backends this query should not be scheduled on. repeated NetworkAddressPB blacklisted_executor_addresses = 4; } message AdmitQueryResponsePB { // Ok if the request was successfully handed off to the admission thread pool for // processing optional StatusPB status = 1; } message GetQueryStatusRequestPB { optional UniqueIdPB query_id = 1; } message GetQueryStatusResponsePB { // Error if the query was rejected or retrieving the status failed. optional StatusPB status = 1; // The results of scheduling and admisison control. WIll only be set if admission was // successful and the query has not yet been released. optional QuerySchedulePB query_schedule = 2; // Idx of the TRuntimeProfileTree sidecar. optional int32 summary_profile_sidecar_idx = 3; // Start time of the query queuing, in Unix milliseconds. optional int64 wait_start_time_ms = 4; // End time of the query queuing, in Unix milliseconds. optional int64 wait_end_time_ms = 5; } message ReleaseQueryRequestPB { optional UniqueIdPB query_id = 1; // Corresponds to the 'peak_mem_consumption' parameter of // AdmissionController::ReleaseQuery() optional int64 peak_mem_consumption = 3; } message ReleaseQueryResponsePB { optional StatusPB status = 1; } message ReleaseQueryBackendsRequestPB { optional UniqueIdPB query_id = 1; // List of backends that have completed. The resources for this query on these backends // will be released. repeated NetworkAddressPB host_addr = 2; } message ReleaseQueryBackendsResponsePB { optional StatusPB status = 1; } message CancelAdmissionRequestPB { optional UniqueIdPB query_id = 1; } message CancelAdmissionResponsePB { optional StatusPB status = 1; } message AdmissionHeartbeatRequestPB { // The backend id for the coordinator sending this heartbeat. optional UniqueIdPB host_id = 1; // The version number of this heartbeat. Incremented every time a new heartbeat is sent. optional int64 version = 2; // A list of all queries registered at this coordinator. repeated UniqueIdPB query_ids = 3; } message AdmissionHeartbeatResponsePB { optional StatusPB status = 1; } service AdmissionControlService { /// Called by the coordinator to start scheduling. The actual work is done on a thread /// pool, so this call returns immedately. Idempotent - if the query has already been /// submitted previously, returns OK without doing anything. TODO: there are some /// situations where we can return the admission result quickly, eg. if the query is /// rejected. We should evaluate the benefits of saving a call to GetQueryStatus() in /// those situations. rpc AdmitQuery(AdmitQueryRequestPB) returns (AdmitQueryResponsePB); /// Called by the coordinator after AdmitQuery() to monitor the admission status of the /// query. The call will block for a configurable amount of time before returning. This /// call is idempotent and will return the schedule on each call between successful /// admission and the query getting released. rpc GetQueryStatus(GetQueryStatusRequestPB) returns (GetQueryStatusResponsePB); /// Called by the coordinator when the query has completely finished, releases all /// remaining resources. rpc ReleaseQuery(ReleaseQueryRequestPB) returns (ReleaseQueryResponsePB); /// Called after individual backends have finished to release their resources while /// other backends are running. Due to the use of Coordinator::BackendResourceState, /// this will be called a max of log(# of backends) times per query. TODO: we can save /// an rpc if we combine the release of the final batch of backends with the call to /// ReleaseQuery. rpc ReleaseQueryBackends(ReleaseQueryBackendsRequestPB) returns (ReleaseQueryBackendsResponsePB); /// Called by the coordinator to cancel scheduling of a query for which GetQueryStatus /// has not yet returned a schedule. rpc CancelAdmission(CancelAdmissionRequestPB) returns (CancelAdmissionResponsePB); /// Used to ensure that the admission service and coordinator have a consistent view of /// what resources are being used even in the face of possible rpc failures. /// Periodically called by each coordinator with a list of query ids for all queries at /// that coordinator. If the admissiond has resources allocated to a query that is not /// included in the list, it assumes the query has completed and releases it's remaining /// resources. Stale heartbeat messages are ignored. rpc AdmissionHeartbeat(AdmissionHeartbeatRequestPB) returns (AdmissionHeartbeatResponsePB); }