From 39cc4b6bf45a172c3fdcd6a9cc42eaadfcf3ae71 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Tue, 31 Aug 2021 14:14:18 +0800 Subject: [PATCH] IMPALA-2581: LIMIT can be propagated down into some aggregations This patch contains 2 parts: 1. When both conditions below are true, push down limit to pre-aggregation a) aggregation node has no aggregate function b) aggregation node has no predicate 2. finish aggregation when number of unique keys of hash table has exceeded the limit. Sample queries: SELECT DISTINCT f FROM t LIMIT n Can pass the LIMIT all the way down to the pre-aggregation, which leads to a nearly unbounded speedup on these queries in large tables when n is low. Testing: Add test targeted-perf/queries/aggregation.test Pass core test Change-Id: I930a6cb203615acfc03f23118d1bc1f0ea360995 Reviewed-on: http://gerrit.cloudera.org:8080/17821 Reviewed-by: Qifan Chen Tested-by: Impala Public Jenkins --- be/src/exec/aggregation-node-base.cc | 4 ++- be/src/exec/aggregation-node-base.h | 3 +++ be/src/exec/aggregation-node.cc | 12 +++++++++ be/src/exec/aggregator.h | 3 +++ be/src/exec/grouping-aggregator.cc | 25 ++++++++++++++++++- be/src/exec/grouping-aggregator.h | 7 +++++- be/src/exec/non-grouping-aggregator.h | 2 ++ be/src/exec/streaming-aggregation-node.cc | 19 ++++++++++++-- common/thrift/PlanNodes.thrift | 3 +++ .../impala/planner/AggregationNode.java | 10 ++++++++ .../impala/planner/DistributedPlanner.java | 4 ++- .../apache/impala/planner/ExchangeNode.java | 5 +++- .../PlannerTest/resource-requirements.test | 15 +++++------ .../PlannerTest/setoperation-rewrite.test | 3 ++- .../queries/PlannerTest/subquery-rewrite.test | 3 ++- .../queries/PlannerTest/tpcds/tpcds-q06.test | 10 +++++--- .../queries/PlannerTest/tpcds/tpcds-q54.test | 20 +++++++++------ .../queries/QueryTest/spilling.test | 17 ++++++++++++- .../targeted-perf/queries/aggregation.test | 11 ++++++++ 19 files changed, 147 insertions(+), 29 deletions(-) diff --git a/be/src/exec/aggregation-node-base.cc b/be/src/exec/aggregation-node-base.cc index 0cad9a80f..33e285a78 100644 --- a/be/src/exec/aggregation-node-base.cc +++ b/be/src/exec/aggregation-node-base.cc @@ -82,11 +82,13 @@ AggregationNodeBase::AggregationNodeBase( static_cast(agg); DCHECK(grouping_config != nullptr); node.reset(new GroupingAggregator(this, pool_, *grouping_config, - pnode.tnode_->agg_node.estimated_input_cardinality)); + pnode.tnode_->agg_node.estimated_input_cardinality, + pnode.tnode_->agg_node.fast_limit_check)); } aggs_.push_back(std::move(node)); runtime_profile_->AddChild(aggs_[i]->runtime_profile()); } + fast_limit_check_ = pnode.tnode_->agg_node.fast_limit_check; } Status AggregationNodeBase::Prepare(RuntimeState* state) { diff --git a/be/src/exec/aggregation-node-base.h b/be/src/exec/aggregation-node-base.h index 6d148b98d..8bcecfcbb 100644 --- a/be/src/exec/aggregation-node-base.h +++ b/be/src/exec/aggregation-node-base.h @@ -67,6 +67,9 @@ class AggregationNodeBase : public ExecNode { /// END: Members that must be Reset() ///////////////////////////////////////// + /// If true, aggregation can be done ahead of time without computing all the input data + bool fast_limit_check_ = false; + /// Splits the rows of 'batch' up according to which tuple of the row is non-null such /// that a row with tuple 'i' non-null is copied into the batch 'mini_batches[i]'. /// It is expected that all rows of 'batch' have exactly 1 non-null tuple. diff --git a/be/src/exec/aggregation-node.cc b/be/src/exec/aggregation-node.cc index 8db79259a..603442e77 100644 --- a/be/src/exec/aggregation-node.cc +++ b/be/src/exec/aggregation-node.cc @@ -69,6 +69,18 @@ Status AggregationNode::Open(RuntimeState* state) { if (num_aggs == 1) { RETURN_IF_ERROR(aggs_[0]->AddBatch(state, &child_batch)); child_batch.Reset(); + if (fast_limit_check_) { + DCHECK(limit() > -1); + if (aggs_[0]->GetNumKeys() >= limit()) { + eos = true; + runtime_profile_->AddInfoString("FastLimitCheckExceededRows", + SimpleItoa(aggs_[0]->GetNumKeys() - limit())); + VLOG_QUERY << Substitute("the number of rows ($0) returned from the " + "aggregation node has exceeded the limit of $1", aggs_[0]->GetNumKeys(), + limit()); + break; + } + } continue; } diff --git a/be/src/exec/aggregator.h b/be/src/exec/aggregator.h index 693716e3f..6c3cae25b 100644 --- a/be/src/exec/aggregator.h +++ b/be/src/exec/aggregator.h @@ -172,6 +172,9 @@ class Aggregator { static const char* LLVM_CLASS_NAME; + // The number of unique values that have been aggregated + virtual int64_t GetNumKeys() const = 0; + protected: /// The id of the ExecNode this Aggregator corresponds to. const int id_; diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc index f77f0df4e..1b8ebd104 100644 --- a/be/src/exec/grouping-aggregator.cc +++ b/be/src/exec/grouping-aggregator.cc @@ -134,7 +134,8 @@ static const int STREAMING_HT_MIN_REDUCTION_SIZE = sizeof(STREAMING_HT_MIN_REDUCTION) / sizeof(STREAMING_HT_MIN_REDUCTION[0]); GroupingAggregator::GroupingAggregator(ExecNode* exec_node, ObjectPool* pool, - const GroupingAggregatorConfig& config, int64_t estimated_input_cardinality) + const GroupingAggregatorConfig& config, int64_t estimated_input_cardinality, + bool needUnsetLimit) : Aggregator( exec_node, pool, config, Substitute("GroupingAggregator $0", config.agg_idx_)), hash_table_config_(*config.hash_table_config_), @@ -152,6 +153,9 @@ GroupingAggregator::GroupingAggregator(ExecNode* exec_node, ObjectPool* pool, estimated_input_cardinality_(estimated_input_cardinality), partition_pool_(new ObjectPool()) { DCHECK_EQ(PARTITION_FANOUT, 1 << NUM_PARTITIONING_BITS); + if (needUnsetLimit) { + UnsetLimit(); + } } Status GroupingAggregator::Prepare(RuntimeState* state) { @@ -1317,4 +1321,23 @@ Status GroupingAggregatorConfig::CodegenAddBatchStreamingImpl( // Instantiate required templates. template Status GroupingAggregator::AppendSpilledRow(Partition*, TupleRow*); template Status GroupingAggregator::AppendSpilledRow(Partition*, TupleRow*); + +int64_t GroupingAggregator::GetNumKeys() const { + int64_t num_keys = 0; + for (int i = 0; i < hash_partitions_.size(); ++i) { + Partition* partition = hash_partitions_[i]; + if (partition == nullptr) continue; + // We might be dealing with a rebuilt spilled partition, where all partitions are + // pointing to a single in-memory partition, so make sure we only proceed for the + // right partition. + if (i != partition->idx) continue; + if (!partition->is_spilled()) { + if (partition->hash_tbl == nullptr) { + continue; + } + num_keys += partition->hash_tbl->size(); + } + } + return num_keys; +} } // namespace impala diff --git a/be/src/exec/grouping-aggregator.h b/be/src/exec/grouping-aggregator.h index f9f7d0d28..b71ba9885 100644 --- a/be/src/exec/grouping-aggregator.h +++ b/be/src/exec/grouping-aggregator.h @@ -192,7 +192,8 @@ class GroupingAggregatorConfig : public AggregatorConfig { class GroupingAggregator : public Aggregator { public: GroupingAggregator(ExecNode* exec_node, ObjectPool* pool, - const GroupingAggregatorConfig& config, int64_t estimated_input_cardinality); + const GroupingAggregatorConfig& config, int64_t estimated_input_cardinality, + bool needUnsetLimit); virtual Status Prepare(RuntimeState* state) override; virtual Status Open(RuntimeState* state) override; @@ -212,6 +213,10 @@ class GroupingAggregator : public Aggregator { virtual std::string DebugString(int indentation_level = 0) const override; virtual void DebugString(int indentation_level, std::stringstream* out) const override; + virtual int64_t GetNumKeys() const override; + + void UnsetLimit() { limit_ = -1; } + private: struct Partition; diff --git a/be/src/exec/non-grouping-aggregator.h b/be/src/exec/non-grouping-aggregator.h index e50f60986..47b461f7a 100644 --- a/be/src/exec/non-grouping-aggregator.h +++ b/be/src/exec/non-grouping-aggregator.h @@ -97,6 +97,8 @@ class NonGroupingAggregator : public Aggregator { virtual std::string DebugString(int indentation_level = 0) const override; virtual void DebugString(int indentation_level, std::stringstream* out) const override; + virtual int64_t GetNumKeys() const override { return 1; } + private: /// MemPool used to allocate memory for 'singleton_output_tuple_'. The ownership of the /// pool's memory is transferred to the output batch on eos. The pool should not be diff --git a/be/src/exec/streaming-aggregation-node.cc b/be/src/exec/streaming-aggregation-node.cc index ef34a0b1d..88bd2431f 100644 --- a/be/src/exec/streaming-aggregation-node.cc +++ b/be/src/exec/streaming-aggregation-node.cc @@ -36,7 +36,8 @@ StreamingAggregationNode::StreamingAggregationNode( ObjectPool* pool, const AggregationPlanNode& pnode, const DescriptorTbl& descs) : AggregationNodeBase(pool, pnode, descs) { DCHECK(pnode.tnode_->conjuncts.empty()) << "Preaggs have no conjuncts"; - DCHECK(limit_ == -1) << "Preaggs have no limits"; + DCHECK(limit_ == -1 || (limit_ != -1 && fast_limit_check_)) + << "Preaggs have no limits"; for (auto& t_agg : pnode.tnode_->agg_node.aggregators) { DCHECK(t_agg.use_streaming_preaggregation); } @@ -62,7 +63,7 @@ Status StreamingAggregationNode::GetNext( RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state)); RETURN_IF_CANCELLED(state); - if (ReachedLimit()) { + if (!fast_limit_check_ && ReachedLimit()) { *eos = true; return Status::OK(); } @@ -124,6 +125,20 @@ Status StreamingAggregationNode::GetRowsStreaming( if (child_batch_processed_) { child_batch_->Reset(); } + if (fast_limit_check_) { + DCHECK(limit() > -1); + if (aggs_[0]->GetNumKeys() >= limit()) { + child_eos_ = true; + child_batch_processed_ = true; + child_batch_->Reset(); + runtime_profile_->AddInfoString("FastLimitCheckExceededRows", + SimpleItoa(aggs_[0]->GetNumKeys() - limit())); + VLOG_QUERY << Substitute("the number of rows ($0) returned from the streaming " + "aggregation node has exceeded the limit of $1",aggs_[0]->GetNumKeys(), + limit()); + break; + } + } continue; } diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift index 502a6fc1d..3c48fe8c8 100644 --- a/common/thrift/PlanNodes.thrift +++ b/common/thrift/PlanNodes.thrift @@ -475,6 +475,9 @@ struct TAggregationNode { // If true, this is the first AggregationNode in a aggregation plan with multiple // Aggregators and the entire input to this node should be passed to each Aggregator. 3: required bool replicate_input + + // Set to true if this aggregation can complete early + 4: required bool fast_limit_check } struct TSortInfo { diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java index 0a7e59e88..b9741ff7b 100644 --- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java +++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java @@ -429,6 +429,7 @@ public class AggregationNode extends PlanNode { boolean replicateInput = aggPhase_ == AggPhase.FIRST && aggInfos_.size() > 1; msg.agg_node.setReplicate_input(replicateInput); msg.agg_node.setEstimated_input_cardinality(getChild(0).getCardinality()); + msg.agg_node.setFast_limit_check(canCompleteEarly()); for (int i = 0; i < aggInfos_.size(); ++i) { AggregateInfo aggInfo = aggInfos_.get(i); List aggregateFunctions = new ArrayList<>(); @@ -641,4 +642,13 @@ public class AggregationNode extends PlanNode { public boolean isNonCorrelatedScalarSubquery() { return isNonCorrelatedScalarSubquery_; } + + // When both conditions below are true, aggregation can complete early + // a) aggregation node has no aggregate function + // b) aggregation node has no predicate + // E.g. SELECT DISTINCT f1,f2,...fn FROM t LIMIT n + public boolean canCompleteEarly() { + return isSingleClassAgg() && hasLimit() && hasGrouping() + && !multiAggInfo_.hasAggregateExprs() && getConjuncts().isEmpty(); + } } diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java index 7fbd3500d..67a376a7e 100644 --- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java @@ -931,7 +931,9 @@ public class DistributedPlanner { // if there is a limit, we need to transfer it from the pre-aggregation // node in the child fragment to the merge aggregation node in the parent long limit = node.getLimit(); - node.unsetLimit(); + if (node.getMultiAggInfo().hasAggregateExprs() || !node.getConjuncts().isEmpty()) { + node.unsetLimit(); + } node.unsetNeedsFinalize(); // place a merge aggregation step in a new fragment diff --git a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java index 29ce4f62a..c3856956f 100644 --- a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java +++ b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java @@ -82,7 +82,10 @@ public class ExchangeNode extends PlanNode { offset_ = 0; children_.add(input); // Only apply the limit at the receiver if there are multiple senders. - if (input.getFragment().isPartitioned()) limit_ = input.limit_; + if (input.getFragment().isPartitioned() && + !(input instanceof AggregationNode && !input.isBlockingNode())) { + limit_ = input.limit_; + } computeTupleIds(); } diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test index 4be80f6e5..fd42e1f27 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test @@ -6182,8 +6182,8 @@ PLAN-ROOT SINK tuple-ids=0 row-size=231B cardinality=6.00M in pipelines: 00(GETNEXT) ---- DISTRIBUTEDPLAN -Max Per-Host Resource Reservation: Memory=112.00MB Threads=4 -Per-Host Resource Estimates: Memory=905MB +Max Per-Host Resource Reservation: Memory=79.94MB Threads=4 +Per-Host Resource Estimates: Memory=441MB Analyzed query: SELECT DISTINCT * FROM tpch_parquet.lineitem LIMIT CAST(5 AS TINYINT) @@ -6200,25 +6200,26 @@ PLAN-ROOT SINK | in pipelines: 03(GETNEXT) | F01:PLAN FRAGMENT [HASH(tpch_parquet.lineitem.l_orderkey,tpch_parquet.lineitem.l_partkey,tpch_parquet.lineitem.l_suppkey,tpch_parquet.lineitem.l_linenumber,tpch_parquet.lineitem.l_quantity,tpch_parquet.lineitem.l_extendedprice,tpch_parquet.lineitem.l_discount,tpch_parquet.lineitem.l_tax,tpch_parquet.lineitem.l_returnflag,tpch_parquet.lineitem.l_linestatus,tpch_parquet.lineitem.l_shipdate,tpch_parquet.lineitem.l_commitdate,tpch_parquet.lineitem.l_receiptdate,tpch_parquet.lineitem.l_shipinstruct,tpch_parquet.lineitem.l_shipmode,tpch_parquet.lineitem.l_comment)] hosts=3 instances=3 -Per-Host Resources: mem-estimate=473.84MB mem-reservation=34.00MB thread-reservation=1 +Per-Host Resources: mem-estimate=10.02MB mem-reservation=1.94MB thread-reservation=1 03:AGGREGATE [FINALIZE] | group by: tpch_parquet.lineitem.l_orderkey, tpch_parquet.lineitem.l_partkey, tpch_parquet.lineitem.l_suppkey, tpch_parquet.lineitem.l_linenumber, tpch_parquet.lineitem.l_quantity, tpch_parquet.lineitem.l_extendedprice, tpch_parquet.lineitem.l_discount, tpch_parquet.lineitem.l_tax, tpch_parquet.lineitem.l_returnflag, tpch_parquet.lineitem.l_linestatus, tpch_parquet.lineitem.l_shipdate, tpch_parquet.lineitem.l_commitdate, tpch_parquet.lineitem.l_receiptdate, tpch_parquet.lineitem.l_shipinstruct, tpch_parquet.lineitem.l_shipmode, tpch_parquet.lineitem.l_comment | limit: 5 -| mem-estimate=463.16MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0 +| mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0 | tuple-ids=1 row-size=231B cardinality=5 | in pipelines: 03(GETNEXT), 00(OPEN) | 02:EXCHANGE [HASH(tpch_parquet.lineitem.l_orderkey,tpch_parquet.lineitem.l_partkey,tpch_parquet.lineitem.l_suppkey,tpch_parquet.lineitem.l_linenumber,tpch_parquet.lineitem.l_quantity,tpch_parquet.lineitem.l_extendedprice,tpch_parquet.lineitem.l_discount,tpch_parquet.lineitem.l_tax,tpch_parquet.lineitem.l_returnflag,tpch_parquet.lineitem.l_linestatus,tpch_parquet.lineitem.l_shipdate,tpch_parquet.lineitem.l_commitdate,tpch_parquet.lineitem.l_receiptdate,tpch_parquet.lineitem.l_shipinstruct,tpch_parquet.lineitem.l_shipmode,tpch_parquet.lineitem.l_comment)] -| mem-estimate=10.69MB mem-reservation=0B thread-reservation=0 -| tuple-ids=1 row-size=231B cardinality=6.00M +| mem-estimate=16.00KB mem-reservation=0B thread-reservation=0 +| tuple-ids=1 row-size=231B cardinality=5 | in pipelines: 00(GETNEXT) | F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 Per-Host Resources: mem-estimate=427.37MB mem-reservation=74.00MB thread-reservation=2 01:AGGREGATE [STREAMING] | group by: tpch_parquet.lineitem.l_orderkey, tpch_parquet.lineitem.l_partkey, tpch_parquet.lineitem.l_suppkey, tpch_parquet.lineitem.l_linenumber, tpch_parquet.lineitem.l_quantity, tpch_parquet.lineitem.l_extendedprice, tpch_parquet.lineitem.l_discount, tpch_parquet.lineitem.l_tax, tpch_parquet.lineitem.l_returnflag, tpch_parquet.lineitem.l_linestatus, tpch_parquet.lineitem.l_shipdate, tpch_parquet.lineitem.l_commitdate, tpch_parquet.lineitem.l_receiptdate, tpch_parquet.lineitem.l_shipinstruct, tpch_parquet.lineitem.l_shipmode, tpch_parquet.lineitem.l_comment +| limit: 5 | mem-estimate=347.37MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0 -| tuple-ids=1 row-size=231B cardinality=6.00M +| tuple-ids=1 row-size=231B cardinality=5 | in pipelines: 00(GETNEXT) | 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM] diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/setoperation-rewrite.test b/testdata/workloads/functional-planner/queries/PlannerTest/setoperation-rewrite.test index 90ae3cf37..3e699318d 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/setoperation-rewrite.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/setoperation-rewrite.test @@ -734,7 +734,8 @@ PLAN-ROOT SINK | | | 02:AGGREGATE [STREAMING] | | group by: functional.alltypestiny.id, functional.alltypestiny.bool_col, functional.alltypestiny.tinyint_col, functional.alltypestiny.smallint_col, functional.alltypestiny.int_col, functional.alltypestiny.bigint_col, functional.alltypestiny.float_col, functional.alltypestiny.double_col, functional.alltypestiny.date_string_col, functional.alltypestiny.string_col, functional.alltypestiny.timestamp_col, functional.alltypestiny.year, functional.alltypestiny.month -| | row-size=89B cardinality=2 +| | limit: 1 +| | row-size=89B cardinality=1 | | | 01:SCAN HDFS [functional.alltypestiny] | partition predicates: `year` = 2009, `month` = 2 diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test b/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test index 0381f6f10..8e83a774e 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test @@ -2978,7 +2978,8 @@ PLAN-ROOT SINK | | | 04:AGGREGATE [STREAMING] | | group by: i -| | row-size=8B cardinality=20 +| | limit: 2 +| | row-size=8B cardinality=2 | | | 01:UNION | | pass-through-operands: 02 diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q06.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q06.test index e12c46531..b1252f681 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q06.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q06.test @@ -308,15 +308,16 @@ Per-Host Resources: mem-estimate=70.86MB mem-reservation=29.06MB thread-reservat | | | 21:EXCHANGE [HASH((d_month_seq))] | | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0 -| | tuple-ids=6 row-size=4B cardinality=108 +| | tuple-ids=6 row-size=4B cardinality=1 | | in pipelines: 05(GETNEXT) | | | F05:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 | Per-Host Resources: mem-estimate=58.00MB mem-reservation=2.25MB thread-reservation=2 | 06:AGGREGATE [STREAMING] | | group by: (d_month_seq) +| | limit: 1 | | mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB thread-reservation=0 -| | tuple-ids=6 row-size=4B cardinality=108 +| | tuple-ids=6 row-size=4B cardinality=1 | | in pipelines: 05(GETNEXT) | | | 05:SCAN HDFS [tpcds_parquet.date_dim, RANDOM] @@ -586,15 +587,16 @@ Per-Instance Resources: mem-estimate=26.00MB mem-reservation=3.00MB thread-reser | | | 21:EXCHANGE [HASH((d_month_seq))] | | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0 -| | tuple-ids=6 row-size=4B cardinality=108 +| | tuple-ids=6 row-size=4B cardinality=1 | | in pipelines: 05(GETNEXT) | | | F05:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 | Per-Instance Resources: mem-estimate=26.00MB mem-reservation=2.25MB thread-reservation=1 | 06:AGGREGATE [STREAMING] | | group by: (d_month_seq) +| | limit: 1 | | mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB thread-reservation=0 -| | tuple-ids=6 row-size=4B cardinality=108 +| | tuple-ids=6 row-size=4B cardinality=1 | | in pipelines: 05(GETNEXT) | | | 05:SCAN HDFS [tpcds_parquet.date_dim, RANDOM] diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q54.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q54.test index a5fe87725..ff936a0e6 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q54.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q54.test @@ -421,15 +421,16 @@ Per-Host Resources: mem-estimate=13.66MB mem-reservation=4.94MB thread-reservati | | | 45:EXCHANGE [HASH(d_month_seq + 3)] | | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0 -| | tuple-ids=17 row-size=8B cardinality=108 +| | tuple-ids=17 row-size=8B cardinality=2 | | in pipelines: 17(GETNEXT) | | | F17:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 | Per-Host Resources: mem-estimate=58.00MB mem-reservation=2.25MB thread-reservation=2 | 18:AGGREGATE [STREAMING] | | group by: CAST(d_month_seq AS BIGINT) + CAST(3 AS BIGINT) +| | limit: 2 | | mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB thread-reservation=0 -| | tuple-ids=17 row-size=8B cardinality=108 +| | tuple-ids=17 row-size=8B cardinality=2 | | in pipelines: 17(GETNEXT) | | | 17:SCAN HDFS [tpcds_parquet.date_dim, RANDOM] @@ -481,15 +482,16 @@ Per-Host Resources: mem-estimate=13.66MB mem-reservation=4.94MB thread-reservati | | | 41:EXCHANGE [HASH(d_month_seq + 1)] | | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0 -| | tuple-ids=14 row-size=8B cardinality=108 +| | tuple-ids=14 row-size=8B cardinality=2 | | in pipelines: 14(GETNEXT) | | | F14:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 | Per-Host Resources: mem-estimate=58.00MB mem-reservation=2.25MB thread-reservation=2 | 15:AGGREGATE [STREAMING] | | group by: CAST(d_month_seq AS BIGINT) + CAST(1 AS BIGINT) +| | limit: 2 | | mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB thread-reservation=0 -| | tuple-ids=14 row-size=8B cardinality=108 +| | tuple-ids=14 row-size=8B cardinality=2 | | in pipelines: 14(GETNEXT) | | | 14:SCAN HDFS [tpcds_parquet.date_dim, RANDOM] @@ -866,15 +868,16 @@ Per-Instance Resources: mem-estimate=10.24MB mem-reservation=2.00MB thread-reser | | | 45:EXCHANGE [HASH(d_month_seq + 3)] | | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0 -| | tuple-ids=17 row-size=8B cardinality=108 +| | tuple-ids=17 row-size=8B cardinality=2 | | in pipelines: 17(GETNEXT) | | | F17:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 | Per-Instance Resources: mem-estimate=26.00MB mem-reservation=2.25MB thread-reservation=1 | 18:AGGREGATE [STREAMING] | | group by: CAST(d_month_seq AS BIGINT) + CAST(3 AS BIGINT) +| | limit: 2 | | mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB thread-reservation=0 -| | tuple-ids=17 row-size=8B cardinality=108 +| | tuple-ids=17 row-size=8B cardinality=2 | | in pipelines: 17(GETNEXT) | | | 17:SCAN HDFS [tpcds_parquet.date_dim, RANDOM] @@ -933,15 +936,16 @@ Per-Instance Resources: mem-estimate=10.24MB mem-reservation=2.00MB thread-reser | | | 41:EXCHANGE [HASH(d_month_seq + 1)] | | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0 -| | tuple-ids=14 row-size=8B cardinality=108 +| | tuple-ids=14 row-size=8B cardinality=2 | | in pipelines: 14(GETNEXT) | | | F14:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 | Per-Instance Resources: mem-estimate=26.00MB mem-reservation=2.25MB thread-reservation=1 | 15:AGGREGATE [STREAMING] | | group by: CAST(d_month_seq AS BIGINT) + CAST(1 AS BIGINT) +| | limit: 2 | | mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB thread-reservation=0 -| | tuple-ids=14 row-size=8B cardinality=108 +| | tuple-ids=14 row-size=8B cardinality=2 | | in pipelines: 14(GETNEXT) | | | 14:SCAN HDFS [tpcds_parquet.date_dim, RANDOM] diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling.test b/testdata/workloads/functional-query/queries/QueryTest/spilling.test index f797f7fc7..8a4a36b58 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/spilling.test +++ b/testdata/workloads/functional-query/queries/QueryTest/spilling.test @@ -288,7 +288,7 @@ set num_nodes=1; select l_orderkey, l_partkey, l_suppkey, l_linenumber, l_comment from tpch_parquet.lineitem group by 1, 2, 3, 4, 5, random() -limit 5 +limit 50000000 ---- RUNTIME_PROFILE row_regex: .*Query State: FINISHED.* row_regex: .*Query Status: OK.* @@ -432,3 +432,18 @@ STRING,DECIMAL,DECIMAL,DECIMAL,DECIMAL,BIGINT row_regex: .*InMemoryHeapsEvicted: .* \([1-9][0-9]*\) #row_regex: .*SpilledRuns: .* \([1-9][0-9]*\) ==== +---- QUERY +# Test spilling an agg with a LIMIT; see IMPALA-2581 +set buffer_pool_limit=136m; +select c.c2 from +(select distinct (a.id*10000 + b.id) c1, a.int_col c2 +from functional.alltypes a, functional.alltypes b limit 3500000) c join /* +SHUFFLE */ functional.alltypes d on c.c2 = d.int_col +group by c.c2 +limit 5 +---- TYPES +BIGINT +---- RUNTIME_PROFILE +# Verify that spilling was activated. +row_regex: .*SpilledPartitions: .* \([1-9][0-9]*\) +row_regex: .*FastLimitCheckExceededRows: [0-9]+ +==== diff --git a/testdata/workloads/targeted-perf/queries/aggregation.test b/testdata/workloads/targeted-perf/queries/aggregation.test index e0e98d794..68b9b07db 100644 --- a/testdata/workloads/targeted-perf/queries/aggregation.test +++ b/testdata/workloads/targeted-perf/queries/aggregation.test @@ -2722,3 +2722,14 @@ AGG1,AGG2,G ----TYPES BIGINT,BIGINT,STRING ==== +---- QUERY: PERF_AGG-Q11 +-- IMPALA-2581: LIMIT can be used to speed up aggregations +select distinct l_orderkey from lineitem limit 10; +---- RUNTIME_PROFILE +row_regex: .*FastLimitCheckExceededRows: [1-9][0-9]* +==== +---- QUERY: PERF_AGG-Q12 +select l_orderkey from lineitem group by 1 limit 10; +---- RUNTIME_PROFILE +row_regex: .*FastLimitCheckExceededRows: [1-9][0-9]* +==== \ No newline at end of file