diff --git a/be/src/exec/aggregation-node-base.cc b/be/src/exec/aggregation-node-base.cc index 0cad9a80f..33e285a78 100644 --- a/be/src/exec/aggregation-node-base.cc +++ b/be/src/exec/aggregation-node-base.cc @@ -82,11 +82,13 @@ AggregationNodeBase::AggregationNodeBase( static_cast(agg); DCHECK(grouping_config != nullptr); node.reset(new GroupingAggregator(this, pool_, *grouping_config, - pnode.tnode_->agg_node.estimated_input_cardinality)); + pnode.tnode_->agg_node.estimated_input_cardinality, + pnode.tnode_->agg_node.fast_limit_check)); } aggs_.push_back(std::move(node)); runtime_profile_->AddChild(aggs_[i]->runtime_profile()); } + fast_limit_check_ = pnode.tnode_->agg_node.fast_limit_check; } Status AggregationNodeBase::Prepare(RuntimeState* state) { diff --git a/be/src/exec/aggregation-node-base.h b/be/src/exec/aggregation-node-base.h index 6d148b98d..8bcecfcbb 100644 --- a/be/src/exec/aggregation-node-base.h +++ b/be/src/exec/aggregation-node-base.h @@ -67,6 +67,9 @@ class AggregationNodeBase : public ExecNode { /// END: Members that must be Reset() ///////////////////////////////////////// + /// If true, aggregation can be done ahead of time without computing all the input data + bool fast_limit_check_ = false; + /// Splits the rows of 'batch' up according to which tuple of the row is non-null such /// that a row with tuple 'i' non-null is copied into the batch 'mini_batches[i]'. /// It is expected that all rows of 'batch' have exactly 1 non-null tuple. diff --git a/be/src/exec/aggregation-node.cc b/be/src/exec/aggregation-node.cc index 8db79259a..603442e77 100644 --- a/be/src/exec/aggregation-node.cc +++ b/be/src/exec/aggregation-node.cc @@ -69,6 +69,18 @@ Status AggregationNode::Open(RuntimeState* state) { if (num_aggs == 1) { RETURN_IF_ERROR(aggs_[0]->AddBatch(state, &child_batch)); child_batch.Reset(); + if (fast_limit_check_) { + DCHECK(limit() > -1); + if (aggs_[0]->GetNumKeys() >= limit()) { + eos = true; + runtime_profile_->AddInfoString("FastLimitCheckExceededRows", + SimpleItoa(aggs_[0]->GetNumKeys() - limit())); + VLOG_QUERY << Substitute("the number of rows ($0) returned from the " + "aggregation node has exceeded the limit of $1", aggs_[0]->GetNumKeys(), + limit()); + break; + } + } continue; } diff --git a/be/src/exec/aggregator.h b/be/src/exec/aggregator.h index 693716e3f..6c3cae25b 100644 --- a/be/src/exec/aggregator.h +++ b/be/src/exec/aggregator.h @@ -172,6 +172,9 @@ class Aggregator { static const char* LLVM_CLASS_NAME; + // The number of unique values that have been aggregated + virtual int64_t GetNumKeys() const = 0; + protected: /// The id of the ExecNode this Aggregator corresponds to. const int id_; diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc index f77f0df4e..1b8ebd104 100644 --- a/be/src/exec/grouping-aggregator.cc +++ b/be/src/exec/grouping-aggregator.cc @@ -134,7 +134,8 @@ static const int STREAMING_HT_MIN_REDUCTION_SIZE = sizeof(STREAMING_HT_MIN_REDUCTION) / sizeof(STREAMING_HT_MIN_REDUCTION[0]); GroupingAggregator::GroupingAggregator(ExecNode* exec_node, ObjectPool* pool, - const GroupingAggregatorConfig& config, int64_t estimated_input_cardinality) + const GroupingAggregatorConfig& config, int64_t estimated_input_cardinality, + bool needUnsetLimit) : Aggregator( exec_node, pool, config, Substitute("GroupingAggregator $0", config.agg_idx_)), hash_table_config_(*config.hash_table_config_), @@ -152,6 +153,9 @@ GroupingAggregator::GroupingAggregator(ExecNode* exec_node, ObjectPool* pool, estimated_input_cardinality_(estimated_input_cardinality), partition_pool_(new ObjectPool()) { DCHECK_EQ(PARTITION_FANOUT, 1 << NUM_PARTITIONING_BITS); + if (needUnsetLimit) { + UnsetLimit(); + } } Status GroupingAggregator::Prepare(RuntimeState* state) { @@ -1317,4 +1321,23 @@ Status GroupingAggregatorConfig::CodegenAddBatchStreamingImpl( // Instantiate required templates. template Status GroupingAggregator::AppendSpilledRow(Partition*, TupleRow*); template Status GroupingAggregator::AppendSpilledRow(Partition*, TupleRow*); + +int64_t GroupingAggregator::GetNumKeys() const { + int64_t num_keys = 0; + for (int i = 0; i < hash_partitions_.size(); ++i) { + Partition* partition = hash_partitions_[i]; + if (partition == nullptr) continue; + // We might be dealing with a rebuilt spilled partition, where all partitions are + // pointing to a single in-memory partition, so make sure we only proceed for the + // right partition. + if (i != partition->idx) continue; + if (!partition->is_spilled()) { + if (partition->hash_tbl == nullptr) { + continue; + } + num_keys += partition->hash_tbl->size(); + } + } + return num_keys; +} } // namespace impala diff --git a/be/src/exec/grouping-aggregator.h b/be/src/exec/grouping-aggregator.h index f9f7d0d28..b71ba9885 100644 --- a/be/src/exec/grouping-aggregator.h +++ b/be/src/exec/grouping-aggregator.h @@ -192,7 +192,8 @@ class GroupingAggregatorConfig : public AggregatorConfig { class GroupingAggregator : public Aggregator { public: GroupingAggregator(ExecNode* exec_node, ObjectPool* pool, - const GroupingAggregatorConfig& config, int64_t estimated_input_cardinality); + const GroupingAggregatorConfig& config, int64_t estimated_input_cardinality, + bool needUnsetLimit); virtual Status Prepare(RuntimeState* state) override; virtual Status Open(RuntimeState* state) override; @@ -212,6 +213,10 @@ class GroupingAggregator : public Aggregator { virtual std::string DebugString(int indentation_level = 0) const override; virtual void DebugString(int indentation_level, std::stringstream* out) const override; + virtual int64_t GetNumKeys() const override; + + void UnsetLimit() { limit_ = -1; } + private: struct Partition; diff --git a/be/src/exec/non-grouping-aggregator.h b/be/src/exec/non-grouping-aggregator.h index e50f60986..47b461f7a 100644 --- a/be/src/exec/non-grouping-aggregator.h +++ b/be/src/exec/non-grouping-aggregator.h @@ -97,6 +97,8 @@ class NonGroupingAggregator : public Aggregator { virtual std::string DebugString(int indentation_level = 0) const override; virtual void DebugString(int indentation_level, std::stringstream* out) const override; + virtual int64_t GetNumKeys() const override { return 1; } + private: /// MemPool used to allocate memory for 'singleton_output_tuple_'. The ownership of the /// pool's memory is transferred to the output batch on eos. The pool should not be diff --git a/be/src/exec/streaming-aggregation-node.cc b/be/src/exec/streaming-aggregation-node.cc index ef34a0b1d..88bd2431f 100644 --- a/be/src/exec/streaming-aggregation-node.cc +++ b/be/src/exec/streaming-aggregation-node.cc @@ -36,7 +36,8 @@ StreamingAggregationNode::StreamingAggregationNode( ObjectPool* pool, const AggregationPlanNode& pnode, const DescriptorTbl& descs) : AggregationNodeBase(pool, pnode, descs) { DCHECK(pnode.tnode_->conjuncts.empty()) << "Preaggs have no conjuncts"; - DCHECK(limit_ == -1) << "Preaggs have no limits"; + DCHECK(limit_ == -1 || (limit_ != -1 && fast_limit_check_)) + << "Preaggs have no limits"; for (auto& t_agg : pnode.tnode_->agg_node.aggregators) { DCHECK(t_agg.use_streaming_preaggregation); } @@ -62,7 +63,7 @@ Status StreamingAggregationNode::GetNext( RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state)); RETURN_IF_CANCELLED(state); - if (ReachedLimit()) { + if (!fast_limit_check_ && ReachedLimit()) { *eos = true; return Status::OK(); } @@ -124,6 +125,20 @@ Status StreamingAggregationNode::GetRowsStreaming( if (child_batch_processed_) { child_batch_->Reset(); } + if (fast_limit_check_) { + DCHECK(limit() > -1); + if (aggs_[0]->GetNumKeys() >= limit()) { + child_eos_ = true; + child_batch_processed_ = true; + child_batch_->Reset(); + runtime_profile_->AddInfoString("FastLimitCheckExceededRows", + SimpleItoa(aggs_[0]->GetNumKeys() - limit())); + VLOG_QUERY << Substitute("the number of rows ($0) returned from the streaming " + "aggregation node has exceeded the limit of $1",aggs_[0]->GetNumKeys(), + limit()); + break; + } + } continue; } diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift index 502a6fc1d..3c48fe8c8 100644 --- a/common/thrift/PlanNodes.thrift +++ b/common/thrift/PlanNodes.thrift @@ -475,6 +475,9 @@ struct TAggregationNode { // If true, this is the first AggregationNode in a aggregation plan with multiple // Aggregators and the entire input to this node should be passed to each Aggregator. 3: required bool replicate_input + + // Set to true if this aggregation can complete early + 4: required bool fast_limit_check } struct TSortInfo { diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java index 0a7e59e88..b9741ff7b 100644 --- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java +++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java @@ -429,6 +429,7 @@ public class AggregationNode extends PlanNode { boolean replicateInput = aggPhase_ == AggPhase.FIRST && aggInfos_.size() > 1; msg.agg_node.setReplicate_input(replicateInput); msg.agg_node.setEstimated_input_cardinality(getChild(0).getCardinality()); + msg.agg_node.setFast_limit_check(canCompleteEarly()); for (int i = 0; i < aggInfos_.size(); ++i) { AggregateInfo aggInfo = aggInfos_.get(i); List aggregateFunctions = new ArrayList<>(); @@ -641,4 +642,13 @@ public class AggregationNode extends PlanNode { public boolean isNonCorrelatedScalarSubquery() { return isNonCorrelatedScalarSubquery_; } + + // When both conditions below are true, aggregation can complete early + // a) aggregation node has no aggregate function + // b) aggregation node has no predicate + // E.g. SELECT DISTINCT f1,f2,...fn FROM t LIMIT n + public boolean canCompleteEarly() { + return isSingleClassAgg() && hasLimit() && hasGrouping() + && !multiAggInfo_.hasAggregateExprs() && getConjuncts().isEmpty(); + } } diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java index 7fbd3500d..67a376a7e 100644 --- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java @@ -931,7 +931,9 @@ public class DistributedPlanner { // if there is a limit, we need to transfer it from the pre-aggregation // node in the child fragment to the merge aggregation node in the parent long limit = node.getLimit(); - node.unsetLimit(); + if (node.getMultiAggInfo().hasAggregateExprs() || !node.getConjuncts().isEmpty()) { + node.unsetLimit(); + } node.unsetNeedsFinalize(); // place a merge aggregation step in a new fragment diff --git a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java index 29ce4f62a..c3856956f 100644 --- a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java +++ b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java @@ -82,7 +82,10 @@ public class ExchangeNode extends PlanNode { offset_ = 0; children_.add(input); // Only apply the limit at the receiver if there are multiple senders. - if (input.getFragment().isPartitioned()) limit_ = input.limit_; + if (input.getFragment().isPartitioned() && + !(input instanceof AggregationNode && !input.isBlockingNode())) { + limit_ = input.limit_; + } computeTupleIds(); } diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test index 4be80f6e5..fd42e1f27 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test @@ -6182,8 +6182,8 @@ PLAN-ROOT SINK tuple-ids=0 row-size=231B cardinality=6.00M in pipelines: 00(GETNEXT) ---- DISTRIBUTEDPLAN -Max Per-Host Resource Reservation: Memory=112.00MB Threads=4 -Per-Host Resource Estimates: Memory=905MB +Max Per-Host Resource Reservation: Memory=79.94MB Threads=4 +Per-Host Resource Estimates: Memory=441MB Analyzed query: SELECT DISTINCT * FROM tpch_parquet.lineitem LIMIT CAST(5 AS TINYINT) @@ -6200,25 +6200,26 @@ PLAN-ROOT SINK | in pipelines: 03(GETNEXT) | F01:PLAN FRAGMENT [HASH(tpch_parquet.lineitem.l_orderkey,tpch_parquet.lineitem.l_partkey,tpch_parquet.lineitem.l_suppkey,tpch_parquet.lineitem.l_linenumber,tpch_parquet.lineitem.l_quantity,tpch_parquet.lineitem.l_extendedprice,tpch_parquet.lineitem.l_discount,tpch_parquet.lineitem.l_tax,tpch_parquet.lineitem.l_returnflag,tpch_parquet.lineitem.l_linestatus,tpch_parquet.lineitem.l_shipdate,tpch_parquet.lineitem.l_commitdate,tpch_parquet.lineitem.l_receiptdate,tpch_parquet.lineitem.l_shipinstruct,tpch_parquet.lineitem.l_shipmode,tpch_parquet.lineitem.l_comment)] hosts=3 instances=3 -Per-Host Resources: mem-estimate=473.84MB mem-reservation=34.00MB thread-reservation=1 +Per-Host Resources: mem-estimate=10.02MB mem-reservation=1.94MB thread-reservation=1 03:AGGREGATE [FINALIZE] | group by: tpch_parquet.lineitem.l_orderkey, tpch_parquet.lineitem.l_partkey, tpch_parquet.lineitem.l_suppkey, tpch_parquet.lineitem.l_linenumber, tpch_parquet.lineitem.l_quantity, tpch_parquet.lineitem.l_extendedprice, tpch_parquet.lineitem.l_discount, tpch_parquet.lineitem.l_tax, tpch_parquet.lineitem.l_returnflag, tpch_parquet.lineitem.l_linestatus, tpch_parquet.lineitem.l_shipdate, tpch_parquet.lineitem.l_commitdate, tpch_parquet.lineitem.l_receiptdate, tpch_parquet.lineitem.l_shipinstruct, tpch_parquet.lineitem.l_shipmode, tpch_parquet.lineitem.l_comment | limit: 5 -| mem-estimate=463.16MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0 +| mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0 | tuple-ids=1 row-size=231B cardinality=5 | in pipelines: 03(GETNEXT), 00(OPEN) | 02:EXCHANGE [HASH(tpch_parquet.lineitem.l_orderkey,tpch_parquet.lineitem.l_partkey,tpch_parquet.lineitem.l_suppkey,tpch_parquet.lineitem.l_linenumber,tpch_parquet.lineitem.l_quantity,tpch_parquet.lineitem.l_extendedprice,tpch_parquet.lineitem.l_discount,tpch_parquet.lineitem.l_tax,tpch_parquet.lineitem.l_returnflag,tpch_parquet.lineitem.l_linestatus,tpch_parquet.lineitem.l_shipdate,tpch_parquet.lineitem.l_commitdate,tpch_parquet.lineitem.l_receiptdate,tpch_parquet.lineitem.l_shipinstruct,tpch_parquet.lineitem.l_shipmode,tpch_parquet.lineitem.l_comment)] -| mem-estimate=10.69MB mem-reservation=0B thread-reservation=0 -| tuple-ids=1 row-size=231B cardinality=6.00M +| mem-estimate=16.00KB mem-reservation=0B thread-reservation=0 +| tuple-ids=1 row-size=231B cardinality=5 | in pipelines: 00(GETNEXT) | F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 Per-Host Resources: mem-estimate=427.37MB mem-reservation=74.00MB thread-reservation=2 01:AGGREGATE [STREAMING] | group by: tpch_parquet.lineitem.l_orderkey, tpch_parquet.lineitem.l_partkey, tpch_parquet.lineitem.l_suppkey, tpch_parquet.lineitem.l_linenumber, tpch_parquet.lineitem.l_quantity, tpch_parquet.lineitem.l_extendedprice, tpch_parquet.lineitem.l_discount, tpch_parquet.lineitem.l_tax, tpch_parquet.lineitem.l_returnflag, tpch_parquet.lineitem.l_linestatus, tpch_parquet.lineitem.l_shipdate, tpch_parquet.lineitem.l_commitdate, tpch_parquet.lineitem.l_receiptdate, tpch_parquet.lineitem.l_shipinstruct, tpch_parquet.lineitem.l_shipmode, tpch_parquet.lineitem.l_comment +| limit: 5 | mem-estimate=347.37MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0 -| tuple-ids=1 row-size=231B cardinality=6.00M +| tuple-ids=1 row-size=231B cardinality=5 | in pipelines: 00(GETNEXT) | 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM] diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/setoperation-rewrite.test b/testdata/workloads/functional-planner/queries/PlannerTest/setoperation-rewrite.test index 90ae3cf37..3e699318d 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/setoperation-rewrite.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/setoperation-rewrite.test @@ -734,7 +734,8 @@ PLAN-ROOT SINK | | | 02:AGGREGATE [STREAMING] | | group by: functional.alltypestiny.id, functional.alltypestiny.bool_col, functional.alltypestiny.tinyint_col, functional.alltypestiny.smallint_col, functional.alltypestiny.int_col, functional.alltypestiny.bigint_col, functional.alltypestiny.float_col, functional.alltypestiny.double_col, functional.alltypestiny.date_string_col, functional.alltypestiny.string_col, functional.alltypestiny.timestamp_col, functional.alltypestiny.year, functional.alltypestiny.month -| | row-size=89B cardinality=2 +| | limit: 1 +| | row-size=89B cardinality=1 | | | 01:SCAN HDFS [functional.alltypestiny] | partition predicates: `year` = 2009, `month` = 2 diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test b/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test index 0381f6f10..8e83a774e 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test @@ -2978,7 +2978,8 @@ PLAN-ROOT SINK | | | 04:AGGREGATE [STREAMING] | | group by: i -| | row-size=8B cardinality=20 +| | limit: 2 +| | row-size=8B cardinality=2 | | | 01:UNION | | pass-through-operands: 02 diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q06.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q06.test index e12c46531..b1252f681 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q06.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q06.test @@ -308,15 +308,16 @@ Per-Host Resources: mem-estimate=70.86MB mem-reservation=29.06MB thread-reservat | | | 21:EXCHANGE [HASH((d_month_seq))] | | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0 -| | tuple-ids=6 row-size=4B cardinality=108 +| | tuple-ids=6 row-size=4B cardinality=1 | | in pipelines: 05(GETNEXT) | | | F05:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 | Per-Host Resources: mem-estimate=58.00MB mem-reservation=2.25MB thread-reservation=2 | 06:AGGREGATE [STREAMING] | | group by: (d_month_seq) +| | limit: 1 | | mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB thread-reservation=0 -| | tuple-ids=6 row-size=4B cardinality=108 +| | tuple-ids=6 row-size=4B cardinality=1 | | in pipelines: 05(GETNEXT) | | | 05:SCAN HDFS [tpcds_parquet.date_dim, RANDOM] @@ -586,15 +587,16 @@ Per-Instance Resources: mem-estimate=26.00MB mem-reservation=3.00MB thread-reser | | | 21:EXCHANGE [HASH((d_month_seq))] | | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0 -| | tuple-ids=6 row-size=4B cardinality=108 +| | tuple-ids=6 row-size=4B cardinality=1 | | in pipelines: 05(GETNEXT) | | | F05:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 | Per-Instance Resources: mem-estimate=26.00MB mem-reservation=2.25MB thread-reservation=1 | 06:AGGREGATE [STREAMING] | | group by: (d_month_seq) +| | limit: 1 | | mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB thread-reservation=0 -| | tuple-ids=6 row-size=4B cardinality=108 +| | tuple-ids=6 row-size=4B cardinality=1 | | in pipelines: 05(GETNEXT) | | | 05:SCAN HDFS [tpcds_parquet.date_dim, RANDOM] diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q54.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q54.test index a5fe87725..ff936a0e6 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q54.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q54.test @@ -421,15 +421,16 @@ Per-Host Resources: mem-estimate=13.66MB mem-reservation=4.94MB thread-reservati | | | 45:EXCHANGE [HASH(d_month_seq + 3)] | | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0 -| | tuple-ids=17 row-size=8B cardinality=108 +| | tuple-ids=17 row-size=8B cardinality=2 | | in pipelines: 17(GETNEXT) | | | F17:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 | Per-Host Resources: mem-estimate=58.00MB mem-reservation=2.25MB thread-reservation=2 | 18:AGGREGATE [STREAMING] | | group by: CAST(d_month_seq AS BIGINT) + CAST(3 AS BIGINT) +| | limit: 2 | | mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB thread-reservation=0 -| | tuple-ids=17 row-size=8B cardinality=108 +| | tuple-ids=17 row-size=8B cardinality=2 | | in pipelines: 17(GETNEXT) | | | 17:SCAN HDFS [tpcds_parquet.date_dim, RANDOM] @@ -481,15 +482,16 @@ Per-Host Resources: mem-estimate=13.66MB mem-reservation=4.94MB thread-reservati | | | 41:EXCHANGE [HASH(d_month_seq + 1)] | | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0 -| | tuple-ids=14 row-size=8B cardinality=108 +| | tuple-ids=14 row-size=8B cardinality=2 | | in pipelines: 14(GETNEXT) | | | F14:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 | Per-Host Resources: mem-estimate=58.00MB mem-reservation=2.25MB thread-reservation=2 | 15:AGGREGATE [STREAMING] | | group by: CAST(d_month_seq AS BIGINT) + CAST(1 AS BIGINT) +| | limit: 2 | | mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB thread-reservation=0 -| | tuple-ids=14 row-size=8B cardinality=108 +| | tuple-ids=14 row-size=8B cardinality=2 | | in pipelines: 14(GETNEXT) | | | 14:SCAN HDFS [tpcds_parquet.date_dim, RANDOM] @@ -866,15 +868,16 @@ Per-Instance Resources: mem-estimate=10.24MB mem-reservation=2.00MB thread-reser | | | 45:EXCHANGE [HASH(d_month_seq + 3)] | | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0 -| | tuple-ids=17 row-size=8B cardinality=108 +| | tuple-ids=17 row-size=8B cardinality=2 | | in pipelines: 17(GETNEXT) | | | F17:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 | Per-Instance Resources: mem-estimate=26.00MB mem-reservation=2.25MB thread-reservation=1 | 18:AGGREGATE [STREAMING] | | group by: CAST(d_month_seq AS BIGINT) + CAST(3 AS BIGINT) +| | limit: 2 | | mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB thread-reservation=0 -| | tuple-ids=17 row-size=8B cardinality=108 +| | tuple-ids=17 row-size=8B cardinality=2 | | in pipelines: 17(GETNEXT) | | | 17:SCAN HDFS [tpcds_parquet.date_dim, RANDOM] @@ -933,15 +936,16 @@ Per-Instance Resources: mem-estimate=10.24MB mem-reservation=2.00MB thread-reser | | | 41:EXCHANGE [HASH(d_month_seq + 1)] | | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0 -| | tuple-ids=14 row-size=8B cardinality=108 +| | tuple-ids=14 row-size=8B cardinality=2 | | in pipelines: 14(GETNEXT) | | | F14:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 | Per-Instance Resources: mem-estimate=26.00MB mem-reservation=2.25MB thread-reservation=1 | 15:AGGREGATE [STREAMING] | | group by: CAST(d_month_seq AS BIGINT) + CAST(1 AS BIGINT) +| | limit: 2 | | mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB thread-reservation=0 -| | tuple-ids=14 row-size=8B cardinality=108 +| | tuple-ids=14 row-size=8B cardinality=2 | | in pipelines: 14(GETNEXT) | | | 14:SCAN HDFS [tpcds_parquet.date_dim, RANDOM] diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling.test b/testdata/workloads/functional-query/queries/QueryTest/spilling.test index f797f7fc7..8a4a36b58 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/spilling.test +++ b/testdata/workloads/functional-query/queries/QueryTest/spilling.test @@ -288,7 +288,7 @@ set num_nodes=1; select l_orderkey, l_partkey, l_suppkey, l_linenumber, l_comment from tpch_parquet.lineitem group by 1, 2, 3, 4, 5, random() -limit 5 +limit 50000000 ---- RUNTIME_PROFILE row_regex: .*Query State: FINISHED.* row_regex: .*Query Status: OK.* @@ -432,3 +432,18 @@ STRING,DECIMAL,DECIMAL,DECIMAL,DECIMAL,BIGINT row_regex: .*InMemoryHeapsEvicted: .* \([1-9][0-9]*\) #row_regex: .*SpilledRuns: .* \([1-9][0-9]*\) ==== +---- QUERY +# Test spilling an agg with a LIMIT; see IMPALA-2581 +set buffer_pool_limit=136m; +select c.c2 from +(select distinct (a.id*10000 + b.id) c1, a.int_col c2 +from functional.alltypes a, functional.alltypes b limit 3500000) c join /* +SHUFFLE */ functional.alltypes d on c.c2 = d.int_col +group by c.c2 +limit 5 +---- TYPES +BIGINT +---- RUNTIME_PROFILE +# Verify that spilling was activated. +row_regex: .*SpilledPartitions: .* \([1-9][0-9]*\) +row_regex: .*FastLimitCheckExceededRows: [0-9]+ +==== diff --git a/testdata/workloads/targeted-perf/queries/aggregation.test b/testdata/workloads/targeted-perf/queries/aggregation.test index e0e98d794..68b9b07db 100644 --- a/testdata/workloads/targeted-perf/queries/aggregation.test +++ b/testdata/workloads/targeted-perf/queries/aggregation.test @@ -2722,3 +2722,14 @@ AGG1,AGG2,G ----TYPES BIGINT,BIGINT,STRING ==== +---- QUERY: PERF_AGG-Q11 +-- IMPALA-2581: LIMIT can be used to speed up aggregations +select distinct l_orderkey from lineitem limit 10; +---- RUNTIME_PROFILE +row_regex: .*FastLimitCheckExceededRows: [1-9][0-9]* +==== +---- QUERY: PERF_AGG-Q12 +select l_orderkey from lineitem group by 1 limit 10; +---- RUNTIME_PROFILE +row_regex: .*FastLimitCheckExceededRows: [1-9][0-9]* +==== \ No newline at end of file