IMPALA-14263: Add broadcast_cost_scale_factor option

This commit enhances the distributed planner's costing model for
broadcast joins by introducing the `broadcast_cost_scale_factor` query
option. This option enables users to fine-tune the planner's decision
between broadcast and partitioned joins.

Key changes:
- The total broadcast cost is scaled by the new
  `broadcast_cost_scale_factor` query option, allowing users to favor or
  penalize broadcast joins as needed when setting query hint is not
  feasible.
- Updated the planner logic and test cases to reflect the new costing
  model and options.

This addresses scenarios where the default costing could lead to
suboptimal join distribution choices, particularly in a large-scale
cluster where the number of executors can increase broadcast cost, while
choosing a partitioned strategy can lead to data skew. Admin can set
`broadcast_cost_scale_factor` less than 1.0 to make DistributedPlanner
favor broadcast more than partitioned join (with possible downside of
higher memory usage per query and higher network transmission).

Existing query hints still take precedence over this option. Note that
this option is applied independent of `broadcast_to_partition_factor`
option (see IMPALA-10287). In MT_DOP>1 setup, it should be sufficient to
set `use_dop_for_costing=True` and tune `broadcast_to_partition_factor`
only.

Testing:
Added FE tests.

Change-Id: I475f8a26b2171e87952b69f66a5c18f77c2b3133
Reviewed-on: http://gerrit.cloudera.org:8080/23258
Reviewed-by: Wenzhe Zhou <wzhou@cloudera.com>
Reviewed-by: Aman Sinha <amsinha@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Riza Suminto
2025-08-05 14:40:10 -07:00
committed by Impala Public Jenkins
parent 09a6f0e6cd
commit cc1cbb559a
6 changed files with 454 additions and 3 deletions

View File

@@ -1398,6 +1398,13 @@ Status impala::SetQueryOption(TImpalaQueryOptions::type option, const string& va
query_options->__set_hide_analyzed_query(IsTrue(value));
break;
}
case TImpalaQueryOptions::BROADCAST_COST_SCALE_FACTOR: {
double double_val = 0.0f;
RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckInclusiveLowerBound<double>(
option, value, 0.0, &double_val));
query_options->__set_broadcast_cost_scale_factor(double_val);
break;
}
default:
string key = to_string(option);
if (IsRemovedQueryOption(key)) {

View File

@@ -51,7 +51,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
// plus one. Thus, the second argument to the DCHECK has to be updated every
// time we add or remove a query option to/from the enum TImpalaQueryOptions.
constexpr unsigned NUM_QUERY_OPTIONS =
TImpalaQueryOptions::HIDE_ANALYZED_QUERY + 1;
TImpalaQueryOptions::BROADCAST_COST_SCALE_FACTOR + 1;
#define QUERY_OPTS_TABLE \
DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(), NUM_QUERY_OPTIONS); \
REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
@@ -378,6 +378,8 @@ constexpr unsigned NUM_QUERY_OPTIONS =
TQueryOptionLevel::ADVANCED) \
QUERY_OPT_FN(json_binary_format, JSON_BINARY_FORMAT, TQueryOptionLevel::REGULAR) \
QUERY_OPT_FN(hide_analyzed_query, HIDE_ANALYZED_QUERY, TQueryOptionLevel::ADVANCED) \
QUERY_OPT_FN(broadcast_cost_scale_factor, BROADCAST_COST_SCALE_FACTOR, \
TQueryOptionLevel::ADVANCED) \
;
/// Enforce practical limits on some query options to avoid undesired query state.

View File

@@ -1037,6 +1037,13 @@ enum TImpalaQueryOptions {
// Hide analyzed query from runtime profile. This is useful if query is too large,
// such as INSERT INTO with hundreds of VALUES.
HIDE_ANALYZED_QUERY = 193
// A scale factor that is applied to the cost of a broadcast join.
// This is used to adjust DataDistribution strategy between broadcast joins vs
// partitioned joins. Setting to a value greater than 1.0 will favor partitioned joins,
// while setting to a value less than 1.0 will favor broadcast joins more.
// Default to 1.0, which means no adjustment is applied.
BROADCAST_COST_SCALE_FACTOR = 194
}
// The summary of a DML statement.

