Files
impala/testdata/workloads/functional-planner/queries/PlannerTest/insert.test
Tim Armstrong 2c2670e389 IMPALA-1305: streaming pre-aggregations
Aggregations are implemented as a distributed pre-aggregation, an
exchange, then a final aggregation that produces the results of the
aggregation. In many cases the pre-aggregation significantly reduces the
amount of data to be exchanged. However, in other cases, the
preaggregation does not greatly reduce the amount of data exchanged or
can use a lot of memory and starve other operators that would benefit
more from the additional memory.

In these cases we would be better off "passing through" some input tuples
by transforming them into intermediate tuples without aggregating them.

This patch adds a streaming pre-aggregation mode to
PartitionedAggregationNode that tries to aggregate input rows with a
hash table, but can switch to passing through the input tuples (after
transforming them into the appropriate tuple format). It does this if
it hits a memory limit or if the aggregation is not sufficiently
reducing the node's output (specifically, if the number of aggregated
rows in the hash table is more than half the number of unaggregated rows
consumed by the pre-aggregation). Pre-aggregations never need to spill
because they can pass through rows when under memory pressure.

This initial implementation is quite conservative: it retains the
partitioning of the previous implementation because switching to a
single partition proved to regress performance of some queries while
improving others. It also always keeps hash tables around and updates
them with matching input rows so that reduction statistics are updated
and early decisions to pass through data can be reversed.  Future work
could explore different approaches within the new framework to get
larger performance gains. Currently we see significant performance
benefits for queries with a very low reduction factor, e.g. group by on
a nearly unique column

Includes codegen support for the passthrough streaming.

Adds a query option, disable_streaming_preaggregations, in case a user
wants to revert to the old behaviour.

Adds TPC-H tests to exercise the new passthrough code path and updates
planner tests to include the new [STREAMING] detail added by the planner.

Change-Id: Ia40525340cba89a8c4e70164ae11447e96494664
Reviewed-on: http://gerrit.cloudera.org:8080/1698
Tested-by: Internal Jenkins
Reviewed-by: Dan Hecht <dhecht@cloudera.com>
2016-02-11 19:03:51 +00:00

515 lines
19 KiB
Plaintext

