Files
impala/common/protobuf/data_stream_service.proto
Tim Armstrong f38da0df8e IMPALA-4400: aggregate runtime filters locally
Move RuntimeFilterBank to QueryState(). Implement fine-grained
locking for each filter to mitigate any increased lock
contention from the change.

Make RuntimeFilterBank handle multiple producers of the
same filter, e.g. multiple instances of a partitioned
join. It computes the expected number of filters upfront
then sends the filter to the coordinator once all the
local instances have been merged together. The merging
can be done in parallel locally to improve latency of
filter propagation.

Add Or() methods to MinMaxFilter and BloomFilter, since
we now need to merge those, not just the thrift versions.

Update coordinator filter routing to expect only one
instance of a filter from each producer backend and
to only send one instance to each consumer backend
(instead of sending one per fragment).

Update memory reservations and estimates to be lower
to account for sharing of filters between fragment
instances. mt_dop plans are modified to show these
shared and non-shared resources separately.

Enable waiting for runtime filters for kudu scanner
with mt_dop.

Made min/max filters const-correct.

Testing
* Added unit tests for Or() methods.
* Added some additional e2e test coverage for mt_dop queries
* Updated planner tests with new estimates and reservation.
* Ran a single node 3-impalad stress test with TPC-H kudu and
  TPC-DS parquet.
* Ran exhaustive tests.
* Ran core tests with ASAN.

Perf
* Did a single-node perf run on TPC-H with default settings. No perf change.
* Single-node perf run with mt_dop=8 showed significant speedups:

+----------+-----------------------+---------+------------+------------+----------------+
| Workload | File Format           | Avg (s) | Delta(Avg) | GeoMean(s) | Delta(GeoMean) |
+----------+-----------------------+---------+------------+------------+----------------+
| TPCH(30) | parquet / none / none | 10.14   | -7.29%     | 5.05       | -11.68%        |
+----------+-----------------------+---------+------------+------------+----------------+

