Files
impala/testdata/workloads/functional-planner/queries/PlannerTest/inline-view-limit.test
Henry Robinson 9f61397fc4 IMPALA-2905: Handle coordinator fragment lifecycle like all others
The plan-root fragment instance that runs on the coordinator should be
handled like all others: started via RPC and run asynchronously. Without
this, the fragment requires special-case code throughout the
coordinator, and does not show up in system metrics etc.

This patch adds a new sink type, PlanRootSink, to the root fragment
instance so that the coordinator can pull row batches that are pushed by
the root instance. The coordinator signals completion to the fragment
instance via closing the consumer side of the sink, whereupon the
instance is free to complete.

Since the root instance now runs asynchronously wrt to the coordinator,
we add several coordination methods to allow the coordinator to wait for
a point in the instance's execution to be hit - e.g. to wait until the
instance has been opened.

Done in this patch:

* Add PlanRootSink
* Add coordination to PFE to allow coordinator to observe lifecycle
* Make FragmentMgr a singleton
* Removed dead code from Coordinator::Wait() and elsewhere.
* Moved result output exprs out of QES and into PlanRootSink.
* Remove special-case limit-based teardown of coordinator fragment, and
  supporting functions in PlanFragmentExecutor.
* Simplified lifecycle of PlanFragmentExecutor by separating Open() into
  Open() and Exec(), the latter of which drives the sink by reading
  rows from the plan tree.
* Add child profile to PlanFragmentExecutor to measure time spent in
  each lifecycle phase.
* Removed dependency between InitExecProfiles() and starting root
  fragment.
* Removed mostly dead-code handling of LIMIT 0 queries.
* Ensured that SET returns a result set in all cases.
* Fix test_get_log() HS2 test. Errors are only guaranteed to be visible
  after fetch calls return EOS, but test was assuming this would happen
  after first fetch.

Change-Id: Ibb0064ec2f085fa3a5598ea80894fb489a01e4df
Reviewed-on: http://gerrit.cloudera.org:8080/4402
Tested-by: Internal Jenkins
Reviewed-by: Henry Robinson <henry@cloudera.com>
2016-10-16 15:55:29 +00:00

687 lines
17 KiB
Plaintext

