diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc index 151316917..ec1e09c27 100644 --- a/be/src/service/query-options.cc +++ b/be/src/service/query-options.cc @@ -1343,6 +1343,9 @@ Status impala::SetQueryOption(TImpalaQueryOptions::type option, const string& va } case TImpalaQueryOptions::SYNC_HMS_EVENTS_STRICT_MODE: { query_options->__set_sync_hms_events_strict_mode(IsTrue(value)); + } + case TImpalaQueryOptions::USE_CALCITE_PLANNER : { + query_options->__set_use_calcite_planner(IsTrue(value)); break; } case TImpalaQueryOptions::SKIP_UNNEEDED_UPDATES_COL_LIMIT: { diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h index 3a50a3862..5e21229e1 100644 --- a/be/src/service/query-options.h +++ b/be/src/service/query-options.h @@ -51,7 +51,7 @@ typedef std::unordered_map // plus one. Thus, the second argument to the DCHECK has to be updated every // time we add or remove a query option to/from the enum TImpalaQueryOptions. constexpr unsigned NUM_QUERY_OPTIONS = - TImpalaQueryOptions::MEM_ESTIMATE_SCALE_FOR_SPILLING_OPERATOR + 1; + TImpalaQueryOptions::USE_CALCITE_PLANNER + 1; #define QUERY_OPTS_TABLE \ DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(), NUM_QUERY_OPTIONS); \ REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \ @@ -374,6 +374,8 @@ constexpr unsigned NUM_QUERY_OPTIONS = SKIP_UNNEEDED_UPDATES_COL_LIMIT, TQueryOptionLevel::ADVANCED) \ QUERY_OPT_FN(mem_estimate_scale_for_spilling_operator, \ MEM_ESTIMATE_SCALE_FOR_SPILLING_OPERATOR, TQueryOptionLevel::DEVELOPMENT) \ + QUERY_OPT_FN(use_calcite_planner, USE_CALCITE_PLANNER, \ + TQueryOptionLevel::ADVANCED) \ ; /// Enforce practical limits on some query options to avoid undesired query state. diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index 1d400b0b9..88c4de5c4 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -1022,6 +1022,9 @@ enum TImpalaQueryOptions { // to either 1.0 (enabled) or 0.0 (disabled) and only set value in between for // experimental purpose. MEM_ESTIMATE_SCALE_FOR_SPILLING_OPERATOR = 190 + + // If True, use the Calcite planner for compilation + USE_CALCITE_PLANNER = 191 } // The summary of a DML statement. diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift index f4087874c..b1ce5fd62 100644 --- a/common/thrift/Query.thrift +++ b/common/thrift/Query.thrift @@ -774,6 +774,9 @@ struct TQueryOptions { // See comment in ImpalaService.thrift 191: optional double mem_estimate_scale_for_spilling_operator = 0.0 + + // See comment in ImpalaService.thrift + 192: optional bool use_calcite_planner = false; } // Impala currently has three types of sessions: Beeswax, HiveServer2 and external diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index 4c86fa3b4..ce8ee8162 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -54,6 +54,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import javax.annotation.Nullable; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileSystem; @@ -300,7 +301,6 @@ public class Frontend { // but other planners may set their own planner values public static final String PLANNER_PROFILE = "PlannerInfo"; public static final String PLANNER_TYPE = "PlannerType"; - private static final String PLANNER = "OriginalPlanner"; /** * Plan-time context that allows capturing various artifacts created @@ -2053,7 +2053,7 @@ public class Frontend { // a wrapper of the getTExecRequest is in the factory so the implementation // can handle various planner fallback execution logic (e.g. allowing one // planner, if execution fails, to call a different planner) - TExecRequest result = getTExecRequest(new CompilerFactoryImpl(), planCtx, timeline); + TExecRequest result = getTExecRequestWithFallback(planCtx, timeline); timeline.markEvent("Planning finished"); result.setTimeline(timeline.toThrift()); result.setProfile(FrontendProfile.getCurrent().emitAsThrift()); @@ -2322,10 +2322,50 @@ public class Frontend { } } + private TExecRequest getTExecRequestWithFallback( + PlanCtx planCtx, EventSequence timeline) throws ImpalaException { + TExecRequest request = null; + CompilerFactory compilerFactory = getCalciteCompilerFactory(planCtx); + if (compilerFactory != null) { + try { + request = getTExecRequest(compilerFactory, planCtx, timeline); + } catch (Exception e) { + if (!shouldFallbackToRegularPlanner(planCtx)) { + throw e; + } + LOG.info("Calcite planner failed: ", e); + timeline.markEvent("Failing over from Calcite planner"); + } + } + if (request == null) { + LOG.info("Using Original Planner."); + // use the original planner if Calcite planner is not set or fallback + // for Calcite planner is enabled. + compilerFactory = new CompilerFactoryImpl(); + request = getTExecRequest(compilerFactory, planCtx, timeline); + } + addPlannerToProfile(compilerFactory.getPlannerString()); + return request; + } + + private boolean shouldFallbackToRegularPlanner(PlanCtx planCtx) { + // TODO: Need a fallback flag for various modes. In production, we will most + // likely want to fallback to the original planner, but in testing, we might want + // the query to fail. + // There are some cases where we will always want to fallback, e.g. if the statement + // fails at parse time because it is not a select statement. + TQueryCtx queryCtx = planCtx.getQueryContext(); + try { + return !(Parser.parse(queryCtx.client_request.stmt, + queryCtx.client_request.query_options) instanceof QueryStmt); + } catch (Exception e) { + return false; + } + } + private TExecRequest getTExecRequest(CompilerFactory compilerFactory, PlanCtx planCtx, EventSequence timeline) throws ImpalaException { TQueryCtx queryCtx = planCtx.getQueryContext(); - addPlannerToProfile(PLANNER); LOG.info("Analyzing query: " + queryCtx.client_request.stmt + " db: " + queryCtx.session.database); @@ -2763,7 +2803,6 @@ public class Frontend { AnalysisResult analysisResult = analysisCtx.analyzeAndAuthorize(compilerFactory, parsedStmt, stmtTableCache, authzChecker_.get(), planCtx.compilationState_.disableAuthorization()); - // need to re-fetch the parsedStatement because analysisResult can rewrite the // statement. parsedStmt = analysisResult.getParsedStmt(); @@ -3534,7 +3573,25 @@ public class Frontend { } } - private CompilerFactory getCompilerFactory(PlanCtx ctx) { - return new CompilerFactoryImpl(); + @Nullable + private CompilerFactory getCalciteCompilerFactory(PlanCtx ctx) { + TQueryOptions queryOptions = ctx.getQueryContext().client_request.getQuery_options(); + LOG.info("Searching for planner to use..."); + if (queryOptions.isUse_calcite_planner()) { + try { + CompilerFactory calciteFactory = (CompilerFactory) Class.forName( + "org.apache.impala.calcite.service." + + "CalciteCompilerFactory").newInstance(); + if (calciteFactory != null) { + LOG.info("Found Calcite Planner, using it."); + return calciteFactory; + } else { + LOG.info("Could not find Calcite planner, using original planner."); + } + } catch (Exception e) { + LOG.info("Could not find Calcite planner, using original planner: " + e); + } + } + return null; } } diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteAnalysisDriver.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteAnalysisDriver.java new file mode 100644 index 000000000..85a2258d2 --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteAnalysisDriver.java @@ -0,0 +1,155 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.service; + +import java.util.Collections; +import java.util.Properties; +import java.util.Set; + +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.config.CalciteConnectionConfigImpl; +import org.apache.calcite.config.CalciteConnectionProperty; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.runtime.CalciteContextException; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.validate.SqlValidator; +import org.apache.calcite.sql.validate.SqlValidatorUtil; +import org.apache.impala.analysis.AnalysisContext; +import org.apache.impala.analysis.AnalysisContext.AnalysisResult; +import org.apache.impala.analysis.AnalysisDriver; +import org.apache.impala.analysis.Analyzer; +import org.apache.impala.analysis.ParsedStatement; +import org.apache.impala.analysis.StmtMetadataLoader; +import org.apache.impala.analysis.StmtMetadataLoader.StmtTableCache; +import org.apache.impala.authorization.AuthorizationContext; +import org.apache.impala.authorization.AuthorizationFactory; +import org.apache.impala.calcite.operators.ImpalaOperatorTable; +import org.apache.impala.calcite.schema.ImpalaCalciteCatalogReader; +import org.apache.impala.calcite.type.ImpalaTypeSystemImpl; +import org.apache.impala.calcite.util.SimplifiedAnalyzer; +import org.apache.impala.calcite.validate.ImpalaConformance; +import org.apache.impala.common.AnalysisException; +import org.apache.impala.common.ImpalaException; +import org.apache.impala.planner.PlannerContext; +import org.apache.impala.planner.SingleNodePlannerIntf; +import org.apache.impala.thrift.TQueryCtx; + +/** + * The CalciteAnalysisDriver is the implementation of AnalysisDriver which validates + * the AST produced by Calcite. + */ +public class CalciteAnalysisDriver implements AnalysisDriver { + + public final TQueryCtx queryCtx_; + + public final AuthorizationFactory authzFactory_; + + public final CalciteParsedStatement parsedStmt_; + + // Calcite AST + public SqlNode validatedNode_; + + public RelDataTypeFactory typeFactory_; + + public SqlValidator sqlValidator_; + + // CalciteCatalogReader is a context class that holds global information that + // may be needed by the CalciteTable object + private CalciteCatalogReader reader_; + + private final AnalysisContext ctx_; + + private final StmtTableCache stmtTableCache_; + + private final AuthorizationContext authzCtx_; + + private final Analyzer analyzer_; + + public CalciteAnalysisDriver(AnalysisContext ctx, ParsedStatement parsedStmt, + StmtTableCache stmtTableCache, + AuthorizationContext authzCtx) { + queryCtx_ = ctx.getQueryCtx(); + authzFactory_ = ctx.getAuthzFactory(); + parsedStmt_ = (CalciteParsedStatement) parsedStmt; + ctx_ = ctx; + stmtTableCache_ = stmtTableCache; + authzCtx_ = authzCtx; + analyzer_ = createAnalyzer(ctx, stmtTableCache, authzCtx); + } + + @Override + public AnalysisResult analyze() { + try { + reader_ = CalciteMetadataHandler.createCalciteCatalogReader(stmtTableCache_, + queryCtx_, queryCtx_.session.database); + CalciteMetadataHandler.populateCalciteSchema(reader_, ctx_.getCatalog(), + parsedStmt_.getTablesInQuery(null)); + + typeFactory_ = new JavaTypeFactoryImpl(new ImpalaTypeSystemImpl()); + sqlValidator_ = SqlValidatorUtil.newValidator( + ImpalaOperatorTable.getInstance(), + reader_, typeFactory_, + SqlValidator.Config.DEFAULT + // Impala requires identifier expansion (tpcds test queries fail + // without this) + .withIdentifierExpansion(true) + .withConformance(ImpalaConformance.INSTANCE) + ); + validatedNode_ = sqlValidator_.validate(parsedStmt_.getParsedSqlNode()); + return new CalciteAnalysisResult(this); + } catch (ImpalaException e) { + return new CalciteAnalysisResult(this, e); + } catch (CalciteContextException e) { + return new CalciteAnalysisResult(this, + new AnalysisException(e.getMessage(), e.getCause())); + } + } + + public Analyzer createAnalyzer(AnalysisContext ctx, StmtTableCache stmtTableCache, + AuthorizationContext authzCtx) { + return new SimplifiedAnalyzer(stmtTableCache, ctx.getQueryCtx(), + ctx.getAuthzFactory(), null); + } + + public RelDataTypeFactory getTypeFactory() { + return typeFactory_; + } + + public CalciteCatalogReader getCatalogReader() { + return reader_; + } + + public SqlValidator getSqlValidator() { + return sqlValidator_; + } + + public SqlNode getValidatedNode() { + return validatedNode_; + } + + public Analyzer getAnalyzer() { + return analyzer_; + } + + public ParsedStatement getParsedStmt() { + return parsedStmt_; + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteAnalysisResult.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteAnalysisResult.java new file mode 100644 index 000000000..d85cd7cb4 --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteAnalysisResult.java @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.service; + +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.validate.SqlValidator; +import org.apache.impala.analysis.AnalysisContext.AnalysisResult; +import org.apache.impala.analysis.AnalysisDriver; +import org.apache.impala.common.ImpalaException; + +/** + * CalciteAnalysisResult is an AnalysisResult with added analysis result + * members produced by the CalciteAnalyzer + */ +public class CalciteAnalysisResult extends AnalysisResult { + // Calcite AST + private final SqlNode validatedNode_; + + private final RelDataTypeFactory typeFactory_; + + private final SqlValidator sqlValidator_; + + // CalciteCatalogReader is a context class that holds global information that + // may be needed by the CalciteTable object + private final CalciteCatalogReader reader_; + + public CalciteAnalysisResult(CalciteAnalysisDriver analysisDriver) { + this(analysisDriver, null); + } + + public CalciteAnalysisResult(CalciteAnalysisDriver analysisDriver, + ImpalaException e) { + super(analysisDriver.getParsedStmt(), analysisDriver.getAnalyzer(), e); + validatedNode_ = analysisDriver.getValidatedNode(); + typeFactory_ = analysisDriver.getTypeFactory(); + sqlValidator_ = analysisDriver.getSqlValidator(); + reader_ = analysisDriver.getCatalogReader(); + } + + public CalciteCatalogReader getCatalogReader() { + return reader_; + } + + public SqlValidator getSqlValidator() { + return sqlValidator_; + } + + public SqlNode getValidatedNode() { + return validatedNode_; + } + + public RelDataTypeFactory getTypeFactory() { + return typeFactory_; + } + + +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteCompilerFactory.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteCompilerFactory.java new file mode 100644 index 000000000..98e518d9a --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteCompilerFactory.java @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.service; + +import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider; +import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.impala.analysis.AnalysisContext; +import org.apache.impala.analysis.AnalysisDriver; +import org.apache.impala.analysis.ParsedStatement; +import org.apache.impala.analysis.StmtMetadataLoader.StmtTableCache; +import org.apache.impala.authorization.AuthorizationContext; +import org.apache.impala.calcite.operators.ImpalaOperatorTable; +import org.apache.impala.catalog.BuiltinsDb; +import org.apache.impala.common.AnalysisException; +import org.apache.impala.common.ImpalaException; +import org.apache.impala.common.UnsupportedFeatureException; +import org.apache.impala.planner.SingleNodePlannerIntf; +import org.apache.impala.planner.PlannerContext; +import org.apache.impala.service.CompilerFactory; +import org.apache.impala.thrift.TQueryCtx; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Factory implementation which creates Compiler implementation classes. + */ +public class CalciteCompilerFactory implements CompilerFactory { + protected static final Logger LOG = + LoggerFactory.getLogger(CalciteCompilerFactory.class.getName()); + + static { + System.setProperty("calcite.default.charset", "UTF8"); + } + + private static final String PLANNER = "CalcitePlanner"; + + static { + ImpalaOperatorTable.create(BuiltinsDb.getInstance()); + } + + public ParsedStatement createParsedStatement(TQueryCtx queryCtx) + throws ImpalaException { + + // check that all options are supported, throws Exception if not. + checkOptionSupportedInCalcite(queryCtx); + + Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); + + // Needed for Calcite's JaninoRelMetadataProvider + RelMetadataQuery.THREAD_PROVIDERS.set( + JaninoRelMetadataProvider.of(DefaultRelMetadataProvider.INSTANCE)); + return new CalciteParsedStatement(queryCtx); + } + + public AnalysisDriver createAnalysisDriver(AnalysisContext ctx, + ParsedStatement parsedStmt, StmtTableCache stmtTableCache, + AuthorizationContext authzCtx) throws AnalysisException { + return new CalciteAnalysisDriver(ctx, parsedStmt, stmtTableCache, authzCtx); + } + + public SingleNodePlannerIntf createSingleNodePlanner(PlannerContext ctx) + throws ImpalaException { + return new CalciteSingleNodePlanner(ctx); + } + + public String getPlannerString() { + return PLANNER; + } + + public static void checkOptionSupportedInCalcite(TQueryCtx queryCtx) + throws ImpalaException { + // IMPALA-13530 + if (!queryCtx.getClient_request().getQuery_options().isDecimal_v2()) { + throw new UnsupportedFeatureException("Decimal v1 not supported in Calcite, " + + "falling back to Impala compiler."); + } + + // IMPALA-13529 + if (queryCtx.getClient_request().getQuery_options() + .isAppx_count_distinct()) { + throw new UnsupportedFeatureException("Approximate count distinct is not " + + "supported in Calcite, falling back to Impala compiler."); + } + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteJniFrontend.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteJniFrontend.java index 6d75a90cb..fe8d3ee0b 100644 --- a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteJniFrontend.java +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteJniFrontend.java @@ -107,7 +107,9 @@ public class CalciteJniFrontend extends JniFrontend { CalciteMetadataHandler mdHandler = null; - if (!optionSupportedInCalcite(queryCtx)) { + try { + CalciteCompilerFactory.checkOptionSupportedInCalcite(queryCtx.getTQueryCtx()); + } catch (UnsupportedFeatureException e) { return runThroughOriginalPlanner(thriftQueryContext, queryCtx); } @@ -200,20 +202,6 @@ public class CalciteJniFrontend extends JniFrontend { } } - private boolean optionSupportedInCalcite(QueryContext queryCtx) { - // IMPALA-13530 - if (!queryCtx.getTQueryCtx().getClient_request().getQuery_options().isDecimal_v2()) { - return false; - } - - // IMPALA-13529 - if (queryCtx.getTQueryCtx().getClient_request().getQuery_options() - .isAppx_count_distinct()) { - return false; - } - return true; - } - private static void loadCalciteImpalaFunctions() { ImpalaOperatorTable.create(BuiltinsDb.getInstance()); } diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteMetadataHandler.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteMetadataHandler.java index 49fda4bc1..ff5375e8a 100644 --- a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteMetadataHandler.java +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteMetadataHandler.java @@ -47,6 +47,8 @@ import org.apache.impala.catalog.FeView; import org.apache.impala.catalog.HdfsTable; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.UnsupportedFeatureException; +import org.apache.impala.thrift.TQueryCtx; + import com.google.common.collect.Sets; import java.util.ArrayList; @@ -92,7 +94,8 @@ public class CalciteMetadataHandler implements CompilerStep { // load the relevant tables in the query from catalogd this.stmtTableCache_ = stmtMetadataLoader.loadTables(tableVisitor.tableNames_); - this.reader_ = createCalciteCatalogReader(queryCtx, stmtTableCache_); + this.reader_ = createCalciteCatalogReader(stmtTableCache_, + queryCtx.getTQueryCtx(), queryCtx.getCurrentDb()); // populate calcite schema. This step needs to be done after the loader because the // schema needs to contain the columns in the table for validation, which cannot @@ -108,24 +111,24 @@ public class CalciteMetadataHandler implements CompilerStep { * Since the individual Table objects have reference to the Schema, this also serves * as a way to give the tables Context information about the general query. */ - private CalciteCatalogReader createCalciteCatalogReader( - CalciteJniFrontend.QueryContext queryCtx, - StmtMetadataLoader.StmtTableCache stmtTableCache) { + public static CalciteCatalogReader createCalciteCatalogReader( + StmtMetadataLoader.StmtTableCache stmtTableCache, TQueryCtx queryCtx, + String database) { RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl(new ImpalaTypeSystemImpl()); Properties props = new Properties(); props.setProperty(CalciteConnectionProperty.CASE_SENSITIVE.camelName(), "false"); CalciteConnectionConfig config = new CalciteConnectionConfigImpl(props); CalciteSchema rootSchema = CalciteSchema.createRootSchema(true); return new ImpalaCalciteCatalogReader(rootSchema, - Collections.singletonList(queryCtx.getCurrentDb()), - typeFactory, config, queryCtx.getTQueryCtx(), stmtTableCache); + Collections.singletonList(database), + typeFactory, config, queryCtx, stmtTableCache); } /** * Populate the CalciteSchema with tables being used by this query. Returns a * list of tables in the query that are not found in the database. */ - private static List populateCalciteSchema(CalciteCatalogReader reader, + public static List populateCalciteSchema(CalciteCatalogReader reader, FeCatalog catalog, Set tableNames) throws ImpalaException { List notFoundTables = new ArrayList<>(); CalciteSchema rootSchema = reader.getRootSchema(); @@ -178,7 +181,7 @@ public class CalciteMetadataHandler implements CompilerStep { * TableVisitor walks through the AST and places all the tables into * tableNames */ - private static class TableVisitor extends SqlBasicVisitor { + public static class TableVisitor extends SqlBasicVisitor { private final String currentDb_; public final Set tableNames_ = new HashSet<>(); diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteOptimizer.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteOptimizer.java index e5ae54d98..47130c674 100644 --- a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteOptimizer.java +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteOptimizer.java @@ -24,6 +24,7 @@ import org.apache.calcite.plan.hep.HepMatchOrder; import org.apache.calcite.plan.hep.HepPlanner; import org.apache.calcite.plan.hep.HepProgram; import org.apache.calcite.plan.hep.HepProgramBuilder; +import org.apache.calcite.prepare.CalciteCatalogReader; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.RelFactories; import org.apache.calcite.rel.rules.CoreRules; @@ -52,15 +53,19 @@ public class CalciteOptimizer implements CompilerStep { protected static final Logger LOG = LoggerFactory.getLogger(CalciteOptimizer.class.getName()); - private final CalciteValidator validator_; + private final CalciteCatalogReader reader_; + + public CalciteOptimizer(CalciteAnalysisResult analysisResult) { + this.reader_ = analysisResult.getCatalogReader(); + } public CalciteOptimizer(CalciteValidator validator) { - this.validator_ = validator; + this.reader_ = validator.getCatalogReader(); } public ImpalaPlanRel optimize(RelNode logPlan) throws ImpalaException { RelBuilder relBuilder = RelFactories.LOGICAL_BUILDER.create(logPlan.getCluster(), - validator_.getCatalogReader()); + reader_); // Run some essential rules needed to create working RelNodes before doing // optimization diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteParsedStatement.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteParsedStatement.java new file mode 100644 index 000000000..16bd759e7 --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteParsedStatement.java @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.service; + +import java.util.Set; + +import org.apache.calcite.sql.SqlExplain; +import org.apache.calcite.sql.SqlNode; +import org.apache.impala.analysis.AnalysisContext; +import org.apache.impala.analysis.AnalysisDriver; +import org.apache.impala.analysis.ParsedStatement; +import org.apache.impala.analysis.TableName; +import org.apache.impala.analysis.StmtMetadataLoader; +import org.apache.impala.common.ImpalaException; +import org.apache.impala.thrift.TQueryCtx; + +/** + * Implemntation of ParsedStatement hook that holds the AST that + * is parsed from the sql String. + */ +public class CalciteParsedStatement implements ParsedStatement { + private final SqlNode parsedNode_; + private final boolean isExplain_; + private final String sql_; + private final Set tableNames_; + + public CalciteParsedStatement(TQueryCtx queryCtx) throws ImpalaException { + sql_ = queryCtx.client_request.stmt; + CalciteQueryParser queryParser = new CalciteQueryParser(sql_); + SqlNode parsedSqlNode = queryParser.parse(); + isExplain_ = parsedSqlNode instanceof SqlExplain; + if (isExplain_) { + parsedSqlNode = ((SqlExplain) parsedSqlNode).getExplicandum(); + } + parsedNode_ = parsedSqlNode; + + CalciteMetadataHandler.TableVisitor tableVisitor = + new CalciteMetadataHandler.TableVisitor(queryCtx.session.database); + + parsedNode_.accept(tableVisitor); + + tableNames_ = tableVisitor.tableNames_; + } + + @Override + public Set getTablesInQuery(StmtMetadataLoader loader) { + return tableNames_; + } + + @Override + public Object getTopLevelNode() { + return parsedNode_; + } + + @Override + public boolean isExplain() { + return isExplain_; + } + + @Override + public boolean isQueryStmt() { + return true; + } + + @Override + public String toSql() { + return sql_; + } + + public SqlNode getParsedSqlNode() { + return parsedNode_; + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalcitePhysPlanCreator.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalcitePhysPlanCreator.java index e65b1c2ea..868329ba8 100644 --- a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalcitePhysPlanCreator.java +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalcitePhysPlanCreator.java @@ -40,22 +40,25 @@ public class CalcitePhysPlanCreator implements CompilerStep { protected static final Logger LOG = LoggerFactory.getLogger(CalcitePhysPlanCreator.class.getName()); - private final CalciteJniFrontend.QueryContext queryCtx_; private final Analyzer analyzer_; private final PlannerContext plannerContext_; + public CalcitePhysPlanCreator(Analyzer analyzer, PlannerContext ctx) { + analyzer_ = analyzer; + plannerContext_ = ctx; + } + public CalcitePhysPlanCreator(CalciteMetadataHandler mdHandler, CalciteJniFrontend.QueryContext queryCtx) throws ImpalaException { - this.queryCtx_ = queryCtx; // TODO: IMPALA-13011: Awkward call for authorization here. Authorization // will be done at validation time, but this is needed here for the Analyzer // instantiation. AuthorizationFactory authzFactory = AuthorizationUtil.authzFactoryFrom(BackendConfig.INSTANCE); this.analyzer_ = new SimplifiedAnalyzer(mdHandler.getStmtTableCache(), - queryCtx_.getTQueryCtx(), authzFactory, null); + queryCtx.getTQueryCtx(), authzFactory, null); this.plannerContext_ = - new PlannerContext(analyzer_, queryCtx_.getTQueryCtx(), queryCtx_.getTimeline()); + new PlannerContext(analyzer_, queryCtx.getTQueryCtx(), queryCtx.getTimeline()); } diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteQueryParser.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteQueryParser.java index 898cd9147..90ba2e6af 100644 --- a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteQueryParser.java +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteQueryParser.java @@ -36,16 +36,20 @@ public class CalciteQueryParser implements CompilerStep { protected static final Logger LOG = LoggerFactory.getLogger(CalciteQueryParser.class.getName()); - private final CalciteJniFrontend.QueryContext queryCtx_; + private final String sqlStmt_; public CalciteQueryParser(CalciteJniFrontend.QueryContext queryCtx) { - this.queryCtx_ = queryCtx; + this.sqlStmt_ = queryCtx.getStmt(); + } + + public CalciteQueryParser(String stmt) { + this.sqlStmt_ = stmt; } public SqlNode parse() throws ParseException { try { // Create an SQL parser - SqlParser parser = SqlParser.create(queryCtx_.getStmt(), + SqlParser parser = SqlParser.create(sqlStmt_, SqlParser.config().withParserFactory(ImpalaSqlParserImpl.FACTORY) .withConformance(ImpalaConformance.INSTANCE) .withQuoting(Quoting.BACK_TICK_BACKSLASH) diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteRelNodeConverter.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteRelNodeConverter.java index a230205d2..71cb49b1d 100644 --- a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteRelNodeConverter.java +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteRelNodeConverter.java @@ -17,6 +17,7 @@ package org.apache.impala.calcite.service; +import org.apache.calcite.rel.type.RelDataTypeFactory; import com.google.common.collect.ImmutableList; import org.apache.calcite.plan.ConventionTraitDef; import org.apache.calcite.plan.RelOptCluster; @@ -43,6 +44,9 @@ import org.apache.impala.calcite.operators.ImpalaConvertletTable; import org.apache.calcite.sql2rel.StandardConvertletTable; import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.sql.validate.SqlValidator; +import org.apache.calcite.prepare.CalciteCatalogReader; + import java.util.List; import org.slf4j.Logger; @@ -59,25 +63,41 @@ public class CalciteRelNodeConverter implements CompilerStep { private static final RelOptTable.ViewExpander NOOP_EXPANDER = (type, query, schema, path) -> null; - private final CalciteValidator validator_; - private final RelOptCluster cluster_; private final RelOptPlanner planner_; - public CalciteRelNodeConverter(CalciteValidator validator) { - this.validator_ = validator; + private final RelDataTypeFactory typeFactory_; + + private final SqlValidator sqlValidator_; + + private final CalciteCatalogReader reader_; + + public CalciteRelNodeConverter(CalciteAnalysisResult analysisResult) { + this.typeFactory_ = analysisResult.getTypeFactory(); + this.reader_ = analysisResult.getCatalogReader(); + this.sqlValidator_ = analysisResult.getSqlValidator(); this.planner_ = new VolcanoPlanner(); planner_.addRelTraitDef(ConventionTraitDef.INSTANCE); cluster_ = - RelOptCluster.create(planner_, new RexBuilder(validator_.getTypeFactory())); + RelOptCluster.create(planner_, new RexBuilder(typeFactory_)); + } + + public CalciteRelNodeConverter(CalciteValidator validator) { + this.typeFactory_ = validator.getTypeFactory(); + this.reader_ = validator.getCatalogReader(); + this.sqlValidator_ = validator.getSqlValidator(); + this.planner_ = new VolcanoPlanner(); + planner_.addRelTraitDef(ConventionTraitDef.INSTANCE); + cluster_ = + RelOptCluster.create(planner_, new RexBuilder(typeFactory_)); } public RelNode convert(SqlNode validatedNode) { SqlToRelConverter relConverter = new SqlToRelConverter( NOOP_EXPANDER, - validator_.getSqlValidator(), - validator_.getCatalogReader(), + sqlValidator_, + reader_, cluster_, ImpalaConvertletTable.INSTANCE, SqlToRelConverter.config().withCreateValuesRel(false)); @@ -98,7 +118,7 @@ public class CalciteRelNodeConverter implements CompilerStep { logDebug(subQueryRemovedPlan); RelBuilder relBuilder = RelFactories.LOGICAL_BUILDER.create(cluster_, - validator_.getCatalogReader()); + reader_); RelNode decorrelatedPlan = RelDecorrelator.decorrelateQuery(subQueryRemovedPlan, relBuilder); @@ -110,10 +130,6 @@ public class CalciteRelNodeConverter implements CompilerStep { return cluster_; } - public CalciteValidator getValidator() { - return validator_; - } - @Override public void logDebug(Object resultObject) { if (!(resultObject instanceof RelNode)) { diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteSingleNodePlanner.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteSingleNodePlanner.java new file mode 100644 index 000000000..a89c112f3 --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteSingleNodePlanner.java @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.service; + + +import java.util.ArrayList; +import java.util.List; + +import org.apache.calcite.rel.RelNode; +import org.apache.impala.analysis.AnalysisDriver; +import org.apache.impala.analysis.ExprSubstitutionMap; +import org.apache.impala.analysis.ParsedStatement; +import org.apache.impala.calcite.rel.node.ImpalaPlanRel; +import org.apache.impala.calcite.rel.node.NodeWithExprs; +import org.apache.impala.common.ImpalaException; +import org.apache.impala.planner.DataSink; +import org.apache.impala.planner.PlannerContext; +import org.apache.impala.planner.PlanNode; +import org.apache.impala.planner.PlanRootSink; +import org.apache.impala.planner.SingleNodePlannerIntf; +import org.apache.impala.thrift.TColumn; +import org.apache.impala.thrift.TResultSetMetadata; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Implementation of the SingleNodePlannerIntf which returns a PlanNode + * to the Impala framework and provides information needed by the + * framework after planning. + */ +public class CalciteSingleNodePlanner implements SingleNodePlannerIntf { + + protected static final Logger LOG = + LoggerFactory.getLogger(CalciteSingleNodePlanner.class.getName()); + private final PlannerContext ctx_; + private final CalciteAnalysisResult analysisResult_; + private NodeWithExprs rootNode_; + + public CalciteSingleNodePlanner(PlannerContext ctx) { + ctx_ = ctx; + analysisResult_ = (CalciteAnalysisResult) ctx.getAnalysisResult(); + } + + public PlanNode createSingleNodePlan() throws ImpalaException { + // Convert the query to RelNodes which can be optimized + CalciteRelNodeConverter relNodeConverter = + new CalciteRelNodeConverter(analysisResult_); + RelNode logicalPlan = relNodeConverter.convert(analysisResult_.getValidatedNode()); + + // Optimize the query + CalciteOptimizer optimizer = new CalciteOptimizer(analysisResult_); + ImpalaPlanRel optimizedPlan = optimizer.optimize(logicalPlan); + + // Create Physical Impala PlanNodes + CalcitePhysPlanCreator physPlanCreator = + new CalcitePhysPlanCreator(analysisResult_.getAnalyzer(), ctx_); + rootNode_ = physPlanCreator.create(optimizedPlan); + + analysisResult_.getAnalyzer().computeValueTransferGraph(); + return rootNode_.planNode_; + } + + /** + * Creates the DataSink needed by the framework. Only the original planner + * requires the substition map passed in. + */ + @Override + public DataSink createDataSink(ExprSubstitutionMap rootNodeSmap) { + return new PlanRootSink(rootNode_.outputExprs_); + } + + @Override + public List getColLabels() { + List colLabels = new ArrayList<>(); + int colCnt = rootNode_.outputExprs_.size(); + for (int i = 0; i < colCnt; ++i) { + colLabels.add(rootNode_.outputExprs_.get(i).toString()); + } + return colLabels; + } + + @Override + public TResultSetMetadata getTResultSetMetadata(ParsedStatement parsedStmt) { + TResultSetMetadata metadata = new TResultSetMetadata(); + int colCnt = rootNode_.outputExprs_.size(); + for (int i = 0; i < colCnt; ++i) { + TColumn colDesc = new TColumn(rootNode_.outputExprs_.get(i).toString(), + rootNode_.outputExprs_.get(i).getType().toThrift()); + metadata.addToColumns(colDesc); + } + return metadata; + } +} diff --git a/tests/custom_cluster/test_calcite_planner.py b/tests/custom_cluster/test_calcite_planner.py index d9016f90e..062f75e33 100644 --- a/tests/custom_cluster/test_calcite_planner.py +++ b/tests/custom_cluster/test_calcite_planner.py @@ -20,6 +20,7 @@ import logging import pytest from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.common.test_dimensions import (add_mandatory_exec_option) LOG = logging.getLogger(__name__) @@ -30,7 +31,12 @@ class TestCalcitePlanner(CustomClusterTestSuite): def setup_class(cls): super(TestCalcitePlanner, cls).setup_class() + @classmethod + def add_test_dimensions(cls): + super(TestCalcitePlanner, cls).add_test_dimensions() + add_mandatory_exec_option(cls, 'use_calcite_planner', 'true') + @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args(start_args="--use_calcite_planner=true") + @CustomClusterTestSuite.with_args(start_args="--env_vars=USE_CALCITE_PLANNER=true") def test_calcite_frontend(self, vector, unique_database): self.run_test_case('QueryTest/calcite', vector, use_db=unique_database)