mirror of
https://github.com/apache/impala.git
synced 2025-12-25 11:04:13 -05:00
QueryState periodically collects runtime profiles from all of its fragment instances and sends them to the coordinator. Previously, each time this happens, if the rpc fails, QueryState will retry twice after a configurable timeout and then cancel the fragment instances under the assumption that the coordinator no longer exists. We've found in real clusters that this logic is too sensitive to failed rpcs and can result in fragment instances being cancelled even in cases where the coordinator is still running. This patch makes a few improvements to this logic: - When a report fails to send, instead of retrying the same report quickly (after waiting report_status_retry_interval_ms), we wait the regular reporting interval (status_report_interval_ms), regenerate any stale portions of the report, and then retry. - A new flag, --status_report_max_retries, is introduced, which controls the number of failed reports that are allowed before the query is cancelled. --report_status_retry_interval_ms is removed. - Backoff is used for repeated failed attempts, such that for a period between retries of 't', on try 'n' the actual timeout will be t * n. Testing: - Added a test which results in a large number of failed intermediate status reports but still succeeds. Change-Id: Ib6007013fc2c9e8eeba11b752ee58fb3038da971 Reviewed-on: http://gerrit.cloudera.org:8080/12049 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
198 lines
7.3 KiB
Protocol Buffer
198 lines
7.3 KiB
Protocol Buffer
// Licensed to the Apache Software Foundation (ASF) under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing,
|
|
// software distributed under the License is distributed on an
|
|
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
// KIND, either express or implied. See the License for the
|
|
// specific language governing permissions and limitations
|
|
// under the License.
|
|
//
|
|
|
|
syntax="proto2";
|
|
|
|
package impala;
|
|
|
|
import "common.proto";
|
|
|
|
import "kudu/rpc/rpc_header.proto";
|
|
|
|
message ParquetDmlStatsPB {
|
|
// For each column, the on disk byte size
|
|
map<string, int64> per_column_size = 1;
|
|
}
|
|
|
|
message KuduDmlStatsPB {
|
|
// The number of reported per-row errors, i.e. this many rows were not modified.
|
|
// Note that this aggregate is less useful than a breakdown of the number of errors by
|
|
// error type, e.g. number of rows with duplicate key conflicts, number of rows
|
|
// violating nullability constraints, etc., but it isn't possible yet to differentiate
|
|
// all error types in the KuduTableSink yet.
|
|
optional int64 num_row_errors = 1;
|
|
}
|
|
|
|
// ReportExecStatus
|
|
|
|
// Per partition DML stats
|
|
// TODO: this should include the table stats that we update the metastore with.
|
|
message DmlStatsPB {
|
|
optional int64 bytes_written = 1;
|
|
|
|
optional ParquetDmlStatsPB parquet_stats = 2;
|
|
|
|
optional KuduDmlStatsPB kudu_stats = 3;
|
|
}
|
|
|
|
// Per-partition statistics and metadata resulting from DML statements.
|
|
message DmlPartitionStatusPB {
|
|
// The id of the partition written to (may be -1 if the partition is created by this
|
|
// query). See THdfsTable.partitions.
|
|
optional int64 id = 1;
|
|
|
|
// The number of rows modified in this partition
|
|
optional int64 num_modified_rows = 2;
|
|
|
|
// Detailed statistics gathered by table writers for this partition
|
|
optional DmlStatsPB stats = 3;
|
|
|
|
// Fully qualified URI to the base directory for this partition.
|
|
optional string partition_base_dir = 4;
|
|
|
|
// The latest observed Kudu timestamp reported by the local KuduSession.
|
|
// This value is an unsigned int64.
|
|
optional int64 kudu_latest_observed_ts = 5;
|
|
}
|
|
|
|
// The results of a DML statement, sent to the coordinator as part of
|
|
// ReportExecStatusRequestPB
|
|
message DmlExecStatusPB {
|
|
// A map from temporary absolute file path to final absolute destination. The
|
|
// coordinator performs these updates after the query completes.
|
|
map<string, string> files_to_move = 1;
|
|
|
|
// Per-partition details, used in finalization and reporting.
|
|
// The keys represent partitions to create, coded as k1=v1/k2=v2/k3=v3..., with the
|
|
// root's key in an unpartitioned table being ROOT_PARTITION_KEY.
|
|
// The target table name is recorded in the corresponding TQueryExecRequest
|
|
map<string, DmlPartitionStatusPB> per_partition_status = 2;
|
|
}
|
|
|
|
// Error message exchange format
|
|
message ErrorLogEntryPB {
|
|
// Number of error messages reported using the above identifier
|
|
optional int32 count = 1;
|
|
|
|
// Sample messages from the above error code
|
|
repeated string messages = 2;
|
|
}
|
|
|
|
// Represents the states that a fragment instance goes through during its execution. The
|
|
// current state gets sent back to the coordinator and will be presented to users through
|
|
// the debug webpages. The states are listed in the order to which they are transitioned.
|
|
// Not all states are necessarily transitioned through when there are errors.
|
|
enum FInstanceExecStatePB {
|
|
WAITING_FOR_EXEC = 0;
|
|
WAITING_FOR_PREPARE = 1;
|
|
WAITING_FOR_CODEGEN = 2;
|
|
WAITING_FOR_OPEN = 3;
|
|
WAITING_FOR_FIRST_BATCH = 4;
|
|
FIRST_BATCH_PRODUCED = 5;
|
|
PRODUCING_DATA = 6;
|
|
LAST_BATCH_SENT = 7;
|
|
FINISHED = 8;
|
|
}
|
|
|
|
// Represents any part of the status report that isn't idempotent. If the executor thinks
|
|
// the report failed, we'll retransmit these parts, and this allows us to keep them
|
|
// associated with their original sequence number so that if the coordinator actually did
|
|
// receive the original report it won't reapply them.
|
|
message StatefulStatusPB {
|
|
// Sequence number prevents out-of-order or duplicated updates from being applied.
|
|
// 'report_seq_no' will be <= the 'report_seq_no' in the FragmentInstanceExecStatusPB
|
|
// that contains this StatefulStatusPB.
|
|
optional int64 report_seq_no = 1;
|
|
|
|
// Map of TErrorCode to ErrorLogEntryPB; New errors that have not been reported to
|
|
// the coordinator by this fragment instance. Not idempotent.
|
|
map<int32, ErrorLogEntryPB> error_log = 2;
|
|
}
|
|
|
|
message FragmentInstanceExecStatusPB {
|
|
// Sequence number prevents out-of-order or duplicated updates from being applied.
|
|
optional int64 report_seq_no = 1;
|
|
|
|
// The ID of the fragment instance which this report contains
|
|
optional UniqueIdPB fragment_instance_id = 2;
|
|
|
|
// If true, fragment finished executing.
|
|
optional bool done = 3;
|
|
|
|
// The current state of this fragment instance's execution.
|
|
optional FInstanceExecStatePB current_state = 4;
|
|
|
|
// Cumulative structural changes made by the table sink of this fragment
|
|
// instance. This is sent only when 'done' above is true. Not idempotent.
|
|
optional DmlExecStatusPB dml_exec_status = 5;
|
|
|
|
// The non-idempotent parts of the report, and any prior reports that are not known to
|
|
// have been received by the coordinator.
|
|
repeated StatefulStatusPB stateful_report = 6;
|
|
}
|
|
|
|
message ReportExecStatusRequestPB {
|
|
// The query id which this report is for.
|
|
optional UniqueIdPB query_id = 1;
|
|
|
|
// same as TExecQueryFInstancesParams.coord_state_idx
|
|
optional int32 coord_state_idx = 2;
|
|
|
|
repeated FragmentInstanceExecStatusPB instance_exec_status = 3;
|
|
|
|
// Sidecar index of the cumulative profiles of all fragment instances
|
|
// in instance_exec_status.
|
|
optional int32 thrift_profiles_sidecar_idx = 4;
|
|
|
|
// Cumulative status for this backend.
|
|
// See QueryState::overall_status for details.
|
|
optional StatusPB overall_status = 5;
|
|
|
|
// The fragment instance id of the first failed fragment instance. This corresponds to
|
|
// the fragment which sets 'overall_status' above. Not set if 'overall_status' is a
|
|
// general error (e.g. failure to start fragment instances).
|
|
optional UniqueIdPB fragment_instance_id = 6;
|
|
}
|
|
|
|
message ReportExecStatusResponsePB {
|
|
optional StatusPB status = 1;
|
|
}
|
|
|
|
message CancelQueryFInstancesRequestPB {
|
|
// The query id of the query being cancelled.
|
|
optional UniqueIdPB query_id = 1;
|
|
}
|
|
|
|
message CancelQueryFInstancesResponsePB {
|
|
optional StatusPB status = 1;
|
|
}
|
|
|
|
service ControlService {
|
|
// Override the default authorization method.
|
|
option (kudu.rpc.default_authz_method) = "Authorize";
|
|
|
|
// Update the coordinator with the query status of the backend.
|
|
rpc ReportExecStatus(ReportExecStatusRequestPB) returns (ReportExecStatusResponsePB);
|
|
|
|
// Called by coordinator to cancel execution of a single query's fragment instances,
|
|
// which the coordinator initiated with a prior call to ExecQueryFInstances.
|
|
// Cancellation is asynchronous (in the sense that this call may return before the
|
|
// fragment instance has completely stopped executing).
|
|
rpc CancelQueryFInstances(CancelQueryFInstancesRequestPB)
|
|
returns (CancelQueryFInstancesResponsePB);
|
|
} |