IMPALA-10992 Planner changes for estimate peak memory

This patch provides replan support for multiple executor group sets.
Each executor group set is associated with a distinct number of nodes
and a threshold for estimated memory per host in bytes that can be
denoted as [<group_name_prefix>:<#nodes>, <threshold>].

In the patch, a query of type EXPLAIN, QUERY or DML can be compiled
more than once. In each attempt, per host memory is estimated and
compared with the threshold of an executor group set. If the estimated
memory is no more than the threshold, the iteration process terminates
and the final plan is determined. The executor group set with the
threshold is selected to run the query.

A new query option 'enable_replan', default to 1 (enabled), is added.
It can be set to 0 to disable this patch and to generate the distributed
plan for the default executor group.

To avoid long compilation time, the following enhancement is enabled.
Note 1) can be disabled when relevant meta-data change is
detected.

 1. Authorization is performed only for the 1st compilation;
 2. openTransaction() is called for transactional queries in 1st
    compilation and the saved transactional info is used in
    subsequent compilations. Similar logic is applied to Kudu
    transactional queries.

To facilitate testing, the patch imposes an artificial two executor
group setup in FE as follows.

 1. [regular:<#nodes>, 64MB]
 2. [large:<#nodes>, 8PB]

This setup is enabled when a new query option 'test_replan' is set
to 1 in backend tests, or RuntimeEnv.INSTANCE.isTestEnv() is true as
in most frontend tests. This query option is set to 0 by default.

Compilation time increases when a query is compiled in several
iterations, as shown below for several TPCDs queries. The increase
is mostly due to redundant work in either single node plan creation
or recomputing value transfer graph phase. For small queries, the
increase can be avoided if they can be compiled in single iteration
by properly setting the smallest threshold among all executor group
sets. For example, for the set of queries listed below, the smallest
threshold can be set to 320MB to catch both q15 and q21 in one
compilation.

                              Compilation time (ms)
Queries	 Estimated Memory   2-iterations  1-iteration  Percentage of
                                                         increase
 q1         408MB              60.14         25.75       133.56%
 q11	   1.37GB             261.00        109.61       138.11%
 q10a	    519MB             139.24         54.52       155.39%
 q13	    339MB             143.82         60.08       139.38%
 q14a	   3.56GB             762.68        312.92       143.73%
 q14b	   2.20GB             522.01        245.13       112.95%
 q15	    314MB               9.73          4.28       127.33%
 q21	    275MB              16.00          8.18        95.59%
 q23a	   1.50GB             461.69        231.78        99.19%
 q23b	   1.34GB             461.31        219.61       110.05%
 q4	   2.60GB             218.05        105.07       107.52%
 q67	   5.16GB             694.59        334.24       101.82%

Testing:
 1. Almost all FE and BE tests are now run in the artificial two
    executor setup except a few where a specific cluster configuration
    is desirable;
 2. Ran core tests successfully;
 3. Added a new observability test and a new query assignment test;
 4. Disabled concurrent insert test (test_concurrent_inserts) and
    failing inserts (test_failing_inserts) test in local catalog mode
    due to flakiness. Reported both in IMPALA-11189 and IMPALA-11191.

Change-Id: I75cf17290be2c64fd4b732a5505bdac31869712a
Reviewed-on: http://gerrit.cloudera.org:8080/18178
Reviewed-by: Qifan Chen <qchen@cloudera.com>
Tested-by: Qifan Chen <qchen@cloudera.com>
This commit is contained in:
Qifan Chen
2022-01-05 17:20:00 -05:00
parent a5a9b1e3f9
commit 07a3e6e0df
24 changed files with 645 additions and 69 deletions

View File

@@ -680,6 +680,62 @@ class TestExecutorGroups(CustomClusterTestSuite):
self.client.close()
second_coord_client.close()
@pytest.mark.execute_serially
def test_query_assignment_with_two_exec_groups(self):
"""This test verifies that query assignment works with two executor groups with
different number of executors and memory limit in each."""
# A small query with estimated memory per host of 10MB that can run on the small
# executor group
SMALL_QUERY = "select count(*) from tpcds_parquet.date_dim;"
# A large query with estimated memory per host of 132MB that can only run on
# the large executor group.
LARGE_QUERY = "select * from tpcds_parquet.store_sales where ss_item_sk = 1 limit 50;"
# The path to resources directory which contains the admission control config files.
RESOURCES_DIR = os.path.join(os.environ['IMPALA_HOME'], "fe", "src", "test",
"resources")
# Define two group sets: small and large
fs_allocation_path = os.path.join(RESOURCES_DIR, "fair-scheduler-2-groups.xml")
# Define the min-query-mem-limit and max-query-mem-limit properties of the two sets:
# small: [0, 64MB]
# large: [64MB+1Byte, 8PB]
llama_site_path = os.path.join(RESOURCES_DIR, "llama-site-2-groups.xml")
# Start with a regular admission config with multiple pools and no resource limits.
self._restart_coordinators(num_coordinators=1,
extra_args="-vmodule admission-controller=3 "
"-expected_executor_group_sets=root.small:2,root.large:3 "
"-fair_scheduler_allocation_path %s "
"-llama_site_path %s" % (
fs_allocation_path, llama_site_path))
# Create fresh client
self.create_impala_clients()
# Add an exec group with a single admission slot and 2 executors.
self._add_executor_group("group", 2, admission_control_slots=1,
resource_pool="root.small", extra_args="-mem_limit=2g")
# Add another exec group with a single admission slot and 3 executors.
self._add_executor_group("group", 3, admission_control_slots=1,
resource_pool="root.large", extra_args="-mem_limit=2g")
assert self._get_num_executor_groups(only_healthy=True) == 2
assert self._get_num_executor_groups(only_healthy=True,
exec_group_set_prefix="root.small") == 1
assert self._get_num_executor_groups(only_healthy=True,
exec_group_set_prefix="root.large") == 1
# Expect to run the small query on the small group
result = self.execute_query_expect_success(self.client, SMALL_QUERY)
assert "Executor Group: root.small-group" in str(result.runtime_profile)
# Expect to run the large query on the large group
result = self.execute_query_expect_success(self.client, LARGE_QUERY)
assert "Executor Group: root.large-group" in str(result.runtime_profile)
# Force to run the large query on the small group should fail
self.client.set_configuration({'request_pool': 'small'})
result = self.execute_query_expect_failure(self.client, LARGE_QUERY)
assert "The query does not fit any executor group set" in str(result)
self.client.close()
@pytest.mark.execute_serially
def test_per_exec_group_set_metrics(self):
"""This test verifies that the metrics for each exec group set are updated