View File

@@ -785,6 +785,9 @@ struct TQueryOptions {
// See comment in ImpalaService.thrift
194: optional bool hide_analyzed_query = false
// See comment in ImpalaService.thrift
195: optional double broadcast_cost_scale_factor = 1.0
}
// Impala currently has three types of sessions: Beeswax, HiveServer2 and external

View File

@@ -603,11 +603,14 @@ public class DistributedPlanner {
hashTblBuildCost *= (long) (ctx_.getQueryOptions().broadcast_to_partition_factor
* Math.max(1.0, Math.sqrt(actual_dop)));
}
broadcastCost = dataPayload + hashTblBuildCost;
broadcastCost = (long) ((dataPayload + hashTblBuildCost)
* ctx_.getQueryOptions().broadcast_cost_scale_factor);
}
}
if (LOG.isTraceEnabled()) {
LOG.trace("broadcast: cost=" + Long.toString(broadcastCost));
LOG.trace("broadcast: cost=" + Long.toString(broadcastCost)
+ " broadcast_cost_scale_factor="
+ ctx_.getQueryOptions().broadcast_cost_scale_factor);
LOG.trace("card=" + Long.toString(rhsTree.getCardinality()) + " row_size="
+ Float.toString(rhsTree.getAvgRowSize()) + " #nodes="
+ Integer.toString(leftChildNodes));

View File

@@ -536,3 +536,432 @@ PLAN-ROOT SINK
runtime filters: RF001 -> tpcds.catalog_sales.cs_bill_customer_sk, RF002 -> tpcds.catalog_sales.cs_item_sk, RF006 -> cs_sold_date_sk
row-size=36B cardinality=1.44M
====
# The same Q78.
# broadcast_cost_scale=0.25 will reduce the broadcast cost from
# broadcast_to_partition_factor=4.0 multiple.
# 03:HASH JOIN turn back to broadcast join.
with ws as
(select d_year AS ws_sold_year, ws_item_sk,
ws_bill_customer_sk ws_customer_sk,
sum(ws_quantity) ws_qty,
sum(ws_wholesale_cost) ws_wc,
sum(ws_sales_price) ws_sp
from web_sales
left join web_returns on wr_order_number=ws_order_number and ws_item_sk=wr_item_sk
join date_dim on ws_sold_date_sk = d_date_sk
where wr_order_number is null
group by d_year, ws_item_sk, ws_bill_customer_sk
),
cs as
(select d_year AS cs_sold_year, cs_item_sk,
cs_bill_customer_sk cs_customer_sk,
sum(cs_quantity) cs_qty,
sum(cs_wholesale_cost) cs_wc,
sum(cs_sales_price) cs_sp
from catalog_sales
left join catalog_returns on cr_order_number=cs_order_number and cs_item_sk=cr_item_sk
join date_dim on cs_sold_date_sk = d_date_sk
where cr_order_number is null
group by d_year, cs_item_sk, cs_bill_customer_sk
),
ss as
(select d_year AS ss_sold_year, ss_item_sk,
ss_customer_sk,
sum(ss_quantity) ss_qty,
sum(ss_wholesale_cost) ss_wc,
sum(ss_sales_price) ss_sp
from store_sales
left join store_returns on sr_ticket_number=ss_ticket_number and ss_item_sk=sr_item_sk
join date_dim on ss_sold_date_sk = d_date_sk
where sr_ticket_number is null
group by d_year, ss_item_sk, ss_customer_sk
)
select
ss_sold_year, ss_item_sk, ss_customer_sk,
round(ss_qty/(coalesce(ws_qty,0)+coalesce(cs_qty,0)),2) ratio,
ss_qty store_qty, ss_wc store_wholesale_cost, ss_sp store_sales_price,
coalesce(ws_qty,0)+coalesce(cs_qty,0) other_chan_qty,
coalesce(ws_wc,0)+coalesce(cs_wc,0) other_chan_wholesale_cost,
coalesce(ws_sp,0)+coalesce(cs_sp,0) other_chan_sales_price
from ss
left join ws on (ws_sold_year=ss_sold_year and ws_item_sk=ss_item_sk and ws_customer_sk=ss_customer_sk)
left join cs on (cs_sold_year=ss_sold_year and cs_item_sk=ss_item_sk and cs_customer_sk=ss_customer_sk)
where (coalesce(ws_qty,0)>0 or coalesce(cs_qty, 0)>0) and ss_sold_year=2002
order by
ss_sold_year, ss_item_sk, ss_customer_sk,
ss_qty desc, ss_wc desc, ss_sp desc,
other_chan_qty,
other_chan_wholesale_cost,
other_chan_sales_price,
round(ss_qty/(coalesce(ws_qty+cs_qty,1)),2)
limit 100
---- QUERYOPTIONS
mt_dop=12
use_dop_for_costing=true
broadcast_to_partition_factor=4.0
broadcast_cost_scale_factor=0.25
---- DISTRIBUTEDPLAN
PLAN-ROOT SINK
|
35:MERGING-EXCHANGE [UNPARTITIONED]
| order by: ss_sold_year ASC, ss_item_sk ASC, ss_customer_sk ASC, ss_qty DESC, ss_wc DESC, ss_sp DESC, coalesce(ws_qty, 0) + coalesce(cs_qty, 0) ASC, coalesce(ws_wc, 0) + coalesce(cs_wc, 0) ASC, coalesce(ws_sp, 0) + coalesce(cs_sp, 0) ASC, round(ss_qty / (coalesce(ws_qty + cs_qty, 1)), 2) ASC
| limit: 100
|
20:TOP-N [LIMIT=100]
| order by: ss_sold_year ASC, ss_item_sk ASC, ss_customer_sk ASC, ss_qty DESC, ss_wc DESC, ss_sp DESC, coalesce(ws_qty, 0) + coalesce(cs_qty, 0) ASC, coalesce(ws_wc, 0) + coalesce(cs_wc, 0) ASC, coalesce(ws_sp, 0) + coalesce(cs_sp, 0) ASC, round(ss_qty / (coalesce(ws_qty + cs_qty, 1)), 2) ASC
| row-size=104B cardinality=100
|
19:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
| hash predicates: d_year = d_year, cs_bill_customer_sk = ss_customer_sk, cs_item_sk = ss_item_sk
| other predicates: (coalesce(sum(ws_quantity), 0) > 0 OR coalesce(sum(cs_quantity), 0) > 0)
| row-size=168B cardinality=3.00K
|
|--JOIN BUILD
| | join-table-id=00 plan-id=01 cohort-id=01
| | build expressions: d_year, ss_customer_sk, ss_item_sk
| | runtime filters: RF000 <- d_year, RF001 <- ss_customer_sk, RF002 <- ss_item_sk
| |
| 34:EXCHANGE [HASH(d_year,ss_item_sk,ss_customer_sk)]
| |
| 18:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
| | hash predicates: d_year = d_year, ws_bill_customer_sk = ss_customer_sk, ws_item_sk = ss_item_sk
| | row-size=112B cardinality=3.00K
| |
| |--JOIN BUILD
| | | join-table-id=01 plan-id=02 cohort-id=02
| | | build expressions: d_year, ss_customer_sk, ss_item_sk
| | | runtime filters: RF008 <- d_year, RF009 <- ss_customer_sk, RF010 <- ss_item_sk
| | |
| | 33:EXCHANGE [HASH(d_year,ss_item_sk,ss_customer_sk)]
| | |
| | 32:AGGREGATE [FINALIZE]
| | | output: sum:merge(ss_quantity), sum:merge(ss_wholesale_cost), sum:merge(ss_sales_price)
| | | group by: d_year, ss_item_sk, ss_customer_sk
| | | having: d_year = 2002
| | | row-size=56B cardinality=3.00K
| | |
| | 31:EXCHANGE [HASH(d_year,ss_item_sk,ss_customer_sk)]
| | |
| | 05:AGGREGATE [STREAMING]
| | | output: sum(ss_quantity), sum(ss_wholesale_cost), sum(ss_sales_price)
| | | group by: d_year, ss_item_sk, ss_customer_sk
| | | row-size=56B cardinality=589.03K
| | |
| | 04:HASH JOIN [INNER JOIN, BROADCAST]
| | | hash predicates: ss_sold_date_sk = d_date_sk
| | | row-size=60B cardinality=589.03K
| | |
| | |--JOIN BUILD
| | | | join-table-id=02 plan-id=03 cohort-id=03
| | | | build expressions: d_date_sk
| | | | runtime filters: RF016 <- d_date_sk
| | | |
| | | 30:EXCHANGE [BROADCAST]
| | | |
| | | 02:SCAN HDFS [tpcds.date_dim]
| | | HDFS partitions=1/1 files=1 size=9.84MB
| | | predicates: tpcds.date_dim.d_year = 2002
| | | row-size=8B cardinality=373
| | |
| | 03:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
| | | hash predicates: ss_item_sk = sr_item_sk, ss_ticket_number = sr_ticket_number
| | | other predicates: sr_ticket_number IS NULL
| | | row-size=52B cardinality=589.03K(filtered from 2.88M)
| | |
| | |--JOIN BUILD
| | | | join-table-id=03 plan-id=04 cohort-id=03
| | | | build expressions: sr_item_sk, sr_ticket_number
| | | |
| | | 29:EXCHANGE [BROADCAST]
| | | |
| | | 01:SCAN HDFS [tpcds.store_returns]
| | | HDFS partitions=1/1 files=1 size=31.19MB
| | | row-size=16B cardinality=287.51K
| | |
| | 00:SCAN HDFS [tpcds.store_sales]
| | HDFS partitions=1824/1824 files=1824 size=346.60MB
| | runtime filters: RF016 -> ss_sold_date_sk
| | row-size=36B cardinality=589.03K(filtered from 2.88M)
| |
| 28:AGGREGATE [FINALIZE]
| | output: sum:merge(ws_quantity), sum:merge(ws_wholesale_cost), sum:merge(ws_sales_price)
| | group by: d_year, ws_item_sk, ws_bill_customer_sk
| | row-size=56B cardinality=148.00K
| |
| 27:EXCHANGE [HASH(d_year,ws_item_sk,ws_bill_customer_sk)]
| |
| 11:AGGREGATE [STREAMING]
| | output: sum(ws_quantity), sum(ws_wholesale_cost), sum(ws_sales_price)
| | group by: d_year, ws_item_sk, ws_bill_customer_sk
| | row-size=56B cardinality=148.00K
| |
| 10:HASH JOIN [INNER JOIN, BROADCAST]
| | hash predicates: ws_sold_date_sk = d_date_sk
| | row-size=60B cardinality=148.00K
| |
| |--JOIN BUILD
| | | join-table-id=04 plan-id=05 cohort-id=02
| | | build expressions: d_date_sk
| | | runtime filters: RF014 <- d_date_sk
| | |
| | 26:EXCHANGE [BROADCAST]
| | |
| | 08:SCAN HDFS [tpcds.date_dim]
| | HDFS partitions=1/1 files=1 size=9.84MB
| | predicates: tpcds.date_dim.d_year = 2002
| | runtime filters: RF008 -> tpcds.date_dim.d_year
| | row-size=8B cardinality=373
| |
| 09:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
| | hash predicates: ws_item_sk = wr_item_sk, ws_order_number = wr_order_number
| | other predicates: wr_order_number IS NULL
| | row-size=52B cardinality=719.38K
| |
| |--JOIN BUILD
| | | join-table-id=05 plan-id=06 cohort-id=02
| | | build expressions: wr_item_sk, wr_order_number
| | |
| | 25:EXCHANGE [BROADCAST]
| | |
| | 07:SCAN HDFS [tpcds.web_returns]
| | HDFS partitions=1/1 files=1 size=9.35MB
| | runtime filters: RF010 -> tpcds.web_returns.wr_item_sk
| | row-size=16B cardinality=71.76K
| |
| 06:SCAN HDFS [tpcds.web_sales]
| HDFS partitions=1/1 files=1 size=140.07MB
| runtime filters: RF009 -> tpcds.web_sales.ws_bill_customer_sk, RF010 -> tpcds.web_sales.ws_item_sk, RF014 -> ws_sold_date_sk
| row-size=36B cardinality=719.38K
|
24:AGGREGATE [FINALIZE]
| output: sum:merge(cs_quantity), sum:merge(cs_wholesale_cost), sum:merge(cs_sales_price)
| group by: d_year, cs_item_sk, cs_bill_customer_sk
| row-size=56B cardinality=294.63K
|
23:EXCHANGE [HASH(d_year,cs_item_sk,cs_bill_customer_sk)]
|
17:AGGREGATE [STREAMING]
| output: sum(cs_quantity), sum(cs_wholesale_cost), sum(cs_sales_price)
| group by: d_year, cs_item_sk, cs_bill_customer_sk
| row-size=56B cardinality=294.63K
|
16:HASH JOIN [INNER JOIN, BROADCAST]
| hash predicates: cs_sold_date_sk = d_date_sk
| row-size=60B cardinality=294.63K
|
|--JOIN BUILD
| | join-table-id=06 plan-id=07 cohort-id=01
| | build expressions: d_date_sk
| | runtime filters: RF006 <- d_date_sk
| |
| 22:EXCHANGE [BROADCAST]
| |
| 14:SCAN HDFS [tpcds.date_dim]
| HDFS partitions=1/1 files=1 size=9.84MB
| predicates: tpcds.date_dim.d_year = 2002
| runtime filters: RF000 -> tpcds.date_dim.d_year
| row-size=8B cardinality=373
|
15:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
| hash predicates: cs_item_sk = cr_item_sk, cs_order_number = cr_order_number
| other predicates: cr_order_number IS NULL
| row-size=52B cardinality=1.44M
|
|--JOIN BUILD
| | join-table-id=07 plan-id=08 cohort-id=01
| | build expressions: cr_item_sk, cr_order_number
| |
| 21:EXCHANGE [BROADCAST]
| |
| 13:SCAN HDFS [tpcds.catalog_returns]
| HDFS partitions=1/1 files=1 size=20.39MB
| runtime filters: RF002 -> tpcds.catalog_returns.cr_item_sk
| row-size=16B cardinality=144.07K
|
12:SCAN HDFS [tpcds.catalog_sales]
HDFS partitions=1/1 files=1 size=282.20MB
runtime filters: RF001 -> tpcds.catalog_sales.cs_bill_customer_sk, RF002 -> tpcds.catalog_sales.cs_item_sk, RF006 -> cs_sold_date_sk
row-size=36B cardinality=1.44M
---- PARALLELPLANS
PLAN-ROOT SINK
|
35:MERGING-EXCHANGE [UNPARTITIONED]
| order by: ss_sold_year ASC, ss_item_sk ASC, ss_customer_sk ASC, ss_qty DESC, ss_wc DESC, ss_sp DESC, coalesce(ws_qty, 0) + coalesce(cs_qty, 0) ASC, coalesce(ws_wc, 0) + coalesce(cs_wc, 0) ASC, coalesce(ws_sp, 0) + coalesce(cs_sp, 0) ASC, round(ss_qty / (coalesce(ws_qty + cs_qty, 1)), 2) ASC
| limit: 100
|
20:TOP-N [LIMIT=100]
| order by: ss_sold_year ASC, ss_item_sk ASC, ss_customer_sk ASC, ss_qty DESC, ss_wc DESC, ss_sp DESC, coalesce(ws_qty, 0) + coalesce(cs_qty, 0) ASC, coalesce(ws_wc, 0) + coalesce(cs_wc, 0) ASC, coalesce(ws_sp, 0) + coalesce(cs_sp, 0) ASC, round(ss_qty / (coalesce(ws_qty + cs_qty, 1)), 2) ASC
| row-size=104B cardinality=100
|
19:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
| hash predicates: d_year = d_year, cs_bill_customer_sk = ss_customer_sk, cs_item_sk = ss_item_sk
| other predicates: (coalesce(sum(ws_quantity), 0) > 0 OR coalesce(sum(cs_quantity), 0) > 0)
| row-size=168B cardinality=3.00K
|
|--JOIN BUILD
| | join-table-id=00 plan-id=01 cohort-id=01
| | build expressions: d_year, ss_customer_sk, ss_item_sk
| | runtime filters: RF000 <- d_year, RF001 <- ss_customer_sk, RF002 <- ss_item_sk
| |
| 34:EXCHANGE [HASH(d_year,ss_item_sk,ss_customer_sk)]
| |
| 18:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
| | hash predicates: d_year = d_year, ws_bill_customer_sk = ss_customer_sk, ws_item_sk = ss_item_sk
| | row-size=112B cardinality=3.00K
| |
| |--JOIN BUILD
| | | join-table-id=01 plan-id=02 cohort-id=02
| | | build expressions: d_year, ss_customer_sk, ss_item_sk
| | | runtime filters: RF008 <- d_year, RF009 <- ss_customer_sk, RF010 <- ss_item_sk
| | |
| | 33:EXCHANGE [HASH(d_year,ss_item_sk,ss_customer_sk)]
| | |
| | 32:AGGREGATE [FINALIZE]
| | | output: sum:merge(ss_quantity), sum:merge(ss_wholesale_cost), sum:merge(ss_sales_price)
| | | group by: d_year, ss_item_sk, ss_customer_sk
| | | having: d_year = 2002
| | | row-size=56B cardinality=3.00K
| | |
| | 31:EXCHANGE [HASH(d_year,ss_item_sk,ss_customer_sk)]
| | |
| | 05:AGGREGATE [STREAMING]
| | | output: sum(ss_quantity), sum(ss_wholesale_cost), sum(ss_sales_price)
| | | group by: d_year, ss_item_sk, ss_customer_sk
| | | row-size=56B cardinality=589.03K
| | |
| | 04:HASH JOIN [INNER JOIN, BROADCAST]
| | | hash predicates: ss_sold_date_sk = d_date_sk
| | | row-size=60B cardinality=589.03K
| | |
| | |--JOIN BUILD
| | | | join-table-id=02 plan-id=03 cohort-id=03
| | | | build expressions: d_date_sk
| | | | runtime filters: RF016 <- d_date_sk
| | | |
| | | 30:EXCHANGE [BROADCAST]
| | | |
| | | 02:SCAN HDFS [tpcds.date_dim]
| | | HDFS partitions=1/1 files=1 size=9.84MB
| | | predicates: tpcds.date_dim.d_year = 2002
| | | row-size=8B cardinality=373
| | |
| | 03:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
| | | hash predicates: ss_item_sk = sr_item_sk, ss_ticket_number = sr_ticket_number
| | | other predicates: sr_ticket_number IS NULL
| | | row-size=52B cardinality=589.03K(filtered from 2.88M)
| | |
| | |--JOIN BUILD
| | | | join-table-id=03 plan-id=04 cohort-id=03
| | | | build expressions: sr_item_sk, sr_ticket_number
| | | |
| | | 29:EXCHANGE [BROADCAST]
| | | |
| | | 01:SCAN HDFS [tpcds.store_returns]
| | | HDFS partitions=1/1 files=1 size=31.19MB
| | | row-size=16B cardinality=287.51K
| | |
| | 00:SCAN HDFS [tpcds.store_sales]
| | HDFS partitions=1824/1824 files=1824 size=346.60MB
| | runtime filters: RF016 -> ss_sold_date_sk
| | row-size=36B cardinality=589.03K(filtered from 2.88M)
| |
| 28:AGGREGATE [FINALIZE]
| | output: sum:merge(ws_quantity), sum:merge(ws_wholesale_cost), sum:merge(ws_sales_price)
| | group by: d_year, ws_item_sk, ws_bill_customer_sk
| | row-size=56B cardinality=148.00K
| |
| 27:EXCHANGE [HASH(d_year,ws_item_sk,ws_bill_customer_sk)]
| |
| 11:AGGREGATE [STREAMING]
| | output: sum(ws_quantity), sum(ws_wholesale_cost), sum(ws_sales_price)
| | group by: d_year, ws_item_sk, ws_bill_customer_sk
| | row-size=56B cardinality=148.00K
| |
| 10:HASH JOIN [INNER JOIN, BROADCAST]
| | hash predicates: ws_sold_date_sk = d_date_sk
| | row-size=60B cardinality=148.00K
| |
| |--JOIN BUILD
| | | join-table-id=04 plan-id=05 cohort-id=02
| | | build expressions: d_date_sk
| | | runtime filters: RF014 <- d_date_sk
| | |
| | 26:EXCHANGE [BROADCAST]
| | |
| | 08:SCAN HDFS [tpcds.date_dim]
| | HDFS partitions=1/1 files=1 size=9.84MB
| | predicates: tpcds.date_dim.d_year = 2002
| | runtime filters: RF008 -> tpcds.date_dim.d_year
| | row-size=8B cardinality=373
| |
| 09:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
| | hash predicates: ws_item_sk = wr_item_sk, ws_order_number = wr_order_number
| | other predicates: wr_order_number IS NULL
| | row-size=52B cardinality=719.38K
| |
| |--JOIN BUILD
| | | join-table-id=05 plan-id=06 cohort-id=02
| | | build expressions: wr_item_sk, wr_order_number
| | |
| | 25:EXCHANGE [BROADCAST]
| | |
| | 07:SCAN HDFS [tpcds.web_returns]
| | HDFS partitions=1/1 files=1 size=9.35MB
| | runtime filters: RF010 -> tpcds.web_returns.wr_item_sk
| | row-size=16B cardinality=71.76K
| |
| 06:SCAN HDFS [tpcds.web_sales]
| HDFS partitions=1/1 files=1 size=140.07MB
| runtime filters: RF009 -> tpcds.web_sales.ws_bill_customer_sk, RF010 -> tpcds.web_sales.ws_item_sk, RF014 -> ws_sold_date_sk
| row-size=36B cardinality=719.38K
|
24:AGGREGATE [FINALIZE]
| output: sum:merge(cs_quantity), sum:merge(cs_wholesale_cost), sum:merge(cs_sales_price)
| group by: d_year, cs_item_sk, cs_bill_customer_sk
| row-size=56B cardinality=294.63K
|
23:EXCHANGE [HASH(d_year,cs_item_sk,cs_bill_customer_sk)]
|
17:AGGREGATE [STREAMING]
| output: sum(cs_quantity), sum(cs_wholesale_cost), sum(cs_sales_price)
| group by: d_year, cs_item_sk, cs_bill_customer_sk
| row-size=56B cardinality=294.63K
|
16:HASH JOIN [INNER JOIN, BROADCAST]
| hash predicates: cs_sold_date_sk = d_date_sk
| row-size=60B cardinality=294.63K
|
|--JOIN BUILD
| | join-table-id=06 plan-id=07 cohort-id=01
| | build expressions: d_date_sk
| | runtime filters: RF006 <- d_date_sk
| |
| 22:EXCHANGE [BROADCAST]
| |
| 14:SCAN HDFS [tpcds.date_dim]
| HDFS partitions=1/1 files=1 size=9.84MB
| predicates: tpcds.date_dim.d_year = 2002
| runtime filters: RF000 -> tpcds.date_dim.d_year
| row-size=8B cardinality=373
|
15:HASH JOIN [LEFT OUTER JOIN, BROADCAST]
| hash predicates: cs_item_sk = cr_item_sk, cs_order_number = cr_order_number
| other predicates: cr_order_number IS NULL
| row-size=52B cardinality=1.44M
|
|--JOIN BUILD
| | join-table-id=07 plan-id=08 cohort-id=01
| | build expressions: cr_item_sk, cr_order_number
| |
| 21:EXCHANGE [BROADCAST]
| |
| 13:SCAN HDFS [tpcds.catalog_returns]
| HDFS partitions=1/1 files=1 size=20.39MB
| runtime filters: RF002 -> tpcds.catalog_returns.cr_item_sk
| row-size=16B cardinality=144.07K
|
12:SCAN HDFS [tpcds.catalog_sales]
HDFS partitions=1/1 files=1 size=282.20MB
runtime filters: RF001 -> tpcds.catalog_sales.cs_bill_customer_sk, RF002 -> tpcds.catalog_sales.cs_item_sk, RF006 -> cs_sold_date_sk
row-size=36B cardinality=1.44M
====