mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
IMPALA-12980: Translate CpuAsk into admission control slots
Impala has a concept of "admission control slots" - the amount of parallelism that should be allowed on an Impala daemon. This defaults to the number of processors per executor and can be overridden with -–admission_control_slots flag. Admission control slot accounting is described in IMPALA-8998. It computes 'slots_to_use' for each backend based on the maximum number of instances of any fragment on that backend. This can lead to slot underestimation and query overadmission. For example, assume an executor node with 48 CPU cores and configured with -–admission_control_slots=48. It is assigned 4 non-blocking query fragments, each has 12 instances scheduled in this executor. IMPALA-8998 algorithm will request the max instance (12) slots rather than the sum of all non-blocking fragment instances (48). With the 36 remaining slots free, the executor can still admit another fragment from a different query but will potentially have CPU contention with the one that is currently running. When COMPUTE_PROCESSING_COST is enabled, Planner will generate a CpuAsk number that represents the cpu requirement of that query over a particular executor group set. This number is an estimation of the largest number of query fragment instances that can run in parallel without waiting, given by the blocking operator analysis. Therefore, the fragment trace that sums into that CpuAsk number can be translated into 'slots_to_use' as well, which will be a closer resemblance of maximum parallel execution of fragment instances. This patch adds a new query option called SLOT_COUNT_STRATEGY to control which admission control slot accounting to use. There are two possible values: - LARGEST_FRAGMENT, which is the original algorithm from IMPALA-8998. This is still the default value for the SLOT_COUNT_STRATEGY option. - PLANNER_CPU_ASK, which will follow the fragment trace that contributes towards CpuAsk number. This strategy will schedule more or equal admission control slots than the LARGEST_FRAGMENT strategy. To do the PLANNER_CPU_ASK strategy, the Planner will mark fragments that contribute to CpuAsk as dominant fragments. It also passes max_slot_per_executor information that it knows about the executor group set to the scheduler. AvgAdmissionSlotsPerExecutor counter is added to describe what Planner thinks the average 'slots_to_use' per backend will be, which follows this formula: AvgAdmissionSlotsPerExecutor = ceil(CpuAsk / num_executors) Actual 'slots_to_use' in each backend may differ than AvgAdmissionSlotsPerExecutor, depending on what is scheduled on that backend. 'slots_to_use' will be shown as 'AdmissionSlots' counter under each executor profile node. Testing: - Update test_executors.py with AvgAdmissionSlotsPerExecutor assertion. - Pass test_tpcds_queries.py::TestTpcdsQueryWithProcessingCost. - Add EE test test_processing_cost.py. - Add FE test PlannerTest#testProcessingCostPlanAdmissionSlots. Change-Id: I338ca96555bfe8d07afce0320b3688a0861663f2 Reviewed-on: http://gerrit.cloudera.org:8080/21257 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
committed by
Impala Public Jenkins
parent
6a079be290
commit
6abfdbc56c
@@ -42,6 +42,32 @@ CPU_TEST_QUERY = "select * from tpcds_parquet.store_sales where ss_item_sk = 1 l
|
||||
GROUPING_TEST_QUERY = ("select ss_item_sk from tpcds_parquet.store_sales"
|
||||
" group by (ss_item_sk) order by ss_item_sk limit 10")
|
||||
|
||||
# TPC-DS Q1 to test slightly more complex query.
|
||||
TPCDS_Q1 = """
|
||||
with customer_total_return as (
|
||||
select sr_customer_sk as ctr_customer_sk,
|
||||
sr_store_sk as ctr_store_sk,
|
||||
sum(SR_RETURN_AMT) as ctr_total_return
|
||||
from tpcds_partitioned_parquet_snap.store_returns,
|
||||
tpcds_partitioned_parquet_snap.date_dim
|
||||
where sr_returned_date_sk = d_date_sk
|
||||
and d_year = 2000
|
||||
group by sr_customer_sk, sr_store_sk
|
||||
) select c_customer_id
|
||||
from customer_total_return ctr1,
|
||||
tpcds_partitioned_parquet_snap.store,
|
||||
tpcds_partitioned_parquet_snap.customer
|
||||
where ctr1.ctr_total_return > (
|
||||
select avg(ctr_total_return) * 1.2
|
||||
from customer_total_return ctr2
|
||||
where ctr1.ctr_store_sk = ctr2.ctr_store_sk)
|
||||
and s_store_sk = ctr1.ctr_store_sk
|
||||
and s_state = 'TN'
|
||||
and ctr1.ctr_customer_sk = c_customer_sk
|
||||
order by c_customer_id
|
||||
limit 100
|
||||
"""
|
||||
|
||||
DEFAULT_RESOURCE_POOL = "default-pool"
|
||||
|
||||
|
||||
@@ -851,14 +877,14 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
|
||||
# Create fresh client
|
||||
self.create_impala_clients()
|
||||
# Add an exec group with 8 admission slots and 1 executors.
|
||||
self._add_executor_group("group", 1, admission_control_slots=8,
|
||||
# Add an exec group with 4 admission slots and 1 executors.
|
||||
self._add_executor_group("group", 1, admission_control_slots=4,
|
||||
resource_pool="root.tiny", extra_args="-mem_limit=2g")
|
||||
# Add an exec group with 8 admission slots and 2 executors.
|
||||
self._add_executor_group("group", 2, admission_control_slots=8,
|
||||
resource_pool="root.small", extra_args="-mem_limit=2g")
|
||||
# Add another exec group with 8 admission slots and 3 executors.
|
||||
self._add_executor_group("group", 3, admission_control_slots=8,
|
||||
# Add another exec group with 64 admission slots and 3 executors.
|
||||
self._add_executor_group("group", 3, admission_control_slots=64,
|
||||
resource_pool="root.large", extra_args="-mem_limit=2g")
|
||||
assert self._get_num_executor_groups(only_healthy=True) == 3
|
||||
assert self._get_num_executor_groups(only_healthy=True,
|
||||
@@ -909,32 +935,38 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
self._setup_three_exec_group_cluster(coordinator_test_args)
|
||||
self.client.clear_configuration()
|
||||
|
||||
# The default query options for this test.
|
||||
# Some test case will change these options along the test, but should eventually
|
||||
# restored to this default values.
|
||||
self._set_query_options({
|
||||
'COMPUTE_PROCESSING_COST': 'true',
|
||||
'SLOT_COUNT_STRATEGY': 'PLANNER_CPU_ASK'})
|
||||
|
||||
# Expect to run the query on the small group by default.
|
||||
self._set_query_options({'COMPUTE_PROCESSING_COST': 'true'})
|
||||
self._run_query_and_verify_profile(CPU_TEST_QUERY,
|
||||
["Executor Group: root.small-group", "EffectiveParallelism: 11",
|
||||
"ExecutorGroupsConsidered: 2"])
|
||||
["Executor Group: root.small-group", "EffectiveParallelism: 10",
|
||||
"ExecutorGroupsConsidered: 2", "AvgAdmissionSlotsPerExecutor: 5"])
|
||||
|
||||
# Test disabling COMPUTE_PROCESING_COST. This will produce non-MT plan.
|
||||
self._set_query_options({'COMPUTE_PROCESSING_COST': 'false'})
|
||||
self._run_query_and_verify_profile(CPU_TEST_QUERY,
|
||||
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
|
||||
"Verdict: Match"],
|
||||
["EffectiveParallelism:", "CpuAsk:"])
|
||||
["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
|
||||
|
||||
# Test COMPUTE_PROCESING_COST=false and MT_DOP=2.
|
||||
self._set_query_options({'MT_DOP': '2'})
|
||||
self._run_query_and_verify_profile(CPU_TEST_QUERY,
|
||||
["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
|
||||
"Verdict: Match"],
|
||||
["EffectiveParallelism:", "CpuAsk:"])
|
||||
["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
|
||||
|
||||
# Test COMPUTE_PROCESING_COST=true and MT_DOP=2.
|
||||
# COMPUTE_PROCESING_COST should override MT_DOP.
|
||||
self._set_query_options({'COMPUTE_PROCESSING_COST': 'true'})
|
||||
self._run_query_and_verify_profile(CPU_TEST_QUERY,
|
||||
["Executor Group: root.small-group", "EffectiveParallelism: 11",
|
||||
"ExecutorGroupsConsidered: 2"])
|
||||
["Executor Group: root.small-group", "EffectiveParallelism: 10",
|
||||
"ExecutorGroupsConsidered: 2", "AvgAdmissionSlotsPerExecutor: 5"])
|
||||
|
||||
# Unset MT_DOP
|
||||
self._set_query_options({'MT_DOP': '0'})
|
||||
@@ -947,7 +979,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
"from tpcds_parquet.store_sales where ss_sold_date_sk < 2452184"
|
||||
).format(unique_database, "store_sales_subset"),
|
||||
["Executor Group: root.small", "ExecutorGroupsConsidered: 2",
|
||||
"Verdict: Match", "CpuAsk: 10"])
|
||||
"Verdict: Match", "CpuAsk: 10", "AvgAdmissionSlotsPerExecutor: 5"])
|
||||
|
||||
compute_stats_query = ("compute stats {0}.{1}").format(
|
||||
unique_database, "store_sales_subset")
|
||||
@@ -961,6 +993,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
["ExecutorGroupsConsidered: 1",
|
||||
"Verdict: Assign to first group because query is not auto-scalable"],
|
||||
["Query Options (set by configuration): REQUEST_POOL=",
|
||||
"EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:",
|
||||
"Executor Group:"])
|
||||
self._verify_total_admitted_queries("root.small", 4)
|
||||
self._verify_total_admitted_queries("root.large", 2)
|
||||
@@ -972,7 +1005,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
["Query Options (set by configuration): REQUEST_POOL=root.small",
|
||||
"ExecutorGroupsConsidered: 1",
|
||||
"Verdict: Assign to first group because query is not auto-scalable"],
|
||||
["Executor Group:"])
|
||||
["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
|
||||
self._verify_total_admitted_queries("root.small", 6)
|
||||
self.client.clear_configuration()
|
||||
|
||||
@@ -983,7 +1016,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
["Query Options (set by configuration): REQUEST_POOL=root.large",
|
||||
"ExecutorGroupsConsidered: 1",
|
||||
"Verdict: Assign to first group because query is not auto-scalable"],
|
||||
["Executor Group:"])
|
||||
["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
|
||||
self._verify_total_admitted_queries("root.large", 4)
|
||||
|
||||
# Test that REQUEST_POOL will override executor group selection
|
||||
@@ -992,7 +1025,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
"Executor Group: root.large-group",
|
||||
("Verdict: query option REQUEST_POOL=root.large is set. "
|
||||
"Memory and cpu limit checking is skipped."),
|
||||
"EffectiveParallelism: 13", "ExecutorGroupsConsidered: 1"])
|
||||
"EffectiveParallelism: 12", "ExecutorGroupsConsidered: 1",
|
||||
"AvgAdmissionSlotsPerExecutor: 4"])
|
||||
|
||||
# Test setting REQUEST_POOL=root.large and disabling COMPUTE_PROCESSING_COST
|
||||
self._set_query_options({'COMPUTE_PROCESSING_COST': 'false'})
|
||||
@@ -1002,7 +1036,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
("Verdict: query option REQUEST_POOL=root.large is set. "
|
||||
"Memory and cpu limit checking is skipped."),
|
||||
"ExecutorGroupsConsidered: 1"],
|
||||
["EffectiveParallelism:", "CpuAsk:"])
|
||||
["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
|
||||
|
||||
# Unset REQUEST_POOL and restore COMPUTE_PROCESSING_COST.
|
||||
self._set_query_options({
|
||||
@@ -1010,23 +1044,22 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
'COMPUTE_PROCESSING_COST': 'true'})
|
||||
|
||||
# Test that empty REQUEST_POOL should have no impact.
|
||||
self.client.set_configuration({'REQUEST_POOL': ''})
|
||||
self._run_query_and_verify_profile(CPU_TEST_QUERY,
|
||||
["Executor Group: root.small-group", "ExecutorGroupsConsidered: 2",
|
||||
"Verdict: Match"],
|
||||
["Executor Group: root.small-group", "EffectiveParallelism: 10",
|
||||
"ExecutorGroupsConsidered: 2", "AvgAdmissionSlotsPerExecutor: 5"],
|
||||
["Query Options (set by configuration): REQUEST_POOL="])
|
||||
self.client.clear_configuration()
|
||||
|
||||
# Test that GROUPING_TEST_QUERY will get assigned to the large group.
|
||||
self._run_query_and_verify_profile(GROUPING_TEST_QUERY,
|
||||
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
|
||||
"Verdict: Match", "CpuAsk: 12"])
|
||||
"Verdict: Match", "CpuAsk: 12", "AvgAdmissionSlotsPerExecutor: 4"])
|
||||
|
||||
# ENABLE_REPLAN=false should force query to run in first group (tiny).
|
||||
self._set_query_options({'ENABLE_REPLAN': 'false'})
|
||||
self._run_query_and_verify_profile(TEST_QUERY,
|
||||
["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
|
||||
"Verdict: Assign to first group because query option ENABLE_REPLAN=false"])
|
||||
"Verdict: Assign to first group because query option ENABLE_REPLAN=false"],
|
||||
["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
|
||||
# Unset ENABLE_REPLAN.
|
||||
self._set_query_options({'ENABLE_REPLAN': ''})
|
||||
|
||||
@@ -1035,37 +1068,38 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
self._run_query_and_verify_profile("SELECT 1",
|
||||
["Executor Group: empty group (using coordinator only)",
|
||||
"ExecutorGroupsConsidered: 1",
|
||||
"Verdict: Assign to first group because the number of nodes is 1"])
|
||||
"Verdict: Assign to first group because the number of nodes is 1"],
|
||||
["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
|
||||
|
||||
# CREATE/DROP database should work and assigned to tiny group.
|
||||
self._run_query_and_verify_profile(
|
||||
"CREATE DATABASE test_non_scalable_query;",
|
||||
["ExecutorGroupsConsidered: 1",
|
||||
"Verdict: Assign to first group because query is not auto-scalable"],
|
||||
["Executor Group:"])
|
||||
["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
|
||||
self._run_query_and_verify_profile(
|
||||
"DROP DATABASE test_non_scalable_query;",
|
||||
["ExecutorGroupsConsidered: 1",
|
||||
"Verdict: Assign to first group because query is not auto-scalable"],
|
||||
["Executor Group:"])
|
||||
["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
|
||||
|
||||
# Test combination of PROCESSING_COST_MIN_THREADS and MAX_FRAGMENT_INSTANCES_PER_NODE.
|
||||
self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': '3'})
|
||||
self._run_query_and_verify_profile(GROUPING_TEST_QUERY,
|
||||
["Executor Group: root.large-group", "EffectiveParallelism: 9",
|
||||
"ExecutorGroupsConsidered: 3"])
|
||||
"ExecutorGroupsConsidered: 3", "AvgAdmissionSlotsPerExecutor: 3"])
|
||||
self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': '4'})
|
||||
self._run_query_and_verify_profile(GROUPING_TEST_QUERY,
|
||||
["Executor Group: root.large-group", "EffectiveParallelism: 12",
|
||||
"ExecutorGroupsConsidered: 3"])
|
||||
"ExecutorGroupsConsidered: 3", "AvgAdmissionSlotsPerExecutor: 4"])
|
||||
self._set_query_options({'PROCESSING_COST_MIN_THREADS': '2'})
|
||||
self._run_query_and_verify_profile(GROUPING_TEST_QUERY,
|
||||
["Executor Group: root.large-group", "EffectiveParallelism: 12",
|
||||
"ExecutorGroupsConsidered: 3"])
|
||||
"ExecutorGroupsConsidered: 3", "AvgAdmissionSlotsPerExecutor: 4"])
|
||||
self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': '2'})
|
||||
self._run_query_and_verify_profile(GROUPING_TEST_QUERY,
|
||||
["Executor Group: root.small-group", "EffectiveParallelism: 4",
|
||||
"ExecutorGroupsConsidered: 2"])
|
||||
"ExecutorGroupsConsidered: 2", "AvgAdmissionSlotsPerExecutor: 2"])
|
||||
self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': '1'})
|
||||
result = self.execute_query_expect_failure(self.client, CPU_TEST_QUERY)
|
||||
status = (r"PROCESSING_COST_MIN_THREADS \(2\) can not be larger than "
|
||||
@@ -1081,13 +1115,13 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
self._run_query_and_verify_profile(
|
||||
"SELECT count(*) FROM tpcds_parquet.store_sales",
|
||||
["Executor Group: root.small-group", "EffectiveParallelism: 10",
|
||||
"ExecutorGroupsConsidered: 2"])
|
||||
"ExecutorGroupsConsidered: 2", "AvgAdmissionSlotsPerExecutor: 5"])
|
||||
|
||||
# Test optimized count star query with 383 scan ranges assign to tiny group.
|
||||
self._run_query_and_verify_profile(
|
||||
"SELECT count(*) FROM tpcds_parquet.store_sales WHERE ss_sold_date_sk < 2451200",
|
||||
["Executor Group: root.tiny-group", "EffectiveParallelism: 2",
|
||||
"ExecutorGroupsConsidered: 1"])
|
||||
"ExecutorGroupsConsidered: 1", "AvgAdmissionSlotsPerExecutor: 2"])
|
||||
|
||||
# Test optimized count star query with 1 scan range detected as trivial query
|
||||
# and assign to tiny group.
|
||||
@@ -1095,20 +1129,21 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
"SELECT count(*) FROM tpcds_parquet.date_dim",
|
||||
["Executor Group: empty group (using coordinator only)",
|
||||
"ExecutorGroupsConsidered: 1",
|
||||
"Verdict: Assign to first group because the number of nodes is 1"])
|
||||
"Verdict: Assign to first group because the number of nodes is 1"],
|
||||
["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
|
||||
|
||||
# Test unoptimized count star query assign to small group.
|
||||
self._run_query_and_verify_profile(
|
||||
("SELECT count(*) FROM tpcds_parquet.store_sales "
|
||||
"WHERE ss_ext_discount_amt != 0.3857"),
|
||||
["Executor Group: root.small-group", "EffectiveParallelism: 10",
|
||||
"ExecutorGroupsConsidered: 2"])
|
||||
"ExecutorGroupsConsidered: 2", "AvgAdmissionSlotsPerExecutor: 5"])
|
||||
|
||||
# Test zero slot scan query assign to small group.
|
||||
self._run_query_and_verify_profile(
|
||||
"SELECT count(ss_sold_date_sk) FROM tpcds_parquet.store_sales",
|
||||
["Executor Group: root.small-group", "EffectiveParallelism: 10",
|
||||
"ExecutorGroupsConsidered: 2"])
|
||||
"ExecutorGroupsConsidered: 2", "AvgAdmissionSlotsPerExecutor: 5"])
|
||||
# END testing count queries
|
||||
|
||||
# BEGIN testing insert + MAX_FS_WRITER
|
||||
@@ -1119,7 +1154,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
"select id, year from functional_parquet.alltypes"
|
||||
).format(unique_database, "test_ctas1"),
|
||||
["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
|
||||
"Verdict: Match", "CpuAsk: 1"])
|
||||
"Verdict: Match", "CpuAsk: 1", "AvgAdmissionSlotsPerExecutor: 1"])
|
||||
self.__verify_fs_writers(result, 1, [0, 1])
|
||||
|
||||
# Test unpartitioned insert, small scan, no MAX_FS_WRITER, with limit.
|
||||
@@ -1129,7 +1164,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
"select id, year from functional_parquet.alltypes limit 100000"
|
||||
).format(unique_database, "test_ctas2"),
|
||||
["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
|
||||
"Verdict: Match", "CpuAsk: 2"])
|
||||
"Verdict: Match", "CpuAsk: 2", "AvgAdmissionSlotsPerExecutor: 2"])
|
||||
self.__verify_fs_writers(result, 1, [0, 2])
|
||||
|
||||
# Test partitioned insert, small scan, no MAX_FS_WRITER.
|
||||
@@ -1139,7 +1174,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
"select id, year from functional_parquet.alltypes"
|
||||
).format(unique_database, "test_ctas3"),
|
||||
["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
|
||||
"Verdict: Match", "CpuAsk: 1"])
|
||||
"Verdict: Match", "CpuAsk: 1", "AvgAdmissionSlotsPerExecutor: 1"])
|
||||
self.__verify_fs_writers(result, 1, [0, 1])
|
||||
|
||||
# Test unpartitioned insert, large scan, no MAX_FS_WRITER.
|
||||
@@ -1148,7 +1183,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
"select ss_item_sk, ss_ticket_number, ss_store_sk "
|
||||
"from tpcds_parquet.store_sales").format(unique_database, "test_ctas4"),
|
||||
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
|
||||
"Verdict: Match", "CpuAsk: 13"])
|
||||
"Verdict: Match", "CpuAsk: 13", "AvgAdmissionSlotsPerExecutor: 5"])
|
||||
self.__verify_fs_writers(result, 1, [0, 4, 4, 5])
|
||||
|
||||
# Test partitioned insert, large scan, no MAX_FS_WRITER.
|
||||
@@ -1157,7 +1192,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
"select ss_item_sk, ss_ticket_number, ss_store_sk "
|
||||
"from tpcds_parquet.store_sales").format(unique_database, "test_ctas5"),
|
||||
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
|
||||
"Verdict: Match", "CpuAsk: 15"])
|
||||
"Verdict: Match", "CpuAsk: 15", "AvgAdmissionSlotsPerExecutor: 5"])
|
||||
self.__verify_fs_writers(result, 3, [0, 5, 5, 5])
|
||||
|
||||
# Test partitioned insert, large scan, high MAX_FS_WRITER.
|
||||
@@ -1167,7 +1202,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
"select ss_item_sk, ss_ticket_number, ss_store_sk "
|
||||
"from tpcds_parquet.store_sales").format(unique_database, "test_ctas6"),
|
||||
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
|
||||
"Verdict: Match", "CpuAsk: 15"])
|
||||
"Verdict: Match", "CpuAsk: 15", "AvgAdmissionSlotsPerExecutor: 5"])
|
||||
self.__verify_fs_writers(result, 3, [0, 5, 5, 5])
|
||||
|
||||
# Test partitioned insert, large scan, low MAX_FS_WRITER.
|
||||
@@ -1177,7 +1212,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
"select ss_item_sk, ss_ticket_number, ss_store_sk "
|
||||
"from tpcds_parquet.store_sales").format(unique_database, "test_ctas7"),
|
||||
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
|
||||
"Verdict: Match", "CpuAsk: 14"])
|
||||
"Verdict: Match", "CpuAsk: 14", "AvgAdmissionSlotsPerExecutor: 5"])
|
||||
self.__verify_fs_writers(result, 2, [0, 4, 5, 5])
|
||||
|
||||
# Test that non-CTAS unpartitioned insert works. MAX_FS_WRITER=2.
|
||||
@@ -1186,7 +1221,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
"select ss_item_sk, ss_ticket_number, ss_store_sk "
|
||||
"from tpcds_parquet.store_sales").format(unique_database, "test_ctas4"),
|
||||
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
|
||||
"Verdict: Match", "CpuAsk: 13"])
|
||||
"Verdict: Match", "CpuAsk: 13", "AvgAdmissionSlotsPerExecutor: 5"])
|
||||
self.__verify_fs_writers(result, 1, [0, 4, 4, 5])
|
||||
|
||||
# Test that non-CTAS partitioned insert works. MAX_FS_WRITER=2.
|
||||
@@ -1196,20 +1231,48 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
"select ss_item_sk, ss_ticket_number, ss_store_sk "
|
||||
"from tpcds_parquet.store_sales").format(unique_database, "test_ctas7"),
|
||||
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
|
||||
"Verdict: Match", "CpuAsk: 14"])
|
||||
"Verdict: Match", "CpuAsk: 14", "AvgAdmissionSlotsPerExecutor: 5"])
|
||||
self.__verify_fs_writers(result, 2, [0, 4, 5, 5])
|
||||
|
||||
# Unset MAX_FS_WRITERS.
|
||||
self._set_query_options({'MAX_FS_WRITERS': ''})
|
||||
# END testing insert + MAX_FS_WRITER
|
||||
|
||||
# BEGIN test slot count strategy
|
||||
# Unset SLOT_COUNT_STRATEGY to use default strategy, which is max # of instances
|
||||
# of any fragment on that backend.
|
||||
# TPCDS_Q1 at root.large_group will have following CoreCount trace:
|
||||
# CoreCount={total=16 trace=F15:3+F01:1+F14:3+F03:1+F13:3+F05:1+F12:3+F07:1},
|
||||
# coresRequired=16
|
||||
self._set_query_options({'SLOT_COUNT_STRATEGY': ''})
|
||||
result = self._run_query_and_verify_profile(TPCDS_Q1,
|
||||
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
|
||||
"Verdict: Match", "CpuAsk: 16",
|
||||
"AdmissionSlots: 1" # coordinator and executors all have 1 slot
|
||||
],
|
||||
["AvgAdmissionSlotsPerExecutor:", "AdmissionSlots: 6"])
|
||||
|
||||
# Test with SLOT_COUNT_STRATEGY='PLANNER_CPU_ASK'.
|
||||
self._set_query_options({'SLOT_COUNT_STRATEGY': 'PLANNER_CPU_ASK'})
|
||||
result = self._run_query_and_verify_profile(TPCDS_Q1,
|
||||
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
|
||||
"Verdict: Match", "CpuAsk: 16", "AvgAdmissionSlotsPerExecutor: 6",
|
||||
# coordinator has 1 slot
|
||||
"AdmissionSlots: 1",
|
||||
# 1 executor has F15:1+F01:1+F14:1+F03:1+F13:1+F05:1+F12:1+F07:1 = 8 slots
|
||||
"AdmissionSlots: 8",
|
||||
# 2 executors have F15:1+F14:1+F13:1+F12:1 = 4 slots
|
||||
"AdmissionSlots: 4"
|
||||
])
|
||||
# END test slot count strategy
|
||||
|
||||
# Check resource pools on the Web queries site and admission site
|
||||
self._verify_query_num_for_resource_pool("root.small", 10)
|
||||
self._verify_query_num_for_resource_pool("root.tiny", 5)
|
||||
self._verify_query_num_for_resource_pool("root.large", 12)
|
||||
self._verify_query_num_for_resource_pool("root.large", 14)
|
||||
self._verify_total_admitted_queries("root.small", 11)
|
||||
self._verify_total_admitted_queries("root.tiny", 8)
|
||||
self._verify_total_admitted_queries("root.large", 16)
|
||||
self._verify_total_admitted_queries("root.large", 18)
|
||||
|
||||
@pytest.mark.execute_serially
|
||||
def test_query_cpu_count_divisor_two(self):
|
||||
@@ -1217,29 +1280,35 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
# But the CpuAsk is around half of EffectiveParallelism.
|
||||
coordinator_test_args = "-query_cpu_count_divisor=2 "
|
||||
self._setup_three_exec_group_cluster(coordinator_test_args)
|
||||
self._set_query_options({'COMPUTE_PROCESSING_COST': 'true'})
|
||||
self._set_query_options({
|
||||
'COMPUTE_PROCESSING_COST': 'true',
|
||||
'SLOT_COUNT_STRATEGY': 'PLANNER_CPU_ASK'})
|
||||
self._run_query_and_verify_profile(CPU_TEST_QUERY,
|
||||
["Executor Group: root.small-group",
|
||||
"CpuAsk: 6", "EffectiveParallelism: 11",
|
||||
"CpuCountDivisor: 2", "ExecutorGroupsConsidered: 2"])
|
||||
"CpuAsk: 5", "EffectiveParallelism: 10",
|
||||
"CpuCountDivisor: 2", "ExecutorGroupsConsidered: 2",
|
||||
"AvgAdmissionSlotsPerExecutor: 5"])
|
||||
|
||||
# Test that QUERY_CPU_COUNT_DIVISOR option can override
|
||||
# query_cpu_count_divisor flag.
|
||||
self._set_query_options({'QUERY_CPU_COUNT_DIVISOR': '1.0'})
|
||||
self._run_query_and_verify_profile(CPU_TEST_QUERY,
|
||||
["Executor Group: root.small-group",
|
||||
"CpuAsk: 11", "EffectiveParallelism: 11",
|
||||
"CpuCountDivisor: 1", "ExecutorGroupsConsidered: 2"])
|
||||
"CpuAsk: 10", "EffectiveParallelism: 10",
|
||||
"CpuCountDivisor: 1", "ExecutorGroupsConsidered: 2",
|
||||
"AvgAdmissionSlotsPerExecutor: 5"])
|
||||
self._set_query_options({'QUERY_CPU_COUNT_DIVISOR': '0.5'})
|
||||
self._run_query_and_verify_profile(CPU_TEST_QUERY,
|
||||
["Executor Group: root.large-group",
|
||||
"CpuAsk: 22", "EffectiveParallelism: 11",
|
||||
"CpuCountDivisor: 0.5", "ExecutorGroupsConsidered: 3"])
|
||||
"CpuAsk: 24", "EffectiveParallelism: 10",
|
||||
"CpuCountDivisor: 0.5", "ExecutorGroupsConsidered: 3",
|
||||
"AvgAdmissionSlotsPerExecutor: 4"])
|
||||
self._set_query_options({'QUERY_CPU_COUNT_DIVISOR': '2.0'})
|
||||
self._run_query_and_verify_profile(CPU_TEST_QUERY,
|
||||
["Executor Group: root.small-group",
|
||||
"CpuAsk: 6", "EffectiveParallelism: 11",
|
||||
"CpuCountDivisor: 2", "ExecutorGroupsConsidered: 2"])
|
||||
"CpuAsk: 5", "EffectiveParallelism: 10",
|
||||
"CpuCountDivisor: 2", "ExecutorGroupsConsidered: 2",
|
||||
"AvgAdmissionSlotsPerExecutor: 5"])
|
||||
|
||||
# Check resource pools on the Web queries site and admission site
|
||||
self._verify_query_num_for_resource_pool("root.small", 3)
|
||||
@@ -1255,11 +1324,12 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
self._setup_three_exec_group_cluster(coordinator_test_args)
|
||||
self._set_query_options({
|
||||
'COMPUTE_PROCESSING_COST': 'true',
|
||||
'SLOT_COUNT_STRATEGY': 'PLANNER_CPU_ASK',
|
||||
'MAX_FRAGMENT_INSTANCES_PER_NODE': '1'})
|
||||
self._run_query_and_verify_profile(CPU_TEST_QUERY,
|
||||
["Executor Group: root.large-group", "EffectiveParallelism: 4",
|
||||
"ExecutorGroupsConsidered: 3", "CpuAsk: 134",
|
||||
"Verdict: Match"])
|
||||
["Executor Group: root.large-group", "EffectiveParallelism: 3",
|
||||
"ExecutorGroupsConsidered: 3", "CpuAsk: 100",
|
||||
"Verdict: Match", "AvgAdmissionSlotsPerExecutor: 1"])
|
||||
|
||||
# Unset MAX_FRAGMENT_INSTANCES_PER_NODE.
|
||||
self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': ''})
|
||||
@@ -1267,9 +1337,10 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
# Expect that a query still admitted to last group even if
|
||||
# its resource requirement exceed the limit on that last executor group.
|
||||
self._run_query_and_verify_profile(CPU_TEST_QUERY,
|
||||
["Executor Group: root.large-group", "EffectiveParallelism: 16",
|
||||
["Executor Group: root.large-group", "EffectiveParallelism: 15",
|
||||
"ExecutorGroupsConsidered: 3", "CpuAsk: 534",
|
||||
"Verdict: no executor group set fit. Admit to last executor group set."])
|
||||
"Verdict: no executor group set fit. Admit to last executor group set.",
|
||||
"AvgAdmissionSlotsPerExecutor: 5"])
|
||||
|
||||
# Check resource pools on the Web queries site and admission site
|
||||
self._verify_query_num_for_resource_pool("root.large", 2)
|
||||
@@ -1282,10 +1353,12 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
coordinator_test_args = ("-query_cpu_count_divisor=0.03 "
|
||||
"-skip_resource_checking_on_last_executor_group_set=false ")
|
||||
self._setup_three_exec_group_cluster(coordinator_test_args)
|
||||
self._set_query_options({'COMPUTE_PROCESSING_COST': 'true'})
|
||||
self._set_query_options({
|
||||
'COMPUTE_PROCESSING_COST': 'true',
|
||||
'SLOT_COUNT_STRATEGY': 'PLANNER_CPU_ASK'})
|
||||
result = self.execute_query_expect_failure(self.client, CPU_TEST_QUERY)
|
||||
assert ("AnalysisException: The query does not fit largest executor group sets. "
|
||||
"Reason: not enough cpu cores (require=434, max=192).") in str(result)
|
||||
"Reason: not enough cpu cores (require=400, max=192).") in str(result)
|
||||
|
||||
@pytest.mark.execute_serially
|
||||
def test_min_processing_per_thread_small(self):
|
||||
@@ -1294,24 +1367,29 @@ class TestExecutorGroups(CustomClusterTestSuite):
|
||||
self._setup_three_exec_group_cluster(coordinator_test_args)
|
||||
|
||||
# Test that GROUPING_TEST_QUERY will get assigned to the large group.
|
||||
self._set_query_options({'COMPUTE_PROCESSING_COST': 'true'})
|
||||
self._set_query_options({
|
||||
'COMPUTE_PROCESSING_COST': 'true',
|
||||
'SLOT_COUNT_STRATEGY': 'PLANNER_CPU_ASK'})
|
||||
self._run_query_and_verify_profile(GROUPING_TEST_QUERY,
|
||||
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
|
||||
"Verdict: Match", "CpuAsk: 15"])
|
||||
"Verdict: Match", "CpuAsk: 15",
|
||||
"AvgAdmissionSlotsPerExecutor: 5"])
|
||||
|
||||
# Test that high_scan_cost_query will get assigned to the large group.
|
||||
high_scan_cost_query = ("SELECT ss_item_sk FROM tpcds_parquet.store_sales "
|
||||
"WHERE ss_item_sk < 1000000 GROUP BY ss_item_sk LIMIT 10")
|
||||
self._run_query_and_verify_profile(high_scan_cost_query,
|
||||
["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
|
||||
"Verdict: Match", "CpuAsk: 18"])
|
||||
"Verdict: Match", "CpuAsk: 18",
|
||||
"AvgAdmissionSlotsPerExecutor: 6"])
|
||||
|
||||
# Test that high_scan_cost_query will get assigned to the small group
|
||||
# if MAX_FRAGMENT_INSTANCES_PER_NODE is limited to 1.
|
||||
self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': '1'})
|
||||
self._run_query_and_verify_profile(high_scan_cost_query,
|
||||
["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
|
||||
"Verdict: Match", "CpuAsk: 1"])
|
||||
"Verdict: Match", "CpuAsk: 1",
|
||||
"AvgAdmissionSlotsPerExecutor: 1"])
|
||||
|
||||
# Check resource pools on the Web queries site and admission site
|
||||
self._verify_query_num_for_resource_pool("root.tiny", 1)
|
||||
|
||||
Reference in New Issue
Block a user