# predicate pushdown
select * from (select * from functional.alltypessmall) a where id < 5
---- PLAN
PLAN-ROOT SINK
|
00:SCAN HDFS [functional.alltypessmall]
partitions=4/4 files=4 size=6.32KB
predicates: functional.alltypessmall.id < 5
---- DISTRIBUTEDPLAN
PLAN-ROOT SINK
|
01:EXCHANGE [UNPARTITIONED]
|
00:SCAN HDFS [functional.alltypessmall]
partitions=4/4 files=4 size=6.32KB
predicates: functional.alltypessmall.id < 5
====
# predicate pushdown is prevented in presence of limit clause
select * from (select * from functional.alltypessmall limit 10) a where id < 5 limit 5
---- PLAN
PLAN-ROOT SINK
|
01:SELECT
| predicates: functional.alltypessmall.id < 5
| limit: 5
|
00:SCAN HDFS [functional.alltypessmall]
partitions=4/4 files=4 size=6.32KB
limit: 10
---- DISTRIBUTEDPLAN
PLAN-ROOT SINK
|
01:SELECT
| predicates: functional.alltypessmall.id < 5
| limit: 5
|
02:EXCHANGE [UNPARTITIONED]
| limit: 10
|
00:SCAN HDFS [functional.alltypessmall]
partitions=4/4 files=4 size=6.32KB
limit: 10
====
# predicate pushdown is prevented in presence of order by/limit clause;
# top-n is distributed
select *
from (select * from functional.alltypessmall order by id limit 10) a
where id < 5 limit 5
---- PLAN
PLAN-ROOT SINK
|
02:SELECT
| predicates: id < 5
| limit: 5
|
01:TOP-N [LIMIT=10]
| order by: id ASC
|
00:SCAN HDFS [functional.alltypessmall]
partitions=4/4 files=4 size=6.32KB
---- DISTRIBUTEDPLAN
PLAN-ROOT SINK
|
02:SELECT
| predicates: id < 5
| limit: 5
|
03:MERGING-EXCHANGE [UNPARTITIONED]
| order by: id ASC
| limit: 10
|
01:TOP-N [LIMIT=10]
| order by: id ASC
|
00:SCAN HDFS [functional.alltypessmall]
partitions=4/4 files=4 size=6.32KB
====
# top-n is not distributed because it depends on the output of the aggregation
select *
from functional.alltypes
join (
select id, count(*)
from functional.alltypes
group by 1 order by 2 limit 5) a using (id)
where a.id < 5 limit 5
---- PLAN
PLAN-ROOT SINK
|
04:HASH JOIN [INNER JOIN]
| hash predicates: functional.alltypes.id = id
| runtime filters: RF000 <- id
| limit: 5
|
|--03:TOP-N [LIMIT=5]
| | order by: count(*) ASC
| |
| 02:AGGREGATE [FINALIZE]
| | output: count(*)
| | group by: id
| |
| 01:SCAN HDFS [functional.alltypes]
| partitions=24/24 files=24 size=478.45KB
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
predicates: functional.alltypes.id < 5
runtime filters: RF000 -> functional.alltypes.id
---- DISTRIBUTEDPLAN
PLAN-ROOT SINK
|
09:EXCHANGE [UNPARTITIONED]
| limit: 5
|
04:HASH JOIN [INNER JOIN, BROADCAST]
| hash predicates: functional.alltypes.id = id
| runtime filters: RF000 <- id
| limit: 5
|
|--08:EXCHANGE [BROADCAST]
| |
| 07:MERGING-EXCHANGE [UNPARTITIONED]
| | order by: count(*) ASC
| | limit: 5
| |
| 03:TOP-N [LIMIT=5]
| | order by: count(*) ASC
| |
| 06:AGGREGATE [FINALIZE]
| | output: count:merge(*)
| | group by: id
| |
| 05:EXCHANGE [HASH(id)]
| |
| 02:AGGREGATE [STREAMING]
| | output: count(*)
| | group by: id
| |
| 01:SCAN HDFS [functional.alltypes]
| partitions=24/24 files=24 size=478.45KB
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
predicates: functional.alltypes.id < 5
runtime filters: RF000 -> functional.alltypes.id
====
# predicate pushdown is prevented in presence of limit clause; variant w/ join
select *
from (
select a.*
from functional.alltypessmall a join functional.alltypessmall using (id)
limit 10) a
where id < 5 limit 5
---- PLAN
PLAN-ROOT SINK
|
03:SELECT
| predicates: a.id < 5
| limit: 5
|
02:HASH JOIN [INNER JOIN]
| hash predicates: a.id = functional.alltypessmall.id
| runtime filters: RF000 <- functional.alltypessmall.id
| limit: 10
|
|--01:SCAN HDFS [functional.alltypessmall]
| partitions=4/4 files=4 size=6.32KB
|
00:SCAN HDFS [functional.alltypessmall a]
partitions=4/4 files=4 size=6.32KB
runtime filters: RF000 -> a.id
---- DISTRIBUTEDPLAN
PLAN-ROOT SINK
|
03:SELECT
| predicates: a.id < 5
| limit: 5
|
05:EXCHANGE [UNPARTITIONED]
| limit: 10
|
02:HASH JOIN [INNER JOIN, BROADCAST]
| hash predicates: a.id = functional.alltypessmall.id
| runtime filters: RF000 <- functional.alltypessmall.id
| limit: 10
|
|--04:EXCHANGE [BROADCAST]
| |
| 01:SCAN HDFS [functional.alltypessmall]
| partitions=4/4 files=4 size=6.32KB
|
00:SCAN HDFS [functional.alltypessmall a]
partitions=4/4 files=4 size=6.32KB
runtime filters: RF000 -> a.id
====
# predicate pushdown is prevented in presence of order by/limit clause
select *
from (select * from functional.alltypessmall limit 10) a
where id < 5
order by id
limit 5
---- PLAN
PLAN-ROOT SINK
|
02:TOP-N [LIMIT=5]
| order by: id ASC
|
01:SELECT
| predicates: functional.alltypessmall.id < 5
|
00:SCAN HDFS [functional.alltypessmall]
partitions=4/4 files=4 size=6.32KB
limit: 10
---- DISTRIBUTEDPLAN
PLAN-ROOT SINK
|
02:TOP-N [LIMIT=5]
| order by: id ASC
|
01:SELECT
| predicates: functional.alltypessmall.id < 5
|
03:EXCHANGE [UNPARTITIONED]
| limit: 10
|
00:SCAN HDFS [functional.alltypessmall]
partitions=4/4 files=4 size=6.32KB
limit: 10
====
# predicate pushdown is prevented in presence of order by/limit clause; variant w/ join
select *
from (
select a.*
from functional.alltypessmall a
join functional.alltypessmall using (id)
limit 10) a
where id < 5
order by id
limit 5
---- PLAN
PLAN-ROOT SINK
|
04:TOP-N [LIMIT=5]
| order by: id ASC
|
03:SELECT
| predicates: a.id < 5
|
02:HASH JOIN [INNER JOIN]
| hash predicates: a.id = functional.alltypessmall.id
| runtime filters: RF000 <- functional.alltypessmall.id
| limit: 10
|
|--01:SCAN HDFS [functional.alltypessmall]
| partitions=4/4 files=4 size=6.32KB
|
00:SCAN HDFS [functional.alltypessmall a]
partitions=4/4 files=4 size=6.32KB
runtime filters: RF000 -> a.id
---- DISTRIBUTEDPLAN
PLAN-ROOT SINK
|
04:TOP-N [LIMIT=5]
| order by: id ASC
|
03:SELECT
| predicates: a.id < 5
|
06:EXCHANGE [UNPARTITIONED]
| limit: 10
|
02:HASH JOIN [INNER JOIN, BROADCAST]
| hash predicates: a.id = functional.alltypessmall.id
| runtime filters: RF000 <- functional.alltypessmall.id
| limit: 10
|
|--05:EXCHANGE [BROADCAST]
| |
| 01:SCAN HDFS [functional.alltypessmall]
| partitions=4/4 files=4 size=6.32KB
|
00:SCAN HDFS [functional.alltypessmall a]
partitions=4/4 files=4 size=6.32KB
runtime filters: RF000 -> a.id
====
# join against subquery with limit creates a merge fragment that applies the limit
select *
from functional.alltypes
join (select id from functional.alltypessmall limit 10) a using (id)
---- PLAN
PLAN-ROOT SINK
|
02:HASH JOIN [INNER JOIN]
| hash predicates: functional.alltypes.id = id
| runtime filters: RF000 <- id
|
|--01:SCAN HDFS [functional.alltypessmall]
| partitions=4/4 files=4 size=6.32KB
| limit: 10
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
runtime filters: RF000 -> functional.alltypes.id
---- DISTRIBUTEDPLAN
PLAN-ROOT SINK
|
05:EXCHANGE [UNPARTITIONED]
|
02:HASH JOIN [INNER JOIN, BROADCAST]
| hash predicates: functional.alltypes.id = id
| runtime filters: RF000 <- id
|
|--04:EXCHANGE [BROADCAST]
| |
| 03:EXCHANGE [UNPARTITIONED]
| | limit: 10
| |
| 01:SCAN HDFS [functional.alltypessmall]
| partitions=4/4 files=4 size=6.32KB
| limit: 10
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
runtime filters: RF000 -> functional.alltypes.id
====
# join against subquery with limit creates a merge fragment that applies the limit;
# topn is distributed
select *
from functional.alltypes
join (select id from functional.alltypessmall order by id limit 10) a using (id)
---- PLAN
PLAN-ROOT SINK
|
03:HASH JOIN [INNER JOIN]
| hash predicates: functional.alltypes.id = id
| runtime filters: RF000 <- id
|
|--02:TOP-N [LIMIT=10]
| | order by: id ASC
| |
| 01:SCAN HDFS [functional.alltypessmall]
| partitions=4/4 files=4 size=6.32KB
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
runtime filters: RF000 -> functional.alltypes.id
---- DISTRIBUTEDPLAN
PLAN-ROOT SINK
|
06:EXCHANGE [UNPARTITIONED]
|
03:HASH JOIN [INNER JOIN, BROADCAST]
| hash predicates: functional.alltypes.id = id
| runtime filters: RF000 <- id
|
|--05:EXCHANGE [BROADCAST]
| |
| 04:MERGING-EXCHANGE [UNPARTITIONED]
| | order by: id ASC
| | limit: 10
| |
| 02:TOP-N [LIMIT=10]
| | order by: id ASC
| |
| 01:SCAN HDFS [functional.alltypessmall]
| partitions=4/4 files=4 size=6.32KB
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
runtime filters: RF000 -> functional.alltypes.id
====
# join against subquery with limit;
# predicate pushdown is prevented in presence of order by/limit clause; variant w/ join
select *
from functional.alltypes
join (
select a.id
from functional.alltypessmall a join functional.alltypestiny using (id)
limit 10) a using (id)
where a.id < 5
order by a.id
limit 5
---- PLAN
PLAN-ROOT SINK
|
05:TOP-N [LIMIT=5]
| order by: id ASC
|
04:HASH JOIN [INNER JOIN]
| hash predicates: functional.alltypes.id = a.id
| runtime filters: RF000 <- a.id
|
|--03:HASH JOIN [INNER JOIN]
| | hash predicates: a.id = functional.alltypestiny.id
| | runtime filters: RF001 <- functional.alltypestiny.id
| | limit: 10
| |
| |--02:SCAN HDFS [functional.alltypestiny]
| | partitions=4/4 files=4 size=460B
| |
| 01:SCAN HDFS [functional.alltypessmall a]
| partitions=4/4 files=4 size=6.32KB
| runtime filters: RF001 -> a.id
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
predicates: functional.alltypes.id < 5
runtime filters: RF000 -> functional.alltypes.id
---- DISTRIBUTEDPLAN
PLAN-ROOT SINK
|
09:MERGING-EXCHANGE [UNPARTITIONED]
| order by: id ASC
| limit: 5
|
05:TOP-N [LIMIT=5]
| order by: id ASC
|
04:HASH JOIN [INNER JOIN, BROADCAST]
| hash predicates: functional.alltypes.id = a.id
| runtime filters: RF000 <- a.id
|
|--08:EXCHANGE [BROADCAST]
| |
| 07:EXCHANGE [UNPARTITIONED]
| | limit: 10
| |
| 03:HASH JOIN [INNER JOIN, BROADCAST]
| | hash predicates: a.id = functional.alltypestiny.id
| | runtime filters: RF001 <- functional.alltypestiny.id
| | limit: 10
| |
| |--06:EXCHANGE [BROADCAST]
| | |
| | 02:SCAN HDFS [functional.alltypestiny]
| | partitions=4/4 files=4 size=460B
| |
| 01:SCAN HDFS [functional.alltypessmall a]
| partitions=4/4 files=4 size=6.32KB
| runtime filters: RF001 -> a.id
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
predicates: functional.alltypes.id < 5
runtime filters: RF000 -> functional.alltypes.id
====
# join against subquery with order by/limit;
# predicate pushdown is prevented in presence of order by/limit clause; variant w/ join
select *
from functional.alltypes
join (
select a.id
from functional.alltypessmall a join functional.alltypestiny using (id)
order by a.int_col
limit 10) a using (id)
where a.id < 5
order by a.id
limit 5
---- PLAN
PLAN-ROOT SINK
|
06:TOP-N [LIMIT=5]
| order by: id ASC
|
05:HASH JOIN [INNER JOIN]
| hash predicates: functional.alltypes.id = id
| runtime filters: RF000 <- id
|
|--04:TOP-N [LIMIT=10]
| | order by: int_col ASC
| |
| 03:HASH JOIN [INNER JOIN]
| | hash predicates: a.id = functional.alltypestiny.id
| | runtime filters: RF001 <- functional.alltypestiny.id
| |
| |--02:SCAN HDFS [functional.alltypestiny]
| | partitions=4/4 files=4 size=460B
| |
| 01:SCAN HDFS [functional.alltypessmall a]
| partitions=4/4 files=4 size=6.32KB
| runtime filters: RF001 -> a.id
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
predicates: functional.alltypes.id < 5
runtime filters: RF000 -> functional.alltypes.id
---- DISTRIBUTEDPLAN
PLAN-ROOT SINK
|
10:MERGING-EXCHANGE [UNPARTITIONED]
| order by: id ASC
| limit: 5
|
06:TOP-N [LIMIT=5]
| order by: id ASC
|
05:HASH JOIN [INNER JOIN, BROADCAST]
| hash predicates: functional.alltypes.id = id
| runtime filters: RF000 <- id
|
|--09:EXCHANGE [BROADCAST]
| |
| 08:MERGING-EXCHANGE [UNPARTITIONED]
| | order by: int_col ASC
| | limit: 10
| |
| 04:TOP-N [LIMIT=10]
| | order by: int_col ASC
| |
| 03:HASH JOIN [INNER JOIN, BROADCAST]
| | hash predicates: a.id = functional.alltypestiny.id
| | runtime filters: RF001 <- functional.alltypestiny.id
| |
| |--07:EXCHANGE [BROADCAST]
| | |
| | 02:SCAN HDFS [functional.alltypestiny]
| | partitions=4/4 files=4 size=460B
| |
| 01:SCAN HDFS [functional.alltypessmall a]
| partitions=4/4 files=4 size=6.32KB
| runtime filters: RF001 -> a.id
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
predicates: functional.alltypes.id < 5
runtime filters: RF000 -> functional.alltypes.id
====
# Subquery containing limit and offset
select x.id from (
select id from functional.alltypesagg order by id limit 5 offset 5) x
order by x.id
limit 100 offset 4
---- PLAN
PLAN-ROOT SINK
|
02:TOP-N [LIMIT=100 OFFSET=4]
| order by: id ASC
|
01:TOP-N [LIMIT=5 OFFSET=5]
| order by: id ASC
|
00:SCAN HDFS [functional.alltypesagg]
partitions=11/11 files=11 size=814.73KB
---- DISTRIBUTEDPLAN
PLAN-ROOT SINK
|
02:TOP-N [LIMIT=100 OFFSET=4]
| order by: id ASC
|
03:MERGING-EXCHANGE [UNPARTITIONED]
| offset: 5
| order by: id ASC
| limit: 5
|
01:TOP-N [LIMIT=10]
| order by: id ASC
|
00:SCAN HDFS [functional.alltypesagg]
partitions=11/11 files=11 size=814.73KB
====
# Test value transfers for outer-joined inline views with a limit.
# Value transfer a.id->b.id is illegal due to the limit in b.
# Value transfer b.id->b.id is illegal due to the left outer join.
select * from
(select id from functional.alltypes where id != 1) a
left outer join
(select id from functional.alltypessmall where id != 2 limit 10) b
on (a.id = b.id)
where a.id > 10 and b.id > 20
---- PLAN
PLAN-ROOT SINK
|
02:HASH JOIN [LEFT OUTER JOIN]
| hash predicates: id = id
| other predicates: id > 20
|
|--01:SCAN HDFS [functional.alltypessmall]
| partitions=4/4 files=4 size=6.32KB
| predicates: id != 2
| limit: 10
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
predicates: functional.alltypes.id > 10, id != 1
====
# Test value transfers for outer-joined inline views with a limit.
# Value transfer a.id->b.id is legal.
# Value transfer b.id->b.id is illegal due to the left outer join.
select * from
(select id from functional.alltypes where id != 1 limit 10) a
left outer join
(select id from functional.alltypessmall where id != 2) b
on (a.id = b.id)
where a.id > 10 and b.id > 20
---- PLAN
PLAN-ROOT SINK
|
03:HASH JOIN [RIGHT OUTER JOIN]
| hash predicates: id = id
| other predicates: id > 20
| runtime filters: RF000 <- id
|
|--01:SELECT
| | predicates: id > 10
| |
| 00:SCAN HDFS [functional.alltypes]
| partitions=24/24 files=24 size=478.45KB
| predicates: id != 1
| limit: 10
|
02:SCAN HDFS [functional.alltypessmall]
partitions=4/4 files=4 size=6.32KB
predicates: functional.alltypessmall.id != 1, functional.alltypessmall.id > 10, functional.alltypessmall.id > 20, id != 2
runtime filters: RF000 -> id
====
# Test value transfers for outer-joined inline views with a limit.
# Value transfer b.id->a.id is illegal due to the limit in a.
# Value transfer a.id->b.id is illegal due to the right outer join.
select * from
(select id from functional.alltypes where id != 1 limit 10) a
right outer join
(select id from functional.alltypessmall where id != 2) b
on (a.id = b.id)
where a.id > 10 and b.id > 20
---- PLAN
PLAN-ROOT SINK
|
02:HASH JOIN [RIGHT OUTER JOIN]
| hash predicates: id = id
| other predicates: id > 10
| runtime filters: RF000 <- id
|
|--01:SCAN HDFS [functional.alltypessmall]
| partitions=4/4 files=4 size=6.32KB
| predicates: functional.alltypessmall.id > 20, id != 2
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
predicates: id != 1
runtime filters: RF000 -> id
limit: 10
====
# Test value transfers for outer-joined inline views with a limit.
# Value transfer b.id->a.id is legal.
# Value transfer a.id->b.id is illegal due to the right outer join.
select * from
(select id from functional.alltypes where id != 1) a
right outer join
(select id from functional.alltypessmall where id != 2 limit 10) b
on (a.id = b.id)
where a.id > 10 and b.id > 20
---- PLAN
PLAN-ROOT SINK
|
03:HASH JOIN [RIGHT OUTER JOIN]
| hash predicates: id = id
| other predicates: id > 10
| runtime filters: RF000 <- id
|
|--02:SELECT
| | predicates: id > 20
| |
| 01:SCAN HDFS [functional.alltypessmall]
| partitions=4/4 files=4 size=6.32KB
| predicates: id != 2
| limit: 10
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
predicates: functional.alltypes.id != 2, functional.alltypes.id > 10, functional.alltypes.id > 20, id != 1
runtime filters: RF000 -> id
====
# IMPALA-3450: limits on select nodes are reflected in cardinality estimates. The test for
# this is embedded in PlannerTestBase.java and is not visible in these plans, as they only
# have explain_level=1
select * from (select * from functional.alltypes limit 100) v where id < 10 limit 1
---- PLAN
PLAN-ROOT SINK
|
01:SELECT
| predicates: functional.alltypes.id < 10
| limit: 1
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
limit: 100
====