Files
impala/common/protobuf/control_service.proto
Yida Wu 80a45014ea IMPALA-13703: Cancel running queries before shutdown deadline
Currently, when the graceful shutdown deadline is reached, Impala
daemon exits immediately, leaving any running queries unfinished.
This approach is not quite graceful, as it may result in unreleased
resources, such as scratch files in remote storage.

This patch adds a new state in the graceful shutdown process.
Before reaching the shutdown deadline, Impala daemon will try to
cancel any remaining running queries within a configurable timelimit
flag, shutdown_query_cancel_period_s. If this time limit exceeds
20% of the total shutdown deadline, it will be automatically
capped at that value. The idea is to cancel queries only near the
end of the graceful shutdown deadline. The 20% is the threshold to
allow us to take a more aggressive way to ensure a graceful
shutdown.

If all queries are successfully canceled within this period, the
server shuts down immediately. Otherwise, it shuts down once the
deadline is reached, with queries still running.

Tests:
Passed core tests.
Added testcases test_shutdown_coordinator_cancel_query and
test_shutdown_executor_with_query_cancel_period and
test_shutdown_coordinator_and_executor_cancel_query.
Manually tested shutdown a coord or an executor with long
running queries and they were canceled.

Change-Id: I1cac2e100d329644e21fdceb0b23901b08079130
Reviewed-on: http://gerrit.cloudera.org:8080/22422
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Reviewed-by: Abhishek Rawat <arawat@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-02-07 10:12:54 +00:00

