From c4d284f3cc22a8e0f99b3974973ca2bbfa277943 Mon Sep 17 00:00:00 2001 From: Tim Armstrong Date: Fri, 9 Jun 2017 11:04:30 -0700 Subject: [PATCH] IMPALA-5483: Automatically disable codegen for small queries This is similar to the single-node execution optimisation, but applies to slightly larger queries that should run in a distributed manner but won't benefit from codegen. This adds a new query option disable_codegen_rows_threshold that defaults to 50,000. If fewer than this number of rows are processed by a plan node per impalad, the cost of codegen almost certainly outweighs the benefit. Using rows processed as a threshold is justified by a simple model that assumes the cost of codegen and execution per row for the same operation are proportional. E.g. if x is the complexity of the operation, n is the number of rows processed, C is a constant factor giving the cost of codegen and Ec/Ei are constant factor giving the cost of codegen'd and interpreted execution and d, then the cost of the codegen'd operator is C * x + Ec * x * n and the cost of the interpreted operator is Ei * x * n. Rearranging means that interpretation is cheaper if n < C / (Ei - Ec), i.e. that (at least with the simplified model) it makes sense to choose interpretation or codegen based on a constant threshold. The model also implies that it is somewhat safer to choose codegen because the additional cost of codegen is O(1) but the additional cost of interpretation is O(n). I ran some experiments with TPC-H Q1, varying the input table size, to determine what the cut-over point where codegen was beneficial was. The cutover was around 150k rows per node for both text and parquet. At 50k rows per node disabling codegen was very beneficial - around 0.12s versus 0.24s. To be somewhat conservative I set the default threshold to 50k rows. On more complex queries, e.g. TPC-H Q10, the cutover tends to be higher because there are plan nodes that process many fewer than the max rows. Fix a couple of minor issues in the frontend - the numNodes_ calculation could return 0 for Kudu, and the single node optimization didn't handle the case where for a scan node with conjuncts, a limit and missing stats correctly (it considered the estimate still valid.) Testing: Updated e2e tests that set disable_codegen to set disable_codegen_rows_threshold to 0, so that those tests run both with and without codegen still. Added an e2e test to make sure that the optimisation is applied in the backend. Added planner tests for various cases where codegen should and shouldn't be disabled. Perf: Added a targeted perf test for a join+agg over a small input, which benefits from this change. Change-Id: I273bcee58641f5b97de52c0b2caab043c914b32e Reviewed-on: http://gerrit.cloudera.org:8080/7153 Reviewed-by: Tim Armstrong Tested-by: Impala Public Jenkins --- be/src/exprs/expr-test.cc | 1 + be/src/service/query-options.cc | 16 +- be/src/service/query-options.h | 3 +- common/thrift/ImpalaInternalService.thrift | 4 + common/thrift/ImpalaService.thrift | 6 +- .../apache/impala/planner/HBaseScanNode.java | 2 +- .../apache/impala/planner/KuduScanNode.java | 2 +- .../org/apache/impala/planner/Planner.java | 63 +++-- .../impala/util/MaxRowsProcessedVisitor.java | 62 +++-- .../apache/impala/planner/PlannerTest.java | 7 + .../queries/PlannerTest/disable-codegen.test | 232 ++++++++++++++++++ .../PlannerTest/resource-requirements.test | 6 + .../queries/QueryTest/disable-codegen.test | 30 +++ .../QueryTest/stats-extrapolation.test | 4 + .../queries/primitive_small_join_1.test | 10 + tests/common/impala_test_suite.py | 3 +- tests/common/test_dimensions.py | 8 +- tests/query_test/test_codegen.py | 41 ++++ tests/query_test/test_decimal_queries.py | 4 +- tests/query_test/test_scanners_fuzz.py | 1 + tests/query_test/test_udfs.py | 1 + 21 files changed, 461 insertions(+), 45 deletions(-) create mode 100644 testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test create mode 100644 testdata/workloads/functional-query/queries/QueryTest/disable-codegen.test create mode 100644 testdata/workloads/targeted-perf/queries/primitive_small_join_1.test create mode 100644 tests/query_test/test_codegen.py diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc index 462716dc6..a8998ecf6 100644 --- a/be/src/exprs/expr-test.cc +++ b/be/src/exprs/expr-test.cc @@ -7312,6 +7312,7 @@ int main(int argc, char **argv) { executor_->PushExecOption("ENABLE_EXPR_REWRITES=0"); executor_->PushExecOption("DISABLE_CODEGEN=0"); executor_->PushExecOption("EXEC_SINGLE_NODE_ROWS_THRESHOLD=0"); + executor_->PushExecOption("DISABLE_CODEGEN_ROWS_THRESHOLD=0"); cout << endl << "Running with codegen" << endl; ret = RUN_ALL_TESTS(); if (ret != 0) return ret; diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc index f4c4f05f0..8dcd7afd3 100644 --- a/be/src/service/query-options.cc +++ b/be/src/service/query-options.cc @@ -366,8 +366,7 @@ Status impala::SetQueryOption(const string& key, const string& value, StringParser::ParseResult status; int val = StringParser::StringToInt(value.c_str(), value.size(), &status); if (status != StringParser::PARSE_SUCCESS) { - return Status(Substitute("Invalid number of runtime filters: '$0'.", - value.c_str())); + return Status(Substitute("Invalid number of runtime filters: '$0'.", value)); } if (val < 0) { return Status(Substitute("Invalid number of runtime filters: '$0'. " @@ -493,6 +492,19 @@ Status impala::SetQueryOption(const string& key, const string& value, } break; } + case TImpalaQueryOptions::DISABLE_CODEGEN_ROWS_THRESHOLD: { + StringParser::ParseResult status; + int val = StringParser::StringToInt(value.c_str(), value.size(), &status); + if (status != StringParser::PARSE_SUCCESS) { + return Status(Substitute("Invalid threshold: '$0'.", value)); + } + if (val < 0) { + return Status(Substitute( + "Invalid threshold: '$0'. Only positive values are allowed.", val)); + } + query_options->__set_disable_codegen_rows_threshold(val); + break; + } default: // We hit this DCHECK(false) if we forgot to add the corresponding entry here // when we add a new query option. diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h index 1f5624cd2..603c783ee 100644 --- a/be/src/service/query-options.h +++ b/be/src/service/query-options.h @@ -35,7 +35,7 @@ class TQueryOptions; // the DCHECK. #define QUERY_OPTS_TABLE\ DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\ - TImpalaQueryOptions::DEFAULT_JOIN_DISTRIBUTION_MODE + 1);\ + TImpalaQueryOptions::DISABLE_CODEGEN_ROWS_THRESHOLD + 1);\ QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\ QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR)\ QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\ @@ -92,6 +92,7 @@ class TQueryOptions; QUERY_OPT_FN(parquet_array_resolution, PARQUET_ARRAY_RESOLUTION)\ QUERY_OPT_FN(parquet_read_statistics, PARQUET_READ_STATISTICS)\ QUERY_OPT_FN(default_join_distribution_mode, DEFAULT_JOIN_DISTRIBUTION_MODE)\ + QUERY_OPT_FN(disable_codegen_rows_threshold, DISABLE_CODEGEN_ROWS_THRESHOLD)\ ; diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index f622ed444..c31cd7f21 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -251,6 +251,10 @@ struct TQueryOptions { // cardinality, e.g., because of missing table statistics. 56: optional TJoinDistributionMode default_join_distribution_mode = TJoinDistributionMode.BROADCAST + + // If the number of rows processed per node is below the threshold codegen will be + // automatically disabled by the planner. + 57: optional i32 disable_codegen_rows_threshold = 50000 } // Impala currently has two types of sessions: Beeswax and HiveServer2 diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index fb0016cf3..ec82bf15d 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -274,7 +274,11 @@ enum TImpalaQueryOptions { // Join distribution mode that is used when the join inputs have an unknown // cardinality, e.g., because of missing table statistics. - DEFAULT_JOIN_DISTRIBUTION_MODE + DEFAULT_JOIN_DISTRIBUTION_MODE, + + // If the number of rows processed per node is below the threshold and disable_codegen + // is unset, codegen will be automatically be disabled by the planner. + DISABLE_CODEGEN_ROWS_THRESHOLD, } // The summary of a DML statement. diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java index 223362fd9..48b772a1d 100644 --- a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java @@ -218,7 +218,7 @@ public class HBaseScanNode extends ScanNode { } // TODO: take actual regions into account - numNodes_ = tbl.getNumNodes(); + numNodes_ = Math.max(1, tbl.getNumNodes()); if (LOG.isTraceEnabled()) { LOG.trace("computeStats HbaseScan: #nodes=" + Integer.toString(numNodes_)); } diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java index 7c6c5e345..b1aa5ba00 100644 --- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java @@ -255,7 +255,7 @@ public class KuduScanNode extends ScanNode { protected void computeStats(Analyzer analyzer) { super.computeStats(analyzer); // Update the number of nodes to reflect the hosts that have relevant data. - numNodes_ = hostIndexSet_.size(); + numNodes_ = Math.max(1, hostIndexSet_.size()); // Update the cardinality inputCardinality_ = cardinality_ = kuduTable_.getNumRows(); diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java index 31a061b93..936847a1c 100644 --- a/fe/src/main/java/org/apache/impala/planner/Planner.java +++ b/fe/src/main/java/org/apache/impala/planner/Planner.java @@ -38,7 +38,6 @@ import org.apache.impala.common.PrintUtils; import org.apache.impala.common.RuntimeEnv; import org.apache.impala.service.BackendConfig; import org.apache.impala.thrift.TExplainLevel; -import org.apache.impala.thrift.TPartitionType; import org.apache.impala.thrift.TQueryCtx; import org.apache.impala.thrift.TQueryExecRequest; import org.apache.impala.thrift.TQueryOptions; @@ -91,24 +90,7 @@ public class Planner { ctx_.getAnalysisResult().getTimeline().markEvent("Single node plan created"); ArrayList fragments = null; - // Determine the maximum number of rows processed by any node in the plan tree - MaxRowsProcessedVisitor visitor = new MaxRowsProcessedVisitor(); - singleNodePlan.accept(visitor); - long maxRowsProcessed = visitor.get() == -1 ? Long.MAX_VALUE : visitor.get(); - boolean isSmallQuery = - maxRowsProcessed < ctx_.getQueryOptions().exec_single_node_rows_threshold; - if (isSmallQuery) { - // Execute on a single node and disable codegen for small results - ctx_.getQueryOptions().setNum_nodes(1); - ctx_.getQueryCtx().disable_codegen_hint = true; - if (maxRowsProcessed < ctx_.getQueryOptions().batch_size || - maxRowsProcessed < 1024 && ctx_.getQueryOptions().batch_size == 0) { - // Only one scanner thread for small queries - ctx_.getQueryOptions().setNum_scanner_threads(1); - } - // disable runtime filters - ctx_.getQueryOptions().setRuntime_filter_mode(TRuntimeFilterMode.OFF); - } + checkForSmallQueryOptimization(singleNodePlan); // Join rewrites. invertJoins(singleNodePlan, ctx_.isSingleNodeExec()); @@ -167,6 +149,10 @@ public class Planner { } rootFragment.setOutputExprs(resultExprs); + // The check for disabling codegen uses estimates of rows per node so must be done + // on the distributed plan. + checkForDisableCodegen(rootFragment.getPlanRoot()); + if (LOG.isTraceEnabled()) { LOG.trace("desctbl: " + ctx_.getRootAnalyzer().getDescTbl().debugString()); LOG.trace("resultexprs: " + Expr.debugString(rootFragment.getOutputExprs())); @@ -279,6 +265,9 @@ public class Planner { PrintUtils.printBytes(request.getPer_host_mem_estimate()))); hasHeader = true; } + if (request.query_ctx.disable_codegen_hint) { + str.append("Codegen disabled by planner\n"); + } // IMPALA-1983 In the case of corrupt stats, issue a warning for all queries except // child queries of 'compute stats'. @@ -482,6 +471,42 @@ public class Planner { return newJoinNode; } + private void checkForSmallQueryOptimization(PlanNode singleNodePlan) { + MaxRowsProcessedVisitor visitor = new MaxRowsProcessedVisitor(); + singleNodePlan.accept(visitor); + // TODO: IMPALA-3335: support the optimization for plans with joins. + if (!visitor.valid() || visitor.foundJoinNode()) return; + // This optimization executes the plan on a single node so the threshold must + // be based on the total number of rows processed. + long maxRowsProcessed = visitor.getMaxRowsProcessed(); + int threshold = ctx_.getQueryOptions().exec_single_node_rows_threshold; + if (maxRowsProcessed < threshold) { + // Execute on a single node and disable codegen for small results + ctx_.getQueryOptions().setNum_nodes(1); + ctx_.getQueryCtx().disable_codegen_hint = true; + if (maxRowsProcessed < ctx_.getQueryOptions().batch_size || + maxRowsProcessed < 1024 && ctx_.getQueryOptions().batch_size == 0) { + // Only one scanner thread for small queries + ctx_.getQueryOptions().setNum_scanner_threads(1); + } + // disable runtime filters + ctx_.getQueryOptions().setRuntime_filter_mode(TRuntimeFilterMode.OFF); + } + } + + private void checkForDisableCodegen(PlanNode distributedPlan) { + MaxRowsProcessedVisitor visitor = new MaxRowsProcessedVisitor(); + distributedPlan.accept(visitor); + if (!visitor.valid()) return; + // This heuristic threshold tries to determine if the per-node codegen time will + // reduce per-node execution time enough to justify the cost of codegen. Per-node + // execution time is correlated with the number of rows flowing through the plan. + if (visitor.getMaxRowsProcessedPerNode() + < ctx_.getQueryOptions().getDisable_codegen_rows_threshold()) { + ctx_.getQueryCtx().disable_codegen_hint = true; + } + } + /** * Insert a sort node on top of the plan, depending on the clustered/noclustered * plan hint and on the 'sort.columns' table property. If clustering is enabled in diff --git a/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java b/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java index d4b3da9c2..56cf047a6 100644 --- a/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java +++ b/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java @@ -17,8 +17,9 @@ package org.apache.impala.util; -import org.apache.impala.planner.NestedLoopJoinNode; -import org.apache.impala.planner.HashJoinNode; +import com.google.common.base.Preconditions; +import org.apache.impala.planner.JoinNode; +import org.apache.impala.planner.PlanFragment; import org.apache.impala.planner.PlanNode; import org.apache.impala.planner.ScanNode; @@ -27,40 +28,67 @@ import org.apache.impala.planner.ScanNode; */ public class MaxRowsProcessedVisitor implements Visitor { - private boolean abort_ = false; - private long result_ = -1l; + // True if we should abort because we don't have valid estimates + // for a plan node. + private boolean valid_ = true; + private boolean foundJoinNode_ = false; + + // Max number of rows processed across all instances of a plan node. + private long maxRowsProcessed_ = 0; + + // Max number of rows processed per backend impala daemon for a plan node. + private long maxRowsProcessedPerNode_ = 0; @Override public void visit(PlanNode caller) { - if (abort_) return; + if (!valid_) return; + if (caller instanceof JoinNode) foundJoinNode_ = true; + PlanFragment fragment = caller.getFragment(); + int numNodes = fragment == null ? 1 : fragment.getNumNodes(); if (caller instanceof ScanNode) { - long tmp = caller.getInputCardinality(); + long numRows = caller.getInputCardinality(); ScanNode scan = (ScanNode) caller; boolean missingStats = scan.isTableMissingStats() || scan.hasCorruptTableStats(); // In the absence of collection stats, treat scans on collections as if they // have no limit. - if (scan.isAccessingCollectionType() || (missingStats && !scan.hasLimit())) { - abort_ = true; + if (scan.isAccessingCollectionType() + || (missingStats && !(scan.hasLimit() && scan.getConjuncts().isEmpty()))) { + valid_ = false; return; } - result_ = Math.max(result_, tmp); - } else if (caller instanceof HashJoinNode || caller instanceof NestedLoopJoinNode) { - // Revisit when multiple scan nodes can be executed in a single fragment, IMPALA-561 - abort_ = true; - return; + maxRowsProcessed_ = Math.max(maxRowsProcessed_, numRows); + maxRowsProcessedPerNode_ = Math.max(maxRowsProcessedPerNode_, + (long)Math.ceil(numRows / (double)numNodes)); } else { long in = caller.getInputCardinality(); long out = caller.getCardinality(); if ((in == -1) || (out == -1)) { - abort_ = true; + valid_ = false; return; } - result_ = Math.max(result_, Math.max(in, out)); + long numRows = Math.max(in, out); + maxRowsProcessed_ = Math.max(maxRowsProcessed_, numRows); + maxRowsProcessedPerNode_ = Math.max(maxRowsProcessedPerNode_, + (long)Math.ceil(numRows / (double)numNodes)); } } - public long get() { - return abort_ ? -1 : result_; + public boolean valid() { return valid_; } + + public long getMaxRowsProcessed() { + Preconditions.checkState(valid_); + return maxRowsProcessed_; } + + public long getMaxRowsProcessedPerNode() { + Preconditions.checkState(valid_); + return maxRowsProcessedPerNode_; + } + + public boolean foundJoinNode() { + Preconditions.checkState(valid_); + return foundJoinNode_; + } + } diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java index 62c8d0dcf..b920555f9 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java @@ -252,6 +252,13 @@ public class PlannerTest extends PlannerTestBase { runPlannerTestFile("small-query-opt", options); } + @Test + public void testDisableCodegenOptimization() { + TQueryOptions options = new TQueryOptions(); + options.setDisable_codegen_rows_threshold(3000); + runPlannerTestFile("disable-codegen", options, false); + } + @Test public void testSingleNodeNlJoin() { TQueryOptions options = new TQueryOptions(); diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test b/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test new file mode 100644 index 000000000..ffabd6bc3 --- /dev/null +++ b/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test @@ -0,0 +1,232 @@ +# Rows per node is < 3000: codegen should be disabled. +select count(*) from functional.alltypes +---- DISTRIBUTEDPLAN +Per-Host Resource Reservation: Memory=0B +Per-Host Resource Estimates: Memory=138.00MB +Codegen disabled by planner + +PLAN-ROOT SINK +| +03:AGGREGATE [FINALIZE] +| output: count:merge(*) +| +02:EXCHANGE [UNPARTITIONED] +| +01:AGGREGATE +| output: count(*) +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +==== +# Rows per node is > 3000: codegen should be enabled. +select count(*) from functional.alltypesagg +---- DISTRIBUTEDPLAN +Per-Host Resource Reservation: Memory=0B +Per-Host Resource Estimates: Memory=90.00MB + +PLAN-ROOT SINK +| +03:AGGREGATE [FINALIZE] +| output: count:merge(*) +| +02:EXCHANGE [UNPARTITIONED] +| +01:AGGREGATE +| output: count(*) +| +00:SCAN HDFS [functional.alltypesagg] + partitions=11/11 files=11 size=814.73KB +==== +# No stats on functional_parquet: codegen should be disabled. +select count(*) from functional_parquet.alltypes +---- DISTRIBUTEDPLAN +Per-Host Resource Reservation: Memory=0B +Per-Host Resource Estimates: Memory=20.00MB +WARNING: The following tables are missing relevant table and/or column statistics. +functional_parquet.alltypes + +PLAN-ROOT SINK +| +03:AGGREGATE [FINALIZE] +| output: count:merge(*) +| +02:EXCHANGE [UNPARTITIONED] +| +01:AGGREGATE +| output: count(*) +| +00:SCAN HDFS [functional_parquet.alltypes] + partitions=24/24 files=24 size=178.13KB +==== +# > 3000 rows returned to coordinator: codegen should be enabled +select * from functional_parquet.alltypes +---- DISTRIBUTEDPLAN +Per-Host Resource Reservation: Memory=0B +Per-Host Resource Estimates: Memory=128.00MB +WARNING: The following tables are missing relevant table and/or column statistics. +functional_parquet.alltypes + +PLAN-ROOT SINK +| +01:EXCHANGE [UNPARTITIONED] +| +00:SCAN HDFS [functional_parquet.alltypes] + partitions=24/24 files=24 size=178.13KB +==== +# Optimisation is enabled for join producing < 3000 rows +select count(*) +from functional.alltypes t1 +join functional.alltypestiny t2 on t1.id = t2.id +---- DISTRIBUTEDPLAN +Per-Host Resource Reservation: Memory=136.00MB +Per-Host Resource Estimates: Memory=138.00MB +Codegen disabled by planner + +PLAN-ROOT SINK +| +06:AGGREGATE [FINALIZE] +| output: count:merge(*) +| +05:EXCHANGE [UNPARTITIONED] +| +03:AGGREGATE +| output: count(*) +| +02:HASH JOIN [INNER JOIN, BROADCAST] +| hash predicates: t1.id = t2.id +| runtime filters: RF000 <- t2.id +| +|--04:EXCHANGE [BROADCAST] +| | +| 01:SCAN HDFS [functional.alltypestiny t2] +| partitions=4/4 files=4 size=460B +| +00:SCAN HDFS [functional.alltypes t1] + partitions=24/24 files=24 size=478.45KB + runtime filters: RF000 -> t1.id +==== +# Optimisation is disabled by cross join producing > 3000 rows +select count(*) from functional.alltypes t1, functional.alltypes t2 +---- DISTRIBUTEDPLAN +Per-Host Resource Reservation: Memory=0B +Per-Host Resource Estimates: Memory=138.00MB + +PLAN-ROOT SINK +| +06:AGGREGATE [FINALIZE] +| output: count:merge(*) +| +05:EXCHANGE [UNPARTITIONED] +| +03:AGGREGATE +| output: count(*) +| +02:NESTED LOOP JOIN [CROSS JOIN, BROADCAST] +| +|--04:EXCHANGE [BROADCAST] +| | +| 01:SCAN HDFS [functional.alltypes t2] +| partitions=24/24 files=24 size=478.45KB +| +00:SCAN HDFS [functional.alltypes t1] + partitions=24/24 files=24 size=478.45KB +==== +# Optimisation is enabled for union producing < 3000 rows +select count(*) from ( + select * from functional.alltypes + union all + select * from functional.alltypestiny) v +---- DISTRIBUTEDPLAN +Per-Host Resource Reservation: Memory=0B +Per-Host Resource Estimates: Memory=170.00MB +Codegen disabled by planner + +PLAN-ROOT SINK +| +05:AGGREGATE [FINALIZE] +| output: count:merge(*) +| +04:EXCHANGE [UNPARTITIONED] +| +03:AGGREGATE +| output: count(*) +| +00:UNION +| pass-through-operands: all +| +|--02:SCAN HDFS [functional.alltypestiny] +| partitions=4/4 files=4 size=460B +| +01:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +==== +# Optimisation is disabled by union producing > 3000 rows +select count(*) from ( + select * from functional.alltypes + union all + select * from functional.alltypes) v +---- DISTRIBUTEDPLAN +Per-Host Resource Reservation: Memory=0B +Per-Host Resource Estimates: Memory=266.00MB + +PLAN-ROOT SINK +| +05:AGGREGATE [FINALIZE] +| output: count:merge(*) +| +04:EXCHANGE [UNPARTITIONED] +| +03:AGGREGATE +| output: count(*) +| +00:UNION +| pass-through-operands: all +| +|--02:SCAN HDFS [functional.alltypes] +| partitions=24/24 files=24 size=478.45KB +| +01:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +==== +# Scan with limit on large table: the number of rows scanned is bounded, +# codegen should be disabled +select sum(l_discount) +from (select * from tpch.lineitem limit 1000) v +---- DISTRIBUTEDPLAN +Per-Host Resource Reservation: Memory=0B +Per-Host Resource Estimates: Memory=274.00MB +Codegen disabled by planner + +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: sum(tpch.lineitem.l_discount) +| +02:EXCHANGE [UNPARTITIONED] +| limit: 1000 +| +00:SCAN HDFS [tpch.lineitem] + partitions=1/1 files=1 size=718.94MB + limit: 1000 +==== +# Scan with limit and predicates on large table: any number of rows could be scanned: +# codegen should be enabled +select sum(l_discount) +from (select * from tpch.lineitem where l_orderkey > 100 limit 1000) v +---- DISTRIBUTEDPLAN +Per-Host Resource Reservation: Memory=0B +Per-Host Resource Estimates: Memory=274.00MB + +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: sum(tpch.lineitem.l_discount) +| +02:EXCHANGE [UNPARTITIONED] +| limit: 1000 +| +00:SCAN HDFS [tpch.lineitem] + partitions=1/1 files=1 size=718.94MB + predicates: l_orderkey > 100 + limit: 1000 +==== diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test index fb0cd3dae..e49938cfd 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test @@ -676,6 +676,7 @@ select * from functional.alltypes where 1 = 2 ---- DISTRIBUTEDPLAN Per-Host Resource Reservation: Memory=0B Per-Host Resource Estimates: Memory=10.00MB +Codegen disabled by planner F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK @@ -687,6 +688,7 @@ PLAN-ROOT SINK ---- PARALLELPLANS Per-Host Resource Reservation: Memory=0B Per-Host Resource Estimates: Memory=10.00MB +Codegen disabled by planner F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK @@ -702,6 +704,7 @@ from functional.alltypes ---- DISTRIBUTEDPLAN Per-Host Resource Reservation: Memory=40.00MB Per-Host Resource Estimates: Memory=24.00MB +Codegen disabled by planner F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK @@ -738,6 +741,7 @@ F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 ---- PARALLELPLANS Per-Host Resource Reservation: Memory=80.00MB Per-Host Resource Estimates: Memory=48.00MB +Codegen disabled by planner F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK @@ -1080,6 +1084,7 @@ where year=2009 and month=05 ---- DISTRIBUTEDPLAN Per-Host Resource Reservation: Memory=0B Per-Host Resource Estimates: Memory=16.03MB +Codegen disabled by planner F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false] @@ -1096,6 +1101,7 @@ WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false] ---- PARALLELPLANS Per-Host Resource Reservation: Memory=0B Per-Host Resource Estimates: Memory=32.03MB +Codegen disabled by planner F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=2 WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false] diff --git a/testdata/workloads/functional-query/queries/QueryTest/disable-codegen.test b/testdata/workloads/functional-query/queries/QueryTest/disable-codegen.test new file mode 100644 index 000000000..dee0126c1 --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/disable-codegen.test @@ -0,0 +1,30 @@ +==== +---- QUERY +# alltypes has 7300 rows - codegen should be enabled if there +# are < 1000 backend daemons. +set disable_codegen_rows_threshold=8; +select count(*) from alltypes t1 + join alltypestiny t2 on t1.id = t2.id +---- RESULTS +8 +---- TYPES +bigint +---- RUNTIME_PROFILE +# Verify that codegen was enabled for join and scan +row_regex: .*Build Side Codegen Enabled.* +row_regex: .*TEXT Codegen Enabled.* +==== +---- QUERY +# alltypes has 7300 rows - codegen should be disabled regardless +# of # of backend impala daemons. +set disable_codegen_rows_threshold=8000; +select count(*) from alltypes t1 + join alltypestiny t2 on t1.id = t2.id +---- RESULTS +8 +---- TYPES +bigint +---- RUNTIME_PROFILE +# Verify that codegen was disabled +row_regex: .*Codegen Disabled: disabled due to optimization hints.* +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test index c22095b7f..9de548048 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test +++ b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test @@ -29,6 +29,7 @@ explain select id from alltypes; ---- RESULTS: VERIFY_IS_EQUAL 'Per-Host Resource Reservation: Memory=0B' 'Per-Host Resource Estimates: Memory=16.00MB' +'Codegen disabled by planner' '' 'F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1' 'PLAN-ROOT SINK' @@ -50,6 +51,7 @@ explain select id from alltypes where month in (1, 2, 3); ---- RESULTS: VERIFY_IS_EQUAL 'Per-Host Resource Reservation: Memory=0B' 'Per-Host Resource Estimates: Memory=16.00MB' +'Codegen disabled by planner' '' 'F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1' 'PLAN-ROOT SINK' @@ -96,6 +98,7 @@ explain select id from alltypes where year = 2010; ---- RESULTS: VERIFY_IS_EQUAL 'Per-Host Resource Reservation: Memory=0B' 'Per-Host Resource Estimates: Memory=16.00MB' +'Codegen disabled by planner' '' 'F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1' 'PLAN-ROOT SINK' @@ -118,6 +121,7 @@ explain select id from alltypes where year = 2010; ---- RESULTS: VERIFY_IS_EQUAL 'Per-Host Resource Reservation: Memory=0B' 'Per-Host Resource Estimates: Memory=16.00MB' +'Codegen disabled by planner' '' 'F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1' 'PLAN-ROOT SINK' diff --git a/testdata/workloads/targeted-perf/queries/primitive_small_join_1.test b/testdata/workloads/targeted-perf/queries/primitive_small_join_1.test new file mode 100644 index 000000000..c6c5df28b --- /dev/null +++ b/testdata/workloads/targeted-perf/queries/primitive_small_join_1.test @@ -0,0 +1,10 @@ +==== +---- QUERY: primitive_small_join_1 +-- Description : join with small input on both sides +-- Target test case : Small queries where cost of codegen exceeds benefit. +SELECT count(*) +FROM (SELECT * FROM customer LIMIT 100000) c +JOIN nation ON c_nationkey = n_nationkey +---- RESULTS +---- TYPES +==== diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py index dda020347..d5841b270 100644 --- a/tests/common/impala_test_suite.py +++ b/tests/common/impala_test_suite.py @@ -682,7 +682,8 @@ class ImpalaTestSuite(BaseTestSuite): cluster_sizes = ALL_NODES_ONLY return create_exec_option_dimension(cluster_sizes, disable_codegen_options, batch_sizes, - exec_single_node_option=exec_single_node_option) + exec_single_node_option=exec_single_node_option, + disable_codegen_rows_threshold_options=[0]) @classmethod def exploration_strategy(cls): diff --git a/tests/common/test_dimensions.py b/tests/common/test_dimensions.py index 20ccbeb00..4171e1f2f 100644 --- a/tests/common/test_dimensions.py +++ b/tests/common/test_dimensions.py @@ -135,17 +135,23 @@ def create_single_exec_option_dimension(): """Creates an exec_option dimension that will produce a single test vector""" return create_exec_option_dimension(cluster_sizes=ALL_NODES_ONLY, disable_codegen_options=[False], + # Make sure codegen kicks in for functional.alltypes. + disable_codegen_rows_threshold_options=[5000], batch_sizes=[0]) def create_exec_option_dimension(cluster_sizes=ALL_CLUSTER_SIZES, disable_codegen_options=ALL_DISABLE_CODEGEN_OPTIONS, batch_sizes=ALL_BATCH_SIZES, - sync_ddl=None, exec_single_node_option=[0]): + sync_ddl=None, exec_single_node_option=[0], + # We already run with codegen on and off explicitly - + # don't need automatic toggling. + disable_codegen_rows_threshold_options=[0]): exec_option_dimensions = { 'abort_on_error': [1], 'exec_single_node_rows_threshold': exec_single_node_option, 'batch_size': batch_sizes, 'disable_codegen': disable_codegen_options, + 'disable_codegen_rows_threshold': disable_codegen_rows_threshold_options, 'num_nodes': cluster_sizes} if sync_ddl is not None: diff --git a/tests/query_test/test_codegen.py b/tests/query_test/test_codegen.py new file mode 100644 index 000000000..8a4390a9d --- /dev/null +++ b/tests/query_test/test_codegen.py @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Tests end-to-end codegen behaviour. + +from tests.common.impala_test_suite import ImpalaTestSuite +from tests.common.test_dimensions import create_exec_option_dimension_from_dict + +class TestCodegen(ImpalaTestSuite): + @classmethod + def get_workload(self): + return 'functional-query' + + @classmethod + def add_test_dimensions(cls): + super(TestCodegen, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension_from_dict({ + 'exec_single_node_rows_threshold' : [0]})) + # No need to run this on all file formats. Run it on text/none, which has stats + # computed. + cls.ImpalaTestMatrix.add_constraint( + lambda v: v.get_value('table_format').file_format == 'text' and + v.get_value('table_format').compression_codec == 'none') + + def test_disable_codegen(self, vector): + """Test that codegen is enabled/disabled by the planner as expected.""" + self.run_test_case('QueryTest/disable-codegen', vector) diff --git a/tests/query_test/test_decimal_queries.py b/tests/query_test/test_decimal_queries.py index 036d879c9..a250ae2b6 100644 --- a/tests/query_test/test_decimal_queries.py +++ b/tests/query_test/test_decimal_queries.py @@ -34,7 +34,9 @@ class TestDecimalQueries(ImpalaTestSuite): cls.ImpalaTestMatrix.add_dimension( create_exec_option_dimension_from_dict({ 'decimal_v2' : ['false', 'true'], - 'batch_size' : [0, 1]})) + 'batch_size' : [0, 1], + 'disable_codegen' : ['false', 'true'], + 'disable_codegen_rows_threshold' : [0]})) # Hive < 0.11 does not support decimal so we can't run these tests against the other # file formats. # TODO: Enable them on Hive >= 0.11. diff --git a/tests/query_test/test_scanners_fuzz.py b/tests/query_test/test_scanners_fuzz.py index 82b4ec98e..8e521d8d1 100644 --- a/tests/query_test/test_scanners_fuzz.py +++ b/tests/query_test/test_scanners_fuzz.py @@ -150,6 +150,7 @@ class TestScannersFuzzing(ImpalaTestSuite): query_options = copy(vector.get_value('exec_option')) query_options['batch_size'] = batch_size query_options['disable_codegen'] = disable_codegen + query_options['disable_codegen_rows_threshold'] = 0 try: result = self.execute_query(query, query_options = query_options) LOG.info('\n'.join(result.log)) diff --git a/tests/query_test/test_udfs.py b/tests/query_test/test_udfs.py index e8e4eadf4..eab599401 100644 --- a/tests/query_test/test_udfs.py +++ b/tests/query_test/test_udfs.py @@ -261,6 +261,7 @@ class TestUdfExecution(TestUdfBase): super(TestUdfExecution, cls).add_test_dimensions() cls.ImpalaTestMatrix.add_dimension( create_exec_option_dimension_from_dict({"disable_codegen" : [False, True], + "disable_codegen_rows_threshold" : [0], "exec_single_node_rows_threshold" : [0,100], "enable_expr_rewrites" : [False, True]})) # There is no reason to run these tests using all dimensions.