# insert into an unpartitioned table
insert into table functional.alltypesnopart
select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col
from functional.alltypes
where year=2009 and month=05
---- PLAN
WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false]
| partitions=1
|
00:SCAN HDFS [functional.alltypes]
partitions=1/24 files=1 size=20.36KB
---- SCANRANGELOCATIONS
NODE 0:
HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypes/year=2009/month=5/090501.txt 0:20853
---- DISTRIBUTEDPLAN
WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false]
| partitions=1
|
00:SCAN HDFS [functional.alltypes]
partitions=1/24 files=1 size=20.36KB
====
# insert into a static partition
insert into table functional.alltypessmall
partition (year=2009, month=04)
select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col
from functional.alltypes
where year=2009 and month=05
---- PLAN
WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(2009,4)]
| partitions=1
|
00:SCAN HDFS [functional.alltypes]
partitions=1/24 files=1 size=20.36KB
---- SCANRANGELOCATIONS
NODE 0:
HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypes/year=2009/month=5/090501.txt 0:20853
---- DISTRIBUTEDPLAN
WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(2009,4)]
| partitions=1
|
00:SCAN HDFS [functional.alltypes]
partitions=1/24 files=1 size=20.36KB
====
# overwrite a static partition
insert overwrite table functional.alltypessmall
partition (year=2009, month=04)
select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col
from functional.alltypes
where year=2009 and month=05
---- PLAN
WRITE TO HDFS [functional.alltypessmall, OVERWRITE=true, PARTITION-KEYS=(2009,4)]
| partitions=1
|
00:SCAN HDFS [functional.alltypes]
partitions=1/24 files=1 size=20.36KB
---- SCANRANGELOCATIONS
NODE 0:
HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypes/year=2009/month=5/090501.txt 0:20853
---- DISTRIBUTEDPLAN
WRITE TO HDFS [functional.alltypessmall, OVERWRITE=true, PARTITION-KEYS=(2009,4)]
| partitions=1
|
00:SCAN HDFS [functional.alltypes]
partitions=1/24 files=1 size=20.36KB
====
# insert into fully dynamic partitions
insert into table functional.alltypessmall
partition (year, month)
select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col, year, month
from functional.alltypes
where year=2009 and month>10
---- PLAN
WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(year,month)]
| partitions=24
|
00:SCAN HDFS [functional.alltypes]
partitions=2/24 files=2 size=40.07KB
---- SCANRANGELOCATIONS
NODE 0:
HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypes/year=2009/month=11/091101.txt 0:20179
HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypes/year=2009/month=12/091201.txt 0:20853
---- DISTRIBUTEDPLAN
WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(year,month)]
| partitions=24
|
01:EXCHANGE [HASH(year,month)]
|
00:SCAN HDFS [functional.alltypes]
partitions=2/24 files=2 size=40.07KB
====
# insert into fully dynamic partitions. The source table has no stats and the insert
# statement has a partition clause, so hash partition before the sink.
insert into table functional.alltypessmall
partition (year, month)
select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col, int_col, int_col
from functional_seq_snap.alltypes
where year=2009 and month>10
---- PLAN
WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(int_col,int_col)]
| partitions=unavailable
|
00:SCAN HDFS [functional_seq_snap.alltypes]
partitions=2/24 files=2 size=11.34KB
---- DISTRIBUTEDPLAN
WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(int_col,int_col)]
| partitions=unavailable
|
01:EXCHANGE [HASH(int_col,int_col)]
|
00:SCAN HDFS [functional_seq_snap.alltypes]
partitions=2/24 files=2 size=11.34KB
====
# insert into fully dynamic partitions;
# partitioned output doesn't require repartitioning
insert into table functional.alltypessmall
partition (year, month)
select min(id), min(bool_col), min(tinyint_col), min(smallint_col), min(int_col),
min(bigint_col), min(float_col), min(double_col), min(date_string_col), min(string_col),
min(timestamp_col), year, month
from functional.alltypes
where year=2009 and month>10
group by year, month
---- PLAN
WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(year,month)]
| partitions=24
|
01:AGGREGATE [FINALIZE]
| output: min(id), min(bool_col), min(tinyint_col), min(smallint_col), min(int_col), min(bigint_col), min(float_col), min(double_col), min(date_string_col), min(string_col), min(timestamp_col)
| group by: year, month
|
00:SCAN HDFS [functional.alltypes]
partitions=2/24 files=2 size=40.07KB
---- SCANRANGELOCATIONS
NODE 0:
HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypes/year=2009/month=11/091101.txt 0:20179
HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypes/year=2009/month=12/091201.txt 0:20853
---- DISTRIBUTEDPLAN
WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(year,month)]
| partitions=24
|
03:AGGREGATE [FINALIZE]
| output: min:merge(id), min:merge(bool_col), min:merge(tinyint_col), min:merge(smallint_col), min:merge(int_col), min:merge(bigint_col), min:merge(float_col), min:merge(double_col), min:merge(date_string_col), min:merge(string_col), min:merge(timestamp_col)
| group by: year, month
|
02:EXCHANGE [HASH(year,month)]
|
01:AGGREGATE [STREAMING]
| output: min(id), min(bool_col), min(tinyint_col), min(smallint_col), min(int_col), min(bigint_col), min(float_col), min(double_col), min(date_string_col), min(string_col), min(timestamp_col)
| group by: year, month
|
00:SCAN HDFS [functional.alltypes]
partitions=2/24 files=2 size=40.07KB
====
# insert into a partially dynamic partition
insert into table functional.alltypessmall
partition (year=2009, month)
select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col, month
from functional.alltypes
where year=2009 and month>10
---- PLAN
WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(2009,month)]
| partitions=12
|
00:SCAN HDFS [functional.alltypes]
partitions=2/24 files=2 size=40.07KB
---- SCANRANGELOCATIONS
NODE 0:
HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypes/year=2009/month=11/091101.txt 0:20179
HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypes/year=2009/month=12/091201.txt 0:20853
---- DISTRIBUTEDPLAN
WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(2009,month)]
| partitions=12
|
01:EXCHANGE [HASH(month)]
|
00:SCAN HDFS [functional.alltypes]
partitions=2/24 files=2 size=40.07KB
====
# insert into a partially dynamic partition
# partitioned output doesn't require repartitioning
insert into table functional.alltypessmall
partition (year=2009, month)
select min(id), min(bool_col), min(tinyint_col), min(smallint_col), min(int_col),
min(bigint_col), min(float_col), min(double_col), min(date_string_col), min(string_col),
min(timestamp_col), month
from functional.alltypes
where year=2009 and month>10
group by month
---- PLAN
WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(2009,month)]
| partitions=12
|
01:AGGREGATE [FINALIZE]
| output: min(id), min(bool_col), min(tinyint_col), min(smallint_col), min(int_col), min(bigint_col), min(float_col), min(double_col), min(date_string_col), min(string_col), min(timestamp_col)
| group by: month
|
00:SCAN HDFS [functional.alltypes]
partitions=2/24 files=2 size=40.07KB
---- SCANRANGELOCATIONS
NODE 0:
HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypes/year=2009/month=11/091101.txt 0:20179
HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypes/year=2009/month=12/091201.txt 0:20853
---- DISTRIBUTEDPLAN
WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(2009,month)]
| partitions=12
|
03:AGGREGATE [FINALIZE]
| output: min:merge(id), min:merge(bool_col), min:merge(tinyint_col), min:merge(smallint_col), min:merge(int_col), min:merge(bigint_col), min:merge(float_col), min:merge(double_col), min:merge(date_string_col), min:merge(string_col), min:merge(timestamp_col)
| group by: month
|
02:EXCHANGE [HASH(month)]
|
01:AGGREGATE [STREAMING]
| output: min(id), min(bool_col), min(tinyint_col), min(smallint_col), min(int_col), min(bigint_col), min(float_col), min(double_col), min(date_string_col), min(string_col), min(timestamp_col)
| group by: month
|
00:SCAN HDFS [functional.alltypes]
partitions=2/24 files=2 size=40.07KB
====
# insert into a partially dynamic partition
insert into table functional.alltypessmall
partition (year, month=4)
select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col, year
from functional.alltypes
where year>2009 and month=4
---- PLAN
WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(year,4)]
| partitions=2
|
00:SCAN HDFS [functional.alltypes]
partitions=1/24 files=1 size=19.71KB
---- SCANRANGELOCATIONS
NODE 0:
HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypes/year=2010/month=4/100401.txt 0:20179
---- DISTRIBUTEDPLAN
WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(year,4)]
| partitions=2
|
00:SCAN HDFS [functional.alltypes]
partitions=1/24 files=1 size=19.71KB
====
# insert with limit from partitioned table.
insert into table functional.alltypesnopart
select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col
from functional.alltypes where year=2009 and month=1 limit 10
---- PLAN
WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false]
| partitions=1
|
00:SCAN HDFS [functional.alltypes]
partitions=1/24 files=1 size=19.95KB
limit: 10
---- SCANRANGELOCATIONS
NODE 0:
HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypes/year=2009/month=1/090101.txt 0:20433
---- DISTRIBUTEDPLAN
WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false]
| partitions=1
|
01:EXCHANGE [UNPARTITIONED]
| limit: 10
|
00:SCAN HDFS [functional.alltypes]
partitions=1/24 files=1 size=19.95KB
limit: 10
====
# static partition insert from a constant select
insert into table functional.alltypessmall
partition (year=2010, month=4)
select 100, false, 1, 1, 1, 10,
10.0, 10.0, "02/01/09", "1", cast("2009-02-01 00:01:00" as timestamp)
---- PLAN
WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(2010,4)]
| partitions=1
|
00:UNION
constant-operands=1
---- DISTRIBUTEDPLAN
WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(2010,4)]
| partitions=1
|
00:UNION
constant-operands=1
====
# dynamic partition insert from a constant select
insert into table functional.alltypessmall
partition (year, month)
select 100, false, 1, 1, 1, 10,
10.0, 10.0, "02/01/09", "1", cast("2009-02-01 00:01:00" as timestamp), 2010, 4
---- PLAN
WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(2010,4)]
| partitions=1
|
00:UNION
constant-operands=1
---- DISTRIBUTEDPLAN
WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(2010,4)]
| partitions=1
|
00:UNION
constant-operands=1
====
# static partition insert from values statement
insert into table functional.alltypessmall
partition (year=2010, month=4) values
(100, false, 1, 1, 1, 10, 10.0, 10.0, "02/01/09", "1", cast("2009-02-01 00:01:00" as timestamp)),
(200, true, 2, 2, 2, 20, 20.0, 20.0, "02/02/09", "2", cast("2009-02-02 00:01:00" as timestamp)),
(300, false, 3, 3, 3, 30, 30.0, 30.0, "02/03/09", "3", cast("2009-02-03 00:01:00" as timestamp))
---- PLAN
WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(2010,4)]
| partitions=1
|
00:UNION
constant-operands=3
---- DISTRIBUTEDPLAN
WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(2010,4)]
| partitions=1
|
00:UNION
constant-operands=3
====
# dynamic partition insert from values statement
insert into table functional.alltypessmall
partition (year, month) values
(100, false, 1, 1, 1, 10, 10.0, 10.0, "02/01/09", "1", cast("2009-02-01 00:01:00" as timestamp), 2010, 4),
(200, true, 2, 2, 2, 20, 20.0, 20.0, "02/02/09", "2", cast("2009-02-02 00:01:00" as timestamp), 2010, 5),
(300, false, 3, 3, 3, 30, 30.0, 30.0, "02/03/09", "3", cast("2009-02-03 00:01:00" as timestamp), 2010, 6)
---- PLAN
WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(2010,4)]
| partitions=9
|
00:UNION
constant-operands=3
---- DISTRIBUTEDPLAN
WRITE TO HDFS [functional.alltypessmall, OVERWRITE=false, PARTITION-KEYS=(2010,4)]
| partitions=9
|
00:UNION
constant-operands=3
====
# test static partition insert from a query with grouped aggregation
# we expect the insert fragment to be partitioned by the grouping exprs of the query stmt
# and not by the partition exprs of the insert stmt
insert into functional.alltypes(bigint_col, string_col) partition (year=2010, month=10)
select count(int_col), string_col from functional.alltypes
group by string_col
---- PLAN
WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(2010,10)]
| partitions=1
|
01:AGGREGATE [FINALIZE]
| output: count(int_col)
| group by: string_col
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
---- DISTRIBUTEDPLAN
WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(2010,10)]
| partitions=1
|
03:AGGREGATE [FINALIZE]
| output: count:merge(int_col)
| group by: string_col
|
02:EXCHANGE [HASH(string_col)]
|
01:AGGREGATE [STREAMING]
| output: count(int_col)
| group by: string_col
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
====
# test static partition insert from a query with distinct grouped aggregation
# we expect the insert fragment to be partitioned by the grouping exprs of the query stmt
# and not by the partition exprs of the insert stmt
insert into functional.alltypes(bigint_col, string_col) partition (year=2010, month=10)
select count(distinct int_col), string_col from functional.alltypes
group by string_col
---- PLAN
WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(2010,10)]
| partitions=1
|
02:AGGREGATE [FINALIZE]
| output: count(int_col)
| group by: string_col
|
01:AGGREGATE
| group by: string_col, int_col
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
---- DISTRIBUTEDPLAN
WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(2010,10)]
| partitions=1
|
02:AGGREGATE [FINALIZE]
| output: count(int_col)
| group by: string_col
|
04:AGGREGATE
| group by: string_col, int_col
|
03:EXCHANGE [HASH(string_col)]
|
01:AGGREGATE [STREAMING]
| group by: string_col, int_col
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
====
# test that the planner chooses to repartition before the table sink
# alltypes has column stats and based on the product of the NDVs of year and month
# the planner should choose to repartition before the table sink
insert into table functional.alltypes partition(year, month)
select * from functional.alltypes
---- DISTRIBUTEDPLAN
WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(functional.alltypes.year,functional.alltypes.month)]
| partitions=24
|
01:EXCHANGE [HASH(functional.alltypes.year,functional.alltypes.month)]
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
====
# test noshuffle hint to prevent repartitioning (same query as above with hint)
insert into table functional.alltypes partition(year, month) [noshuffle]
select * from functional.alltypes
---- DISTRIBUTEDPLAN
WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(functional.alltypes.year,functional.alltypes.month)]
| partitions=24
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
====
# same as above but with traditional commented hint
insert into table functional.alltypes partition(year, month) /* +noshuffle */
select * from functional.alltypes
---- DISTRIBUTEDPLAN
WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(functional.alltypes.year,functional.alltypes.month)]
| partitions=24
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
====
# same as above but with enf-of-line commented hint
insert into table functional.alltypes partition(year, month)
-- +noshuffle
select * from functional.alltypes
---- DISTRIBUTEDPLAN
WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(functional.alltypes.year,functional.alltypes.month)]
| partitions=24
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
====
# test that the planner does not repartition before the table sink
# alltypes has column stats and since year only has 2 distinct values the planner
# should choose not to repartition before the table sink
insert into table functional.alltypes partition(year, month=1)
select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col, year
from functional.alltypes
---- DISTRIBUTEDPLAN
WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,1)]
| partitions=2
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
====
# test shuffle hint to force repartitioning (same query as above with hint)
insert into table functional.alltypes partition(year, month=1) [shuffle]
select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col, year
from functional.alltypes
---- DISTRIBUTEDPLAN
WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,1)]
| partitions=2
|
01:EXCHANGE [HASH(year)]
|
00:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
====
# test insert/select stmt that contains an analytic function (IMPALA-1400)
insert into table functional.alltypestiny partition(year=2009, month=1)
select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col,
lag(timestamp_col, 1) over (partition by id order by id) as timestamp_col
from functional.alltypestiny
---- PLAN
WRITE TO HDFS [functional.alltypestiny, OVERWRITE=false, PARTITION-KEYS=(2009,1)]
| partitions=1
|
02:ANALYTIC
| functions: lag(timestamp_col, 1, NULL)
| partition by: id
| order by: id ASC
| window: ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING
|
01:SORT
| order by: id ASC NULLS FIRST, id ASC
|
00:SCAN HDFS [functional.alltypestiny]
partitions=4/4 files=4 size=460B
====