+----------+----------+-----------------------+--------+-------------+------------+-----------+----------------+-------+----------------+---------+---------+
| Workload | Query    | File Format           | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%) | Base StdDev(%) | Iters | Median Diff(%) | MW Zval | Tval    |
+----------+----------+-----------------------+--------+-------------+------------+-----------+----------------+-------+----------------+---------+---------+
| TPCH(30) | TPCH-Q7  | parquet / none / none | 38.87  | 38.44       |   +1.13%   |   7.17%   | * 10.92% *     | 20    |   +0.72%       | 0.72    | 0.39    |
| TPCH(30) | TPCH-Q1  | parquet / none / none | 4.28   | 4.26        |   +0.50%   |   1.92%   |   1.09%        | 20    |   +0.03%       | 0.31    | 1.01    |
| TPCH(30) | TPCH-Q22 | parquet / none / none | 2.32   | 2.32        |   +0.05%   |   2.01%   |   1.89%        | 20    |   -0.03%       | -0.36   | 0.08    |
| TPCH(30) | TPCH-Q15 | parquet / none / none | 3.73   | 3.75        |   -0.42%   |   0.84%   |   1.05%        | 20    |   -0.25%       | -0.77   | -1.40   |
| TPCH(30) | TPCH-Q13 | parquet / none / none | 9.80   | 9.83        |   -0.38%   |   0.51%   |   0.80%        | 20    |   -0.32%       | -1.30   | -1.81   |
| TPCH(30) | TPCH-Q2  | parquet / none / none | 1.98   | 2.00        |   -1.32%   |   1.74%   |   2.81%        | 20    |   -0.64%       | -1.71   | -1.79   |
| TPCH(30) | TPCH-Q6  | parquet / none / none | 1.22   | 1.25        |   -2.14%   |   2.66%   |   4.15%        | 20    |   -0.96%       | -2.00   | -1.95   |
| TPCH(30) | TPCH-Q19 | parquet / none / none | 5.13   | 5.22        |   -1.65%   |   1.20%   |   1.40%        | 20    |   -1.76%       | -3.34   | -4.02   |
| TPCH(30) | TPCH-Q16 | parquet / none / none | 2.46   | 2.56        |   -4.13%   |   2.49%   |   1.99%        | 20    |   -4.31%       | -4.04   | -5.94   |
| TPCH(30) | TPCH-Q9  | parquet / none / none | 81.63  | 85.07       |   -4.05%   |   4.94%   |   3.06%        | 20    |   -5.46%       | -3.28   | -3.21   |
| TPCH(30) | TPCH-Q10 | parquet / none / none | 5.07   | 5.50        | I -7.92%   |   0.96%   |   1.33%        | 20    | I -8.51%       | -5.27   | -22.14  |
| TPCH(30) | TPCH-Q21 | parquet / none / none | 24.00  | 26.24       | I -8.57%   |   0.46%   |   0.38%        | 20    | I -9.34%       | -5.27   | -67.47  |
| TPCH(30) | TPCH-Q18 | parquet / none / none | 8.66   | 9.50        | I -8.86%   |   0.62%   |   0.44%        | 20    | I -9.75%       | -5.27   | -55.17  |
| TPCH(30) | TPCH-Q3  | parquet / none / none | 6.01   | 6.70        | I -10.19%  |   1.01%   |   0.90%        | 20    | I -11.25%      | -5.27   | -35.76  |
| TPCH(30) | TPCH-Q12 | parquet / none / none | 2.98   | 3.39        | I -12.23%  |   1.48%   |   1.48%        | 20    | I -13.56%      | -5.27   | -27.75  |
| TPCH(30) | TPCH-Q11 | parquet / none / none | 1.69   | 2.00        | I -15.55%  |   1.63%   |   1.47%        | 20    | I -18.09%      | -5.27   | -34.60  |
| TPCH(30) | TPCH-Q4  | parquet / none / none | 2.42   | 2.87        | I -15.69%  |   1.48%   |   1.26%        | 20    | I -18.61%      | -5.27   | -39.50  |
| TPCH(30) | TPCH-Q14 | parquet / none / none | 4.64   | 6.27        | I -26.02%  |   1.35%   |   0.73%        | 20    | I -35.37%      | -5.27   | -94.07  |
| TPCH(30) | TPCH-Q20 | parquet / none / none | 3.19   | 4.37        | I -27.01%  |   1.54%   |   0.99%        | 20    | I -36.85%      | -5.27   | -80.74  |
| TPCH(30) | TPCH-Q5  | parquet / none / none | 4.57   | 6.39        | I -28.36%  |   1.04%   |   0.75%        | 20    | I -39.56%      | -5.27   | -120.02 |
| TPCH(30) | TPCH-Q17 | parquet / none / none | 3.15   | 4.71        | I -33.06%  |   1.59%   |   1.31%        | 20    | I -49.43%      | -5.27   | -87.64  |
| TPCH(30) | TPCH-Q8  | parquet / none / none | 5.25   | 7.95        | I -33.95%  |   0.95%   |   0.53%        | 20    | I -51.11%      | -5.27   | -185.02 |
+----------+----------+-----------------------+--------+-------------+------------+-----------+----------------+-------+----------------+---------+---------+

Change-Id: Iabeeab5eec869ff2197250ad41c1eb5551704acc
Reviewed-on: http://gerrit.cloudera.org:8080/14538
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2020-01-29 00:58:24 +00:00

