mirror of
https://github.com/apache/impala.git
synced 2025-12-25 02:03:09 -05:00
Previously the aggregation and propagation of a runtime filter in Impala is implemented using Thrift RPC, which suffers from a disadvantage that the number of connections in a cluster grows with both the number of queries and cluster size. This patch ports the functions that implement the aggregation and propagation of a runtime filter, i.e., UpdateFilter() and PublishFilter(), respctively, to KRPC, which requires only one connection per direction between every pair of hosts, thus reducing the number of connections in a cluster. In addition, this patch also incorporates KRPC sidecar when the runtime filter is a Bloom filter. KRPC sidecar eliminates the need for an extra copy of the Bloom filter contents when a Bloom filter is serialized to be transmitted and hence reduces the serialization overhead. Due to the incorporation of KRPC sidecar, a SpinLock is also added to prevent a BloomFilter from being deallocated before its associated KRPC call finishes. Two related BE tests bloom-filter-test.cc and bloom-filter-benchmark.cc are also modified accordingly because of the changes to the signatures of some functions in BloomFilter. Testing: This patch has passed the exhaustive tests. Change-Id: I11a2f92a91750c2470fba082c30f97529524b9c8 Reviewed-on: http://gerrit.cloudera.org:8080/13882 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Reviewed-on: http://gerrit.cloudera.org:8080/14974 Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com> Tested-by: Tim Armstrong <tarmstrong@cloudera.com>
173 lines
5.7 KiB
Protocol Buffer
173 lines
5.7 KiB
Protocol Buffer
// Licensed to the Apache Software Foundation (ASF) under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing,
|
|
// software distributed under the License is distributed on an
|
|
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
// KIND, either express or implied. See the License for the
|
|
// specific language governing permissions and limitations
|
|
// under the License.
|
|
//
|
|
|
|
syntax="proto2";
|
|
|
|
package impala;
|
|
|
|
import "common.proto";
|
|
import "row_batch.proto";
|
|
|
|
import "kudu/rpc/rpc_header.proto";
|
|
|
|
// All fields are required in V1.
|
|
message TransmitDataRequestPB {
|
|
// The fragment instance id of the receiver.
|
|
optional UniqueIdPB dest_fragment_instance_id = 1;
|
|
|
|
// Sender instance id, unique within a fragment.
|
|
optional int32 sender_id = 2;
|
|
|
|
// PlanNodeId of the exchange node which owns the receiver.
|
|
optional int32 dest_node_id = 3;
|
|
|
|
// The header which contains the meta-data of the row batch.
|
|
optional RowBatchHeaderPB row_batch_header = 4;
|
|
|
|
// The sidecar index of tuple offsets' buffer which is an array of int32 containing the
|
|
// offsets of tuples into the buffer pointed to by tuple data's sidecar below. There are
|
|
// num_rows * num_tuples_per_row offsets in total. An offset of -1 records a NULL.
|
|
optional int32 tuple_offsets_sidecar_idx = 5;
|
|
|
|
// The sidecar index of the tuple's data which is a (compressed) row batch.
|
|
// The details of the row batch (e.g. # of rows) is in 'row_batch_header' above.
|
|
optional int32 tuple_data_sidecar_idx = 6;
|
|
}
|
|
|
|
// All fields are required in V1.
|
|
message TransmitDataResponsePB {
|
|
// Status::OK() on success; Error status on failure.
|
|
optional StatusPB status = 1;
|
|
|
|
// Latency for response in the receiving daemon in nanoseconds.
|
|
optional int64 receiver_latency_ns = 2;
|
|
}
|
|
|
|
// All fields are required in V1.
|
|
message EndDataStreamRequestPB {
|
|
// The fragment instance id of the receiver.
|
|
optional UniqueIdPB dest_fragment_instance_id = 1;
|
|
|
|
// Sender instance id, unique within a fragment.
|
|
optional int32 sender_id = 2;
|
|
|
|
// PlanNodeId of the exchange node which owns the receiver.
|
|
optional int32 dest_node_id = 3;
|
|
}
|
|
|
|
// All fields are required in V1.
|
|
message EndDataStreamResponsePB {
|
|
optional StatusPB status = 1;
|
|
|
|
// Latency for response in the receiving daemon in nanoseconds.
|
|
optional int64 receiver_latency_ns = 2;
|
|
}
|
|
|
|
message BloomFilterPB {
|
|
// Log_2 of the bufferpool space required for this filter.
|
|
// See BloomFilter::BloomFilter() for details.
|
|
optional int32 log_bufferpool_space = 1;
|
|
|
|
// If always_true or always_false is true, 'directory' and 'log_bufferpool_space' are
|
|
// not meaningful.
|
|
optional bool always_true = 2;
|
|
optional bool always_false = 3;
|
|
|
|
// The sidecar index associated with the directory of a Bloom filter.
|
|
// A directory is a list of buckets representing the Bloom Filter contents,
|
|
// laid out contiguously in one string for efficiency of (de)serialisation.
|
|
// See BloomFilter::Bucket and BloomFilter::directory_.
|
|
optional int32 directory_sidecar_idx = 4;
|
|
}
|
|
|
|
message MinMaxFilterPB {
|
|
// If true, filter allows all elements to pass and 'min'/'max' will not be set.
|
|
optional bool always_true = 1;
|
|
|
|
// If true, filter doesn't allow any elements to pass and 'min'/'max' will not be set.
|
|
optional bool always_false = 2;
|
|
|
|
optional ColumnValuePB min = 3;
|
|
optional ColumnValuePB max = 4;
|
|
}
|
|
|
|
message UpdateFilterParamsPB {
|
|
// Filter ID, unique within a query.
|
|
optional int32 filter_id = 1;
|
|
|
|
// Query that this filter is for.
|
|
optional UniqueIdPB query_id = 2;
|
|
|
|
optional BloomFilterPB bloom_filter = 3;
|
|
|
|
optional MinMaxFilterPB min_max_filter = 4;
|
|
}
|
|
|
|
message UpdateFilterResultPB {
|
|
optional StatusPB status = 1;
|
|
|
|
// Latency for response in the receiving daemon in nanoseconds.
|
|
optional int64 receiver_latency_ns = 2;
|
|
}
|
|
|
|
message PublishFilterParamsPB {
|
|
// Filter ID, unique within a query.
|
|
optional int32 filter_id = 1;
|
|
|
|
// Query that this filter is for.
|
|
optional UniqueIdPB dst_query_id = 2;
|
|
|
|
// Index of fragment to receive this filter
|
|
optional int32 dst_fragment_idx = 3;
|
|
|
|
// Actual bloom_filter payload
|
|
optional BloomFilterPB bloom_filter = 4;
|
|
|
|
// Actual min_max_filter payload
|
|
optional MinMaxFilterPB min_max_filter = 5;
|
|
}
|
|
|
|
message PublishFilterResultPB {
|
|
optional StatusPB status = 1;
|
|
|
|
// Latency for response in the receiving daemon in nanoseconds.
|
|
optional int64 receiver_latency_ns = 2;
|
|
}
|
|
|
|
// Handles data transmission between fragment instances.
|
|
service DataStreamService {
|
|
// Override the default authorization method.
|
|
option (kudu.rpc.default_authz_method) = "Authorize";
|
|
|
|
// Called by sender to transmit a single row batch. Returns error indication
|
|
// if params.fragmentId or params.destNodeId are unknown or if data couldn't
|
|
// be read.
|
|
rpc TransmitData(TransmitDataRequestPB) returns (TransmitDataResponsePB);
|
|
|
|
// Called by a sender to close the channel between fragment instances.
|
|
rpc EndDataStream(EndDataStreamRequestPB) returns (EndDataStreamResponsePB);
|
|
|
|
// Called by fragment instances that produce local runtime filters to deliver them to
|
|
// the coordinator for aggregation and broadcast.
|
|
rpc UpdateFilter(UpdateFilterParamsPB) returns (UpdateFilterResultPB);
|
|
|
|
// Called by the coordinator to deliver global runtime filters to fragments for
|
|
// application at plan nodes.
|
|
rpc PublishFilter(PublishFilterParamsPB) returns (PublishFilterResultPB);
|
|
}
|