diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc index 462716dc6..a8998ecf6 100644 --- a/be/src/exprs/expr-test.cc +++ b/be/src/exprs/expr-test.cc @@ -7312,6 +7312,7 @@ int main(int argc, char **argv) { executor_->PushExecOption("ENABLE_EXPR_REWRITES=0"); executor_->PushExecOption("DISABLE_CODEGEN=0"); executor_->PushExecOption("EXEC_SINGLE_NODE_ROWS_THRESHOLD=0"); + executor_->PushExecOption("DISABLE_CODEGEN_ROWS_THRESHOLD=0"); cout << endl << "Running with codegen" << endl; ret = RUN_ALL_TESTS(); if (ret != 0) return ret; diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc index f4c4f05f0..8dcd7afd3 100644 --- a/be/src/service/query-options.cc +++ b/be/src/service/query-options.cc @@ -366,8 +366,7 @@ Status impala::SetQueryOption(const string& key, const string& value, StringParser::ParseResult status; int val = StringParser::StringToInt(value.c_str(), value.size(), &status); if (status != StringParser::PARSE_SUCCESS) { - return Status(Substitute("Invalid number of runtime filters: '$0'.", - value.c_str())); + return Status(Substitute("Invalid number of runtime filters: '$0'.", value)); } if (val < 0) { return Status(Substitute("Invalid number of runtime filters: '$0'. " @@ -493,6 +492,19 @@ Status impala::SetQueryOption(const string& key, const string& value, } break; } + case TImpalaQueryOptions::DISABLE_CODEGEN_ROWS_THRESHOLD: { + StringParser::ParseResult status; + int val = StringParser::StringToInt(value.c_str(), value.size(), &status); + if (status != StringParser::PARSE_SUCCESS) { + return Status(Substitute("Invalid threshold: '$0'.", value)); + } + if (val < 0) { + return Status(Substitute( + "Invalid threshold: '$0'. Only positive values are allowed.", val)); + } + query_options->__set_disable_codegen_rows_threshold(val); + break; + } default: // We hit this DCHECK(false) if we forgot to add the corresponding entry here // when we add a new query option. diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h index 1f5624cd2..603c783ee 100644 --- a/be/src/service/query-options.h +++ b/be/src/service/query-options.h @@ -35,7 +35,7 @@ class TQueryOptions; // the DCHECK. #define QUERY_OPTS_TABLE\ DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\ - TImpalaQueryOptions::DEFAULT_JOIN_DISTRIBUTION_MODE + 1);\ + TImpalaQueryOptions::DISABLE_CODEGEN_ROWS_THRESHOLD + 1);\ QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\ QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR)\ QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\ @@ -92,6 +92,7 @@ class TQueryOptions; QUERY_OPT_FN(parquet_array_resolution, PARQUET_ARRAY_RESOLUTION)\ QUERY_OPT_FN(parquet_read_statistics, PARQUET_READ_STATISTICS)\ QUERY_OPT_FN(default_join_distribution_mode, DEFAULT_JOIN_DISTRIBUTION_MODE)\ + QUERY_OPT_FN(disable_codegen_rows_threshold, DISABLE_CODEGEN_ROWS_THRESHOLD)\ ; diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index f622ed444..c31cd7f21 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -251,6 +251,10 @@ struct TQueryOptions { // cardinality, e.g., because of missing table statistics. 56: optional TJoinDistributionMode default_join_distribution_mode = TJoinDistributionMode.BROADCAST + + // If the number of rows processed per node is below the threshold codegen will be + // automatically disabled by the planner. + 57: optional i32 disable_codegen_rows_threshold = 50000 } // Impala currently has two types of sessions: Beeswax and HiveServer2 diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index fb0016cf3..ec82bf15d 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -274,7 +274,11 @@ enum TImpalaQueryOptions { // Join distribution mode that is used when the join inputs have an unknown // cardinality, e.g., because of missing table statistics. - DEFAULT_JOIN_DISTRIBUTION_MODE + DEFAULT_JOIN_DISTRIBUTION_MODE, + + // If the number of rows processed per node is below the threshold and disable_codegen + // is unset, codegen will be automatically be disabled by the planner. + DISABLE_CODEGEN_ROWS_THRESHOLD, } // The summary of a DML statement. diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java index 223362fd9..48b772a1d 100644 --- a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java @@ -218,7 +218,7 @@ public class HBaseScanNode extends ScanNode { } // TODO: take actual regions into account - numNodes_ = tbl.getNumNodes(); + numNodes_ = Math.max(1, tbl.getNumNodes()); if (LOG.isTraceEnabled()) { LOG.trace("computeStats HbaseScan: #nodes=" + Integer.toString(numNodes_)); } diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java index 7c6c5e345..b1aa5ba00 100644 --- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java @@ -255,7 +255,7 @@ public class KuduScanNode extends ScanNode { protected void computeStats(Analyzer analyzer) { super.computeStats(analyzer); // Update the number of nodes to reflect the hosts that have relevant data. - numNodes_ = hostIndexSet_.size(); + numNodes_ = Math.max(1, hostIndexSet_.size()); // Update the cardinality inputCardinality_ = cardinality_ = kuduTable_.getNumRows(); diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java index 31a061b93..936847a1c 100644 --- a/fe/src/main/java/org/apache/impala/planner/Planner.java +++ b/fe/src/main/java/org/apache/impala/planner/Planner.java @@ -38,7 +38,6 @@ import org.apache.impala.common.PrintUtils; import org.apache.impala.common.RuntimeEnv; import org.apache.impala.service.BackendConfig; import org.apache.impala.thrift.TExplainLevel; -import org.apache.impala.thrift.TPartitionType; import org.apache.impala.thrift.TQueryCtx; import org.apache.impala.thrift.TQueryExecRequest; import org.apache.impala.thrift.TQueryOptions; @@ -91,24 +90,7 @@ public class Planner { ctx_.getAnalysisResult().getTimeline().markEvent("Single node plan created"); ArrayList fragments = null; - // Determine the maximum number of rows processed by any node in the plan tree - MaxRowsProcessedVisitor visitor = new MaxRowsProcessedVisitor(); - singleNodePlan.accept(visitor); - long maxRowsProcessed = visitor.get() == -1 ? Long.MAX_VALUE : visitor.get(); - boolean isSmallQuery = - maxRowsProcessed < ctx_.getQueryOptions().exec_single_node_rows_threshold; - if (isSmallQuery) { - // Execute on a single node and disable codegen for small results - ctx_.getQueryOptions().setNum_nodes(1); - ctx_.getQueryCtx().disable_codegen_hint = true; - if (maxRowsProcessed < ctx_.getQueryOptions().batch_size || - maxRowsProcessed < 1024 && ctx_.getQueryOptions().batch_size == 0) { - // Only one scanner thread for small queries - ctx_.getQueryOptions().setNum_scanner_threads(1); - } - // disable runtime filters - ctx_.getQueryOptions().setRuntime_filter_mode(TRuntimeFilterMode.OFF); - } + checkForSmallQueryOptimization(singleNodePlan); // Join rewrites. invertJoins(singleNodePlan, ctx_.isSingleNodeExec()); @@ -167,6 +149,10 @@ public class Planner { } rootFragment.setOutputExprs(resultExprs); + // The check for disabling codegen uses estimates of rows per node so must be done + // on the distributed plan. + checkForDisableCodegen(rootFragment.getPlanRoot()); + if (LOG.isTraceEnabled()) { LOG.trace("desctbl: " + ctx_.getRootAnalyzer().getDescTbl().debugString()); LOG.trace("resultexprs: " + Expr.debugString(rootFragment.getOutputExprs())); @@ -279,6 +265,9 @@ public class Planner { PrintUtils.printBytes(request.getPer_host_mem_estimate()))); hasHeader = true; } + if (request.query_ctx.disable_codegen_hint) { + str.append("Codegen disabled by planner\n"); + } // IMPALA-1983 In the case of corrupt stats, issue a warning for all queries except // child queries of 'compute stats'. @@ -482,6 +471,42 @@ public class Planner { return newJoinNode; } + private void checkForSmallQueryOptimization(PlanNode singleNodePlan) { + MaxRowsProcessedVisitor visitor = new MaxRowsProcessedVisitor(); + singleNodePlan.accept(visitor); + // TODO: IMPALA-3335: support the optimization for plans with joins. + if (!visitor.valid() || visitor.foundJoinNode()) return; + // This optimization executes the plan on a single node so the threshold must + // be based on the total number of rows processed. + long maxRowsProcessed = visitor.getMaxRowsProcessed(); + int threshold = ctx_.getQueryOptions().exec_single_node_rows_threshold; + if (maxRowsProcessed < threshold) { + // Execute on a single node and disable codegen for small results + ctx_.getQueryOptions().setNum_nodes(1); + ctx_.getQueryCtx().disable_codegen_hint = true; + if (maxRowsProcessed < ctx_.getQueryOptions().batch_size || + maxRowsProcessed < 1024 && ctx_.getQueryOptions().batch_size == 0) { + // Only one scanner thread for small queries + ctx_.getQueryOptions().setNum_scanner_threads(1); + } + // disable runtime filters + ctx_.getQueryOptions().setRuntime_filter_mode(TRuntimeFilterMode.OFF); + } + } + + private void checkForDisableCodegen(PlanNode distributedPlan) { + MaxRowsProcessedVisitor visitor = new MaxRowsProcessedVisitor(); + distributedPlan.accept(visitor); + if (!visitor.valid()) return; + // This heuristic threshold tries to determine if the per-node codegen time will + // reduce per-node execution time enough to justify the cost of codegen. Per-node + // execution time is correlated with the number of rows flowing through the plan. + if (visitor.getMaxRowsProcessedPerNode() + < ctx_.getQueryOptions().getDisable_codegen_rows_threshold()) { + ctx_.getQueryCtx().disable_codegen_hint = true; + } + } + /** * Insert a sort node on top of the plan, depending on the clustered/noclustered * plan hint and on the 'sort.columns' table property. If clustering is enabled in diff --git a/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java b/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java index d4b3da9c2..56cf047a6 100644 --- a/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java +++ b/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java @@ -17,8 +17,9 @@ package org.apache.impala.util; -import org.apache.impala.planner.NestedLoopJoinNode; -import org.apache.impala.planner.HashJoinNode; +import com.google.common.base.Preconditions; +import org.apache.impala.planner.JoinNode; +import org.apache.impala.planner.PlanFragment; import org.apache.impala.planner.PlanNode; import org.apache.impala.planner.ScanNode; @@ -27,40 +28,67 @@ import org.apache.impala.planner.ScanNode; */ public class MaxRowsProcessedVisitor implements Visitor { - private boolean abort_ = false; - private long result_ = -1l; + // True if we should abort because we don't have valid estimates + // for a plan node. + private boolean valid_ = true; + private boolean foundJoinNode_ = false; + + // Max number of rows processed across all instances of a plan node. + private long maxRowsProcessed_ = 0; + + // Max number of rows processed per backend impala daemon for a plan node. + private long maxRowsProcessedPerNode_ = 0; @Override public void visit(PlanNode caller) { - if (abort_) return; + if (!valid_) return; + if (caller instanceof JoinNode) foundJoinNode_ = true; + PlanFragment fragment = caller.getFragment(); + int numNodes = fragment == null ? 1 : fragment.getNumNodes(); if (caller instanceof ScanNode) { - long tmp = caller.getInputCardinality(); + long numRows = caller.getInputCardinality(); ScanNode scan = (ScanNode) caller; boolean missingStats = scan.isTableMissingStats() || scan.hasCorruptTableStats(); // In the absence of collection stats, treat scans on collections as if they // have no limit. - if (scan.isAccessingCollectionType() || (missingStats && !scan.hasLimit())) { - abort_ = true; + if (scan.isAccessingCollectionType() + || (missingStats && !(scan.hasLimit() && scan.getConjuncts().isEmpty()))) { + valid_ = false; return; } - result_ = Math.max(result_, tmp); - } else if (caller instanceof HashJoinNode || caller instanceof NestedLoopJoinNode) { - // Revisit when multiple scan nodes can be executed in a single fragment, IMPALA-561 - abort_ = true; - return; + maxRowsProcessed_ = Math.max(maxRowsProcessed_, numRows); + maxRowsProcessedPerNode_ = Math.max(maxRowsProcessedPerNode_, + (long)Math.ceil(numRows / (double)numNodes)); } else { long in = caller.getInputCardinality(); long out = caller.getCardinality(); if ((in == -1) || (out == -1)) { - abort_ = true; + valid_ = false; return; } - result_ = Math.max(result_, Math.max(in, out)); + long numRows = Math.max(in, out); + maxRowsProcessed_ = Math.max(maxRowsProcessed_, numRows); + maxRowsProcessedPerNode_ = Math.max(maxRowsProcessedPerNode_, + (long)Math.ceil(numRows / (double)numNodes)); } } - public long get() { - return abort_ ? -1 : result_; + public boolean valid() { return valid_; } + + public long getMaxRowsProcessed() { + Preconditions.checkState(valid_); + return maxRowsProcessed_; } + + public long getMaxRowsProcessedPerNode() { + Preconditions.checkState(valid_); + return maxRowsProcessedPerNode_; + } + + public boolean foundJoinNode() { + Preconditions.checkState(valid_); + return foundJoinNode_; + } + } diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java index 62c8d0dcf..b920555f9 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java @@ -252,6 +252,13 @@ public class PlannerTest extends PlannerTestBase { runPlannerTestFile("small-query-opt", options); } + @Test + public void testDisableCodegenOptimization() { + TQueryOptions options = new TQueryOptions(); + options.setDisable_codegen_rows_threshold(3000); + runPlannerTestFile("disable-codegen", options, false); + } + @Test public void testSingleNodeNlJoin() { TQueryOptions options = new TQueryOptions(); diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test b/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test new file mode 100644 index 000000000..ffabd6bc3 --- /dev/null +++ b/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test @@ -0,0 +1,232 @@ +# Rows per node is < 3000: codegen should be disabled. +select count(*) from functional.alltypes +---- DISTRIBUTEDPLAN +Per-Host Resource Reservation: Memory=0B +Per-Host Resource Estimates: Memory=138.00MB +Codegen disabled by planner + +PLAN-ROOT SINK +| +03:AGGREGATE [FINALIZE] +| output: count:merge(*) +| +02:EXCHANGE [UNPARTITIONED] +| +01:AGGREGATE +| output: count(*) +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +==== +# Rows per node is > 3000: codegen should be enabled. +select count(*) from functional.alltypesagg +---- DISTRIBUTEDPLAN +Per-Host Resource Reservation: Memory=0B +Per-Host Resource Estimates: Memory=90.00MB + +PLAN-ROOT SINK +| +03:AGGREGATE [FINALIZE] +| output: count:merge(*) +| +02:EXCHANGE [UNPARTITIONED] +| +01:AGGREGATE +| output: count(*) +| +00:SCAN HDFS [functional.alltypesagg] + partitions=11/11 files=11 size=814.73KB +==== +# No stats on functional_parquet: codegen should be disabled. +select count(*) from functional_parquet.alltypes +---- DISTRIBUTEDPLAN +Per-Host Resource Reservation: Memory=0B +Per-Host Resource Estimates: Memory=20.00MB +WARNING: The following tables are missing relevant table and/or column statistics. +functional_parquet.alltypes + +PLAN-ROOT SINK +| +03:AGGREGATE [FINALIZE] +| output: count:merge(*) +| +02:EXCHANGE [UNPARTITIONED] +| +01:AGGREGATE +| output: count(*) +| +00:SCAN HDFS [functional_parquet.alltypes] + partitions=24/24 files=24 size=178.13KB +==== +# > 3000 rows returned to coordinator: codegen should be enabled +select * from functional_parquet.alltypes +---- DISTRIBUTEDPLAN +Per-Host Resource Reservation: Memory=0B +Per-Host Resource Estimates: Memory=128.00MB +WARNING: The following tables are missing relevant table and/or column statistics. +functional_parquet.alltypes + +PLAN-ROOT SINK +| +01:EXCHANGE [UNPARTITIONED] +| +00:SCAN HDFS [functional_parquet.alltypes] + partitions=24/24 files=24 size=178.13KB +==== +# Optimisation is enabled for join producing < 3000 rows +select count(*) +from functional.alltypes t1 +join functional.alltypestiny t2 on t1.id = t2.id +---- DISTRIBUTEDPLAN +Per-Host Resource Reservation: Memory=136.00MB +Per-Host Resource Estimates: Memory=138.00MB +Codegen disabled by planner + +PLAN-ROOT SINK +| +06:AGGREGATE [FINALIZE] +| output: count:merge(*) +| +05:EXCHANGE [UNPARTITIONED] +| +03:AGGREGATE +| output: count(*) +| +02:HASH JOIN [INNER JOIN, BROADCAST] +| hash predicates: t1.id = t2.id +| runtime filters: RF000 <- t2.id +| +|--04:EXCHANGE [BROADCAST] +| | +| 01:SCAN HDFS [functional.alltypestiny t2] +| partitions=4/4 files=4 size=460B +| +00:SCAN HDFS [functional.alltypes t1] + partitions=24/24 files=24 size=478.45KB + runtime filters: RF000 -> t1.id +==== +# Optimisation is disabled by cross join producing > 3000 rows +select count(*) from functional.alltypes t1, functional.alltypes t2 +---- DISTRIBUTEDPLAN +Per-Host Resource Reservation: Memory=0B +Per-Host Resource Estimates: Memory=138.00MB + +PLAN-ROOT SINK +| +06:AGGREGATE [FINALIZE] +| output: count:merge(*) +| +05:EXCHANGE [UNPARTITIONED] +| +03:AGGREGATE +| output: count(*) +| +02:NESTED LOOP JOIN [CROSS JOIN, BROADCAST] +| +|--04:EXCHANGE [BROADCAST] +| | +| 01:SCAN HDFS [functional.alltypes t2] +| partitions=24/24 files=24 size=478.45KB +| +00:SCAN HDFS [functional.alltypes t1] + partitions=24/24 files=24 size=478.45KB +==== +# Optimisation is enabled for union producing < 3000 rows +select count(*) from ( + select * from functional.alltypes + union all + select * from functional.alltypestiny) v +---- DISTRIBUTEDPLAN +Per-Host Resource Reservation: Memory=0B +Per-Host Resource Estimates: Memory=170.00MB +Codegen disabled by planner + +PLAN-ROOT SINK +| +05:AGGREGATE [FINALIZE] +| output: count:merge(*) +| +04:EXCHANGE [UNPARTITIONED] +| +03:AGGREGATE +| output: count(*) +| +00:UNION +| pass-through-operands: all +| +|--02:SCAN HDFS [functional.alltypestiny] +| partitions=4/4 files=4 size=460B +| +01:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +==== +# Optimisation is disabled by union producing > 3000 rows +select count(*) from ( + select * from functional.alltypes + union all + select * from functional.alltypes) v +---- DISTRIBUTEDPLAN +Per-Host Resource Reservation: Memory=0B +Per-Host Resource Estimates: Memory=266.00MB + +PLAN-ROOT SINK +| +05:AGGREGATE [FINALIZE] +| output: count:merge(*) +| +04:EXCHANGE [UNPARTITIONED] +| +03:AGGREGATE +| output: count(*) +| +00:UNION +| pass-through-operands: all +| +|--02:SCAN HDFS [functional.alltypes] +| partitions=24/24 files=24 size=478.45KB +| +01:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +==== +# Scan with limit on large table: the number of rows scanned is bounded, +# codegen should be disabled +select sum(l_discount) +from (select * from tpch.lineitem limit 1000) v +---- DISTRIBUTEDPLAN +Per-Host Resource Reservation: Memory=0B +Per-Host Resource Estimates: Memory=274.00MB +Codegen disabled by planner + +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: sum(tpch.lineitem.l_discount) +| +02:EXCHANGE [UNPARTITIONED] +| limit: 1000 +| +00:SCAN HDFS [tpch.lineitem] + partitions=1/1 files=1 size=718.94MB + limit: 1000 +==== +# Scan with limit and predicates on large table: any number of rows could be scanned: +# codegen should be enabled +select sum(l_discount) +from (select * from tpch.lineitem where l_orderkey > 100 limit 1000) v +---- DISTRIBUTEDPLAN +Per-Host Resource Reservation: Memory=0B +Per-Host Resource Estimates: Memory=274.00MB + +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: sum(tpch.lineitem.l_discount) +| +02:EXCHANGE [UNPARTITIONED] +| limit: 1000 +| +00:SCAN HDFS [tpch.lineitem] + partitions=1/1 files=1 size=718.94MB + predicates: l_orderkey > 100 + limit: 1000 +==== diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test index fb0cd3dae..e49938cfd 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test @@ -676,6 +676,7 @@ select * from functional.alltypes where 1 = 2 ---- DISTRIBUTEDPLAN Per-Host Resource Reservation: Memory=0B Per-Host Resource Estimates: Memory=10.00MB +Codegen disabled by planner F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK @@ -687,6 +688,7 @@ PLAN-ROOT SINK ---- PARALLELPLANS Per-Host Resource Reservation: Memory=0B Per-Host Resource Estimates: Memory=10.00MB +Codegen disabled by planner F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK @@ -702,6 +704,7 @@ from functional.alltypes ---- DISTRIBUTEDPLAN Per-Host Resource Reservation: Memory=40.00MB Per-Host Resource Estimates: Memory=24.00MB +Codegen disabled by planner F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK @@ -738,6 +741,7 @@ F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 ---- PARALLELPLANS Per-Host Resource Reservation: Memory=80.00MB Per-Host Resource Estimates: Memory=48.00MB +Codegen disabled by planner F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK @@ -1080,6 +1084,7 @@ where year=2009 and month=05 ---- DISTRIBUTEDPLAN Per-Host Resource Reservation: Memory=0B Per-Host Resource Estimates: Memory=16.03MB +Codegen disabled by planner F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1 WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false] @@ -1096,6 +1101,7 @@ WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false] ---- PARALLELPLANS Per-Host Resource Reservation: Memory=0B Per-Host Resource Estimates: Memory=32.03MB +Codegen disabled by planner F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=2 WRITE TO HDFS [functional.alltypesnopart, OVERWRITE=false] diff --git a/testdata/workloads/functional-query/queries/QueryTest/disable-codegen.test b/testdata/workloads/functional-query/queries/QueryTest/disable-codegen.test new file mode 100644 index 000000000..dee0126c1 --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/disable-codegen.test @@ -0,0 +1,30 @@ +==== +---- QUERY +# alltypes has 7300 rows - codegen should be enabled if there +# are < 1000 backend daemons. +set disable_codegen_rows_threshold=8; +select count(*) from alltypes t1 + join alltypestiny t2 on t1.id = t2.id +---- RESULTS +8 +---- TYPES +bigint +---- RUNTIME_PROFILE +# Verify that codegen was enabled for join and scan +row_regex: .*Build Side Codegen Enabled.* +row_regex: .*TEXT Codegen Enabled.* +==== +---- QUERY +# alltypes has 7300 rows - codegen should be disabled regardless +# of # of backend impala daemons. +set disable_codegen_rows_threshold=8000; +select count(*) from alltypes t1 + join alltypestiny t2 on t1.id = t2.id +---- RESULTS +8 +---- TYPES +bigint +---- RUNTIME_PROFILE +# Verify that codegen was disabled +row_regex: .*Codegen Disabled: disabled due to optimization hints.* +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test index c22095b7f..9de548048 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test +++ b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test @@ -29,6 +29,7 @@ explain select id from alltypes; ---- RESULTS: VERIFY_IS_EQUAL 'Per-Host Resource Reservation: Memory=0B' 'Per-Host Resource Estimates: Memory=16.00MB' +'Codegen disabled by planner' '' 'F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1' 'PLAN-ROOT SINK' @@ -50,6 +51,7 @@ explain select id from alltypes where month in (1, 2, 3); ---- RESULTS: VERIFY_IS_EQUAL 'Per-Host Resource Reservation: Memory=0B' 'Per-Host Resource Estimates: Memory=16.00MB' +'Codegen disabled by planner' '' 'F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1' 'PLAN-ROOT SINK' @@ -96,6 +98,7 @@ explain select id from alltypes where year = 2010; ---- RESULTS: VERIFY_IS_EQUAL 'Per-Host Resource Reservation: Memory=0B' 'Per-Host Resource Estimates: Memory=16.00MB' +'Codegen disabled by planner' '' 'F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1' 'PLAN-ROOT SINK' @@ -118,6 +121,7 @@ explain select id from alltypes where year = 2010; ---- RESULTS: VERIFY_IS_EQUAL 'Per-Host Resource Reservation: Memory=0B' 'Per-Host Resource Estimates: Memory=16.00MB' +'Codegen disabled by planner' '' 'F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1' 'PLAN-ROOT SINK' diff --git a/testdata/workloads/targeted-perf/queries/primitive_small_join_1.test b/testdata/workloads/targeted-perf/queries/primitive_small_join_1.test new file mode 100644 index 000000000..c6c5df28b --- /dev/null +++ b/testdata/workloads/targeted-perf/queries/primitive_small_join_1.test @@ -0,0 +1,10 @@ +==== +---- QUERY: primitive_small_join_1 +-- Description : join with small input on both sides +-- Target test case : Small queries where cost of codegen exceeds benefit. +SELECT count(*) +FROM (SELECT * FROM customer LIMIT 100000) c +JOIN nation ON c_nationkey = n_nationkey +---- RESULTS +---- TYPES +==== diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py index dda020347..d5841b270 100644 --- a/tests/common/impala_test_suite.py +++ b/tests/common/impala_test_suite.py @@ -682,7 +682,8 @@ class ImpalaTestSuite(BaseTestSuite): cluster_sizes = ALL_NODES_ONLY return create_exec_option_dimension(cluster_sizes, disable_codegen_options, batch_sizes, - exec_single_node_option=exec_single_node_option) + exec_single_node_option=exec_single_node_option, + disable_codegen_rows_threshold_options=[0]) @classmethod def exploration_strategy(cls): diff --git a/tests/common/test_dimensions.py b/tests/common/test_dimensions.py index 20ccbeb00..4171e1f2f 100644 --- a/tests/common/test_dimensions.py +++ b/tests/common/test_dimensions.py @@ -135,17 +135,23 @@ def create_single_exec_option_dimension(): """Creates an exec_option dimension that will produce a single test vector""" return create_exec_option_dimension(cluster_sizes=ALL_NODES_ONLY, disable_codegen_options=[False], + # Make sure codegen kicks in for functional.alltypes. + disable_codegen_rows_threshold_options=[5000], batch_sizes=[0]) def create_exec_option_dimension(cluster_sizes=ALL_CLUSTER_SIZES, disable_codegen_options=ALL_DISABLE_CODEGEN_OPTIONS, batch_sizes=ALL_BATCH_SIZES, - sync_ddl=None, exec_single_node_option=[0]): + sync_ddl=None, exec_single_node_option=[0], + # We already run with codegen on and off explicitly - + # don't need automatic toggling. + disable_codegen_rows_threshold_options=[0]): exec_option_dimensions = { 'abort_on_error': [1], 'exec_single_node_rows_threshold': exec_single_node_option, 'batch_size': batch_sizes, 'disable_codegen': disable_codegen_options, + 'disable_codegen_rows_threshold': disable_codegen_rows_threshold_options, 'num_nodes': cluster_sizes} if sync_ddl is not None: diff --git a/tests/query_test/test_codegen.py b/tests/query_test/test_codegen.py new file mode 100644 index 000000000..8a4390a9d --- /dev/null +++ b/tests/query_test/test_codegen.py @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Tests end-to-end codegen behaviour. + +from tests.common.impala_test_suite import ImpalaTestSuite +from tests.common.test_dimensions import create_exec_option_dimension_from_dict + +class TestCodegen(ImpalaTestSuite): + @classmethod + def get_workload(self): + return 'functional-query' + + @classmethod + def add_test_dimensions(cls): + super(TestCodegen, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension_from_dict({ + 'exec_single_node_rows_threshold' : [0]})) + # No need to run this on all file formats. Run it on text/none, which has stats + # computed. + cls.ImpalaTestMatrix.add_constraint( + lambda v: v.get_value('table_format').file_format == 'text' and + v.get_value('table_format').compression_codec == 'none') + + def test_disable_codegen(self, vector): + """Test that codegen is enabled/disabled by the planner as expected.""" + self.run_test_case('QueryTest/disable-codegen', vector) diff --git a/tests/query_test/test_decimal_queries.py b/tests/query_test/test_decimal_queries.py index 036d879c9..a250ae2b6 100644 --- a/tests/query_test/test_decimal_queries.py +++ b/tests/query_test/test_decimal_queries.py @@ -34,7 +34,9 @@ class TestDecimalQueries(ImpalaTestSuite): cls.ImpalaTestMatrix.add_dimension( create_exec_option_dimension_from_dict({ 'decimal_v2' : ['false', 'true'], - 'batch_size' : [0, 1]})) + 'batch_size' : [0, 1], + 'disable_codegen' : ['false', 'true'], + 'disable_codegen_rows_threshold' : [0]})) # Hive < 0.11 does not support decimal so we can't run these tests against the other # file formats. # TODO: Enable them on Hive >= 0.11. diff --git a/tests/query_test/test_scanners_fuzz.py b/tests/query_test/test_scanners_fuzz.py index 82b4ec98e..8e521d8d1 100644 --- a/tests/query_test/test_scanners_fuzz.py +++ b/tests/query_test/test_scanners_fuzz.py @@ -150,6 +150,7 @@ class TestScannersFuzzing(ImpalaTestSuite): query_options = copy(vector.get_value('exec_option')) query_options['batch_size'] = batch_size query_options['disable_codegen'] = disable_codegen + query_options['disable_codegen_rows_threshold'] = 0 try: result = self.execute_query(query, query_options = query_options) LOG.info('\n'.join(result.log)) diff --git a/tests/query_test/test_udfs.py b/tests/query_test/test_udfs.py index e8e4eadf4..eab599401 100644 --- a/tests/query_test/test_udfs.py +++ b/tests/query_test/test_udfs.py @@ -261,6 +261,7 @@ class TestUdfExecution(TestUdfBase): super(TestUdfExecution, cls).add_test_dimensions() cls.ImpalaTestMatrix.add_dimension( create_exec_option_dimension_from_dict({"disable_codegen" : [False, True], + "disable_codegen_rows_threshold" : [0], "exec_single_node_rows_threshold" : [0,100], "enable_expr_rewrites" : [False, True]})) # There is no reason to run these tests using all dimensions.