mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
IMPALA-14006: Bound max_instances in CreateInputCollocatedInstances
IMPALA-11604 (part 2) changes how many instances to create in Scheduler::CreateInputCollocatedInstances. This works when the left child fragment of a parent fragment is distributed across nodes. However, if the left child fragment instance is limited to only 1 node (the case of UNPARTITIONED fragment), the scheduler might over-parallelize the parent fragment by scheduling too many instances in a single node. This patch attempts to mitigate the issue in two ways. First, it adds bounding logic in PlanFragment.traverseEffectiveParallelism() to lower parallelism further if the left (probe) side of the child fragment is not well distributed across nodes. Second, it adds TQueryExecRequest.max_parallelism_per_node to relay information from Analyzer.getMaxParallelismPerNode() to the scheduler. With this information, the scheduler can do additional sanity checks to prevent Scheduler::CreateInputCollocatedInstances from over-parallelizing a fragment. Note that this sanity check can also cap MAX_FS_WRITERS option under a similar scenario. Added ScalingVerdict enum and TRACE log it to show the scaling decision steps. Testing: - Add planner test and e2e test that exercise the corner case under COMPUTE_PROCESSING_COST=1 option. - Manually comment the bounding logic in traverseEffectiveParallelism() and confirm that the scheduler's sanity check still enforces the bounding. Change-Id: I65223b820c9fd6e4267d57297b1466d4e56829b3 Reviewed-on: http://gerrit.cloudera.org:8080/22840 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
committed by
Impala Public Jenkins
parent
c0c6cc9df4
commit
3210ec58c5
@@ -457,9 +457,10 @@ Status Scheduler::CheckEffectiveInstanceCount(
|
||||
return Status(Substitute(
|
||||
"$0 scheduled instance count ($1) is higher than maximum instances per node"
|
||||
" ($2), indicating a planner bug. Consider running the query with"
|
||||
" COMPUTE_PROCESSING_COST=false.",
|
||||
" COMPUTE_PROCESSING_COST=false. Scheduler see $3 hosts for this fragment"
|
||||
" with at most $4 fragment instance assignment at one host.",
|
||||
fragment_state->fragment.display_name, largest_inst_per_host,
|
||||
qc.MAX_FRAGMENT_INSTANCES_PER_NODE));
|
||||
qc.MAX_FRAGMENT_INSTANCES_PER_NODE, num_host, largest_inst_per_host));
|
||||
}
|
||||
|
||||
int planned_inst_per_host = ceil((float)effective_instance_count / num_host);
|
||||
@@ -925,6 +926,18 @@ void Scheduler::CreateInputCollocatedInstances(
|
||||
input_fragment_state.instance_states) {
|
||||
all_hosts.insert({input_instance_state.host, input_instance_state.krpc_host});
|
||||
}
|
||||
// Sanity check max_instances with information from Planner, if set.
|
||||
if (state->request().__isset.max_parallelism_per_node) {
|
||||
int max_global = all_hosts.size() * state->request().max_parallelism_per_node;
|
||||
if (max_instances > max_global) {
|
||||
LOG(WARNING) << Substitute(
|
||||
"Fragment $0 lowered max_instance from $1 to $2 due to num_host=$3 and "
|
||||
"max_parallelism_per_node=$4",
|
||||
fragment.display_name, max_instances, max_global, all_hosts.size(),
|
||||
state->request().max_parallelism_per_node);
|
||||
max_instances = max_global;
|
||||
}
|
||||
}
|
||||
// This implementation creates the desired number of instances while balancing them
|
||||
// across hosts and ensuring that instances on the same host get consecutive instance
|
||||
// indexes.
|
||||
|
||||
@@ -1118,5 +1118,9 @@ struct TQueryExecRequest {
|
||||
// The unbounded version of cores_required. Used by Frontend to do executor group-set
|
||||
// assignment for the query. Should either be unset or set with positive value.
|
||||
18: optional i32 cores_required_unbounded
|
||||
|
||||
// Propagated value from Analyzer.getMaxParallelismPerNode().
|
||||
// Used by scheduler.cc as sanity check during scheduling.
|
||||
19: optional i32 max_parallelism_per_node
|
||||
}
|
||||
|
||||
|
||||
@@ -196,7 +196,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
|
||||
setFragmentInPlanTree(planRoot_);
|
||||
coordinatorOnly_ = coordinatorOnly;
|
||||
|
||||
// Coordinator-only fragments must be unpartitined as there is only one instance of
|
||||
// Coordinator-only fragments must be unpartitioned as there is only one instance of
|
||||
// them.
|
||||
Preconditions.checkState(!coordinatorOnly ||
|
||||
dataPartition_.equals(DataPartition.UNPARTITIONED));
|
||||
@@ -1101,10 +1101,10 @@ public class PlanFragment extends TreeNode<PlanFragment> {
|
||||
|
||||
// step 1: Set initial parallelism to the maximum possible.
|
||||
// Subsequent steps after this will not exceed maximum parallelism sets here.
|
||||
boolean canTryLower = adjustToMaxParallelism(
|
||||
ScalingVerdict verdict = adjustToMaxParallelism(
|
||||
minThreadPerNode, maxThreadPerNode, parentFragment, nodeStepCount, queryOptions);
|
||||
|
||||
if (canTryLower) {
|
||||
if (verdict == ScalingVerdict.CAN_LOWER && getAdjustedInstanceCount() > 1) {
|
||||
// step 2: Try lower parallelism by comparing output ProcessingCost of the input
|
||||
// child fragment against this fragment's segment costs.
|
||||
Preconditions.checkState(getChildCount() > 0);
|
||||
@@ -1117,10 +1117,31 @@ public class PlanFragment extends TreeNode<PlanFragment> {
|
||||
nodeStepCount, minParallelism, maxParallelism);
|
||||
setAdjustedInstanceCount(effectiveParallelism);
|
||||
if (LOG.isTraceEnabled() && effectiveParallelism != maxParallelism) {
|
||||
logCountAdjustmentTrace(maxParallelism, effectiveParallelism,
|
||||
logCountAdjustmentTrace(maxParallelism, effectiveParallelism, verdict,
|
||||
"Lower parallelism based on load and produce-consume rate ratio.");
|
||||
}
|
||||
}
|
||||
// If this is probe traversal from Planner.computeEffectiveParallelism()
|
||||
// (parentFragment == null), check possibility of left-child node (probe) being
|
||||
// underparallelized.
|
||||
if (parentFragment == null && hasChild(0)
|
||||
&& verdict != ScalingVerdict.FIXED_BY_PLAN_NODE
|
||||
&& verdict != ScalingVerdict.FIXED_BY_PARTITIONED_JOIN_BUILD) {
|
||||
// Cap max parallelism at left child max.
|
||||
// This is to prevent Scheduler::CreateInputCollocatedInstances to overparallelize.
|
||||
// It is safe to do if verdict is neither of FIXED_BY_PLAN_NODE nor
|
||||
// FIXED_BY_PARTITIONED_JOIN_BUILD.
|
||||
PlanFragment lc = getChild(0);
|
||||
int lcNumNode = lc.getNumNodes();
|
||||
int lcMaxParallelism = IntMath.saturatedMultiply(maxThreadPerNode, lcNumNode);
|
||||
if (lcMaxParallelism < getAdjustedInstanceCount()) {
|
||||
LOG.warn("Reducing instance count of {} from {} to {} to follow left-child node "
|
||||
+ "{} (num_nodes={}, num_instance={}). Scaling verdict was {}.",
|
||||
getId(), getAdjustedInstanceCount(), lcMaxParallelism, lc.getId(),
|
||||
lc.getNumNodes(), lc.getAdjustedInstanceCount(), verdict);
|
||||
setAdjustedInstanceCount(lcMaxParallelism);
|
||||
}
|
||||
}
|
||||
validateProcessingCosts();
|
||||
|
||||
// step 3: Compute the parallelism of join build fragment.
|
||||
@@ -1192,6 +1213,15 @@ public class PlanFragment extends TreeNode<PlanFragment> {
|
||||
return maxParallelism;
|
||||
}
|
||||
|
||||
private enum ScalingVerdict {
|
||||
CAN_LOWER,
|
||||
FIXED_BY_PLAN_NODE,
|
||||
FIXED_BY_PARTITIONED_JOIN_BUILD,
|
||||
UNION_FRAGMENT_BOUNDED,
|
||||
SCAN_FRAGMENT_BOUNDED,
|
||||
MIN_GLOBAL_PARALLELISM
|
||||
}
|
||||
|
||||
/**
|
||||
* Adjust parallelism of this fragment to the maximum allowed.
|
||||
* This method initialize maxParallelism_ and adjustedInstanceCount_.
|
||||
@@ -1204,33 +1234,38 @@ public class PlanFragment extends TreeNode<PlanFragment> {
|
||||
* @param parentFragment Parent fragment of this fragment.
|
||||
* @param nodeStepCount The step count used to increase this fragment's parallelism.
|
||||
* Usually equal to number of nodes or just 1.
|
||||
* @return True if it is possible to lower this fragment's parallelism through
|
||||
* ProcessingCost comparison. False if the parallelism should not be changed anymore.
|
||||
* @return a ScalingVerdict. If CAN_LOWER, it is possible to lower this fragment's
|
||||
* parallelism through ProcessingCost comparison.
|
||||
*/
|
||||
private boolean adjustToMaxParallelism(final int minThreadPerNode,
|
||||
private ScalingVerdict adjustToMaxParallelism(final int minThreadPerNode,
|
||||
final int maxThreadPerNode, final @Nullable PlanFragment parentFragment,
|
||||
final int nodeStepCount, TQueryOptions queryOptions) {
|
||||
int maxThreadAllowed = IntMath.saturatedMultiply(maxThreadPerNode, getNumNodes());
|
||||
boolean canTryLower = true;
|
||||
ScalingVerdict verdict = ScalingVerdict.CAN_LOWER;
|
||||
|
||||
// Compute selectedParallelism as the maximum allowed parallelism.
|
||||
int selectedParallelism = getNumInstances();
|
||||
if (isFixedParallelism_) {
|
||||
selectedParallelism = getAdjustedInstanceCount();
|
||||
maxParallelism_ = selectedParallelism;
|
||||
canTryLower = false;
|
||||
verdict = ScalingVerdict.FIXED_BY_PLAN_NODE;
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("{} instance count fixed to {}. verdict={}", getId(), maxParallelism_,
|
||||
verdict);
|
||||
}
|
||||
} else if (isPartitionedJoinBuildFragment()) {
|
||||
// This is a non-shared (PARTITIONED) join build fragment.
|
||||
// Parallelism of this fragment is equal to its parent parallelism.
|
||||
Preconditions.checkNotNull(parentFragment);
|
||||
final int parentParallelism = parentFragment.getAdjustedInstanceCount();
|
||||
if (LOG.isTraceEnabled() && selectedParallelism != parentParallelism) {
|
||||
logCountAdjustmentTrace(selectedParallelism, parentParallelism,
|
||||
"Partitioned join build fragment follow parent's parallelism.");
|
||||
}
|
||||
selectedParallelism = parentParallelism;
|
||||
maxParallelism_ = parentFragment.getMaxParallelism();
|
||||
canTryLower = false; // no need to compute effective parallelism anymore.
|
||||
// no need to compute effective parallelism anymore.
|
||||
verdict = ScalingVerdict.FIXED_BY_PARTITIONED_JOIN_BUILD;
|
||||
if (LOG.isTraceEnabled()) {
|
||||
logCountAdjustmentTrace(selectedParallelism, parentParallelism, verdict,
|
||||
"Partitioned join build fragment follow parent's parallelism.");
|
||||
}
|
||||
} else {
|
||||
UnionNode unionNode = getUnionNode();
|
||||
|
||||
@@ -1242,17 +1277,17 @@ public class PlanFragment extends TreeNode<PlanFragment> {
|
||||
if (maxParallelism_ > maxThreadAllowed) {
|
||||
selectedParallelism = maxThreadAllowed;
|
||||
if (LOG.isTraceEnabled()) {
|
||||
logCountAdjustmentTrace(
|
||||
getNumInstances(), selectedParallelism, "Follow maxThreadPerNode.");
|
||||
logCountAdjustmentTrace(getNumInstances(), selectedParallelism, verdict,
|
||||
"Follow maxThreadPerNode.");
|
||||
}
|
||||
} else {
|
||||
selectedParallelism = maxParallelism_;
|
||||
if (LOG.isTraceEnabled()) {
|
||||
logCountAdjustmentTrace(getNumInstances(), selectedParallelism,
|
||||
logCountAdjustmentTrace(getNumInstances(), selectedParallelism, verdict,
|
||||
"Follow minimum work per thread or max child count.");
|
||||
}
|
||||
}
|
||||
canTryLower = false;
|
||||
verdict = ScalingVerdict.UNION_FRAGMENT_BOUNDED;
|
||||
} else {
|
||||
// This is an interior fragment or fragment with single scan node.
|
||||
// We calculate maxParallelism_, minParallelism, and selectedParallelism across
|
||||
@@ -1286,7 +1321,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
|
||||
|
||||
// Prevent caller from lowering parallelism if fragment has ScanNode
|
||||
// because there is no child fragment to compare with.
|
||||
canTryLower = false;
|
||||
verdict = ScalingVerdict.SCAN_FRAGMENT_BOUNDED;
|
||||
}
|
||||
|
||||
int minParallelism = Math.min(
|
||||
@@ -1297,20 +1332,20 @@ public class PlanFragment extends TreeNode<PlanFragment> {
|
||||
if (boundedParallelism > maxThreadAllowed) {
|
||||
selectedParallelism = maxThreadAllowed;
|
||||
if (LOG.isTraceEnabled()) {
|
||||
logCountAdjustmentTrace(
|
||||
getNumInstances(), selectedParallelism, "Follow maxThreadPerNode.");
|
||||
logCountAdjustmentTrace(getNumInstances(), selectedParallelism, verdict,
|
||||
"Follow maxThreadPerNode.");
|
||||
}
|
||||
} else {
|
||||
if (boundedParallelism < minParallelism && minParallelism < maxScannerThreads) {
|
||||
boundedParallelism = minParallelism;
|
||||
canTryLower = false;
|
||||
verdict = ScalingVerdict.MIN_GLOBAL_PARALLELISM;
|
||||
if (LOG.isTraceEnabled()) {
|
||||
logCountAdjustmentTrace(
|
||||
getNumInstances(), boundedParallelism, "Follow minThreadPerNode.");
|
||||
logCountAdjustmentTrace(getNumInstances(), boundedParallelism, verdict,
|
||||
"Follow minThreadPerNode.");
|
||||
}
|
||||
} else if (LOG.isTraceEnabled()) {
|
||||
logCountAdjustmentTrace(
|
||||
getNumInstances(), boundedParallelism, "Follow minimum work per thread.");
|
||||
logCountAdjustmentTrace(getNumInstances(), boundedParallelism, verdict,
|
||||
"Follow minimum work per thread.");
|
||||
}
|
||||
selectedParallelism = boundedParallelism;
|
||||
}
|
||||
@@ -1325,7 +1360,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
|
||||
|
||||
// Initialize this fragment's parallelism to the selectedParallelism.
|
||||
setAdjustedInstanceCount(selectedParallelism);
|
||||
return canTryLower && selectedParallelism > 1;
|
||||
return verdict;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -1392,8 +1427,8 @@ public class PlanFragment extends TreeNode<PlanFragment> {
|
||||
|
||||
int adjustedCount = getAdjustedInstanceCount();
|
||||
if (LOG.isTraceEnabled() && originalInstanceCount_ != adjustedCount) {
|
||||
logCountAdjustmentTrace(
|
||||
originalInstanceCount_, adjustedCount, "Finalize effective parallelism.");
|
||||
LOG.trace("{} finalize instance count from {} to {}.", getId(),
|
||||
originalInstanceCount_, adjustedCount);
|
||||
}
|
||||
|
||||
for (PlanNode node : collectPlanNodes()) { node.numInstances_ = adjustedCount; }
|
||||
@@ -1403,9 +1438,10 @@ public class PlanFragment extends TreeNode<PlanFragment> {
|
||||
}
|
||||
}
|
||||
|
||||
private void logCountAdjustmentTrace(int oldCount, int newCount, String reason) {
|
||||
LOG.trace("{} adjust instance count from {} to {}. {}", getId(), oldCount, newCount,
|
||||
reason);
|
||||
private void logCountAdjustmentTrace(
|
||||
int oldCount, int newCount, ScalingVerdict verdict, String reason) {
|
||||
LOG.trace("{} adjust instance count from {} to {}. verdict={} reason={}", getId(),
|
||||
oldCount, newCount, verdict, reason);
|
||||
}
|
||||
|
||||
private static boolean isBlockingNode(PlanNode node) {
|
||||
|
||||
@@ -619,6 +619,7 @@ public class Planner {
|
||||
CoreCount unboundedCores = computeBlockingAwareCores(postOrderFragments, true);
|
||||
int coresRequiredUnbounded = Math.max(1, unboundedCores.totalWithoutCoordinator());
|
||||
request.setCores_required_unbounded(coresRequiredUnbounded);
|
||||
request.setMax_parallelism_per_node(rootAnalyzer.getMaxParallelismPerNode());
|
||||
LOG.info("CoreCountUnbounded=" + unboundedCores
|
||||
+ ", coresRequiredUnbounded=" + coresRequiredUnbounded);
|
||||
}
|
||||
|
||||
@@ -1504,7 +1504,7 @@ public class PlannerTest extends PlannerTestBase {
|
||||
TQueryOptions options = tpcdsParquetQueryOptions();
|
||||
options.setCompute_processing_cost(true);
|
||||
options.setProcessing_cost_min_threads(2);
|
||||
options.setMax_fragment_instances_per_node(16);
|
||||
// MAX_FRAGMENT_INSTANCES_PER_NODE option is set at test file.
|
||||
runPlannerTestFile("tpcds-processing-cost", "tpcds_partitioned_parquet_snap", options,
|
||||
tpcdsParquetTestOptions());
|
||||
}
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
# Regression test for IMPALA-12510: select star on empty table
|
||||
select * from functional.emptytable;
|
||||
---- QUERYOPTIONS
|
||||
MAX_FRAGMENT_INSTANCES_PER_NODE=16
|
||||
---- PARALLELPLANS
|
||||
Max Per-Host Resource Reservation: Memory=4.00MB Threads=2
|
||||
Per-Host Resource Estimates: Memory=10MB
|
||||
@@ -47,6 +49,8 @@ group by rollup (
|
||||
ss_net_paid,
|
||||
ss_ext_sales_price)
|
||||
limit 100
|
||||
---- QUERYOPTIONS
|
||||
MAX_FRAGMENT_INSTANCES_PER_NODE=16
|
||||
---- PARALLELPLANS
|
||||
Max Per-Host Resource Reservation: Memory=1.81GB Threads=11
|
||||
Per-Host Resource Estimates: Memory=2.38GB
|
||||
@@ -140,6 +144,8 @@ max-parallelism=21 segment-costs=[89349287, 198453333]
|
||||
in pipelines: 00(GETNEXT)
|
||||
====
|
||||
select * from income_band;
|
||||
---- QUERYOPTIONS
|
||||
MAX_FRAGMENT_INSTANCES_PER_NODE=16
|
||||
---- PARALLELPLANS
|
||||
Max Per-Host Resource Reservation: Memory=4.02MB Threads=2
|
||||
Per-Host Resource Estimates: Memory=20MB
|
||||
@@ -170,6 +176,8 @@ max-parallelism=1 segment-costs=[28]
|
||||
====
|
||||
# Scan cost should be exactly the same as select star without limit.
|
||||
select * from income_band limit 1000000000;
|
||||
---- QUERYOPTIONS
|
||||
MAX_FRAGMENT_INSTANCES_PER_NODE=16
|
||||
---- PARALLELPLANS
|
||||
Max Per-Host Resource Reservation: Memory=4.02MB Threads=2
|
||||
Per-Host Resource Estimates: Memory=20MB
|
||||
@@ -209,6 +217,8 @@ select
|
||||
from functional.emptytable
|
||||
) b
|
||||
where rk = 1;
|
||||
---- QUERYOPTIONS
|
||||
MAX_FRAGMENT_INSTANCES_PER_NODE=16
|
||||
---- PARALLELPLANS
|
||||
Max Per-Host Resource Reservation: Memory=48.00MB Threads=4
|
||||
Per-Host Resource Estimates: Memory=48MB
|
||||
@@ -278,3 +288,118 @@ max-parallelism=1 segment-costs=[0, 0]
|
||||
tuple-ids=0 row-size=16B cardinality=0 cost=0
|
||||
in pipelines: 00(GETNEXT)
|
||||
====
|
||||
# IMPALA-14006: Regression test for UNPARTITIONED fragment in probe side.
|
||||
# F03 should schedule 2 instances because F01 is only 1 node and 1 instance.
|
||||
select
|
||||
timestamp_col,
|
||||
rank
|
||||
from
|
||||
functional.alltypessmall alts
|
||||
left outer join (
|
||||
select id, dense_rank() over(order by id) as rank, int_col from functional.alltypes
|
||||
) rank_view on (rank_view.id = alts.id)
|
||||
where rank < 10;
|
||||
---- QUERYOPTIONS
|
||||
MAX_FRAGMENT_INSTANCES_PER_NODE=2
|
||||
---- PARALLELPLANS
|
||||
Max Per-Host Resource Reservation: Memory=15.98MB Threads=6
|
||||
Per-Host Resource Estimates: Memory=49MB
|
||||
F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
|
||||
| Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
|
||||
| max-parallelism=1 segment-costs=[233] cpu-comparison-result=8 [max(5 (self) vs 8 (sum children))]
|
||||
PLAN-ROOT SINK
|
||||
| output exprs: timestamp_col, dense_rank()
|
||||
| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0 cost=200
|
||||
|
|
||||
09:EXCHANGE [UNPARTITIONED]
|
||||
| mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
|
||||
| tuple-ids=7N,6N,0 row-size=32B cardinality=100 cost=33
|
||||
| in pipelines: 02(GETNEXT)
|
||||
|
|
||||
F03:PLAN FRAGMENT [HASH(alts.id)] hosts=3 instances=2 (adjusted from 6)
|
||||
Per-Instance Resources: mem-estimate=193.11KB mem-reservation=0B thread-reservation=1
|
||||
max-parallelism=3 segment-costs=[713]
|
||||
05:HASH JOIN [RIGHT OUTER JOIN, PARTITIONED]
|
||||
| hash-table-id=00
|
||||
| hash predicates: id = alts.id
|
||||
| fk/pk conjuncts: id = alts.id
|
||||
| other predicates: dense_rank() < CAST(10 AS BIGINT)
|
||||
| mem-estimate=0B mem-reservation=0B spill-buffer=64.00KB thread-reservation=0
|
||||
| tuple-ids=7N,6N,0 row-size=32B cardinality=100 cost=205
|
||||
| in pipelines: 02(GETNEXT), 00(OPEN)
|
||||
|
|
||||
|--F05:PLAN FRAGMENT [HASH(alts.id)] hosts=3 instances=2 (adjusted from 6)
|
||||
| | Per-Instance Resources: mem-estimate=1.95MB mem-reservation=1.94MB thread-reservation=1
|
||||
| | max-parallelism=3 segment-costs=[120]
|
||||
| JOIN BUILD
|
||||
| | join-table-id=00 plan-id=01 cohort-id=01
|
||||
| | build expressions: alts.id
|
||||
| | mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0 cost=100
|
||||
| |
|
||||
| 08:EXCHANGE [HASH(alts.id)]
|
||||
| | mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
|
||||
| | tuple-ids=0 row-size=20B cardinality=100 cost=20
|
||||
| | in pipelines: 00(GETNEXT)
|
||||
| |
|
||||
| F02:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 (adjusted from 4)
|
||||
| Per-Instance Resources: mem-estimate=16.19MB mem-reservation=8.00KB thread-reservation=1
|
||||
| max-parallelism=3 segment-costs=[409]
|
||||
| 00:SCAN HDFS [functional.alltypessmall alts, RANDOM]
|
||||
| HDFS partitions=4/4 files=4 size=6.32KB
|
||||
| stored statistics:
|
||||
| table: rows=100 size=6.32KB
|
||||
| partitions: 4/4 rows=100
|
||||
| columns: all
|
||||
| extrapolated-rows=disabled max-scan-range-rows=25
|
||||
| mem-estimate=16.00MB mem-reservation=8.00KB thread-reservation=0
|
||||
| tuple-ids=0 row-size=20B cardinality=100 cost=229
|
||||
| in pipelines: 00(GETNEXT)
|
||||
|
|
||||
07:EXCHANGE [HASH(id)]
|
||||
| mem-estimate=17.11KB mem-reservation=0B thread-reservation=0
|
||||
| tuple-ids=7,6 row-size=12B cardinality=730 cost=148
|
||||
| in pipelines: 02(GETNEXT)
|
||||
|
|
||||
F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
|
||||
Per-Instance Resources: mem-estimate=4.19MB mem-reservation=4.00MB thread-reservation=1
|
||||
max-parallelism=1 segment-costs=[17820]
|
||||
04:SELECT
|
||||
| predicates: dense_rank() < CAST(10 AS BIGINT)
|
||||
| mem-estimate=0B mem-reservation=0B thread-reservation=0
|
||||
| tuple-ids=7,6 row-size=12B cardinality=730 cost=7300
|
||||
| in pipelines: 02(GETNEXT)
|
||||
|
|
||||
03:ANALYTIC
|
||||
| functions: dense_rank()
|
||||
| order by: id ASC
|
||||
| window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
|
||||
| mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
|
||||
| tuple-ids=7,6 row-size=12B cardinality=7.30K cost=7300
|
||||
| in pipelines: 02(GETNEXT)
|
||||
|
|
||||
06:MERGING-EXCHANGE [UNPARTITIONED]
|
||||
| order by: id ASC
|
||||
| mem-estimate=33.50KB mem-reservation=0B thread-reservation=0
|
||||
| tuple-ids=7 row-size=4B cardinality=7.30K cost=1904
|
||||
| in pipelines: 02(GETNEXT)
|
||||
|
|
||||
F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 (adjusted from 6)
|
||||
Per-Instance Resources: mem-estimate=22.00MB mem-reservation=6.03MB thread-reservation=1
|
||||
max-parallelism=3 segment-costs=[20162, 3059]
|
||||
02:SORT
|
||||
| order by: id ASC
|
||||
| mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB thread-reservation=0
|
||||
| tuple-ids=7 row-size=4B cardinality=7.30K cost=2819
|
||||
| in pipelines: 02(GETNEXT), 01(OPEN)
|
||||
|
|
||||
01:SCAN HDFS [functional.alltypes, RANDOM]
|
||||
HDFS partitions=24/24 files=24 size=478.45KB
|
||||
stored statistics:
|
||||
table: rows=7.30K size=478.45KB
|
||||
partitions: 24/24 rows=7.30K
|
||||
columns: all
|
||||
extrapolated-rows=disabled max-scan-range-rows=310
|
||||
mem-estimate=16.00MB mem-reservation=32.00KB thread-reservation=0
|
||||
tuple-ids=1 row-size=4B cardinality=7.30K cost=17343
|
||||
in pipelines: 01(GETNEXT)
|
||||
====
|
||||
|
||||
30
testdata/workloads/tpcds/queries/unpartitioned-probe.test
vendored
Normal file
30
testdata/workloads/tpcds/queries/unpartitioned-probe.test
vendored
Normal file
@@ -0,0 +1,30 @@
|
||||
====
|
||||
---- QUERY: UNPARTITIONED_PROBE
|
||||
# Validate num nodes and num instances from ExecSummary.
|
||||
select
|
||||
timestamp_col,
|
||||
rank
|
||||
from
|
||||
functional.alltypessmall alts
|
||||
left outer join (
|
||||
select id, dense_rank() over(order by id) as rank, int_col from functional.alltypes
|
||||
) rank_view on (rank_view.id = alts.id)
|
||||
where rank < 10;
|
||||
---- RESULTS: VERIFY_IS_EQUAL_SORTED
|
||||
2009-01-01 00:00:00,1
|
||||
2009-01-01 00:01:00,2
|
||||
2009-01-01 00:02:00.100000000,3
|
||||
2009-01-01 00:03:00.300000000,4
|
||||
2009-01-01 00:04:00.600000000,5
|
||||
2009-01-01 00:05:00.100000000,6
|
||||
2009-01-01 00:06:00.150000000,7
|
||||
2009-01-01 00:07:00.210000000,8
|
||||
2009-01-01 00:08:00.280000000,9
|
||||
---- TYPES
|
||||
TIMESTAMP, BIGINT
|
||||
---- RUNTIME_PROFILE
|
||||
row_regex: F03:EXCHANGE SENDER .* 1 .* 2 .*
|
||||
row_regex: 05:HASH JOIN .* 1 .* 2 .*
|
||||
row_regex: F01:EXCHANGE SENDER .* 1 .* 1 .*
|
||||
row_regex: 04:SELECT .* 1 .* 1 .*
|
||||
====
|
||||
@@ -782,6 +782,12 @@ class TestTpcdsQueryWithProcessingCost(TestTpcdsQuery):
|
||||
new_vector.get_value('exec_option')['max_fragment_instances_per_node'] = 2
|
||||
self.run_test_case(self.get_workload() + '-q67a', new_vector)
|
||||
|
||||
def test_unpartitioned_probe(self, vector):
|
||||
"""Set max_fragment_instances_per_node to 2 to contrast against num_nodes."""
|
||||
new_vector = deepcopy(vector)
|
||||
new_vector.get_value('exec_option')['max_fragment_instances_per_node'] = 2
|
||||
self.run_test_case('unpartitioned-probe', new_vector)
|
||||
|
||||
|
||||
@SkipIfBuildType.dev_build
|
||||
@SkipIfDockerizedCluster.insufficient_mem_limit
|
||||
|
||||
Reference in New Issue
Block a user