511 lines
19 KiB
Protocol Buffer

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
syntax="proto2";
package impala;
import "common.proto";
import "planner.proto";
import "kudu/rpc/rpc_header.proto";
message ParquetDmlStatsPB {
// For each column, the on disk byte size
map<string, int64> per_column_size = 1;
}
message KuduDmlStatsPB {
// The number of reported per-row errors, i.e. this many rows were not modified.
// Note that this aggregate is less useful than a breakdown of the number of errors by
// error type, e.g. number of rows with duplicate key conflicts, number of rows
// violating nullability constraints, etc., but it isn't possible yet to differentiate
// all error types in the KuduTableSink yet.
optional int64 num_row_errors = 1;
}
// ReportExecStatus
// Per partition DML stats
// TODO: this should include the table stats that we update the metastore with.
message DmlStatsPB {
optional int64 bytes_written = 1;
optional ParquetDmlStatsPB parquet_stats = 2;
optional KuduDmlStatsPB kudu_stats = 3;
}
// Per-file statistics and metadata resulting from DML statements.
message DmlFileStatusPb {
required string final_path = 1;
required int64 num_rows = 2;
required int64 size = 3;
// Temporaty path where the file was written by the executor.
// The coordinator needs to move the file from staging_path to final_path.
// If not set, the file was already written to its final path (e.g. in
// transactional tables).
// TODO: this could be merged with final_path by storing only the suffix
// of the path and append it to a staging/final prefix from DmlPartitionStatusPB
optional string staging_path = 4;
// Flat buffer encoded Iceberg data file (see FbIcebergDataFile). Contains metadata and
// stats for the iceberg file.
optional bytes iceberg_data_file_fb = 5;
}
// Per-partition statistics and metadata resulting from DML statements.
message DmlPartitionStatusPB {
// The id of the partition written to (may be -1 if the partition is created by this
// query). See THdfsTable.partitions.
optional int64 id = 1;
// The number of rows modified in this partition
optional int64 num_modified_rows = 2;
// The number of rows deleted in this partition
optional int64 num_deleted_rows = 3;
// Detailed statistics gathered by table writers for this partition
optional DmlStatsPB stats = 4;
// Fully qualified URI to the base directory for this partition.
optional string partition_base_dir = 5;
// The latest observed Kudu timestamp reported by the local KuduSession.
// This value is an unsigned int64.
optional int64 kudu_latest_observed_ts = 6;
// List of files created during the DML statement in this partition.
repeated DmlFileStatusPb created_files = 7;
// List of delete files created during the DML statement in this partition.
repeated DmlFileStatusPb created_delete_files = 8;
// Fully qualified URI to the staging directory for this partition.
optional string staging_dir_to_clean_up = 9;
}
// The results of a DML statement, sent to the coordinator as part of
// ReportExecStatusRequestPB
message DmlExecStatusPB {
// Per-partition details, used in finalization and reporting.
// The keys represent partitions to create, coded as k1=v1/k2=v2/k3=v3..., with the
// root's key in an unpartitioned table being ROOT_PARTITION_KEY.
// The target table name is recorded in the corresponding TQueryExecRequest
map<string, DmlPartitionStatusPB> per_partition_status = 1;
// In case of Iceberg modify statements it contains the data files referenced
// by position delete records.
repeated string data_files_referenced_by_position_deletes = 2;
}
// Error message exchange format
message ErrorLogEntryPB {
// Number of error messages reported using the above identifier
optional int32 count = 1;
// Sample messages from the above error code
repeated string messages = 2;
}
// Represents the states that a fragment instance goes through during its execution. The
// current state gets sent back to the coordinator and will be presented to users through
// the debug webpages. The states are listed in the order to which they are transitioned.
// Not all states are necessarily transitioned through when there are errors.
enum FInstanceExecStatePB {
WAITING_FOR_EXEC = 0;
WAITING_FOR_PREPARE = 1;
WAITING_FOR_CODEGEN = 2;
WAITING_FOR_OPEN = 3;
WAITING_FOR_FIRST_BATCH = 4;
FIRST_BATCH_PRODUCED = 5;
PRODUCING_DATA = 6;
LAST_BATCH_SENT = 7;
FINISHED = 8;
}
// Represents any part of the status report that isn't idempotent. If the executor thinks
// the report failed, we'll retransmit these parts, and this allows us to keep them
// associated with their original sequence number so that if the coordinator actually did
// receive the original report it won't reapply them.
message StatefulStatusPB {
// Sequence number prevents out-of-order or duplicated updates from being applied.
// 'report_seq_no' will be <= the 'report_seq_no' in the FragmentInstanceExecStatusPB
// that contains this StatefulStatusPB.
optional int64 report_seq_no = 1;
// Map of TErrorCode to ErrorLogEntryPB; New errors that have not been reported to
// the coordinator by this fragment instance. Not idempotent.
map<int32, ErrorLogEntryPB> error_log = 2;
// Metadata associated with a failed fragment instance. Only set for failed fragment
// instances.
optional AuxErrorInfoPB aux_error_info = 3;
}
// Per-node stats required for the exec summary.
message ExecSummaryDataPB {
// Plan node ID, set if this is for a PlanNode.
optional int32 plan_node_id = 1;
// Plan node ID, set if this is for a DataSink.
optional int32 data_sink_id = 2;
// Rows returned from this node, if this is a PlanNode.
optional int64 rows_returned = 3;
// Peak memory usage in bytes of this PlanNode or DataSink.
optional int64 peak_mem_usage = 4;
// Local time in nanoseconds spent in this plan node.
optional int64 local_time_ns = 5;
}
// RPC error metadata that can be associated with a AuxErrorInfoPB object. Created if a
// RPC to another node failed.
message RPCErrorInfoPB {
// The address of the RPC's target node.
required NetworkAddressPB dest_node = 1;
// The posix error code of the failed RPC.
required int32 posix_error_code = 2;
}
// Error metadata that can be associated with a failed fragment instance. Used to store
// extra info about errors encountered during fragment execution. This information is
// used by the Coordinator to blacklist potentially unhealthy nodes.
message AuxErrorInfoPB {
// Set if the fragment instance failed because a RPC to another node failed. Only set
// if the RPC failed due to a network error.
optional RPCErrorInfoPB rpc_error_info = 1;
}
message FragmentInstanceExecStatusPB {
// Sequence number prevents out-of-order or duplicated updates from being applied.
optional int64 report_seq_no = 1;
// The ID of the fragment instance which this report contains
optional UniqueIdPB fragment_instance_id = 2;
// If true, fragment finished executing.
optional bool done = 3;
// The current state of this fragment instance's execution.
optional FInstanceExecStatePB current_state = 4;
// Cumulative structural changes made by the table sink of this fragment
// instance. This is sent only when 'done' above is true. Not idempotent.
optional DmlExecStatusPB dml_exec_status = 5;
// The non-idempotent parts of the report, and any prior reports that are not known to
// have been received by the coordinator.
repeated StatefulStatusPB stateful_report = 6;
// Per-node stats required for the exec summary.
repeated ExecSummaryDataPB exec_summary_data = 7;
}
message FragmentExecStatusPB {
// Ordinal number of fragment in the query.
optional int32 fragment_idx = 1;
// The index of the first instance included in this state report. The serialized
// profile (in a corresponding sidecar) includes the number of instances and the
// profiles for those instances. The instance indexes for a backend are sequential.
optional int32 min_per_fragment_instance_idx = 2;
}
message ReportExecStatusRequestPB {
// The query id which this report is for.
optional UniqueIdPB query_id = 1;
// same as TExecQueryFInstancesParams.coord_state_idx
optional int32 coord_state_idx = 2;
repeated FragmentInstanceExecStatusPB instance_exec_status = 3;
// Sidecar index of the cumulative profiles in instance_exec_status and
// fragment_exec_status
optional int32 thrift_profiles_sidecar_idx = 4;
// Cumulative status for this backend.
// See QueryState::overall_status for details.
optional StatusPB overall_status = 5;
// The fragment instance id of the first failed fragment instance. This corresponds to
// the fragment which sets 'overall_status' above. Not set if 'overall_status' is a
// general error (e.g. failure to start fragment instances).
optional UniqueIdPB fragment_instance_id = 6;
// Peak memory usage for this query on this backend in bytes.
optional int64 peak_mem_consumption = 7;
// User CPU utilization for the query on this backend in ns.
optional int64 cpu_user_ns = 8;
// System CPU utilization for the query on this backend in ns.
optional int64 cpu_sys_ns = 9;
// Sum of BytesRead counters on this backend.
optional int64 bytes_read = 10;
// Total scan ranges completed on this backend.
optional int64 scan_ranges_complete = 11;
// Total bytes sent by instances that did not contain a scan node.
optional int64 exchange_bytes_sent = 12;
// Total bytes sent by instances that contained a scan node.
optional int64 scan_bytes_sent = 13;
// Aggregated status for all instances of a fragment on a backend. Used when
// TQueryCtx.gen_aggregated_profile is true.
repeated FragmentExecStatusPB fragment_exec_status = 14;
// Sequence number prevents out-of-order or duplicated updates from being applied.
// Incremented for each backend status report. 'fragment_exec_status' should be
// disregarded if the sequence number of this report is less than or equal to
// the previous report received.
optional int64 backend_report_seq_no = 15;
// For each join node, sum of RowsReturned counters on this backend.
map<int32, int64> per_join_rows_produced = 16;
// If true, the executor failed to execute query fragments due to local disk IO
// fatal error, like local storage devices for spilling are corrupted.
optional bool local_disk_faulty = 17 [default = false];
}
message ReportExecStatusResponsePB {
optional StatusPB status = 1;
}
message CancelQueryFInstancesRequestPB {
// The query id of the query being cancelled.
optional UniqueIdPB query_id = 1;
}
message CancelQueryFInstancesResponsePB {
optional StatusPB status = 1;
}
message RemoteShutdownParamsPB {
// Deadline for the shutdown. After this deadline expires (starting at the time when
// this remote shutdown command is received), the Impala daemon exits immediately
// regardless of whether queries are still executing.
optional int64 deadline_s = 1;
}
// The current status of a shutdown operation.
message ShutdownStatusPB {
// Milliseconds remaining in startup grace period. 0 if the period has expired.
optional int64 grace_remaining_ms = 1;
// Milliseconds remaining in shutdown deadline. 0 if the deadline has expired.
optional int64 deadline_remaining_ms = 2;
// Number of fragment instances still executing.
optional int64 finstances_executing = 3;
// Number of client requests still registered with the Impala server that is being shut
// down.
optional int64 client_requests_registered = 4;
// Number of queries still executing on backend.
optional int64 backend_queries_executing = 5;
// Milliseconds remaining in query cancel before shutdown deadline. 0 if the deadline
// has expired.
optional int64 cancel_deadline_remaining_ms = 6;
}
message RemoteShutdownResultPB {
// Success or failure of the operation.
optional StatusPB status = 1;
// If status is OK, additional info about the shutdown status.
optional ShutdownStatusPB shutdown_status = 2;
}
// Specification of one output destination of a plan fragment
message PlanFragmentDestinationPB {
// The globally unique fragment instance id.
optional UniqueIdPB fragment_instance_id = 1;
// Hostname + port of the KRPC backend service on the destination.
optional NetworkAddressPB address = 2;
// IP address + port of the KRPC backend service on the destination.
optional NetworkAddressPB krpc_backend = 3;
}
// Context to collect information that is shared among all instances of a particular plan
// fragment. Corresponds to a TPlanFragment with the same idx in the
// TExecPlanFragmentInfo.
message PlanFragmentCtxPB {
// Ordinal number of corresponding fragment in the query.
optional int32 fragment_idx = 1;
// Output destinations, one per output partition. The partitioning of the output is
// specified by TPlanFragment.output_sink.output_partition in the corresponding
// TPlanFragment. The number of output partitions is destinations.size().
repeated PlanFragmentDestinationPB destinations = 2;
}
// A scan range plus the parameters needed to execute that scan.
message ScanRangeParamsPB {
optional ScanRangePB scan_range = 1;
optional int32 volume_id = 2 [default = -1];
optional bool try_hdfs_cache = 3 [default = false];
optional bool is_remote = 4;
}
// List of ScanRangeParamsPB. This is needed so that per_node_scan_ranges in
// PlanFragmentInstanceCtxPB can be a map since protobuf doesn't support repeated map
// values.
message ScanRangesPB {
repeated ScanRangeParamsPB scan_ranges = 1;
}
// Information about the input fragment instance of a join node.
message JoinBuildInputPB {
// The join node id that will consume this join build.
optional int32 join_node_id = 1;
// Fragment instance id of the input fragment instance.
optional UniqueIdPB input_finstance_id = 2;
}
// Protobuf portion of the execution parameters of a single fragment instance. Every
// fragment instance will also have a corresponding TPlanFragmentInstanceCtx with the same
// fragment_idx.
message PlanFragmentInstanceCtxPB {
// Ordinal number of corresponding fragment in the query.
optional int32 fragment_idx = 1;
// Map from plan node id to initial scan ranges for each scan node in
// TPlanFragment.plan_tree
map<int32, ScanRangesPB> per_node_scan_ranges = 2;
// List of input join build finstances for joins in this finstance.
repeated JoinBuildInputPB join_build_inputs = 3;
}
// List of host addresses.
message FilepathToHostsListPB {
repeated NetworkAddressPB hosts = 1;
// True if the key in the map for this entry is a relative path, false if it is an
// absolute path.
required bool is_relative = 2;
}
// File path to a list of host addresses mapping.
message FilepathToHostsMapPB {
map<string, FilepathToHostsListPB> filepath_to_hosts = 1;
}
// ExecQueryFInstances
message ExecQueryFInstancesRequestPB {
// This backend's index into Coordinator::backend_states_, needed for subsequent rpcs to
// the coordinator.
optional int32 coord_state_idx = 1;
// Sidecar index of the TQueryCtx.
optional int32 query_ctx_sidecar_idx = 2;
// Sidecar index of the TExecPlanFragmentInfo.
optional int32 plan_fragment_info_sidecar_idx = 3;
// The minimum query-wide memory reservation (in bytes) required for the backend
// executing the instances in fragment_instance_ctxs. This is the peak minimum
// reservation that may be required by the concurrently-executing operators at any
// point in query execution. It may be less than the initial reservation total claims
// (below) if execution of some operators never overlaps, which allows reuse of
// reservations.
optional int64 min_mem_reservation_bytes = 4;
// Total of the initial buffer reservations that we expect to be claimed on this
// backend for all fragment instances in fragment_instance_ctxs. I.e. the sum over all
// operators in all fragment instances that execute on this backend. This is used for
// an optimization in InitialReservation. Measured in bytes.
optional int64 initial_mem_reservation_total_claims = 5;
// The backend memory limit (in bytes) as set by the admission controller. Used by the
// query mem tracker to enforce the memory limit.
optional int64 per_backend_mem_limit = 6;
// General execution parameters for different fragments. Corresponds to 'fragments' in
// the TExecPlanFragmentInfo sidecar.
repeated PlanFragmentCtxPB fragment_ctxs = 7;
// Execution parameters for specific fragment instances. Corresponds to
// 'fragment_instance_ctxs' in the TExecPlanFragmentInfo sidecar.
repeated PlanFragmentInstanceCtxPB fragment_instance_ctxs = 8;
// Mapping to store which data file is read on which host, grouped by scan node ID.
map<int32, FilepathToHostsMapPB> by_node_filepath_to_hosts = 9;
}
message ExecQueryFInstancesResponsePB {
// Success or failure of the operation.
optional StatusPB status = 1;
}
message KillQueryRequestPB {
// The query ids of the queries to kill.
repeated UniqueIdPB query_ids = 1;
// The effective user who submitted this request.
required string requesting_user = 2;
// True if the requesting_user is an admin.
required bool is_admin = 3;
}
message KillQueryResponsePB {
repeated StatusPB statuses = 1;
}
service ControlService {
// Override the default authorization method.
option (kudu.rpc.default_authz_method) = "Authorize";
// Called by coord to start asynchronous execution of a query's fragment instances in
// backend. Returns as soon as all incoming data streams have been set up.
rpc ExecQueryFInstances(ExecQueryFInstancesRequestPB)
returns (ExecQueryFInstancesResponsePB);
// Update the coordinator with the query status of the backend.
rpc ReportExecStatus(ReportExecStatusRequestPB) returns (ReportExecStatusResponsePB);
// Called by coordinator to cancel execution of a single query's fragment instances,
// which the coordinator initiated with a prior call to ExecQueryFInstances.
// Cancellation is asynchronous (in the sense that this call may return before the
// fragment instance has completely stopped executing).
rpc CancelQueryFInstances(CancelQueryFInstancesRequestPB)
returns (CancelQueryFInstancesResponsePB);
// Called to initiate shutdown of this backend.
rpc RemoteShutdown(RemoteShutdownParamsPB) returns (RemoteShutdownResultPB);
// Called by an impalad to ask another impalad to kill a query.
rpc KillQuery(KillQueryRequestPB) returns (KillQueryResponsePB);
}