mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
IMPALA-12091: Control scan parallelism by its processing cost
Before this patch, Impala still relies on MT_DOP option to decide the
degree of parallelism of the scan fragment when a query runs with
COMPUTE_PROCESSING_COST=1. This patch adds the scan node's processing
cost as another consideration to raise scan parallelism beyond MT_DOP.
Scan node cost is now adjusted to also consider the number of effective
scan ranges. Each scan range is given a weight of (0.5% *
min_processing_per_thread), which roughly means that one scan node
instance can handle at most 200 scan ranges.
Query option MAX_FRAGMENT_INSTANCES_PER_NODE is added as an upper
bound on scan parallelism if COMPUTE_PROCESSING_COST=true. If the number
of scan ranges is fewer than the maximum parallelism allowed by the scan
node's processing cost, that processing cost will be clamped down
to (min_processing_per_thread / number of scan ranges). Lowering
MAX_FRAGMENT_INSTANCES_PER_NODE can also clamp down the scan processing
cost in a similar way. For interior fragments, a combination of
MAX_FRAGMENT_INSTANCES_PER_NODE, PROCESSING_COST_MIN_THREADS, and the
number of available cores per node is accounted to determine maximum
fragment parallelism per node. For scan fragment, only the first two are
considered to encourage Frontend to choose a larger executor group as
needed.
Two new static state is added into exec-node.h: is_mt_fragment_ and
num_instances_per_node_. The backend code that refers to the MT_DOP
option is replaced with either is_mt_fragment_ or
num_instances_per_node_.
Two new criteria are added during effective parallelism calculation in
PlanFragment.adjustToMaxParallelism():
- If a fragment has UnionNode, its parallelism is the maximum between
its input fragments and its collocated ScanNode's expected
parallelism.
- If a fragment only has a single ScanNode (and no UnionNode), its
parallelism is calculated in the same fashion as the interior fragment
but will not be lowered anymore since it will not have any child
fragment to compare with.
Admission control slots remain unchanged. This may cause a query to fail
admission if Planner selects scan parallelism that is higher than the
configured admission control slots value. Setting
MAX_FRAGMENT_INSTANCES_PER_NODE equal to or lower than configured
admission control slots value can help lower scan parallelism and pass
the admission controller.
The previous workaround to control scan parallelism by IMPALA-12029 is
now removed. This patch also disables IMPALA-10287 optimization if
COMPUTE_PROCESSING_COST=true. This is because IMPALA-10287 relies on a
fixed number of fragment instances in DistributedPlanner.java. However,
effective parallelism calculation is done much later and may change the
final number of instances of hash join fragment, rendering
DistributionMode selected by IMPALA-10287 inaccurate.
This patch is benchmarked using single_node_perf_run.py with the
following parameters:
args="-gen_experimental_profile=true -default_query_options="
args+="mt_dop=4,compute_processing_cost=1,processing_cost_min_threads=1 "
./bin/single_node_perf_run.py --num_impalads=3 --scale=10 \
--workloads=tpcds --iterations=5 --table_formats=parquet/none/none \
--impalad_args="$args" \
--query_names=TPCDS-Q3,TPCDS-Q14-1,TPCDS-Q14-2,TPCDS-Q23-1,TPCDS-Q23-2,TPCDS-Q49,TPCDS-Q76,TPCDS-Q78,TPCDS-Q80A \
"IMPALA-12091~1" IMPALA-12091
The benchmark result is as follows:
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| Workload | Query | File Format | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%) | Base StdDev(%) | Iters | Median Diff(%) | MW Zval | Tval |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| TPCDS(10) | TPCDS-Q23-1 | parquet / none / none | 4.62 | 4.54 | +1.92% | 0.23% | 1.59% | 5 | +2.32% | 1.15 | 2.67 |
| TPCDS(10) | TPCDS-Q14-1 | parquet / none / none | 5.82 | 5.76 | +1.08% | 5.27% | 3.89% | 5 | +2.04% | 0.00 | 0.37 |
| TPCDS(10) | TPCDS-Q23-2 | parquet / none / none | 4.65 | 4.58 | +1.38% | 1.97% | 0.48% | 5 | +0.81% | 0.87 | 1.51 |
| TPCDS(10) | TPCDS-Q49 | parquet / none / none | 1.49 | 1.48 | +0.46% | * 36.02% * | * 34.95% * | 5 | +1.26% | 0.58 | 0.02 |
| TPCDS(10) | TPCDS-Q14-2 | parquet / none / none | 3.76 | 3.75 | +0.39% | 1.67% | 0.58% | 5 | -0.03% | -0.58 | 0.49 |
| TPCDS(10) | TPCDS-Q78 | parquet / none / none | 2.80 | 2.80 | -0.04% | 1.32% | 1.33% | 5 | -0.42% | -0.29 | -0.05 |
| TPCDS(10) | TPCDS-Q80A | parquet / none / none | 2.87 | 2.89 | -0.51% | 1.33% | 0.40% | 5 | -0.01% | -0.29 | -0.82 |
| TPCDS(10) | TPCDS-Q3 | parquet / none / none | 0.18 | 0.19 | -1.29% | * 15.26% * | * 15.87% * | 5 | -0.54% | -0.87 | -0.13 |
| TPCDS(10) | TPCDS-Q76 | parquet / none / none | 1.08 | 1.11 | -2.98% | 0.92% | 1.70% | 5 | -3.99% | -2.02 | -3.47 |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
Testing:
- Pass PlannerTest.testProcessingCost
- Pass test_executor_groups.py
- Reenable test_tpcds_q51a in TestTpcdsQueryWithProcessingCost with
MAX_FRAGMENT_INSTANCES_PER_NODE set to 5
- Pass test_tpcds_queries.py::TestTpcdsQueryWithProcessingCost
- Pass core tests
Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Reviewed-on: http://gerrit.cloudera.org:8080/19807
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
committed by
Impala Public Jenkins
parent
e54c636385
commit
1d0b111bcf
@@ -22,11 +22,11 @@ from builtins import range
|
||||
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
||||
from tests.util.concurrent_workload import ConcurrentWorkload
|
||||
|
||||
import copy
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import pytest
|
||||
import re
|
||||
from time import sleep
|
||||
|
||||
LOG = logging.getLogger("test_auto_scaling")
|
||||
@@ -44,9 +44,6 @@ GROUPING_TEST_QUERY = ("select ss_item_sk from tpcds_parquet.store_sales"
|
||||
# A query to test behavior of child queries.
|
||||
COMPUTE_STATS_QUERY = "COMPUTE STATS tpcds_parquet.store_sales"
|
||||
|
||||
# Default query option to use for testing CPU requirement.
|
||||
CPU_DOP_OPTIONS = {'MT_DOP': '2', 'COMPUTE_PROCESSING_COST': 'true'}
|
||||
|
||||
DEFAULT_RESOURCE_POOL = "default-pool"
|
||||
|
||||
|
||||
@@ -837,12 +834,13 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
# max-query-cpu-core-per-node-limit and max-query-cpu-core-coordinator-limit
|
||||
# properties of the three sets:
|
||||
# tiny: [0, 64MB, 4, 4]
|
||||
# small: [0, 70MB, 8, 8]
|
||||
# small: [0, 90MB, 8, 8]
|
||||
# large: [64MB+1Byte, 8PB, 64, 64]
|
||||
llama_site_path = os.path.join(RESOURCES_DIR, "llama-site-3-groups.xml")
|
||||
|
||||
# extra args template to start coordinator
|
||||
extra_args_template = ("-vmodule admission-controller=3 "
|
||||
"-admission_control_slots=8 "
|
||||
"-expected_executor_group_sets=root.tiny:1,root.small:2,root.large:3 "
|
||||
"-fair_scheduler_allocation_path %s "
|
||||
"-llama_site_path %s "
|
||||
@@ -856,14 +854,14 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
|
||||
# Create fresh client
|
||||
self.create_impala_clients()
|
||||
# Add an exec group with a 2 admission slot and 1 executors.
|
||||
self._add_executor_group("group", 1, admission_control_slots=2,
|
||||
# Add an exec group with 8 admission slots and 1 executors.
|
||||
self._add_executor_group("group", 1, admission_control_slots=8,
|
||||
resource_pool="root.tiny", extra_args="-mem_limit=2g")
|
||||
# Add an exec group with a 2 admission slot and 2 executors.
|
||||
self._add_executor_group("group", 2, admission_control_slots=2,
|
||||
# Add an exec group with 8 admission slots and 2 executors.
|
||||
self._add_executor_group("group", 2, admission_control_slots=8,
|
||||
resource_pool="root.small", extra_args="-mem_limit=2g")
|
||||
# Add another exec group with 2 admission slot and 3 executors.
|
||||
self._add_executor_group("group", 3, admission_control_slots=2,
|
||||
# Add another exec group with 8 admission slots and 3 executors.
|
||||
self._add_executor_group("group", 3, admission_control_slots=8,
|
||||
resource_pool="root.large", extra_args="-mem_limit=2g")
|
||||
assert self._get_num_executor_groups(only_healthy=True) == 3
|
||||
assert self._get_num_executor_groups(only_healthy=True,
|
||||
@@ -873,14 +871,16 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
assert self._get_num_executor_groups(only_healthy=True,
|
||||
exec_group_set_prefix="root.large") == 1
|
||||
|
||||
def _run_query_and_verify_profile(self, query, query_options,
|
||||
expected_strings_in_profile, not_expected_in_profile=[]):
|
||||
"""Run 'query' with given 'query_options'. Assert existence of
|
||||
'expected_strings_in_profile' and nonexistence of 'not_expected_in_profile'
|
||||
in query profile.
|
||||
Caller is reponsible to close self.client at the end of test."""
|
||||
def _set_query_options(self, query_options):
|
||||
"""Set query options"""
|
||||
for k, v in query_options.items():
|
||||
self.execute_query_expect_success(self.client, "SET {}='{}';".format(k, v))
|
||||
|
||||
def _run_query_and_verify_profile(self, query,
|
||||
expected_strings_in_profile, not_expected_in_profile=[]):
|
||||
"""Run 'query' and assert existence of 'expected_strings_in_profile' and
|
||||
nonexistence of 'not_expected_in_profile' in query profile.
|
||||
Caller is reponsible to close self.client at the end of test."""
|
||||
result = self.execute_query_expect_success(self.client, query)
|
||||
for expected_profile in expected_strings_in_profile:
|
||||
assert expected_profile in str(result.runtime_profile)
|
||||
@@ -892,131 +892,200 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
# Expect to run the query on the small group by default.
|
||||
coordinator_test_args = ""
|
||||
self._setup_three_exec_group_cluster(coordinator_test_args)
|
||||
self._run_query_and_verify_profile(CPU_TEST_QUERY, CPU_DOP_OPTIONS,
|
||||
["Executor Group: root.small-group", "EffectiveParallelism: 5",
|
||||
self._set_query_options({'COMPUTE_PROCESSING_COST': 'true'})
|
||||
self._run_query_and_verify_profile(CPU_TEST_QUERY,
|
||||
["Executor Group: root.small-group", "EffectiveParallelism: 11",
|
||||
"ExecutorGroupsConsidered: 2"])
|
||||
|
||||
# Test disabling COMPUTE_PROCESING_COST and not setting REQUEST_POOL
|
||||
options = copy.deepcopy(CPU_DOP_OPTIONS)
|
||||
options['COMPUTE_PROCESSING_COST'] = 'false'
|
||||
self._run_query_and_verify_profile(CPU_TEST_QUERY, options,
|
||||
# Test disabling COMPUTE_PROCESING_COST. This will produce non-MT plan.
|
||||
self._set_query_options({'COMPUTE_PROCESSING_COST': 'false'})
|
||||
self._run_query_and_verify_profile(CPU_TEST_QUERY,
|
||||
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
|
||||
"Verdict: Match"],
|
||||
["EffectiveParallelism:", "CpuAsk:"])
|
||||
|
||||
# Test COMPUTE_PROCESING_COST=false and MT_DOP=2.
|
||||
self._set_query_options({'MT_DOP': '2'})
|
||||
self._run_query_and_verify_profile(CPU_TEST_QUERY,
|
||||
["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
|
||||
"Verdict: Match"],
|
||||
["EffectiveParallelism:", "CpuAsk:"])
|
||||
|
||||
# Test COMPUTE_PROCESING_COST=true and MT_DOP=2.
|
||||
# COMPUTE_PROCESING_COST should override MT_DOP.
|
||||
self._set_query_options({'COMPUTE_PROCESSING_COST': 'true'})
|
||||
self._run_query_and_verify_profile(CPU_TEST_QUERY,
|
||||
["Executor Group: root.small-group", "EffectiveParallelism: 11",
|
||||
"ExecutorGroupsConsidered: 2"])
|
||||
|
||||
# Test that REQUEST_POOL will override executor group selection
|
||||
options['COMPUTE_PROCESSING_COST'] = 'true'
|
||||
options['REQUEST_POOL'] = 'root.large'
|
||||
self._run_query_and_verify_profile(CPU_TEST_QUERY, options,
|
||||
self._set_query_options({
|
||||
'MT_DOP': '0',
|
||||
'REQUEST_POOL': 'root.large'})
|
||||
self._run_query_and_verify_profile(CPU_TEST_QUERY,
|
||||
["Executor Group: root.large-group",
|
||||
("Verdict: query option REQUEST_POOL=root.large is set. "
|
||||
"Memory and cpu limit checking is skipped."),
|
||||
"EffectiveParallelism: 7", "ExecutorGroupsConsidered: 1"])
|
||||
"EffectiveParallelism: 13", "ExecutorGroupsConsidered: 1"])
|
||||
|
||||
# Test that child queries follow REQUEST_POOL that was set by client.
|
||||
# Two child queries should all run in root.large.
|
||||
self._verify_total_admitted_queries("root.large", 1)
|
||||
self._run_query_and_verify_profile(COMPUTE_STATS_QUERY, options,
|
||||
self._verify_total_admitted_queries("root.large", 2)
|
||||
self._run_query_and_verify_profile(COMPUTE_STATS_QUERY,
|
||||
["ExecutorGroupsConsidered: 1",
|
||||
"Verdict: Assign to first group because query is not auto-scalable"],
|
||||
["Executor Group:"])
|
||||
self._verify_total_admitted_queries("root.large", 3)
|
||||
self._verify_total_admitted_queries("root.large", 4)
|
||||
|
||||
# Test setting REQUEST_POOL and disabling COMPUTE_PROCESSING_COST
|
||||
options['COMPUTE_PROCESSING_COST'] = 'false'
|
||||
options['REQUEST_POOL'] = 'root.large'
|
||||
self._run_query_and_verify_profile(CPU_TEST_QUERY, options,
|
||||
self._set_query_options({
|
||||
'COMPUTE_PROCESSING_COST': 'false',
|
||||
'REQUEST_POOL': 'root.large'})
|
||||
self._run_query_and_verify_profile(CPU_TEST_QUERY,
|
||||
["Executor Group: root.large-group",
|
||||
("Verdict: query option REQUEST_POOL=root.large is set. "
|
||||
"Memory and cpu limit checking is skipped."),
|
||||
"ExecutorGroupsConsidered: 1"],
|
||||
["EffectiveParallelism:", "CpuAsk:"])
|
||||
|
||||
# Unset REQUEST_POOL.
|
||||
self.execute_query_expect_success(self.client, "SET REQUEST_POOL='';")
|
||||
# Unset REQUEST_POOL and restore COMPUTE_PROCESSING_COST.
|
||||
self._set_query_options({
|
||||
'REQUEST_POOL': '',
|
||||
'COMPUTE_PROCESSING_COST': 'true'})
|
||||
|
||||
# Test that child queries unset REQUEST_POOL that was set by Frontend planner for
|
||||
# parent query. One child queries should run in root.small, and another one in
|
||||
# root.large.
|
||||
self._verify_total_admitted_queries("root.small", 1)
|
||||
self._verify_total_admitted_queries("root.large", 4)
|
||||
self._run_query_and_verify_profile(COMPUTE_STATS_QUERY, CPU_DOP_OPTIONS,
|
||||
self._verify_total_admitted_queries("root.small", 2)
|
||||
self._verify_total_admitted_queries("root.large", 5)
|
||||
self._run_query_and_verify_profile(COMPUTE_STATS_QUERY,
|
||||
["ExecutorGroupsConsidered: 1",
|
||||
"Verdict: Assign to first group because query is not auto-scalable"],
|
||||
["Executor Group:"])
|
||||
self._verify_total_admitted_queries("root.small", 2)
|
||||
self._verify_total_admitted_queries("root.large", 5)
|
||||
self._verify_total_admitted_queries("root.small", 3)
|
||||
self._verify_total_admitted_queries("root.large", 6)
|
||||
|
||||
# Test that GROUPING_TEST_QUERY will get assigned to the small group.
|
||||
self._run_query_and_verify_profile(GROUPING_TEST_QUERY, CPU_DOP_OPTIONS,
|
||||
["Executor Group: root.small-group", "ExecutorGroupsConsidered: 2",
|
||||
"Verdict: Match", "CpuAsk: 4", "CpuAskUnbounded: 1"])
|
||||
# Test that GROUPING_TEST_QUERY will get assigned to the large group.
|
||||
self._run_query_and_verify_profile(GROUPING_TEST_QUERY,
|
||||
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
|
||||
"Verdict: Match", "CpuAsk: 12"])
|
||||
|
||||
# ENABLE_REPLAN=false should force query to run in tiny group.
|
||||
self.execute_query_expect_success(self.client, "SET ENABLE_REPLAN=false;")
|
||||
self._run_query_and_verify_profile(CPU_TEST_QUERY, CPU_DOP_OPTIONS,
|
||||
# ENABLE_REPLAN=false should force query to run in tiny group, but high scan
|
||||
# parallelism will cause it to exceed the admission control slots.
|
||||
self._set_query_options({'ENABLE_REPLAN': 'false'})
|
||||
result = self.execute_query_expect_failure(self.client, CPU_TEST_QUERY)
|
||||
status = ("Rejected query from pool root.tiny: number of admission control slots "
|
||||
r"needed \(10\) on backend '.*' is greater than total slots available 8. "
|
||||
"Reduce mt_dop to less than 8 to ensure that the query can execute.")
|
||||
assert re.search(status, str(result))
|
||||
|
||||
# ENABLE_REPLAN=false and MAX_FRAGMENT_INSTANCES_PER_NODE=4 should allow query to run
|
||||
# in tiny group.
|
||||
self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': '4'})
|
||||
self._run_query_and_verify_profile(CPU_TEST_QUERY,
|
||||
["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
|
||||
"Verdict: Assign to first group because query option ENABLE_REPLAN=false"])
|
||||
self.execute_query_expect_success(self.client, "SET ENABLE_REPLAN='';")
|
||||
|
||||
# Unset both ENABLE_REPLAN and MAX_FRAGMENT_INSTANCES_PER_NODE
|
||||
self._set_query_options({
|
||||
'ENABLE_REPLAN': '',
|
||||
'MAX_FRAGMENT_INSTANCES_PER_NODE': ''})
|
||||
|
||||
# Trivial query should be assigned to tiny group by Frontend.
|
||||
# Backend may decide to run it in coordinator only.
|
||||
self._run_query_and_verify_profile("SELECT 1", CPU_DOP_OPTIONS,
|
||||
self._run_query_and_verify_profile("SELECT 1",
|
||||
["Executor Group: empty group (using coordinator only)",
|
||||
"ExecutorGroupsConsidered: 1",
|
||||
"Verdict: Assign to first group because the number of nodes is 1"])
|
||||
|
||||
# CREATE/DROP database should work and assigned to tiny group.
|
||||
self._run_query_and_verify_profile(
|
||||
"CREATE DATABASE test_non_scalable_query;", CPU_DOP_OPTIONS,
|
||||
"CREATE DATABASE test_non_scalable_query;",
|
||||
["ExecutorGroupsConsidered: 1",
|
||||
"Verdict: Assign to first group because query is not auto-scalable"],
|
||||
["Executor Group:"])
|
||||
self._run_query_and_verify_profile(
|
||||
"DROP DATABASE test_non_scalable_query;", CPU_DOP_OPTIONS,
|
||||
"DROP DATABASE test_non_scalable_query;",
|
||||
["ExecutorGroupsConsidered: 1",
|
||||
"Verdict: Assign to first group because query is not auto-scalable"],
|
||||
["Executor Group:"])
|
||||
|
||||
# Test combination of PROCESSING_COST_MIN_THREADS and MAX_FRAGMENT_INSTANCES_PER_NODE.
|
||||
self._set_query_options({
|
||||
'PROCESSING_COST_MIN_THREADS': '1',
|
||||
'MAX_FRAGMENT_INSTANCES_PER_NODE': '3'})
|
||||
self._run_query_and_verify_profile(GROUPING_TEST_QUERY,
|
||||
["Executor Group: root.large-group", "EffectiveParallelism: 9",
|
||||
"ExecutorGroupsConsidered: 3"])
|
||||
self._set_query_options({
|
||||
'MAX_FRAGMENT_INSTANCES_PER_NODE': '4'})
|
||||
self._run_query_and_verify_profile(GROUPING_TEST_QUERY,
|
||||
["Executor Group: root.large-group", "EffectiveParallelism: 12",
|
||||
"ExecutorGroupsConsidered: 3"])
|
||||
self._set_query_options({
|
||||
'PROCESSING_COST_MIN_THREADS': '3',
|
||||
'MAX_FRAGMENT_INSTANCES_PER_NODE': '1'})
|
||||
self._run_query_and_verify_profile(GROUPING_TEST_QUERY,
|
||||
["Executor Group: root.large-group", "EffectiveParallelism: 9",
|
||||
"ExecutorGroupsConsidered: 3"])
|
||||
self._set_query_options({
|
||||
'PROCESSING_COST_MIN_THREADS': '2',
|
||||
'MAX_FRAGMENT_INSTANCES_PER_NODE': '2'})
|
||||
self._run_query_and_verify_profile(GROUPING_TEST_QUERY,
|
||||
["Executor Group: root.small-group", "EffectiveParallelism: 2",
|
||||
"ExecutorGroupsConsidered: 2"])
|
||||
# Unset PROCESSING_COST_MIN_THREADS and MAX_FRAGMENT_INSTANCES_PER_NODE.
|
||||
self._set_query_options({
|
||||
'PROCESSING_COST_MIN_THREADS': '',
|
||||
'MAX_FRAGMENT_INSTANCES_PER_NODE': ''})
|
||||
|
||||
# Check resource pools on the Web queries site and admission site
|
||||
self._verify_query_num_for_resource_pool("root.small", 3)
|
||||
self._verify_query_num_for_resource_pool("root.tiny", 3)
|
||||
self._verify_query_num_for_resource_pool("root.large", 5)
|
||||
self._verify_total_admitted_queries("root.small", 3)
|
||||
self._verify_query_num_for_resource_pool("root.small", 4)
|
||||
self._verify_query_num_for_resource_pool("root.tiny", 4)
|
||||
self._verify_query_num_for_resource_pool("root.large", 10)
|
||||
self._verify_total_admitted_queries("root.small", 4)
|
||||
self._verify_total_admitted_queries("root.tiny", 3)
|
||||
self._verify_total_admitted_queries("root.large", 5)
|
||||
self._verify_total_admitted_queries("root.large", 10)
|
||||
self.client.close()
|
||||
|
||||
@pytest.mark.execute_serially
|
||||
def test_query_cpu_count_divisor_two(self):
|
||||
# Expect to run the query on the tiny group
|
||||
# Expect to run the query on the small group (driven by MemoryAsk),
|
||||
# But the CpuAsk is around half of EffectiveParallelism.
|
||||
coordinator_test_args = "-query_cpu_count_divisor=2 "
|
||||
self._setup_three_exec_group_cluster(coordinator_test_args)
|
||||
self._run_query_and_verify_profile(CPU_TEST_QUERY, CPU_DOP_OPTIONS,
|
||||
["Executor Group: root.tiny-group", "EffectiveParallelism: 3",
|
||||
"ExecutorGroupsConsidered: 1"])
|
||||
self._set_query_options({'COMPUTE_PROCESSING_COST': 'true'})
|
||||
self._run_query_and_verify_profile(CPU_TEST_QUERY,
|
||||
["Executor Group: root.small-group",
|
||||
"CpuAsk: 6", "EffectiveParallelism: 11",
|
||||
"ExecutorGroupsConsidered: 2"])
|
||||
# Check resource pools on the Web queries site and admission site
|
||||
self._verify_query_num_for_resource_pool("root.tiny", 1)
|
||||
self._verify_total_admitted_queries("root.tiny", 1)
|
||||
self._verify_query_num_for_resource_pool("root.small", 1)
|
||||
self._verify_total_admitted_queries("root.small", 1)
|
||||
self.client.close()
|
||||
|
||||
@pytest.mark.execute_serially
|
||||
def test_query_cpu_count_divisor_fraction(self):
|
||||
# Expect to run the query on the large group
|
||||
coordinator_test_args = "-query_cpu_count_divisor=0.03 "
|
||||
coordinator_test_args = ("-min_processing_per_thread=550000 "
|
||||
"-query_cpu_count_divisor=0.03 ")
|
||||
self._setup_three_exec_group_cluster(coordinator_test_args)
|
||||
options = copy.deepcopy(CPU_DOP_OPTIONS)
|
||||
options['MT_DOP'] = '1'
|
||||
self._run_query_and_verify_profile(CPU_TEST_QUERY, options,
|
||||
self._set_query_options({
|
||||
'COMPUTE_PROCESSING_COST': 'true',
|
||||
'MAX_FRAGMENT_INSTANCES_PER_NODE': '1'})
|
||||
self._run_query_and_verify_profile(CPU_TEST_QUERY,
|
||||
["Executor Group: root.large-group", "EffectiveParallelism: 4",
|
||||
"ExecutorGroupsConsidered: 3", "CpuAsk: 134",
|
||||
"Verdict: Match"])
|
||||
|
||||
# Unset MAX_FRAGMENT_INSTANCES_PER_NODE.
|
||||
self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': ''})
|
||||
|
||||
# Expect that a query still admitted to last group even if
|
||||
# its resource requirement exceed the limit on that last executor group.
|
||||
self._run_query_and_verify_profile(CPU_TEST_QUERY, CPU_DOP_OPTIONS,
|
||||
["Executor Group: root.large-group", "EffectiveParallelism: 7",
|
||||
"ExecutorGroupsConsidered: 3", "CpuAsk: 234",
|
||||
self._run_query_and_verify_profile(CPU_TEST_QUERY,
|
||||
["Executor Group: root.large-group", "EffectiveParallelism: 16",
|
||||
"ExecutorGroupsConsidered: 3", "CpuAsk: 534",
|
||||
"Verdict: no executor group set fit. Admit to last executor group set."])
|
||||
# Check resource pools on the Web queries site and admission site
|
||||
self._verify_query_num_for_resource_pool("root.large", 2)
|
||||
@@ -1030,10 +1099,10 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
coordinator_test_args = ("-query_cpu_count_divisor=0.03 "
|
||||
"-skip_resource_checking_on_last_executor_group_set=false ")
|
||||
self._setup_three_exec_group_cluster(coordinator_test_args)
|
||||
self.client.set_configuration(CPU_DOP_OPTIONS)
|
||||
self._set_query_options({'COMPUTE_PROCESSING_COST': 'true'})
|
||||
result = self.execute_query_expect_failure(self.client, CPU_TEST_QUERY)
|
||||
assert ("AnalysisException: The query does not fit largest executor group sets. "
|
||||
"Reason: not enough cpu cores (require=234, max=192).") in str(result)
|
||||
"Reason: not enough cpu cores (require=434, max=192).") in str(result)
|
||||
self.client.close()
|
||||
|
||||
@pytest.mark.execute_serially
|
||||
@@ -1043,26 +1112,24 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
self._setup_three_exec_group_cluster(coordinator_test_args)
|
||||
|
||||
# Test that GROUPING_TEST_QUERY will get assigned to the large group.
|
||||
self._run_query_and_verify_profile(GROUPING_TEST_QUERY, CPU_DOP_OPTIONS,
|
||||
self._set_query_options({'COMPUTE_PROCESSING_COST': 'true'})
|
||||
self._run_query_and_verify_profile(GROUPING_TEST_QUERY,
|
||||
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
|
||||
"Verdict: Match", "CpuAsk: 6"],
|
||||
["CpuAskUnbounded:"])
|
||||
"Verdict: Match", "CpuAsk: 12"])
|
||||
|
||||
# Test that high_scan_cost_query will get assigned to the large group.
|
||||
high_scan_cost_query = ("SELECT ss_item_sk FROM tpcds_parquet.store_sales "
|
||||
"WHERE ss_item_sk < 1000000 GROUP BY ss_item_sk LIMIT 10")
|
||||
options = copy.deepcopy(CPU_DOP_OPTIONS)
|
||||
self._run_query_and_verify_profile(high_scan_cost_query, options,
|
||||
self._run_query_and_verify_profile(high_scan_cost_query,
|
||||
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
|
||||
"Verdict: Match", "CpuAsk: 6"],
|
||||
["CpuAskUnbounded:"])
|
||||
"Verdict: Match", "CpuAsk: 15"])
|
||||
|
||||
# Test that high_scan_cost_query will get assigned to the small group
|
||||
# if NUM_SCANNER_THREADS is limited to 1.
|
||||
options['NUM_SCANNER_THREADS'] = '1'
|
||||
self._run_query_and_verify_profile(high_scan_cost_query, options,
|
||||
["Executor Group: root.small-group", "ExecutorGroupsConsidered: 2",
|
||||
"Verdict: Match", "CpuAsk: 4", "CpuAskUnbounded: 4"])
|
||||
# if MAX_FRAGMENT_INSTANCES_PER_NODE is limited to 1.
|
||||
self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': '1'})
|
||||
self._run_query_and_verify_profile(high_scan_cost_query,
|
||||
["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
|
||||
"Verdict: Match", "CpuAsk: 1"])
|
||||
|
||||
self.client.close()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user