170 lines
5.6 KiB
Protocol Buffer

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
syntax="proto2";
package impala;
import "common.proto";
import "row_batch.proto";
import "kudu/rpc/rpc_header.proto";
// All fields are required in V1.
message TransmitDataRequestPB {
// The fragment instance id of the receiver.
optional UniqueIdPB dest_fragment_instance_id = 1;
// Sender instance id, unique within a fragment.
optional int32 sender_id = 2;
// PlanNodeId of the exchange node which owns the receiver.
optional int32 dest_node_id = 3;
// The header which contains the meta-data of the row batch.
optional RowBatchHeaderPB row_batch_header = 4;
// The sidecar index of tuple offsets' buffer which is an array of int32 containing the
// offsets of tuples into the buffer pointed to by tuple data's sidecar below. There are
// num_rows * num_tuples_per_row offsets in total. An offset of -1 records a NULL.
optional int32 tuple_offsets_sidecar_idx = 5;
// The sidecar index of the tuple's data which is a (compressed) row batch.
// The details of the row batch (e.g. # of rows) is in 'row_batch_header' above.
optional int32 tuple_data_sidecar_idx = 6;
}
// All fields are required in V1.
message TransmitDataResponsePB {
// Status::OK() on success; Error status on failure.
optional StatusPB status = 1;
// Latency for response in the receiving daemon in nanoseconds.
optional int64 receiver_latency_ns = 2;
}
// All fields are required in V1.
message EndDataStreamRequestPB {
// The fragment instance id of the receiver.
optional UniqueIdPB dest_fragment_instance_id = 1;
// Sender instance id, unique within a fragment.
optional int32 sender_id = 2;
// PlanNodeId of the exchange node which owns the receiver.
optional int32 dest_node_id = 3;
}
// All fields are required in V1.
message EndDataStreamResponsePB {
optional StatusPB status = 1;
// Latency for response in the receiving daemon in nanoseconds.
optional int64 receiver_latency_ns = 2;
}
message BloomFilterPB {
// Log_2 of the bufferpool space required for this filter.
// See BloomFilter::BloomFilter() for details.
optional int32 log_bufferpool_space = 1;
// If always_true or always_false is true, 'directory' and 'log_bufferpool_space' are
// not meaningful.
optional bool always_true = 2;
optional bool always_false = 3;
// The sidecar index associated with the directory of a Bloom filter.
// A directory is a list of buckets representing the Bloom Filter contents,
// laid out contiguously in one string for efficiency of (de)serialisation.
// See BloomFilter::Bucket and BloomFilter::directory_.
optional int32 directory_sidecar_idx = 4;
}
message MinMaxFilterPB {
// If true, filter allows all elements to pass and 'min'/'max' will not be set.
optional bool always_true = 1;
// If true, filter doesn't allow any elements to pass and 'min'/'max' will not be set.
optional bool always_false = 2;
optional ColumnValuePB min = 3;
optional ColumnValuePB max = 4;
}
message UpdateFilterParamsPB {
// Filter ID, unique within a query.
optional int32 filter_id = 1;
// Query that this filter is for.
optional UniqueIdPB query_id = 2;
optional BloomFilterPB bloom_filter = 3;
optional MinMaxFilterPB min_max_filter = 4;
}
message UpdateFilterResultPB {
optional StatusPB status = 1;
// Latency for response in the receiving daemon in nanoseconds.
optional int64 receiver_latency_ns = 2;
}
message PublishFilterParamsPB {
// Filter ID, unique within a query.
optional int32 filter_id = 1;
// Query that this filter is for.
optional UniqueIdPB dst_query_id = 2;
// Actual bloom_filter payload
optional BloomFilterPB bloom_filter = 3;
// Actual min_max_filter payload
optional MinMaxFilterPB min_max_filter = 4;
}
message PublishFilterResultPB {
optional StatusPB status = 1;
// Latency for response in the receiving daemon in nanoseconds.
optional int64 receiver_latency_ns = 2;
}
// Handles data transmission between fragment instances.
service DataStreamService {
// Override the default authorization method.
option (kudu.rpc.default_authz_method) = "Authorize";
// Called by sender to transmit a single row batch. Returns error indication
// if params.fragmentId or params.destNodeId are unknown or if data couldn't
// be read.
rpc TransmitData(TransmitDataRequestPB) returns (TransmitDataResponsePB);
// Called by a sender to close the channel between fragment instances.
rpc EndDataStream(EndDataStreamRequestPB) returns (EndDataStreamResponsePB);
// Called by fragment instances that produce local runtime filters to deliver them to
// the coordinator for aggregation and broadcast.
rpc UpdateFilter(UpdateFilterParamsPB) returns (UpdateFilterResultPB);
// Called by the coordinator to deliver global runtime filters to fragments for
// application at plan nodes.
rpc PublishFilter(PublishFilterParamsPB) returns (PublishFilterResultPB);
}