IMPALA-12657: Improve ProcessingCost of ScanNode and NonGroupingAggregator

This patch improves the accuracy of the CPU ProcessingCost estimates for
several of the CPU intensive operators by basing the costs on benchmark
data. The general approach for a given operator was to run a set of queries
that exercised the operator under various conditions (e.g. large vs small
row sizes and row counts, varying NDV, different file formats, etc) and
capture the CPU time spent per unit of work (the unit of work might be
measured as some number of rows, some number of bytes, some number of
predicates evaluated, or some combination of these). The data was then
analyzed in an attempt to fit a simple model that would allow us to
predict CPU consumption of a given operator based on information available
at planning time.

For example, the CPU ProcessingCost for a Parquet scan is estimated as:
TotalCost = (0.0144 * BytesMaterialized) + (0.0281 * Rows * Predicate Count)

The coefficients  (0.0144 and 0.0281) are derived from benchmarking
scans under a variety of conditions. Similar cost functions and coefficients
were derived for all of the benchmarked operators. The coefficients for all
the operators are normalized such that a single unit of cost equates to
roughly 100 nanoseconds of CPU time on a r5d.4xlarge instance. So we would
predict an operator with a cost of 10,000,000 would complete in roughly one
second on a single core.

Limitations:
* Costing only addresses CPU time spent and doesn't account for any IO
  or other wait time.
* Benchmarking scenarios didn't provide comprehensive coverage of the
  full range of data types, distributions, etc. More thorough
  benchmarking could improve the costing estimates further.
* This initial patch only covers a subset of the operators, focusing
  on those that are most common and most CPU intensive. Specifically
  the following operators are covered by this patch. All others
  continue to use the previous ProcessingCost code:
  AggregationNode
  DataStreamSink (exchange sender)
  ExchangeNode
  HashJoinNode
  HdfsScanNode
  HdfsTableSink
  NestedLoopJoinNode
  SortNode
  UnionNode

Benchmark-based costing of the remaining operators will be covered by
a future patch.

Future patches will automate the collection and analysis of the benchmark
data and the computation of the cost coefficients to simplify maintenance
of the costing as performance changes over time.

Change-Id: Icf1edd48d4ae255b7b3b7f5b228800d7bac7d2ca
Reviewed-on: http://gerrit.cloudera.org:8080/21279
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
David Rorke
2024-04-08 22:18:21 -07:00
committed by Impala Public Jenkins
parent d437334e53
commit 25a8d70664
126 changed files with 18585 additions and 18198 deletions

View File

@@ -452,14 +452,14 @@ class TestInsertHdfsWriterLimit(ImpalaTestSuite):
"select ss_item_sk, ss_ticket_number, ss_store_sk " \
"from tpcds_parquet.store_sales".format(unique_database)
self.__run_insert_and_verify_instances(query, max_fs_writers=0,
expected_num_instances_per_host=[5, 5, 5],
expected_num_instances_per_host=[2, 2, 2],
processing_cost_min_threads=1)
# Partitioned insert can still be limited by max_fs_writers option.
query = "create table {0}.test6 partitioned by (ss_store_sk) as " \
"select ss_item_sk, ss_ticket_number, ss_store_sk " \
"from tpcds_parquet.store_sales".format(unique_database)
self.__run_insert_and_verify_instances(query, max_fs_writers=2,
expected_num_instances_per_host=[4, 5, 5],
expected_num_instances_per_host=[1, 2, 2],
processing_cost_min_threads=1)
def __run_insert_and_verify_instances(self, query, max_fs_writers=0, mt_dop=0,