mirror of
https://github.com/apache/impala.git
synced 2026-01-07 09:02:19 -05:00
This has two related changes. IMPALA-6679: defer scanner reservation increases ------------------------------------------------ When starting each scan range, check to see how big the initial scan range is (the full thing for row-based formats, the footer for Parquet) and determine whether more reservation would be useful. For Parquet, base the ideal reservation on the actual column layout of each file. This avoids reserving memory that we won't use for the actual files that we're scanning. This also avoid the need to estimate ideal reservation in the planner. We also release scanner thread reservations above the minimum as soon as threads complete, so that resources can be released slightly earlier. IMPALA-6678: estimate Parquet column size for reservation --------------------------------------------------------- This change also reduces reservation computed by the planner in certain cases by estimating the on-disk size of column data based on stats. It also reduces the default per-column reservation to 4MB since it appears that < 8MB columns are generally common in practice and the method for estimating column size is biased towards over-estimating. There are two main cases to consider for the performance implications: * Memory is available to improve query perf - if we underestimate, we can increase the reservation so we can do "efficient" 8MB I/Os for large columns. * The ideal reservation is not available - query performance is affected because we can't overlap I/O and compute as much and may do smaller (probably 4MB I/Os). However, we should avoid pathological behaviour like tiny I/Os. When stats are not available, we just default to reserving 4MB per column, which typically is more memory than required. When stats are available, the memory required can be reduced below when some heuristic tell us with high confidence that the column data for most or all files is smaller than 4MB. The stats-based heuristic could reduce scan performance if both the conservative heuristics significantly underestimate the column size and memory is constrained such that we can't increase the scan reservation at runtime (in which case the memory might be used by a different operator or scanner thread). Observability: Added counters to track when threads were not spawned due to reservation and to track when reservation increases are requested and denied. These allow determining if performance may have been affected by memory availability. Testing: Updated test_mem_usage_scaling.py memory requirements and added steps to regenerate the requirements. Loops test for a while to flush out flakiness. Added targeted planner and query tests for reservation calculations and increases. Change-Id: Ifc80e05118a9eef72cac8e2308418122e3ee0842 Reviewed-on: http://gerrit.cloudera.org:8080/9757 Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
233 lines
5.7 KiB
Plaintext
233 lines
5.7 KiB
Plaintext
# Rows per node is < 3000: codegen should be disabled.
|
|
select count(*) from functional.alltypes
|
|
---- DISTRIBUTEDPLAN
|
|
Max Per-Host Resource Reservation: Memory=32.00KB
|
|
Per-Host Resource Estimates: Memory=148.00MB
|
|
Codegen disabled by planner
|
|
|
|
PLAN-ROOT SINK
|
|
|
|
|
03:AGGREGATE [FINALIZE]
|
|
| output: count:merge(*)
|
|
|
|
|
02:EXCHANGE [UNPARTITIONED]
|
|
|
|
|
01:AGGREGATE
|
|
| output: count(*)
|
|
|
|
|
00:SCAN HDFS [functional.alltypes]
|
|
partitions=24/24 files=24 size=478.45KB
|
|
====
|
|
# Rows per node is > 3000: codegen should be enabled.
|
|
select count(*) from functional.alltypesagg
|
|
---- DISTRIBUTEDPLAN
|
|
Max Per-Host Resource Reservation: Memory=128.00KB
|
|
Per-Host Resource Estimates: Memory=100.00MB
|
|
|
|
PLAN-ROOT SINK
|
|
|
|
|
03:AGGREGATE [FINALIZE]
|
|
| output: count:merge(*)
|
|
|
|
|
02:EXCHANGE [UNPARTITIONED]
|
|
|
|
|
01:AGGREGATE
|
|
| output: count(*)
|
|
|
|
|
00:SCAN HDFS [functional.alltypesagg]
|
|
partitions=11/11 files=11 size=814.73KB
|
|
====
|
|
# No stats on functional_parquet: codegen should be disabled.
|
|
select count(*) from functional_parquet.alltypes
|
|
---- DISTRIBUTEDPLAN
|
|
Max Per-Host Resource Reservation: Memory=16.00KB
|
|
Per-Host Resource Estimates: Memory=21.00MB
|
|
WARNING: The following tables are missing relevant table and/or column statistics.
|
|
functional_parquet.alltypes
|
|
|
|
PLAN-ROOT SINK
|
|
|
|
|
03:AGGREGATE [FINALIZE]
|
|
| output: count:merge(*)
|
|
|
|
|
02:EXCHANGE [UNPARTITIONED]
|
|
|
|
|
01:AGGREGATE
|
|
| output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows)
|
|
|
|
|
00:SCAN HDFS [functional_parquet.alltypes]
|
|
partitions=24/24 files=24 size=188.29KB
|
|
====
|
|
# > 3000 rows returned to coordinator: codegen should be enabled
|
|
select * from functional_parquet.alltypes
|
|
---- DISTRIBUTEDPLAN
|
|
Max Per-Host Resource Reservation: Memory=88.00KB
|
|
Per-Host Resource Estimates: Memory=128.00MB
|
|
WARNING: The following tables are missing relevant table and/or column statistics.
|
|
functional_parquet.alltypes
|
|
|
|
PLAN-ROOT SINK
|
|
|
|
|
01:EXCHANGE [UNPARTITIONED]
|
|
|
|
|
00:SCAN HDFS [functional_parquet.alltypes]
|
|
partitions=24/24 files=24 size=188.29KB
|
|
====
|
|
# Optimisation is enabled for join producing < 3000 rows
|
|
select count(*)
|
|
from functional.alltypes t1
|
|
join functional.alltypestiny t2 on t1.id = t2.id
|
|
---- DISTRIBUTEDPLAN
|
|
Max Per-Host Resource Reservation: Memory=2.98MB
|
|
Per-Host Resource Estimates: Memory=182.94MB
|
|
Codegen disabled by planner
|
|
|
|
PLAN-ROOT SINK
|
|
|
|
|
06:AGGREGATE [FINALIZE]
|
|
| output: count:merge(*)
|
|
|
|
|
05:EXCHANGE [UNPARTITIONED]
|
|
|
|
|
03:AGGREGATE
|
|
| output: count(*)
|
|
|
|
|
02:HASH JOIN [INNER JOIN, BROADCAST]
|
|
| hash predicates: t1.id = t2.id
|
|
| runtime filters: RF000 <- t2.id
|
|
|
|
|
|--04:EXCHANGE [BROADCAST]
|
|
| |
|
|
| 01:SCAN HDFS [functional.alltypestiny t2]
|
|
| partitions=4/4 files=4 size=460B
|
|
|
|
|
00:SCAN HDFS [functional.alltypes t1]
|
|
partitions=24/24 files=24 size=478.45KB
|
|
runtime filters: RF000 -> t1.id
|
|
====
|
|
# Optimisation is disabled by cross join producing > 3000 rows
|
|
select count(*) from functional.alltypes t1, functional.alltypes t2
|
|
---- DISTRIBUTEDPLAN
|
|
Max Per-Host Resource Reservation: Memory=64.00KB
|
|
Per-Host Resource Estimates: Memory=276.00MB
|
|
|
|
PLAN-ROOT SINK
|
|
|
|
|
06:AGGREGATE [FINALIZE]
|
|
| output: count:merge(*)
|
|
|
|
|
05:EXCHANGE [UNPARTITIONED]
|
|
|
|
|
03:AGGREGATE
|
|
| output: count(*)
|
|
|
|
|
02:NESTED LOOP JOIN [CROSS JOIN, BROADCAST]
|
|
|
|
|
|--04:EXCHANGE [BROADCAST]
|
|
| |
|
|
| 01:SCAN HDFS [functional.alltypes t2]
|
|
| partitions=24/24 files=24 size=478.45KB
|
|
|
|
|
00:SCAN HDFS [functional.alltypes t1]
|
|
partitions=24/24 files=24 size=478.45KB
|
|
====
|
|
# Optimisation is enabled for union producing < 3000 rows
|
|
select count(*) from (
|
|
select * from functional.alltypes
|
|
union all
|
|
select * from functional.alltypestiny) v
|
|
---- DISTRIBUTEDPLAN
|
|
Max Per-Host Resource Reservation: Memory=32.00KB
|
|
Per-Host Resource Estimates: Memory=148.00MB
|
|
Codegen disabled by planner
|
|
|
|
PLAN-ROOT SINK
|
|
|
|
|
05:AGGREGATE [FINALIZE]
|
|
| output: count:merge(*)
|
|
|
|
|
04:EXCHANGE [UNPARTITIONED]
|
|
|
|
|
03:AGGREGATE
|
|
| output: count(*)
|
|
|
|
|
00:UNION
|
|
| pass-through-operands: all
|
|
|
|
|
|--02:SCAN HDFS [functional.alltypestiny]
|
|
| partitions=4/4 files=4 size=460B
|
|
|
|
|
01:SCAN HDFS [functional.alltypes]
|
|
partitions=24/24 files=24 size=478.45KB
|
|
====
|
|
# Optimisation is disabled by union producing > 3000 rows
|
|
select count(*) from (
|
|
select * from functional.alltypes
|
|
union all
|
|
select * from functional.alltypes) v
|
|
---- DISTRIBUTEDPLAN
|
|
Max Per-Host Resource Reservation: Memory=32.00KB
|
|
Per-Host Resource Estimates: Memory=148.00MB
|
|
|
|
PLAN-ROOT SINK
|
|
|
|
|
05:AGGREGATE [FINALIZE]
|
|
| output: count:merge(*)
|
|
|
|
|
04:EXCHANGE [UNPARTITIONED]
|
|
|
|
|
03:AGGREGATE
|
|
| output: count(*)
|
|
|
|
|
00:UNION
|
|
| pass-through-operands: all
|
|
|
|
|
|--02:SCAN HDFS [functional.alltypes]
|
|
| partitions=24/24 files=24 size=478.45KB
|
|
|
|
|
01:SCAN HDFS [functional.alltypes]
|
|
partitions=24/24 files=24 size=478.45KB
|
|
====
|
|
# Scan with limit on large table: the number of rows scanned is bounded,
|
|
# codegen should be disabled
|
|
select sum(l_discount)
|
|
from (select * from tpch.lineitem limit 1000) v
|
|
---- DISTRIBUTEDPLAN
|
|
Max Per-Host Resource Reservation: Memory=8.00MB
|
|
Per-Host Resource Estimates: Memory=274.00MB
|
|
Codegen disabled by planner
|
|
|
|
PLAN-ROOT SINK
|
|
|
|
|
01:AGGREGATE [FINALIZE]
|
|
| output: sum(tpch.lineitem.l_discount)
|
|
|
|
|
02:EXCHANGE [UNPARTITIONED]
|
|
| limit: 1000
|
|
|
|
|
00:SCAN HDFS [tpch.lineitem]
|
|
partitions=1/1 files=1 size=718.94MB
|
|
limit: 1000
|
|
====
|
|
# Scan with limit and predicates on large table: any number of rows could be scanned:
|
|
# codegen should be enabled
|
|
select sum(l_discount)
|
|
from (select * from tpch.lineitem where l_orderkey > 100 limit 1000) v
|
|
---- DISTRIBUTEDPLAN
|
|
Max Per-Host Resource Reservation: Memory=8.00MB
|
|
Per-Host Resource Estimates: Memory=274.00MB
|
|
|
|
PLAN-ROOT SINK
|
|
|
|
|
01:AGGREGATE [FINALIZE]
|
|
| output: sum(tpch.lineitem.l_discount)
|
|
|
|
|
02:EXCHANGE [UNPARTITIONED]
|
|
| limit: 1000
|
|
|
|
|
00:SCAN HDFS [tpch.lineitem]
|
|
partitions=1/1 files=1 size=718.94MB
|
|
predicates: l_orderkey > 100
|
|
limit: 1000
|
|
====
|