mirror of
https://github.com/apache/impala.git
synced 2025-12-25 02:03:09 -05:00
IMPALA-3586: Implement union passthrough
The union node acts as pass through operator and forwards row batches from it's children without materializing. This is done in the case when the child's tuple layout is identical to union node tuple layout and no functions need to be applied to the child row batches. Removed operand reordering in the FE because it's simpler and safer to handle all passthrough children before non-passthrough children in the BE. The recent improvements to memory management allowed us to remove this requirement. Testing: - Added new planner and end to end tests that cover the new functionality. - Updated existing tests to reflect the new behavior. Perf: Ran a benchmark on a local 10 GB tpcds dataset. I used an unpartitioned version of the store_sales table. There was over a 2x performance improvement for the following query: SELECT COUNT(ss_sold_time_sk), COUNT(ss_item_sk), COUNT(ss_customer_sk), COUNT(ss_cdemo_sk), COUNT(ss_hdemo_sk), COUNT(ss_addr_sk), COUNT(ss_store_sk), COUNT(ss_promo_sk), COUNT(ss_ticket_number), COUNT(ss_quantity), COUNT(ss_wholesale_cost), COUNT(ss_list_price), COUNT(ss_sales_price), COUNT(ss_ext_discount_amt), COUNT(ss_ext_sales_price), COUNT(ss_ext_wholesale_cost), COUNT(ss_ext_list_price), COUNT(ss_ext_tax), COUNT(ss_coupon_amt), COUNT(ss_net_paid), COUNT(ss_net_paid_inc_tax), COUNT(ss_net_profit), COUNT(ss_sold_date_sk) FROM ( select * from tpcds_10_parquet.store_sales_unpartitioned union all select * from tpcds_10_parquet.store_sales_unpartitioned union all select * from tpcds_10_parquet.store_sales_unpartitioned union all select * from tpcds_10_parquet.store_sales_unpartitioned union all select * from tpcds_10_parquet.store_sales_unpartitioned union all select * from tpcds_10_parquet.store_sales_unpartitioned union all select * from tpcds_10_parquet.store_sales_unpartitioned union all select * from tpcds_10_parquet.store_sales_unpartitioned union all select * from tpcds_10_parquet.store_sales_unpartitioned union all select * from tpcds_10_parquet.store_sales_unpartitioned ) t Before: Total Time: 43s164ms Summary: Operator #Hosts Avg Time Max Time #Rows Est. #Rows Peak Mem Est. Peak Mem Detail ------------------------------------------------------------------------------------------------------------------------------ 13:AGGREGATE 1 224.721us 224.721us 1 1 28.00 KB -1.00 B FINALIZE 12:EXCHANGE 1 24.578us 24.578us 3 1 0 -1.00 B UNPARTITIONED 11:AGGREGATE 3 2s402ms 3s060ms 3 1 119.00 KB 10.00 MB 00:UNION 3 35s380ms 37s846ms 288.01M 288.01M 3.08 MB 0 |--02:SCAN HDFS 3 184.197ms 219.931ms 28.80M 28.80M 535.03 MB 1.88 GB store_sales_unpartitioned |--03:SCAN HDFS 3 131.956ms 153.401ms 28.80M 28.80M 534.98 MB 1.88 GB store_sales_unpartitioned |--04:SCAN HDFS 3 178.456ms 247.721ms 28.80M 28.80M 534.98 MB 1.88 GB store_sales_unpartitioned |--05:SCAN HDFS 3 189.398ms 242.251ms 28.80M 28.80M 535.01 MB 1.88 GB store_sales_unpartitioned |--06:SCAN HDFS 3 122.786ms 156.528ms 28.80M 28.80M 534.98 MB 1.88 GB store_sales_unpartitioned |--07:SCAN HDFS 3 147.467ms 183.391ms 28.80M 28.80M 535.13 MB 1.88 GB store_sales_unpartitioned |--08:SCAN HDFS 3 147.502ms 186.273ms 28.80M 28.80M 535.01 MB 1.88 GB store_sales_unpartitioned |--09:SCAN HDFS 3 130.086ms 154.682ms 28.80M 28.80M 535.04 MB 1.88 GB store_sales_unpartitioned |--10:SCAN HDFS 3 122.701ms 161.056ms 28.80M 28.80M 534.89 MB 1.88 GB store_sales_unpartitioned 01:SCAN HDFS 3 287.863ms 330.436ms 28.80M 28.80M 534.98 MB 1.88 GB store_sales_unpartitioned After: Total Time: 19s139ms Summary: Operator #Hosts Avg Time Max Time #Rows Est. #Rows Peak Mem Est. Peak Mem Detail ------------------------------------------------------------------------------------------------------------------------------ 13:AGGREGATE 1 166.241us 166.241us 1 1 28.00 KB -1.00 B FINALIZE 12:EXCHANGE 1 71.695us 71.695us 3 1 0 -1.00 B UNPARTITIONED 11:AGGREGATE 3 2s971ms 3s809ms 3 1 3.08 MB 10.00 MB 00:UNION 3 207.956ms 222.846ms 288.01M 288.01M 0 0 |--02:SCAN HDFS 3 1s533ms 1s535ms 28.80M 28.80M 532.28 MB 1.88 GB store_sales_unpartitioned |--03:SCAN HDFS 3 1s554ms 1s669ms 28.80M 28.80M 525.73 MB 1.88 GB store_sales_unpartitioned |--04:SCAN HDFS 3 1s568ms 1s716ms 28.80M 28.80M 525.03 MB 1.88 GB store_sales_unpartitioned |--05:SCAN HDFS 3 1s503ms 1s617ms 28.80M 28.80M 527.43 MB 1.88 GB store_sales_unpartitioned |--06:SCAN HDFS 3 1s560ms 1s634ms 28.80M 28.80M 528.52 MB 1.88 GB store_sales_unpartitioned |--07:SCAN HDFS 3 1s489ms 1s643ms 28.80M 28.80M 534.81 MB 1.88 GB store_sales_unpartitioned |--08:SCAN HDFS 3 1s534ms 1s581ms 28.80M 28.80M 528.10 MB 1.88 GB store_sales_unpartitioned |--09:SCAN HDFS 3 1s558ms 1s674ms 28.80M 28.80M 526.77 MB 1.88 GB store_sales_unpartitioned |--10:SCAN HDFS 3 1s504ms 1s692ms 28.80M 28.80M 527.83 MB 1.88 GB store_sales_unpartitioned 01:SCAN HDFS 3 1s682ms 1s911ms 28.80M 28.80M 526.14 MB 1.88 GB store_sales_unpartitioned Change-Id: Ia8f6d5062724ba5b78174c3227a7a796d10d8416 Reviewed-on: http://gerrit.cloudera.org:8080/5816 Reviewed-by: Dan Hecht <dhecht@cloudera.com> Tested-by: Impala Public Jenkins
This commit is contained in:
committed by
Impala Public Jenkins
parent
5deec604de
commit
a50c344077
@@ -75,6 +75,19 @@ class TestQueries(ImpalaTestSuite):
|
||||
|
||||
def test_union(self, vector):
|
||||
self.run_test_case('QueryTest/union', vector)
|
||||
# IMPALA-3586: The passthrough and materialized children are interleaved. The batch
|
||||
# size is small to test the transition between materialized and passthrough children.
|
||||
query_string = ("select count(c) from ( "
|
||||
"select bigint_col + 1 as c from functional.alltypes limit 15 "
|
||||
"union all "
|
||||
"select bigint_col as c from functional.alltypes limit 15 "
|
||||
"union all "
|
||||
"select bigint_col + 1 as c from functional.alltypes limit 15 "
|
||||
"union all "
|
||||
"(select bigint_col as c from functional.alltypes limit 15)) t")
|
||||
vector.get_value('exec_option')['batch_size'] = 10
|
||||
result = self.execute_query(query_string, vector.get_value('exec_option'))
|
||||
assert result.data[0] == '60'
|
||||
|
||||
def test_sort(self, vector):
|
||||
if vector.get_value('table_format').file_format == 'hbase':
|
||||
|
||||
Reference in New Issue
Block a user