Planner calculates CpuAsk through a recursive call beginning at
Planner.computeBlockingAwareCores(), which is called after
Planner.computeEffectiveParallelism(). It does blocking operator
analysis over the selected degree of parallelism that was decided during
computeEffectiveParallelism() traversal. That selected degree of
parallelism, however, is already bounded by min and max parallelism
config, derived from PROCESSING_COST_MIN_THREADS and
MAX_FRAGMENT_INSTANCES_PER_NODE options accordingly.
This patch calculates an unbounded version of CpuAsk that is not bounded
by min and max parallelism config. It is purely based on the fragment's
ProcessingCost and query plan relationship constraint (for example, the
number of JOIN BUILDER fragments should equal the number of destination
JOIN fragments for partitioned join).
Frontend will receive both bounded and unbounded CpuAsk values from
TQueryExecRequest on each executor group set selection round. The
unbounded CpuAsk is then scaled down once using a nth root based
sublinear-function, controlled by the total cpu count of the smallest
executor group set and the bounded CpuAsk number. Another linear scaling
is then applied on both bounded and unbounded CpuAsk using
QUERY_CPU_COUNT_DIVISOR option. Frontend then compare the unbounded
CpuAsk after scaling against CpuMax to avoid assigning a query to a
small executor group set too soon. The last executor group set stays as
the "catch-all" executor group set.
After this patch, setting COMPUTE_PROCESSING_COST=True will show
following changes in query profile:
- The "max-parallelism" fields in the query plan will all be set to
maximum parallelism based on ProcessingCost.
- The CpuAsk counter is changed to show the unbounded CpuAsk after
scaling.
- A new counter CpuAskBounded shows the bounded CpuAsk after scaling. If
QUERY_CPU_COUNT_DIVISOR=1 and PLANNER_CPU_ASK slot counting strategy
is selected, this CpuAskBounded is also the minimum total admission
slots given to the query.
- A new counter MaxParallelism shows the unbounded CpuAsk before
scaling.
- The EffectiveParallelism counter remains unchanged,
showing bounded CpuAsk before scaling.
Testing:
- Update and pass FE test TpcdsCpuCostPlannerTest and
PlannerTest#testProcessingCost.
- Pass EE test tests/query_test/test_tpcds_queries.py
- Pass custom cluster test tests/custom_cluster/test_executor_groups.py
Change-Id: I5441e31088f90761062af35862be4ce09d116923
Reviewed-on: http://gerrit.cloudera.org:8080/21277
Reviewed-by: Kurt Deschler <kdeschle@cloudera.com>
Reviewed-by: Abhishek Rawat <arawat@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Impala has a concept of "admission control slots" - the amount of
parallelism that should be allowed on an Impala daemon. This defaults to
the number of processors per executor and can be overridden with
-–admission_control_slots flag.
Admission control slot accounting is described in IMPALA-8998. It
computes 'slots_to_use' for each backend based on the maximum number of
instances of any fragment on that backend. This can lead to slot
underestimation and query overadmission. For example, assume an executor
node with 48 CPU cores and configured with -–admission_control_slots=48.
It is assigned 4 non-blocking query fragments, each has 12 instances
scheduled in this executor. IMPALA-8998 algorithm will request the max
instance (12) slots rather than the sum of all non-blocking fragment
instances (48). With the 36 remaining slots free, the executor can still
admit another fragment from a different query but will potentially have
CPU contention with the one that is currently running.
When COMPUTE_PROCESSING_COST is enabled, Planner will generate a CpuAsk
number that represents the cpu requirement of that query over a
particular executor group set. This number is an estimation of the
largest number of query fragment instances that can run in parallel
without waiting, given by the blocking operator analysis. Therefore, the
fragment trace that sums into that CpuAsk number can be translated into
'slots_to_use' as well, which will be a closer resemblance of maximum
parallel execution of fragment instances.
This patch adds a new query option called SLOT_COUNT_STRATEGY to control
which admission control slot accounting to use. There are two possible
values:
- LARGEST_FRAGMENT, which is the original algorithm from IMPALA-8998.
This is still the default value for the SLOT_COUNT_STRATEGY option.
- PLANNER_CPU_ASK, which will follow the fragment trace that contributes
towards CpuAsk number. This strategy will schedule more or equal
admission control slots than the LARGEST_FRAGMENT strategy.
To do the PLANNER_CPU_ASK strategy, the Planner will mark fragments that
contribute to CpuAsk as dominant fragments. It also passes
max_slot_per_executor information that it knows about the executor group
set to the scheduler.
AvgAdmissionSlotsPerExecutor counter is added to describe what Planner
thinks the average 'slots_to_use' per backend will be, which follows
this formula:
AvgAdmissionSlotsPerExecutor = ceil(CpuAsk / num_executors)
Actual 'slots_to_use' in each backend may differ than
AvgAdmissionSlotsPerExecutor, depending on what is scheduled on that
backend. 'slots_to_use' will be shown as 'AdmissionSlots' counter under
each executor profile node.
Testing:
- Update test_executors.py with AvgAdmissionSlotsPerExecutor assertion.
- Pass test_tpcds_queries.py::TestTpcdsQueryWithProcessingCost.
- Add EE test test_processing_cost.py.
- Add FE test PlannerTest#testProcessingCostPlanAdmissionSlots.
Change-Id: I338ca96555bfe8d07afce0320b3688a0861663f2
Reviewed-on: http://gerrit.cloudera.org:8080/21257
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
IMPALA-11604 adds a hidden backend flag named query_cpu_count_divisor to
allow oversubscribing CPU cores more than what is available in the
executor group set. This patch adds a query option with the same name
and function so that CPU core matching can be tuned for individual
queries. The query option takes precedence over the flag.
Testing:
- Add test case in test_executor_groups.py and query-options-test.cc
Change-Id: I34ab47bd67509a02790c3caedb3fde4d1b6eaa78
Reviewed-on: http://gerrit.cloudera.org:8080/20819
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
TestExecutorGroups.test_75_percent_availability can fail in certains
build/test setup for not starting the last impalad within 5s delay
injection. This patch simplifies the test by launching fewer impalad in
totals (reduced from 8 to 5, excluding coordinator) and increases the
delay injection to ensure test query run at all five executors. The test
renamed to test_partial_availability accordingly.
Testing:
- Run and pass the test against HDFS and S3.
Change-Id: I2e70f1dde10045c32c2bb4f6f78e8a707c9cd97d
Reviewed-on: http://gerrit.cloudera.org:8080/20712
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Scan fragment did not follow PROCESSING_COST_MIN_THREADS set by user
even if total scan ranges allow to do so. This patch fix the issue by
exposing ScanNode.maxScannerThreads_ to
PlanFragment.adjustToMaxParallelism(). By using
ScanNode.maxScannerThreads_ as an upper bound, ScanNode does not need to
artificially lower ProcessingCost if maxScannerThreads_ is lower than
minimum parallelism dictated by the original ProcessingCost. Thus, the
synthetic ProcessingCost logic in ScanNode class is revised to only
apply if input cardinality is unknown (-1).
This patch also does the following adjustments:
- Remove some dead codes in Frontend.java and PlanFragment.java.
- Add sanity check such that PROCESSING_COST_MIN_THREADS <=
MAX_FRAGMENT_INSTANCES_PER_NODE.
- Tidy up test_query_cpu_count_divisor_default to reduce number of
SET query.
Testing:
- Update test_query_cpu_count_divisor_default to ensure that
PROCESSING_COST_MIN_THREADS is respected by scan fragment and error
is returned if PROCESSING_COST_MIN_THREADS is greater than
MAX_FRAGMENT_INSTANCES_PER_NODE.
- Pass test_executor_groups.py.
Change-Id: I69e5a80146d4ac41de5ef406fc2bdceffe3ec394
Reviewed-on: http://gerrit.cloudera.org:8080/20475
Reviewed-by: Kurt Deschler <kdeschle@cloudera.com>
Reviewed-by: Wenzhe Zhou <wzhou@cloudera.com>
Tested-by: Riza Suminto <riza.suminto@cloudera.com>
Added a custom cluster test for testing number of executors used for
planning when no executor groups are healthy. Planner should use
num executors from 'num_expected_executors' or
'expected_executor_group_sets' when executor groups aren't healthy.
Change-Id: Ib71ca0a5402c74d07ee875878f092d6d3827c6b7
Reviewed-on: http://gerrit.cloudera.org:8080/20419
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
The cardinality estimate in HdfsScanNode.java for count queries does not
account for the fact that the count optimization only scans metadata and
not the actual columns. Optimized count star scan will return only 1 row
per parquet row group.
This patch override the scan cardinality with total number of files,
which is the closest estimate to number of row group. Similar override
already exist in IcebergScanNode.java.
Testing:
- Add count query testcases in test_query_cpu_count_divisor_default
- Pass core tests
Change-Id: Id5ce967657208057d50bd80adadac29ebb51cbc5
Reviewed-on: http://gerrit.cloudera.org:8080/20406
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
test_75_percent_availability fail against Ozone and S3 test environment
because test expects the string "SCAN HDFS" to be found in the profile.
Instead of it there's "SCAN OZONE" and "SCAN S3" for Ozone and S3 test
environment respectively. This patch fix the test by removing that
assertion from test_75_percent_availability. The remaining assertion is
enough to verify that FE planner and BE scheduler can see cluster
membership change.
Change-Id: Id14934d2fce0f6cf03242c36c0142bc697b4180e
Reviewed-on: http://gerrit.cloudera.org:8080/20259
Reviewed-by: Wenzhe Zhou <wzhou@cloudera.com>
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Scheduler::CheckEffectiveInstanceCount was added to check consistency
between FE planning and BE scheduling if COMPUTE_PROCESSING_COST=true.
This consistency can be broken if there is a cluster membership
change (new executor becomes online) between FE planning and BE
scheduling. Say, in executor group size 10 with 90% health threshold,
admission-controller is allowed to run a query when only 9 executor is
available. If 10th executor is online during the time between FE
planning and BE scheduling, CheckEffectiveInstanceCount can fail and
return error.
This patch turn two error status in CheckEffectiveInstanceCount into
warning, either to query profile as InfoString or WARNING log.
MAX_FRAGMENT_INSTANCES_PER_NODE violation check stays to return error.
Testing:
- Add test_75_percent_availability
- Pass test_executors.py
Change-Id: Ieaf6a46c4f12dbf8b03d1618c2f090ab4f2ac665
Reviewed-on: http://gerrit.cloudera.org:8080/20231
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
IMPALA-12056 enable child query to unset REQUEST_POOL if it is set by
Frontend.java as part of executor group selection. However, the
implementation miss to setRequest_pool_set_by_frontend(false) if
REQUEST_POOL is explicitly set by client request through impala-shell
configuration. This cause child query to always unset REQUEST_POOL if
parent query was executed via impala-shell. This patch fix the issue by
checking query options that comes from client.
This patch also tidy up null and empty REQUEST_POOL checking by using
StringUtils.isNotEmpty().
Testing:
- Add testcase in test_query_cpu_count_divisor_default
Change-Id: Ib5036859d51bc64f568da405f730c8f3ffebb742
Reviewed-on: http://gerrit.cloudera.org:8080/20189
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Riza Suminto <riza.suminto@cloudera.com>
IMPALA-12091 has a bug where scan fragment parallelism will always be
limited solely by the ScanNode cost. If ScanNode is colocated with other
query node operators that have higher processing costs, Planner will not
scale it up beyond what is allowed by the ScanNode cost.
This patch fixes the problem in two aspects. The first is to allow a
scan fragment to scale up higher as long as it is within the total
fragment cost and the number of effective scan ranges. The second is to
add missing Math.max() in CostingSegment.java which causes lower
fragment parallelism even when the total fragment cost is high.
IMPALA-10287 optimization is re-enabled to reduce regression in TPC-DS
Q78. Ideally, the broadcast vs partitioned costing formula during
distributed planning should not rely on numInstance. But enabling this
optimization ensures consistent query plan shape when comparing against
MT_DOP plan. This optimization can still be disabled by specifying
USE_DOP_FOR_COSTING=false.
This patch also does some cleanup including:
- Fix "max-parallelism" value in explain string.
- Make a constant in ScanNode.rowMaterializationCost() into a backend
flag named scan_range_cost_factor for experimental purposes.
- Replace all references to ProcessingCost.isComputeCost() to
queryOptions.isCompute_processing_cost() directly.
- Add Precondition in PlanFragment.getNumInstances() to verify that the
fragment's num instance is not modified anymore after the costing
algorithm finish.
Testing:
- Manually run TPCDS Q84 over tpcds10_parquet and confirm that the
leftmost scan fragment parallelism is raised from 12 (before the
patch) to 18 (after the patch).
- Add test in PlannerTest.testProcessingCost that reproduces the issue.
- Update compute stats test in test_executor_groups.py to maintain test
assertion.
- Pass core tests.
Change-Id: I7010f6c3bc48ae3f74e8db98a83f645b6c157226
Reviewed-on: http://gerrit.cloudera.org:8080/20024
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
The new processing cost-based planner changes (IMPALA-11604,
IMPALA-12091) will impact output writer parallelism for insert queries,
with the potential for more small files if the processing cost-based
planning results in too many writer fragments. It can further exacerbate
a problem introduced by MT_DOP (see IMPALA-8125).
The MAX_FS_WRITERS query option can help mitigate this. But even without
the MAX_FS_WRITERS set, the default output writer parallelism should
avoid creating excessive writer parallelism for partitioned and
unpartitioned inserts.
This patch implements such a limit when using the cost-based planner. It
limits the number of writer fragments such that each writer fragment
writes at least 256MB of rows. This patch also allows CTAS (a kind of
DDL query) to be eligible for auto-scaling.
This patch also remove comments about NUM_SCANNER_THREADS added by
IMPALA-12029, since it does not applies anymore after IMPALA-12091.
Testing:
- Add test cases in test_query_cpu_count_divisor_default
- Add test_processing_cost_writer_limit in test_insert.py
- Pass test_insert.py::TestInsertHdfsWriterLimit
- Pass test_executor_groups.py
Change-Id: I289c6ffcd6d7b225179cc9fb2f926390325a27e0
Reviewed-on: http://gerrit.cloudera.org:8080/19880
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Before this patch, Impala still relies on MT_DOP option to decide the
degree of parallelism of the scan fragment when a query runs with
COMPUTE_PROCESSING_COST=1. This patch adds the scan node's processing
cost as another consideration to raise scan parallelism beyond MT_DOP.
Scan node cost is now adjusted to also consider the number of effective
scan ranges. Each scan range is given a weight of (0.5% *
min_processing_per_thread), which roughly means that one scan node
instance can handle at most 200 scan ranges.
Query option MAX_FRAGMENT_INSTANCES_PER_NODE is added as an upper
bound on scan parallelism if COMPUTE_PROCESSING_COST=true. If the number
of scan ranges is fewer than the maximum parallelism allowed by the scan
node's processing cost, that processing cost will be clamped down
to (min_processing_per_thread / number of scan ranges). Lowering
MAX_FRAGMENT_INSTANCES_PER_NODE can also clamp down the scan processing
cost in a similar way. For interior fragments, a combination of
MAX_FRAGMENT_INSTANCES_PER_NODE, PROCESSING_COST_MIN_THREADS, and the
number of available cores per node is accounted to determine maximum
fragment parallelism per node. For scan fragment, only the first two are
considered to encourage Frontend to choose a larger executor group as
needed.
Two new static state is added into exec-node.h: is_mt_fragment_ and
num_instances_per_node_. The backend code that refers to the MT_DOP
option is replaced with either is_mt_fragment_ or
num_instances_per_node_.
Two new criteria are added during effective parallelism calculation in
PlanFragment.adjustToMaxParallelism():
- If a fragment has UnionNode, its parallelism is the maximum between
its input fragments and its collocated ScanNode's expected
parallelism.
- If a fragment only has a single ScanNode (and no UnionNode), its
parallelism is calculated in the same fashion as the interior fragment
but will not be lowered anymore since it will not have any child
fragment to compare with.
Admission control slots remain unchanged. This may cause a query to fail
admission if Planner selects scan parallelism that is higher than the
configured admission control slots value. Setting
MAX_FRAGMENT_INSTANCES_PER_NODE equal to or lower than configured
admission control slots value can help lower scan parallelism and pass
the admission controller.
The previous workaround to control scan parallelism by IMPALA-12029 is
now removed. This patch also disables IMPALA-10287 optimization if
COMPUTE_PROCESSING_COST=true. This is because IMPALA-10287 relies on a
fixed number of fragment instances in DistributedPlanner.java. However,
effective parallelism calculation is done much later and may change the
final number of instances of hash join fragment, rendering
DistributionMode selected by IMPALA-10287 inaccurate.
This patch is benchmarked using single_node_perf_run.py with the
following parameters:
args="-gen_experimental_profile=true -default_query_options="
args+="mt_dop=4,compute_processing_cost=1,processing_cost_min_threads=1 "
./bin/single_node_perf_run.py --num_impalads=3 --scale=10 \
--workloads=tpcds --iterations=5 --table_formats=parquet/none/none \
--impalad_args="$args" \
--query_names=TPCDS-Q3,TPCDS-Q14-1,TPCDS-Q14-2,TPCDS-Q23-1,TPCDS-Q23-2,TPCDS-Q49,TPCDS-Q76,TPCDS-Q78,TPCDS-Q80A \
"IMPALA-12091~1" IMPALA-12091
The benchmark result is as follows:
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| Workload | Query | File Format | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%) | Base StdDev(%) | Iters | Median Diff(%) | MW Zval | Tval |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| TPCDS(10) | TPCDS-Q23-1 | parquet / none / none | 4.62 | 4.54 | +1.92% | 0.23% | 1.59% | 5 | +2.32% | 1.15 | 2.67 |
| TPCDS(10) | TPCDS-Q14-1 | parquet / none / none | 5.82 | 5.76 | +1.08% | 5.27% | 3.89% | 5 | +2.04% | 0.00 | 0.37 |
| TPCDS(10) | TPCDS-Q23-2 | parquet / none / none | 4.65 | 4.58 | +1.38% | 1.97% | 0.48% | 5 | +0.81% | 0.87 | 1.51 |
| TPCDS(10) | TPCDS-Q49 | parquet / none / none | 1.49 | 1.48 | +0.46% | * 36.02% * | * 34.95% * | 5 | +1.26% | 0.58 | 0.02 |
| TPCDS(10) | TPCDS-Q14-2 | parquet / none / none | 3.76 | 3.75 | +0.39% | 1.67% | 0.58% | 5 | -0.03% | -0.58 | 0.49 |
| TPCDS(10) | TPCDS-Q78 | parquet / none / none | 2.80 | 2.80 | -0.04% | 1.32% | 1.33% | 5 | -0.42% | -0.29 | -0.05 |
| TPCDS(10) | TPCDS-Q80A | parquet / none / none | 2.87 | 2.89 | -0.51% | 1.33% | 0.40% | 5 | -0.01% | -0.29 | -0.82 |
| TPCDS(10) | TPCDS-Q3 | parquet / none / none | 0.18 | 0.19 | -1.29% | * 15.26% * | * 15.87% * | 5 | -0.54% | -0.87 | -0.13 |
| TPCDS(10) | TPCDS-Q76 | parquet / none / none | 1.08 | 1.11 | -2.98% | 0.92% | 1.70% | 5 | -3.99% | -2.02 | -3.47 |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
Testing:
- Pass PlannerTest.testProcessingCost
- Pass test_executor_groups.py
- Reenable test_tpcds_q51a in TestTpcdsQueryWithProcessingCost with
MAX_FRAGMENT_INSTANCES_PER_NODE set to 5
- Pass test_tpcds_queries.py::TestTpcdsQueryWithProcessingCost
- Pass core tests
Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Reviewed-on: http://gerrit.cloudera.org:8080/19807
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This reverts commit f932d78ad0.
The commit is reverted because it cause significant regression for
non-optimized counts star query in parquet format.
There are several conflicts that need to be resolved manually:
- Removed assertion against 'NumFileMetadataRead' counter that is lost
with the revert.
- Adjust the assertion in test_plain_count_star_optimization,
test_in_predicate_push_down, and test_partitioned_insert of
test_iceberg.py due to missing improvement in parquet optimized count
star code path.
- Keep the "override" specifier in hdfs-parquet-scanner.h to pass
clang-tidy
- Keep python3 style of RuntimeError instantiation in
test_file_parser.py to pass check-python-syntax.sh
Change-Id: Iefd8fd0838638f9db146f7b706e541fe2aaf01c1
Reviewed-on: http://gerrit.cloudera.org:8080/19843
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Wenzhe Zhou <wzhou@cloudera.com>
'Compute Stats' queries gets scheduled on the smallest executor group
set since these queries don't do any real work. However their child
queries also gets scheduled on the smallest executor group. This may not
be ideal for cases where the child query does NDVs and Counts on a big
wide table.
This patch let child queries to unset REQUEST_POOL query option if that
option is set by frontend planner rather than client. With REQUEST_POOL
unset, child query can select the executor group that best-fit its
workload.
Testing:
- Add test in test_query_cpu_count_divisor_default
Change-Id: I6dc559aa161a27a7bd5d3034788cc6241490d3b5
Reviewed-on: http://gerrit.cloudera.org:8080/19832
Reviewed-by: Kurt Deschler <kdeschle@cloudera.com>
Reviewed-by: Wenzhe Zhou <wzhou@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
In multiple executor groups setup, some trivial queries like "select 1;"
fail admission with "No mapping found for request" error message. This
patch fixes a bug where the Frontend does not set group name prefix when
query is not auto-scalable. In cases like trivial query run, correct
executor group name prefix is still needed for backend to correctly
resolve the target pool.
Testing:
- Pass test_executor_groups.py
Change-Id: I89497c8f67bfd176c2b60fa1b70fe53f905bbab0
Reviewed-on: http://gerrit.cloudera.org:8080/19691
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Web queries site shows no resource pool unless it is specified with
query option. The Planner could set TQueryCtx.request_pool in
TQueryExecRequest when auto scaling is enabled. But the backend
ignores the TQueryCtx.request_pool in TQueryExecRequest when getting
resource pools for Web UI.
This patch fixes the issue in ClientRequestState::request_pool() by
checking TQueryCtx.request_pool in TQueryExecRequest. It also
removes the error path in RequestPoolService::ResolveRequestPool() if
requested_pool is empty string.
Testing:
- Updated TestExecutorGroups::test_query_cpu_count_divisor_default,
TestExecutorGroups::test_query_cpu_count_divisor_two, and
TestExecutorGroups::test_query_cpu_count_divisor_fraction to
verify resource pools on Web queries site and Web admission site.
- Updated expected error message in
TestAdmissionController::test_set_request_pool.
- Passed core test.
Change-Id: Iceacb3a8ec3bd15a8029ba05d064bbbb81e3a766
Reviewed-on: http://gerrit.cloudera.org:8080/19688
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Kurt Deschler <kdeschle@cloudera.com>
Reviewed-by: Abhishek Rawat <arawat@cloudera.com>
In a setup with multiple executor group set, Frontend will try to match
a query with the smallest executor group set that can fit the memory and
cpu requirement of the compiled query. There are kinds of query where
the compiled plan will fit to any executor group set but not necessarily
deliver the best performance. An example for this is Impala's COMPUTE
STATS query. It does full table scan and aggregate the stats, have
fairly simple query plan shape, but can benefit from higher scan
parallelism.
This patch relaxes the scan fragment parallelism on first round of query
planning. This allows scan fragment to increase its parallelism based on
its ProcessingCost estimation. If the relaxed plan fit in an executor
group set, we replan once again with that executor group set but with
scan fragment parallelism returned back to MT_DOP. This one extra round
of query planning adds couple millisecond overhead depending on the
complexity of the query plan, but necessary since the backend scheduler
still expect at most MT_DOP amount of scan fragment instances. We can
remove the extra replanning in the future once we can fully manage scan
node parallelism without MT_DOP.
This patch also adds some improvement, including:
- Tune computeScanProcessingCost() to guard against scheduling too many
scan fragments by comparing with the actual scan range count that
Planner knows.
- Use NUM_SCANNER_THREADS as a hint to cap scan node cost during the
first round of planning.
- Multiply memory related counters by num executors to make it per group
set rather than per node.
- Fix bug in doCreateExecRequest() about selection of num executors for
planning.
Testing:
- Pass test_executor_groups.py
- Add test cases in test_min_processing_per_thread_small.
- Raised impala.admission-control.max-query-mem-limit.root.small from
64MB to 70MB in llama-site-3-groups.xml so that the new grouping query
can fit in root.small pool.
Change-Id: I7a2276fbd344d00caa67103026661a3644b9a1f9
Reviewed-on: http://gerrit.cloudera.org:8080/19656
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Kurt Deschler <kdeschle@cloudera.com>
Reviewed-by: Wenzhe Zhou <wzhou@cloudera.com>
This patch adds a new admission control policy that attempts to
balance queries across multiple executor groups belonging to the
same request pool based on available memory and slots in each
executor group. This feature can be enabled by setting the startup
flag '-balance_queries_across_executor_groups=true'. The setting is
off by default.
Testing:
- Add e2e tests to verify the default policy and the new policy.
Change-Id: I25e851fb57c1d820c25cef5316f4ed800e4c6ac5
Reviewed-on: http://gerrit.cloudera.org:8080/19630
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
Reviewed-by: Wenzhe Zhou <wzhou@cloudera.com>
This patch adds flag skip_resource_checking_on_last_executor_group_set.
If this backend flag is set to true, memory and cpu resource checking
will be skipped when a query is being planned against the last (largest)
executor group set. Setting true will ensure that query will always get
admitted into the last executor group set if it does not fit in any
other group set.
Testing
- Tune test_query_cpu_count_divisor_fraction to run two test case:
cpu within limit, and cpu outside limit.
- Add test_no_skip_resource_checking
Change-Id: I5848e4f67939d3dd2fb105c1ae4ca8e15f2e556f
Reviewed-on: http://gerrit.cloudera.org:8080/19649
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Wenzhe Zhou <wzhou@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Memory and cpu limit checking in executor group
selection (Frontend.java) should be skipped if REQUEST_POOL query option
is set. Setting REQUEST_POOL means user is specifying pool to run the
query regardless of memory and cpu limit.
Testing:
- Add test cases in test_query_cpu_count_divisor_default
Change-Id: I14bf7fe71e2dda1099651b3edf62480e1fdbf845
Reviewed-on: http://gerrit.cloudera.org:8080/19645
Reviewed-by: Wenzhe Zhou <wzhou@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
This patch adds new profile counters under the Frontend profile node to
describe executor group set selection during query planning. It modifies
FrontendProfile.java to allow one level of TRuntimeProfileNode nesting
under the Frontend profile node. This makes it possible to group profile
counters specific to each executor group set in consideration.
"fragment-costs" hint is renamed to "segment-costs". A new
"cpu-comparison-result" hint is added after "segment-costs" to help
navigate how cpu sizing decision is made.
This patch also adds some function overloading in runtime-profile.cc to
hide TotalTime and InactiveTotalTime that is meaningless for anything
under the Frontend profile node. Additional context also added into
AnalysisException threw when none of the executor group sets fits the
query requirement.
This is how the Frontend profile node looks like after running
TestExecutorGroups::test_query_cpu_count_divisor_fraction
Frontend:
Referenced Tables: tpcds_parquet.store_sales
- CpuCountDivisor: 0.20
- ExecutorGroupsConsidered: 3 (3)
Executor group 1 (root.tiny):
Verdict: not enough cpu cores
- CpuAsk: 15 (15)
- CpuMax: 2 (2)
- EffectiveParallelism: 3 (3)
- MemoryAsk: 36.83 MB (38617088)
- MemoryMax: 64.00 MB (67108864)
Executor group 2 (root.small):
Verdict: not enough cpu cores
- CpuAsk: 25 (25)
- CpuMax: 16 (16)
- EffectiveParallelism: 5 (5)
- MemoryAsk: 36.83 MB (38624004)
- MemoryMax: 64.00 MB (67108864)
Executor group 3 (root.large):
Verdict: Match
- CpuAsk: 35 (35)
- CpuMax: 192 (192)
- EffectiveParallelism: 7 (7)
- MemoryAsk: 36.84 MB (38633570)
- MemoryMax: 8388608.00 GB (9007199254740992)
Testing:
- Pass core tests
Change-Id: I6c0ac7f5216d631e4439fe97702e21e06d2eda8a
Reviewed-on: http://gerrit.cloudera.org:8080/19628
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
Tested-by: Riza Suminto <riza.suminto@cloudera.com>
Python 3 changed some object model methods:
- __nonzero__ was removed in favor of __bool__
- func_dict / func_name were removed in favor of __dict__ / __name__
- The next() function was deprecated in favor of __next__
(Code locations should use next(iter) rather than iter.next())
- metaclasses are specified a different way
- Locations that specify __eq__ should also specify __hash__
Python 3 also moved some packages around (urllib2, Queue, httplib,
etc), and this adapts the code to use the new locations (usually
handled on Python 2 via future). This also fixes the code to
avoid referencing exception variables outside the exception block
and variables outside of a comprehension. Several of these seem
like false positives, but it is better to avoid the warning.
This fixes these pylint warnings:
bad-python3-import
eq-without-hash
metaclass-assignment
next-method-called
nonzero-method
exception-escape
comprehension-escape
Testing:
- Ran core tests
- Ran release exhaustive tests
Change-Id: I988ae6c139142678b0d40f1f4170b892eabf25ee
Reviewed-on: http://gerrit.cloudera.org:8080/19592
Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Python 3 changes list operators such as range, map, and filter
to be lazy. Some code that expects the list operators to happen
immediately will fail. e.g.
Python 2:
range(0,5) == [0,1,2,3,4]
True
Python 3:
range(0,5) == [0,1,2,3,4]
False
The fix is to wrap locations with list(). i.e.
Python 3:
list(range(0,5)) == [0,1,2,3,4]
True
Since the base operators are now lazy, Python 3 also removes the
old lazy versions (e.g. xrange, ifilter, izip, etc). This uses
future's builtins package to convert the code to the Python 3
behavior (i.e. xrange -> future's builtins.range).
Most of the changes were done via these futurize fixes:
- libfuturize.fixes.fix_xrange_with_import
- lib2to3.fixes.fix_map
- lib2to3.fixes.fix_filter
This eliminates the pylint warnings:
- xrange-builtin
- range-builtin-not-iterating
- map-builtin-not-iterating
- zip-builtin-not-iterating
- filter-builtin-not-iterating
- reduce-builtin
- deprecated-itertools-function
Testing:
- Ran core job
Change-Id: Ic7c082711f8eff451a1b5c085e97461c327edb5f
Reviewed-on: http://gerrit.cloudera.org:8080/19589
Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com>
Tested-by: Joe McDonnell <joemcdonnell@cloudera.com>
This takes steps to make Python 2 behave like Python 3 as
a way to flush out issues with running on Python 3. Specifically,
it handles two main differences:
1. Python 3 requires absolute imports within packages. This
can be emulated via "from __future__ import absolute_import"
2. Python 3 changed division to "true" division that doesn't
round to an integer. This can be emulated via
"from __future__ import division"
This changes all Python files to add imports for absolute_import
and division. For completeness, this also includes print_function in the
import.
I scrutinized each old-division location and converted some locations
to use the integer division '//' operator if it needed an integer
result (e.g. for indices, counts of records, etc). Some code was also using
relative imports and needed to be adjusted to handle absolute_import.
This fixes all Pylint warnings about no-absolute-import and old-division,
and these warnings are now banned.
Testing:
- Ran core tests
Change-Id: Idb0fcbd11f3e8791f5951c4944be44fb580e576b
Reviewed-on: http://gerrit.cloudera.org:8080/19588
Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com>
Tested-by: Joe McDonnell <joemcdonnell@cloudera.com>
Part 1 of IMPALA-11604 implements the ProcessingCost model for each
PlanNode and DataSink. This second part builds on top of ProcessingCost
model by adjusting the number of instances for each fragment after
considering their production-consumption ratio, and then finally returns
a number representing an ideal CPU core count required for a query to
run efficiently. A more detailed explanation of the CPU costing
algorithm can be found in the three steps below.
I. Compute the total ProcessingCost of a fragment.
The costing algorithm splits a query fragment into several segments
divided by blocking PlanNode/DataSink boundary. Each fragment segment is
a subtree of PlanNodes/DataSink in the fragment with a DataSink or
blocking PlanNode as root and non-blocking leaves. All other nodes in
the segment are non-blocking. PlanNodes or DataSink that belong to the
same segment will have their ProcessingCost summed. A new CostingSegment
class is added to represent this segment.
A fragment that has a blocking PlanNode or blocking DataSink is called a
blocking fragment. Currently, only JoinBuildSink is considered as
blocking DataSink. A fragment without any blocking nodes is called a
non-blocking fragment. Step III discuss further about blocking and
non-blocking fragment.
Take an example of the following fragment plant, which is blocking since
it has 3 blocking PlanNode: 12:AGGREGATE, 06:SORT, and 08:TOP-N.
F03:PLAN FRAGMENT [HASH(i_class)] hosts=3 instances=6 (adjusted from 12)
fragment-costs=[34974657, 2159270, 23752870, 22]
08:TOP-N [LIMIT=100]
| cost=900
|
07:ANALYTIC
| cost=23751970
|
06:SORT
| cost=2159270
|
12:AGGREGATE [FINALIZE]
| cost=34548320
|
11:EXCHANGE [HASH(i_class)]
cost=426337
In bottom-up direction, there exist four segments in F03:
Blocking segment 1: (11:EXCHANGE, 12:AGGREGATE)
Blocking segment 2: 06:SORT
Blocking segment 3: (07:ANALYTIC, 08:TOP-N)
Non-blocking segment 4: DataStreamSink of F03
Therefore we have:
PC(segment 1) = 426337+34548320
PC(segment 2) = 2159270
PC(segment 3) = 23751970+900
PC(segment 4) = 22
These per-segment costs stored in a CostingSegment tree rooted at
PlanFragment.rootSegment_, and are [34974657, 2159270, 23752870, 22]
respectively after the post-order traversal.
This is implemented in PlanFragment.computeCostingSegment() and
PlanFragment.collectCostingSegmentHelper().
II. Compute the effective degree of parallelism (EDoP) of fragments.
The costing algorithm walks PlanFragments of the query plan tree in
post-order traversal. Upon visiting a PlanFragment, the costing
algorithm attempts to adjust the number of instances (effective
parallelism) of that fragment by comparing the last segment's
ProcessingCost of its child and production-consumption rate between its
adjacent segments from step I. To simplify this initial implementation,
the parallelism of PlanFragment containing EmptySetNode, UnionNode, or
ScanNode will remain unchanged (follow MT_DOP).
This step is implemented at PlanFragment.traverseEffectiveParallelism().
III. Compute the EDoP of the query.
Effective parallelism of a query is the maximum upper bound of CPU core
count that can parallelly work on a query when considering the
overlapping between fragment execution and blocking operators. We
compute this in a similar post-order traversal as step II and split the
query tree into blocking fragment subtrees similar to step I. The
following is an example of a query plan from TPCDS-Q12.
F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
|
13:MERGING-EXCHANGE [UNPARTITIONED]
|
F03:PLAN FRAGMENT [HASH(i_class)] hosts=3 instances=3 (adjusted from 12)
08:TOP-N [LIMIT=100]
|
07:ANALYTIC
|
06:SORT
|
12:AGGREGATE [FINALIZE]
|
11:EXCHANGE [HASH(i_class)]
|
F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=12
05:AGGREGATE [STREAMING]
|
04:HASH JOIN [INNER JOIN, BROADCAST]
|
|--F05:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
| JOIN BUILD
| |
| 10:EXCHANGE [BROADCAST]
| |
| F02:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
| 02:SCAN HDFS [tpcds10_parquet.date_dim, RANDOM]
|
03:HASH JOIN [INNER JOIN, BROADCAST]
|
|--F06:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
| JOIN BUILD
| |
| 09:EXCHANGE [BROADCAST]
| |
| F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
| 01:SCAN HDFS [tpcds10_parquet.item, RANDOM]
|
00:SCAN HDFS [tpcds10_parquet.web_sales, RANDOM]
A blocking fragment is a fragment that has a blocking PlanNode or
blocking DataSink in it. The costing algorithm splits the query plan
tree into blocking subtrees divided by blocking fragment boundary. Each
blocking subtree has a blocking fragment as a root and non-blocking
fragments as the intermediate or leaf nodes. From the TPCDS-Q12 example
above, the query plan is divided into five blocking subtrees of
[(F05, F02), (F06, F01), F00, F03, F04].
A CoreCount is a container class that represents the CPU core
requirement of a subtree of a query or the query itself. Each blocking
subtree will have its fragment's adjusted instance count summed into a
single CoreCount. This means that all fragments within a blocking
subtree can run in parallel and should be assigned one core per fragment
instance. The CoreCount for each blocking subtree in the TPCDS-Q12
example is [4, 4, 12, 3, 1].
Upon visiting a blocking fragment, the maximum between current
CoreCount (rooted at that blocking fragment) vs previous blocking
subtrees CoreCount is taken and the algorithm continues up to the next
ancestor PlanFragment. The final CoreCount for the TPCDS-Q12 example is
12.
This step is implemented at Planner.computeBlockingAwareCores() and
PlanFragment.traverseBlockingAwareCores().
The resulting CoreCount at the root PlanFragment is then taken as the
ideal CPU core count / EDoP of the query. This number will be compared
against the total CPU count of an Impala executor group to determine if
it fits to run in that set or not. A backend flag
query_cpu_count_divisor is added to help scale down/up the EDoP of a
query if needed.
Two query options are added to control the entire computation of EDoP.
1. COMPUTE_PROCESSING_COST
Control whether to enable this CPU costing algorithm or not.
Must also set MT_DOP > 0 for this query option to take effect.
2. PROCESSING_COST_MIN_THREADS
Control the minimum number of fragment instances (threads) that the
costing algorithm is allowed to adjust. The costing algorithm is in
charge of increasing the fragment's instance count beyond this
minimum number through producer-consumer rate comparison. The maximum
number of fragment is max between PROCESSING_COST_MIN_THREADS,
MT_DOP, and number of cores per executor.
This patch also adds three backend flags to tune the algorithm.
1. query_cpu_count_divisor
Divide the CPU requirement of a query to fit the total available CPU
in the executor group. For example, setting value 2 will fit the
query with CPU requirement 2X to an executor group with total
available CPU X. Note that setting with a fractional value less than
1 effectively multiplies the query CPU requirement. A valid value is
> 0.0. The default value is 1.
2. processing_cost_use_equal_expr_weight
If true, all expression evaluations are weighted equally to 1 during
the plan node's processing cost calculation. If false, expression
cost from IMPALA-2805 will be used. Default to true.
3. min_processing_per_thread
Minimum processing load (in processing cost unit) that a fragment
instance needs to work on before planner considers increasing
instance count based on the processing cost rather than the MT_DOP
setting. The decision is per fragment. Setting this to high number
will reduce parallelism of a fragment (more workload per fragment),
while setting to low number will increase parallelism (less workload
per fragment). Actual parallelism might still be constrained by the
total number of cores in selected executor group, MT_DOP, or
PROCESSING_COST_MIN_THREAD query option. Must be a positive integer.
Currently default to 10M.
As an example, the following are additional ProcessingCost information
printed to coordinator log for Q3, Q12, and Q15 ran on TPCDS 10GB scale,
3 executors, MT_DOP=4, PROCESSING_COST_MAX_THREADS=4, and
processing_cost_use_equal_expr_weight=false.
Q3
CoreCount={total=12 trace=F00:12}
Q12
CoreCount={total=12 trace=F00:12}
Q15
CoreCount={total=15 trace=N07:3+F00:12}
There are a few TODOs which will be done in follow up tasks:
1. Factor in row width in ProcessingCost calcuation (IMPALA-11972).
2. Tune the individual expression cost from IMPALA-2805.
3. Benchmark and tune min_processing_per_thread with an optimal value.
4. Revisit cases where cardinality is not available (getCardinality() or
getInputCardinality() return -1).
5. Bound SCAN and UNION fragments by ProcessingCost as well (need to
address IMPALA-8081).
Testing:
- Add TestTpcdsQueryWithProcessingCost, which is a similar run of
TestTpcdsQuery, but with COMPUTE_PROCESSING_COST=1 and MT_DOP=4.
Setting log level TRACE for PlanFragment and manually running
TestTpcdsQueryWithProcessingCost in minicluster shows several fragment
instance count reduction from 12 to either of 9, 6, or 3 in
coordinator log.
- Add PlannerTest#testProcessingCost
Adjusted fragment count is indicated by "(adjusted from 12)" in the
query profile.
- Add TestExecutorGroups::test_query_cpu_count_divisor.
Co-authored-by: Qifan Chen <qchen@cloudera.com>
Change-Id: Ibb2a796fdf78336e95991955d89c671ec82be62e
Reviewed-on: http://gerrit.cloudera.org:8080/19593
Reviewed-by: Wenzhe Zhou <wzhou@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Kurt Deschler <kdeschle@cloudera.com>
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
This patch removes executor groups from cluster membership after they
have no executors, so that executor groups' configurations can be
updated without restarting all impalads in the cluster.
Testing:
- Added an e2e test to verify the new functionality.
Change-Id: I480b84b26a780d345216004f1a4657c7b95dda45
Reviewed-on: http://gerrit.cloudera.org:8080/19468
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Wenzhe Zhou <wzhou@cloudera.com>
IMPALA-11604 enables the planner to compute CPU usage for certain
queries and to select suitable executor groups to run. The CPU usage is
expressed as the CPU cores required to process a query.
This patch added the CPU core limit, which is the maximum CPU core
available per node and coordinator for each executor group, to the pool
service.
Testing:
- Passed core run.
- Verified that CPU cores were shown on the admission and
metrics pages of the Impala debug web server.
Change-Id: Id4c5ee519ce7c329b06ac821283e215a3560f525
Reviewed-on: http://gerrit.cloudera.org:8080/19366
Reviewed-by: Andrew Sherman <asherman@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
IMPALA-11470 adds support for codegen cache, however the admission
controller is not aware of the memory usage of the codegen cache,
while the codegen cache is actually using the memory quota from
the query memory. It could result in query failures when running
heavy workloads and admission controller has fully admitted queries.
This patch subtracts the codegen cache capacity from the admission
memory limit during initialization, therefore preserving the memory
consumption of codegen cache from the beginning, and treating it as
a separate memory independent to the query memory reservation.
Also reduces the max codegen cache memory from 20 percent to 10
percent, and changes some failed testcases due to the reduction of
the admit memory limit.
Tests:
Passed exhaustive tests.
Change-Id: Iebdc04ba1b91578d74684209a11c815225b8505a
Reviewed-on: http://gerrit.cloudera.org:8080/19377
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
IMPALA-11604 enables the planner to compute CPU usage for certain
queries and to select suitable executor groups to run. The CPU usage is
expressed as the total amount of data to be processed per query.
This patch added the processing cost limit, which is the total amount of
data that each executor group can handle, to the pool service.
Testing:
- Passed core run.
- Verified that processing costs were shown on the admission and
metrics pages of the Impala debug web server.
Change-Id: I9bd2a7284eda47a969ef91e4be19f96d2af53449
Reviewed-on: http://gerrit.cloudera.org:8080/19121
Reviewed-by: Qifan Chen <qchen@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
After queries are cancelled, it can take some time (>30s in some
instances) to fully cancel all fragment instances and fully reclaim
reserved memory. The test and query limits were exactly matched, so any
extra reservation would prevent scheduling, causing the test to
frequently time out. With the fix, a 1MB of extra memory is reserved to
break the tie thus avoiding the time out. The extra 1MB of memory can be
seen in logs printing agg_mem_reserved.
Rather than extend timeouts and make the test run longer, add a small
buffer to the admission limit to allow for fragment instance cleanup
while the test runs.
Change-Id: Iaee557ad87d3926589b30d6dcdd850e9af9b3476
Reviewed-on: http://gerrit.cloudera.org:8080/19092
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch provides count(star) optimization for ORC scans, similar to
the work done in IMPALA-5036 for Parquet scans. We use the stripes num
rows statistics when computing the count star instead of materializing
empty rows. The aggregate function changed from a count to a special sum
function initialized to 0.
This count(star) optimization is disabled for the full ACID table
because the scanner might need to read and validate the
'currentTransaction' column in table's special schema.
This patch drops 'parquet' from names related to the count star
optimization. It also improves the count(star) operation in general by
serving the result just from the file's footer stats for both Parquet
and ORC. We unify the optimized count star and zero slot scan functions
into HdfsColumnarScanner.
The following table shows a performance comparison before and after the
patch. primitive_count_star query target tpch10_parquet.lineitem
table (10GB scale TPC-H). Meanwhile, count_star_parq and count_star_orc
query is a modified primitive_count_star query that targets
tpch_parquet.lineitem and tpch_orc_def.lineitem table accordingly.
+-------------------+----------------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| Workload | Query | File Format | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%) | Base StdDev(%) | Iters | Median Diff(%) | MW Zval | Tval |
+-------------------+----------------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| tpch_parquet | count_star_parq | parquet / none / none | 0.06 | 0.07 | -10.45% | 2.87% | * 25.51% * | 9 | -1.47% | -1.26 | -1.22 |
| tpch_orc_def | count_star_orc | orc / def / none | 0.06 | 0.08 | -22.37% | 6.22% | * 30.95% * | 9 | -1.85% | -1.16 | -2.14 |
| TARGETED-PERF(10) | primitive_count_star | parquet / none / none | 0.06 | 0.08 | I -30.40% | 2.68% | * 29.63% * | 9 | I -7.20% | -2.42 | -3.07 |
+-------------------+----------------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
Testing:
- Add PlannerTest.testOrcStatsAgg
- Add TestAggregationQueries::test_orc_count_star_optimization
- Exercise count(star) in TestOrc::test_misaligned_orc_stripes
- Pass core tests
Change-Id: I0fafa1182f97323aeb9ee39dd4e8ecd418fa6091
Reviewed-on: http://gerrit.cloudera.org:8080/18327
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch provides replan support for multiple executor group sets.
Each executor group set is associated with a distinct number of nodes
and a threshold for estimated memory per host in bytes that can be
denoted as [<group_name_prefix>:<#nodes>, <threshold>].
In the patch, a query of type EXPLAIN, QUERY or DML can be compiled
more than once. In each attempt, per host memory is estimated and
compared with the threshold of an executor group set. If the estimated
memory is no more than the threshold, the iteration process terminates
and the final plan is determined. The executor group set with the
threshold is selected to run the query.
A new query option 'enable_replan', default to 1 (enabled), is added.
It can be set to 0 to disable this patch and to generate the distributed
plan for the default executor group.
To avoid long compilation time, the following enhancement is enabled.
Note 1) can be disabled when relevant meta-data change is
detected.
1. Authorization is performed only for the 1st compilation;
2. openTransaction() is called for transactional queries in 1st
compilation and the saved transactional info is used in
subsequent compilations. Similar logic is applied to Kudu
transactional queries.
To facilitate testing, the patch imposes an artificial two executor
group setup in FE as follows.
1. [regular:<#nodes>, 64MB]
2. [large:<#nodes>, 8PB]
This setup is enabled when a new query option 'test_replan' is set
to 1 in backend tests, or RuntimeEnv.INSTANCE.isTestEnv() is true as
in most frontend tests. This query option is set to 0 by default.
Compilation time increases when a query is compiled in several
iterations, as shown below for several TPCDs queries. The increase
is mostly due to redundant work in either single node plan creation
or recomputing value transfer graph phase. For small queries, the
increase can be avoided if they can be compiled in single iteration
by properly setting the smallest threshold among all executor group
sets. For example, for the set of queries listed below, the smallest
threshold can be set to 320MB to catch both q15 and q21 in one
compilation.
Compilation time (ms)
Queries Estimated Memory 2-iterations 1-iteration Percentage of
increase
q1 408MB 60.14 25.75 133.56%
q11 1.37GB 261.00 109.61 138.11%
q10a 519MB 139.24 54.52 155.39%
q13 339MB 143.82 60.08 139.38%
q14a 3.56GB 762.68 312.92 143.73%
q14b 2.20GB 522.01 245.13 112.95%
q15 314MB 9.73 4.28 127.33%
q21 275MB 16.00 8.18 95.59%
q23a 1.50GB 461.69 231.78 99.19%
q23b 1.34GB 461.31 219.61 110.05%
q4 2.60GB 218.05 105.07 107.52%
q67 5.16GB 694.59 334.24 101.82%
Testing:
1. Almost all FE and BE tests are now run in the artificial two
executor setup except a few where a specific cluster configuration
is desirable;
2. Ran core tests successfully;
3. Added a new observability test and a new query assignment test;
4. Disabled concurrent insert test (test_concurrent_inserts) and
failing inserts (test_failing_inserts) test in local catalog mode
due to flakiness. Reported both in IMPALA-11189 and IMPALA-11191.
Change-Id: I75cf17290be2c64fd4b732a5505bdac31869712a
Reviewed-on: http://gerrit.cloudera.org:8080/18178
Reviewed-by: Qifan Chen <qchen@cloudera.com>
Tested-by: Qifan Chen <qchen@cloudera.com>
This adds metrics for each executor group set that expose the number
of executor groups, the number of healthy executor groups and the
total number of backends associated with that group set.
Testing:
Added an e2e test to verify metrics are updated correctly.
Change-Id: Ib39f940de830ef6302785aee30eeb847fa5deeba
Reviewed-on: http://gerrit.cloudera.org:8080/18142
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch introduces the concept of executor group sets. Each group
set specifies an executor group name prefix and an expected group
size (the number of executors in each group). Every executor group
that is a part of this set will have the same prefix which will
also be equivalent to the resource pool name that it maps to.
These sets are specified via a startup flag
'expected_executor_group_sets' which is a comma separated list in
the format <executor_group_name_prefix>:<expected_group_size>.
Testing:
- Added unit tests
Change-Id: I9e0a3a5fe2b1f0b7507b7c096b7a3c373bc2e684
Reviewed-on: http://gerrit.cloudera.org:8080/18093
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
executor pools
This patch adds a test to verify that admission control accounting
works when using multiple coordinators and multiple executor groups
mapped to different resource pools and having different sizes.
Change-Id: If76d386d8de5730da937674ddd9a69aa1aa1355e
Reviewed-on: http://gerrit.cloudera.org:8080/17891
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch adds the ability to share the per-host stats for locally
admitted queries across all coordinators. This helps to get a more
consolidated view of the cluster for stats like slots_in_use and
mem_admitted when making local admission decisions.
Testing:
Added e2e py test
Change-Id: I2946832e0a89b077d0f3bec755e4672be2088243
Reviewed-on: http://gerrit.cloudera.org:8080/17683
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
With this fix, coordinator only queries are submitted to a pseudo
executor group named "empty group (using coordinator only)" which
is empty. This allows running coordinator only queries regardless
of the presence of any healthy executor groups.
Testing:
Added a custom cluster test and modified tests that relied on
coordinator only queries to be queued in absence of executor groups.
Change-Id: I8fe098032744aa20bbbe4faddfc67e7a46ce03d5
Reviewed-on: http://gerrit.cloudera.org:8080/14183
Reviewed-by: Bikramjeet Vig <bikramjeet.vig@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Some tests in test_executor_groups immediately tried fetching the query
profile after executing it asynchronously to verify if the query was
queued. However there is a small window between the exec rpc returning
and the query being queued during which the query profile does not
contain any info about the query being queued. This was causing some
asserts in the test to fail.
Change-Id: I47070045250a12d86c99f9a30a956a268be5fa7e
Reviewed-on: http://gerrit.cloudera.org:8080/14810
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This change improves the cluster membership snapshot we maintain in the
frontend in cases where all executors have been shut down or none have
started yet.
Prior to this change when configuring Impala with executor groups, the
planner might see a ExecutorMembershipSnapshot that has no executors in
it. This could happen if the first executor group had not started up
yet, or if all executor groups had been shutdown. If this happened, the
planner would make sub-optimal decisions, e.g. decide on a broadcast
join vs a partitioned hash join.
With this change if no executors have been registered so far, the
planner will use the expected number of executors which can be set using
the -num_expected_executors flag and is 20 by default. After executors
come online, the planner will use the size of the largest healthy
executor group, and it will hold on to the group's size even if it shuts
down or becomes unhealthy. This allows the planner to work on the
assumption that a healthy executor group of the same size will
eventually come online to execute the query.
Change-Id: Ib6b05326c82fb3ca625c015cfcdc38f891f5d4f9
Reviewed-on: http://gerrit.cloudera.org:8080/14756
Reviewed-by: Lars Volker <lv@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
The test was checking the incorrect invariant - the
slot mechanism only prevents more than than number of
queries running on a backend. More queries can run on
a cluster since the query's backends are freed up before
the query itself finishes.
It was a little tricky picking an appropriate metric
since there is no strong consistency between the
metrics, e.g. decrementing a metric after a backend
finishes may race with admitting the next query.
So I simply used the same metric used by the admission
controller in making decisions, which should be
strongly consistent w.r.t. admission control decissions.
Also remove the concurrency limit on the coordinator,
which seemed inconsistent with the purpose of the
test, because we only want concurrency to be limited
by the executors.
Testing:
Looped the test for a bit.
Change-Id: I910028919f248a3bf5de345e9eade9dbc4353ebd
Reviewed-on: http://gerrit.cloudera.org:8080/14606
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This integrates mt_dop with the "slots" mechanism that's used
for non-default executor groups.
The idea is simple - the degree of parallelism on a backend
determines the number of slots consumed. The effective
degree of parallelism is used, not the raw mt_dop setting.
E.g. if the query only has a single input split and executes
only a single fragment instance on a host, we don't want
to count the full mt_dop value for admission control.
--admission_control_slots is added as a new flag that
replaces --max_concurrent_queries, since the name better
reflects the concept. --max_concurrent_queries is kept
for backwards compatibility and has the same meaning
as --admission_control_slots.
The admission control logic is extended to take this into
account. We also add an immediate rejection code path
since it is now possible for queries to not be admittable
based on the # of available slots.
We only factor in the "width" of the plan - i.e. the number
of instances of fragments. We don't account for the number
of distinct fragments, since they may not actually execute
in parallel with each other because of dependencies.
This number is added to the per-host profile as the
"AdmissionSlots" counter.
Testing:
Added unit tests for rejection and queue/admit checks.
Also includes a fix for IMPALA-9054 where we increase
the timeout.
Added end-to-end tests:
* test_admission_slots in test_mt_dop.py that checks the
admission slot calculation via the profile.
* End-to-end admission test that exercises the admit
immediately and queueing code paths.
Added checks to test_verify_metrics (which runs after
end-to-end tests) to ensure that the per-backend
slots in use goes to 0 when the cluster is quiesced.
Change-Id: I7b6b6262ef238df26b491352656a26e4163e46e5
Reviewed-on: http://gerrit.cloudera.org:8080/14357
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
With this patch, all executor groups with at least one executor
will have a metric added that displays the number of queries
(admitted by the local coordinator) running on them. The metric
is removed only when the group has no executors in it. It gets updated
when either the cluster membership changes or a query gets admitted or
released by the admission controller. Also adds the ability to delete
metrics from a metric group after registration.
Testing:
- Added a custom cluster test and a BE metric test.
- Had to modify some metric tests that relied on ordering of metrics by
their name.
Change-Id: I58cde8699c33af8b87273437e9d8bf6371a34539
Reviewed-on: http://gerrit.cloudera.org:8080/14103
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
In some situations, users might actually expect not having a healthy
executor group around, e.g. when they're starting one and it takes a
while to come online. This change makes the queuing reason more generic
and drops the "unhealthy" concept from it to reduce confusion.
Change-Id: Idceab7fb56335bab9d787b0f351a41e6efd7dd59
Reviewed-on: http://gerrit.cloudera.org:8080/14210
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Changes the Coordinator to release admitted memory when each Backend
completes, rather than waiting for the entire query to complete before
releasing admitted memory. When the Coordinator detects that a Backend
has completed (via ControlService::ReportExecStatus) it updates the
state of the Backend in Coordinator::BackendResourceState.
BackendResourceState tracks the state of the admitted resources for
each Backend and decides when the resources for a group of Backend
states should be released. BackendResourceState defines a state machine
to help coordinate the state of the admitted memory for each Backend.
It guarantees that by the time the query is shutdown, all Backends
release their admitted memory.
BackendResourceState implements three rules to control the rate at
which the Coordinator releases admitted memory from the
AdmissionController:
* Resources are released at most once every 1 second, this prevents
short lived queries from causing high load on the admission controller
* Resources are released at most O(log(num_backends)) times; the
BackendResourceStates can release multiple BackendStates from the
AdmissionController at a time
* All pending resources are released if the only remaining Backend is
the Coordinator Backend; this is useful for result spooling where all
Backends may complete, except for the Coordinator Backend
Exposes the following hidden startup flags to help tune the heuristics
above:
--batched_release_decay_factor
* Defaults to 2
* Controls the base value for the O(log(num_backends)) bound when
batching the release of Backends.
--release_backend_states_delay_ms
* Defaults to 1000 milliseconds
* Controls how often Backends can release their resources.
Testing:
* Ran core tests
* Added new tests to test_result_spooling.py and
test_admission_controller.py
Change-Id: I88bb11e0ede7574568020e0277dd8ac8d2586dc9
Reviewed-on: http://gerrit.cloudera.org:8080/14104
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This patch adds 3 metrics under a new metric group called
"cluster-membership" that keep track of the number of executor groups
that have at least one live executor, number of executor groups that are
in a healthy state and the number of backends registered with the
statestore.
Testing:
Modified tests to use these metrics for verification.
Change-Id: I7745ea1c7c6778d3fb5e59adbc873697beb0f3b9
Reviewed-on: http://gerrit.cloudera.org:8080/13979
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This change adds support for running queries inside a single admission
control pool on one of several, disjoint sets of executors called
"executor groups".
Executors can be configured with an executor group through the newly
added '--executor_groups' flag. Note that in anticipation of future
changes, the flag already uses the plural form, but only a single
executor group may be specified for now. Each executor group
specification can optionally contain a minimum size, separated by a
':', e.g. --executor_groups default-pool-1:3. Only when the cluster
membership contains at least that number of executors for the groups
will it be considered for admission.
Executor groups are mapped to resource pools by their name: An executor
group can service queries from a resource pool if the pool name is a
prefix of the group name separated by a '-'. For example, queries in
poll poolA can be serviced by executor groups named poolA-1 and poolA-2,
but not by groups name foo or poolB-1.
During scheduling, executor groups are considered in alphabetical order.
This means that one group is filled up entirely before a subsequent
group is considered for admission. Groups also need to pass a health
check before considered. In particular, they must contain at least the
minimum number of executors specified.
If no group is specified during startup, executors are added to the
default executor group. If - during admission - no executor group for a
pool can be found and the default group is non-empty, then the default
group is considered. The default group does not have a minimum size.
This change inverts the order of scheduling and admission. Prior to this
change, queries were scheduled before submitting them to the admission
controller. Now the admission controller computes schedules for all
candidate executor groups before each admission attempt. If the cluster
membership has not changed, then the schedules of the previous attempt
will be reused. This means that queries will no longer fail if the
cluster membership changes while they are queued in the admission
controller.
This change also alters the default behavior when using a dedicated
coordinator and no executors have registered yet. Prior to this change,
a query would fail immediately with an error ("No executors registered
in group"). Now a query will get queued and wait until executors show
up, or it times out after the pools queue timeout period.
Testing:
This change adds a new custom cluster test for executor groups. It
makes use of new capabilities added to start-impala-cluster.py to bring
up additional executors into an already running cluster.
Additionally, this change adds an instructional implementation of
executor group based autoscaling, which can be used during development.
It also adds a helper to run queries concurrently. Both are used in a
new test to exercise the executor group logic and to prevent regressions
to these tools.
In addition to these tests, the existing tests for the admission
controller (both BE and EE tests) thoroughly exercise the changed code.
Some of them required changes themselves to reflect the new behavior.
I looped the new tests (test_executor_groups and test_auto_scaling) for
a night (110 iterations each) without any issues.
I also started an autoscaling cluster with a single group and ran
TPC-DS, TPC-H, and test_queries on it successfully.
Known limitations:
When using executor groups, only a single coordinator and a single AC
pool (i.e. the default pool) are supported. Executors to not include the
number of currently running queries in their statestore updates and so
admission controllers are not aware of the number of queries admitted by
other controllers per host.
Change-Id: I8a1d0900f2a82bd2fc0a906cc094e442cffa189b
Reviewed-on: http://gerrit.cloudera.org:8080/13550
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>