Files
impala/common/protobuf/data_stream_service.proto
Riza Suminto 09d2f10f4d IMPALA-13040: Add waiting mechanism in UpdateFilterFromRemote
It is possible to have UpdateFilterFromRemote RPC arrive to an impalad
executor before QueryState of the destination query is created or
complete initialization. This patch add wait mechanism in
UpdateFilterFromRemote RPC endpoint to wait for few miliseconds until
QueryState exist and complete initialization.

The wait time is fixed at 500ms, with exponential sleep period in
between. If wait time passed and QueryState still not found or
initialized, UpdateFilterFromRemote RPC is deemed fail and query
execution move on without complete filter.

Testing:
- Add BE tests in network-util-test.cc
- Add test_runtime_filter_aggregation.py::TestLateQueryStateInit
- Pass exhastive runs of test_runtime_filter_aggregation.py,
  test_query_live.py, and test_query_log.py

Change-Id: I156d1f0c694b91ba34be70bc53ae9bacf924b3b9
Reviewed-on: http://gerrit.cloudera.org:8080/21383
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2024-05-10 02:08:07 +00:00

188 lines
6.2 KiB
Protocol Buffer

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
syntax="proto2";
package impala;
import "common.proto";
import "row_batch.proto";
import "kudu/rpc/rpc_header.proto";
// All fields are required in V1.
message TransmitDataRequestPB {
// The fragment instance id of the receiver.
optional UniqueIdPB dest_fragment_instance_id = 1;
// Sender instance id, unique within a fragment.
optional int32 sender_id = 2;
// PlanNodeId of the exchange node which owns the receiver.
optional int32 dest_node_id = 3;
// The header which contains the meta-data of the row batch.
optional RowBatchHeaderPB row_batch_header = 4;
// The sidecar index of tuple offsets' buffer which is an array of int32 containing the
// offsets of tuples into the buffer pointed to by tuple data's sidecar below. There are
// num_rows * num_tuples_per_row offsets in total. An offset of -1 records a NULL.
optional int32 tuple_offsets_sidecar_idx = 5;
// The sidecar index of the tuple's data which is a (compressed) row batch.
// The details of the row batch (e.g. # of rows) is in 'row_batch_header' above.
optional int32 tuple_data_sidecar_idx = 6;
}
// All fields are required in V1.
message TransmitDataResponsePB {
// Status::OK() on success; Error status on failure.
optional StatusPB status = 1;
// Latency for response in the receiving daemon in nanoseconds.
optional int64 receiver_latency_ns = 2;
}
// All fields are required in V1.
message EndDataStreamRequestPB {
// The fragment instance id of the receiver.
optional UniqueIdPB dest_fragment_instance_id = 1;
// Sender instance id, unique within a fragment.
optional int32 sender_id = 2;
// PlanNodeId of the exchange node which owns the receiver.
optional int32 dest_node_id = 3;
}
// All fields are required in V1.
message EndDataStreamResponsePB {
optional StatusPB status = 1;
// Latency for response in the receiving daemon in nanoseconds.
optional int64 receiver_latency_ns = 2;
}
message BloomFilterPB {
// Log_2 of the bufferpool space required for this filter.
// See BloomFilter::BloomFilter() for details.
optional int32 log_bufferpool_space = 1;
// If always_true or always_false is true, 'directory' and 'log_bufferpool_space' are
// not meaningful.
optional bool always_true = 2;
optional bool always_false = 3;
// The sidecar index associated with the directory of a Bloom filter.
// A directory is a list of buckets representing the Bloom Filter contents,
// laid out contiguously in one string for efficiency of (de)serialisation.
// See BloomFilter::Bucket and BloomFilter::directory_.
optional int32 directory_sidecar_idx = 4;
}
message MinMaxFilterPB {
// If true, filter allows all elements to pass and 'min'/'max' will not be set.
optional bool always_true = 1;
// If true, filter doesn't allow any elements to pass and 'min'/'max' will not be set.
optional bool always_false = 2;
optional ColumnValuePB min = 3;
optional ColumnValuePB max = 4;
}
message InListFilterPB {
optional bool always_true = 1;
optional bool contains_null = 2;
repeated ColumnValuePB value = 3;
}
message UpdateFilterParamsPB {
// Filter ID, unique within a query.
optional int32 filter_id = 1;
// Query that this filter is for.
optional UniqueIdPB query_id = 2;
optional BloomFilterPB bloom_filter = 3;
optional MinMaxFilterPB min_max_filter = 4;
optional InListFilterPB in_list_filter = 5;
// Remaining filter wait time as understood by sender.
optional int32 remaining_filter_wait_time_ms = 6;
}
message UpdateFilterResultPB {
optional StatusPB status = 1;
// Latency for response in the receiving daemon in nanoseconds.
optional int64 receiver_latency_ns = 2;
}
message PublishFilterParamsPB {
// Filter ID, unique within a query.
optional int32 filter_id = 1;
// Query that this filter is for.
optional UniqueIdPB dst_query_id = 2;
// Actual bloom_filter payload
optional BloomFilterPB bloom_filter = 3;
// Actual min_max_filter payload
optional MinMaxFilterPB min_max_filter = 4;
// Actual in_list_filter payload
optional InListFilterPB in_list_filter = 5;
}
message PublishFilterResultPB {
optional StatusPB status = 1;
// Latency for response in the receiving daemon in nanoseconds.
optional int64 receiver_latency_ns = 2;
}
// Handles data transmission between fragment instances.
service DataStreamService {
// Override the default authorization method.
option (kudu.rpc.default_authz_method) = "Authorize";
// Called by sender to transmit a single row batch. Returns error indication
// if params.fragmentId or params.destNodeId are unknown or if data couldn't
// be read.
rpc TransmitData(TransmitDataRequestPB) returns (TransmitDataResponsePB);
// Called by a sender to close the channel between fragment instances.
rpc EndDataStream(EndDataStreamRequestPB) returns (EndDataStreamResponsePB);
// Called by fragment instances that produce local runtime filters to deliver them to
// the coordinator for aggregation and broadcast.
rpc UpdateFilter(UpdateFilterParamsPB) returns (UpdateFilterResultPB);
// Called by fragment instances that produce local runtime filters to deliver them to
// the aggregator backend for intermediate aggregation.
rpc UpdateFilterFromRemote(UpdateFilterParamsPB) returns (UpdateFilterResultPB);
// Called by the coordinator to deliver global runtime filters to fragments for
// application at plan nodes.
rpc PublishFilter(PublishFilterParamsPB) returns (PublishFilterResultPB);
}