mirror of
https://github.com/apache/impala.git
synced 2026-02-03 00:00:40 -05:00
This moves away from the PipelinedPlanNodeSet approach of enumerating sets of concurrently-executing nodes because unions would force creating many overlapping sets of nodes. The new approach computes the peak resources during Open() and the peak resources between Open() and Close() (i.e. while calling GetNext()) bottom-up for each plan node in a fragment. The fragment resources are then combined to produce the query resources. The basic assumptions for the new resource estimates are: * resources are acquired during or after the first call to Open() and released in Close(). * Blocking nodes call Open() on their child before acquiring their own resources (this required some backend changes). * Blocking nodes call Close() on their children before returning from Open(). * The peak resource consumption of the query is the sum of the independent fragments (except for the parallel join build plans where we can assume there will be synchronisation). This is conservative but we don't synchronise fragment Open() and Close() across exchanges so can't make stronger assumptions in general. Also compute the sum of minimum reservations. This will be useful in the backend to determine exactly when all of the initial reservations have been claimed from a shared pool of initial reservations. Testing: * Updated planner tests to reflect behavioural changes. * Added extra resource requirement planner tests for unions, subplans, pipelines of blocking operators, and bushy join plans. * Added single-node plans to resource-requirements tests. These have more complex plan trees inside a single fragment, which is useful for testing the peak resource requirement logic. Change-Id: I492cf5052bb27e4e335395e2a8f8a3b07248ec9d Reviewed-on: http://gerrit.cloudera.org:8080/7223 Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com> Tested-by: Impala Public Jenkins
233 lines
5.6 KiB
Plaintext
233 lines
5.6 KiB
Plaintext
# Rows per node is < 3000: codegen should be disabled.
|
|
select count(*) from functional.alltypes
|
|
---- DISTRIBUTEDPLAN
|
|
Per-Host Resource Reservation: Memory=0B
|
|
Per-Host Resource Estimates: Memory=148.00MB
|
|
Codegen disabled by planner
|
|
|
|
PLAN-ROOT SINK
|
|
|
|
|
03:AGGREGATE [FINALIZE]
|
|
| output: count:merge(*)
|
|
|
|
|
02:EXCHANGE [UNPARTITIONED]
|
|
|
|
|
01:AGGREGATE
|
|
| output: count(*)
|
|
|
|
|
00:SCAN HDFS [functional.alltypes]
|
|
partitions=24/24 files=24 size=478.45KB
|
|
====
|
|
# Rows per node is > 3000: codegen should be enabled.
|
|
select count(*) from functional.alltypesagg
|
|
---- DISTRIBUTEDPLAN
|
|
Per-Host Resource Reservation: Memory=0B
|
|
Per-Host Resource Estimates: Memory=100.00MB
|
|
|
|
PLAN-ROOT SINK
|
|
|
|
|
03:AGGREGATE [FINALIZE]
|
|
| output: count:merge(*)
|
|
|
|
|
02:EXCHANGE [UNPARTITIONED]
|
|
|
|
|
01:AGGREGATE
|
|
| output: count(*)
|
|
|
|
|
00:SCAN HDFS [functional.alltypesagg]
|
|
partitions=11/11 files=11 size=814.73KB
|
|
====
|
|
# No stats on functional_parquet: codegen should be disabled.
|
|
select count(*) from functional_parquet.alltypes
|
|
---- DISTRIBUTEDPLAN
|
|
Per-Host Resource Reservation: Memory=0B
|
|
Per-Host Resource Estimates: Memory=36.00MB
|
|
WARNING: The following tables are missing relevant table and/or column statistics.
|
|
functional_parquet.alltypes
|
|
|
|
PLAN-ROOT SINK
|
|
|
|
|
03:AGGREGATE [FINALIZE]
|
|
| output: count:merge(*)
|
|
|
|
|
02:EXCHANGE [UNPARTITIONED]
|
|
|
|
|
01:AGGREGATE
|
|
| output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
|
|
|
|
|
00:SCAN HDFS [functional_parquet.alltypes]
|
|
partitions=24/24 files=24 size=178.13KB
|
|
====
|
|
# > 3000 rows returned to coordinator: codegen should be enabled
|
|
select * from functional_parquet.alltypes
|
|
---- DISTRIBUTEDPLAN
|
|
Per-Host Resource Reservation: Memory=0B
|
|
Per-Host Resource Estimates: Memory=128.00MB
|
|
WARNING: The following tables are missing relevant table and/or column statistics.
|
|
functional_parquet.alltypes
|
|
|
|
PLAN-ROOT SINK
|
|
|
|
|
01:EXCHANGE [UNPARTITIONED]
|
|
|
|
|
00:SCAN HDFS [functional_parquet.alltypes]
|
|
partitions=24/24 files=24 size=178.13KB
|
|
====
|
|
# Optimisation is enabled for join producing < 3000 rows
|
|
select count(*)
|
|
from functional.alltypes t1
|
|
join functional.alltypestiny t2 on t1.id = t2.id
|
|
---- DISTRIBUTEDPLAN
|
|
Per-Host Resource Reservation: Memory=136.00MB
|
|
Per-Host Resource Estimates: Memory=180.00MB
|
|
Codegen disabled by planner
|
|
|
|
PLAN-ROOT SINK
|
|
|
|
|
06:AGGREGATE [FINALIZE]
|
|
| output: count:merge(*)
|
|
|
|
|
05:EXCHANGE [UNPARTITIONED]
|
|
|
|
|
03:AGGREGATE
|
|
| output: count(*)
|
|
|
|
|
02:HASH JOIN [INNER JOIN, BROADCAST]
|
|
| hash predicates: t1.id = t2.id
|
|
| runtime filters: RF000 <- t2.id
|
|
|
|
|
|--04:EXCHANGE [BROADCAST]
|
|
| |
|
|
| 01:SCAN HDFS [functional.alltypestiny t2]
|
|
| partitions=4/4 files=4 size=460B
|
|
|
|
|
00:SCAN HDFS [functional.alltypes t1]
|
|
partitions=24/24 files=24 size=478.45KB
|
|
runtime filters: RF000 -> t1.id
|
|
====
|
|
# Optimisation is disabled by cross join producing > 3000 rows
|
|
select count(*) from functional.alltypes t1, functional.alltypes t2
|
|
---- DISTRIBUTEDPLAN
|
|
Per-Host Resource Reservation: Memory=0B
|
|
Per-Host Resource Estimates: Memory=276.00MB
|
|
|
|
PLAN-ROOT SINK
|
|
|
|
|
06:AGGREGATE [FINALIZE]
|
|
| output: count:merge(*)
|
|
|
|
|
05:EXCHANGE [UNPARTITIONED]
|
|
|
|
|
03:AGGREGATE
|
|
| output: count(*)
|
|
|
|
|
02:NESTED LOOP JOIN [CROSS JOIN, BROADCAST]
|
|
|
|
|
|--04:EXCHANGE [BROADCAST]
|
|
| |
|
|
| 01:SCAN HDFS [functional.alltypes t2]
|
|
| partitions=24/24 files=24 size=478.45KB
|
|
|
|
|
00:SCAN HDFS [functional.alltypes t1]
|
|
partitions=24/24 files=24 size=478.45KB
|
|
====
|
|
# Optimisation is enabled for union producing < 3000 rows
|
|
select count(*) from (
|
|
select * from functional.alltypes
|
|
union all
|
|
select * from functional.alltypestiny) v
|
|
---- DISTRIBUTEDPLAN
|
|
Per-Host Resource Reservation: Memory=0B
|
|
Per-Host Resource Estimates: Memory=148.00MB
|
|
Codegen disabled by planner
|
|
|
|
PLAN-ROOT SINK
|
|
|
|
|
05:AGGREGATE [FINALIZE]
|
|
| output: count:merge(*)
|
|
|
|
|
04:EXCHANGE [UNPARTITIONED]
|
|
|
|
|
03:AGGREGATE
|
|
| output: count(*)
|
|
|
|
|
00:UNION
|
|
| pass-through-operands: all
|
|
|
|
|
|--02:SCAN HDFS [functional.alltypestiny]
|
|
| partitions=4/4 files=4 size=460B
|
|
|
|
|
01:SCAN HDFS [functional.alltypes]
|
|
partitions=24/24 files=24 size=478.45KB
|
|
====
|
|
# Optimisation is disabled by union producing > 3000 rows
|
|
select count(*) from (
|
|
select * from functional.alltypes
|
|
union all
|
|
select * from functional.alltypes) v
|
|
---- DISTRIBUTEDPLAN
|
|
Per-Host Resource Reservation: Memory=0B
|
|
Per-Host Resource Estimates: Memory=148.00MB
|
|
|
|
PLAN-ROOT SINK
|
|
|
|
|
05:AGGREGATE [FINALIZE]
|
|
| output: count:merge(*)
|
|
|
|
|
04:EXCHANGE [UNPARTITIONED]
|
|
|
|
|
03:AGGREGATE
|
|
| output: count(*)
|
|
|
|
|
00:UNION
|
|
| pass-through-operands: all
|
|
|
|
|
|--02:SCAN HDFS [functional.alltypes]
|
|
| partitions=24/24 files=24 size=478.45KB
|
|
|
|
|
01:SCAN HDFS [functional.alltypes]
|
|
partitions=24/24 files=24 size=478.45KB
|
|
====
|
|
# Scan with limit on large table: the number of rows scanned is bounded,
|
|
# codegen should be disabled
|
|
select sum(l_discount)
|
|
from (select * from tpch.lineitem limit 1000) v
|
|
---- DISTRIBUTEDPLAN
|
|
Per-Host Resource Reservation: Memory=0B
|
|
Per-Host Resource Estimates: Memory=274.00MB
|
|
Codegen disabled by planner
|
|
|
|
PLAN-ROOT SINK
|
|
|
|
|
01:AGGREGATE [FINALIZE]
|
|
| output: sum(tpch.lineitem.l_discount)
|
|
|
|
|
02:EXCHANGE [UNPARTITIONED]
|
|
| limit: 1000
|
|
|
|
|
00:SCAN HDFS [tpch.lineitem]
|
|
partitions=1/1 files=1 size=718.94MB
|
|
limit: 1000
|
|
====
|
|
# Scan with limit and predicates on large table: any number of rows could be scanned:
|
|
# codegen should be enabled
|
|
select sum(l_discount)
|
|
from (select * from tpch.lineitem where l_orderkey > 100 limit 1000) v
|
|
---- DISTRIBUTEDPLAN
|
|
Per-Host Resource Reservation: Memory=0B
|
|
Per-Host Resource Estimates: Memory=274.00MB
|
|
|
|
PLAN-ROOT SINK
|
|
|
|
|
01:AGGREGATE [FINALIZE]
|
|
| output: sum(tpch.lineitem.l_discount)
|
|
|
|
|
02:EXCHANGE [UNPARTITIONED]
|
|
| limit: 1000
|
|
|
|
|
00:SCAN HDFS [tpch.lineitem]
|
|
partitions=1/1 files=1 size=718.94MB
|
|
predicates: l_orderkey > 100
|
|
limit: 1000
|
|
====
|