IMPALA-13657: Connect Calcite planner to Impala Frontend framework

This commit adds the plumbing created by IMPALA-13653. The Calcite
planner is now called from Impala's Frontend code via 4 hooks which
are:

- CalciteCompilerFactory: the factory class that creates
    the implementations of the parser, analysis, and single node
    planner hooks.
- CalciteParsedStatement: The class which holds the Calcite SqlNode
    AST.
- CalciteAnalysisDriver: The class that does the validation of the
    SqlNode AST
- CalciteSingleNodePlanner: The class that converts the AST to a
    logical plan, optimizes it, and converts it into an Impala
    PlanNode physical plan.

To run on Calcite, one needs to do two things:

1) set the USE_CALCITE_PLANNER env variable to true before starting
the cluster. This adds the jar file into the path in the
bin/setclasspath.sh file, which is not there by default at the time
of this commit.
2) set the use_calcite_planner query option to true.

This commit makes the CalciteJniFrontend class obsolete. Once the
test cases are moved out of there, that class and others can be
removed.

Change-Id: I3b30571beb797ede827ef4d794b8daefb130ccb1
Reviewed-on: http://gerrit.cloudera.org:8080/22319
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com>
This commit is contained in:
Steve Carlin
2025-01-08 06:56:04 -08:00
parent a877cde76d
commit 706e1f026c
17 changed files with 675 additions and 53 deletions

View File

@@ -1343,6 +1343,9 @@ Status impala::SetQueryOption(TImpalaQueryOptions::type option, const string& va
}
case TImpalaQueryOptions::SYNC_HMS_EVENTS_STRICT_MODE: {
query_options->__set_sync_hms_events_strict_mode(IsTrue(value));
}
case TImpalaQueryOptions::USE_CALCITE_PLANNER : {
query_options->__set_use_calcite_planner(IsTrue(value));
break;
}
case TImpalaQueryOptions::SKIP_UNNEEDED_UPDATES_COL_LIMIT: {

View File

@@ -51,7 +51,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
// plus one. Thus, the second argument to the DCHECK has to be updated every
// time we add or remove a query option to/from the enum TImpalaQueryOptions.
constexpr unsigned NUM_QUERY_OPTIONS =
TImpalaQueryOptions::MEM_ESTIMATE_SCALE_FOR_SPILLING_OPERATOR + 1;
TImpalaQueryOptions::USE_CALCITE_PLANNER + 1;
#define QUERY_OPTS_TABLE \
DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(), NUM_QUERY_OPTIONS); \
REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
@@ -374,6 +374,8 @@ constexpr unsigned NUM_QUERY_OPTIONS =
SKIP_UNNEEDED_UPDATES_COL_LIMIT, TQueryOptionLevel::ADVANCED) \
QUERY_OPT_FN(mem_estimate_scale_for_spilling_operator, \
MEM_ESTIMATE_SCALE_FOR_SPILLING_OPERATOR, TQueryOptionLevel::DEVELOPMENT) \
QUERY_OPT_FN(use_calcite_planner, USE_CALCITE_PLANNER, \
TQueryOptionLevel::ADVANCED) \
;
/// Enforce practical limits on some query options to avoid undesired query state.

View File

@@ -1022,6 +1022,9 @@ enum TImpalaQueryOptions {
// to either 1.0 (enabled) or 0.0 (disabled) and only set value in between for
// experimental purpose.
MEM_ESTIMATE_SCALE_FOR_SPILLING_OPERATOR = 190
// If True, use the Calcite planner for compilation
USE_CALCITE_PLANNER = 191
}
// The summary of a DML statement.

View File

@@ -774,6 +774,9 @@ struct TQueryOptions {
// See comment in ImpalaService.thrift
191: optional double mem_estimate_scale_for_spilling_operator = 0.0
// See comment in ImpalaService.thrift
192: optional bool use_calcite_planner = false;
}
// Impala currently has three types of sessions: Beeswax, HiveServer2 and external

View File

@@ -54,6 +54,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
@@ -300,7 +301,6 @@ public class Frontend {
// but other planners may set their own planner values
public static final String PLANNER_PROFILE = "PlannerInfo";
public static final String PLANNER_TYPE = "PlannerType";
private static final String PLANNER = "OriginalPlanner";
/**
* Plan-time context that allows capturing various artifacts created
@@ -2053,7 +2053,7 @@ public class Frontend {
// a wrapper of the getTExecRequest is in the factory so the implementation
// can handle various planner fallback execution logic (e.g. allowing one
// planner, if execution fails, to call a different planner)
TExecRequest result = getTExecRequest(new CompilerFactoryImpl(), planCtx, timeline);
TExecRequest result = getTExecRequestWithFallback(planCtx, timeline);
timeline.markEvent("Planning finished");
result.setTimeline(timeline.toThrift());
result.setProfile(FrontendProfile.getCurrent().emitAsThrift());
@@ -2322,10 +2322,50 @@ public class Frontend {
}
}
private TExecRequest getTExecRequestWithFallback(
PlanCtx planCtx, EventSequence timeline) throws ImpalaException {
TExecRequest request = null;
CompilerFactory compilerFactory = getCalciteCompilerFactory(planCtx);
if (compilerFactory != null) {
try {
request = getTExecRequest(compilerFactory, planCtx, timeline);
} catch (Exception e) {
if (!shouldFallbackToRegularPlanner(planCtx)) {
throw e;
}
LOG.info("Calcite planner failed: ", e);
timeline.markEvent("Failing over from Calcite planner");
}
}
if (request == null) {
LOG.info("Using Original Planner.");
// use the original planner if Calcite planner is not set or fallback
// for Calcite planner is enabled.
compilerFactory = new CompilerFactoryImpl();
request = getTExecRequest(compilerFactory, planCtx, timeline);
}
addPlannerToProfile(compilerFactory.getPlannerString());
return request;
}
private boolean shouldFallbackToRegularPlanner(PlanCtx planCtx) {
// TODO: Need a fallback flag for various modes. In production, we will most
// likely want to fallback to the original planner, but in testing, we might want
// the query to fail.
// There are some cases where we will always want to fallback, e.g. if the statement
// fails at parse time because it is not a select statement.
TQueryCtx queryCtx = planCtx.getQueryContext();
try {
return !(Parser.parse(queryCtx.client_request.stmt,
queryCtx.client_request.query_options) instanceof QueryStmt);
} catch (Exception e) {
return false;
}
}
private TExecRequest getTExecRequest(CompilerFactory compilerFactory,
PlanCtx planCtx, EventSequence timeline) throws ImpalaException {
TQueryCtx queryCtx = planCtx.getQueryContext();
addPlannerToProfile(PLANNER);
LOG.info("Analyzing query: " + queryCtx.client_request.stmt + " db: "
+ queryCtx.session.database);
@@ -2763,7 +2803,6 @@ public class Frontend {
AnalysisResult analysisResult = analysisCtx.analyzeAndAuthorize(compilerFactory,
parsedStmt, stmtTableCache, authzChecker_.get(),
planCtx.compilationState_.disableAuthorization());
// need to re-fetch the parsedStatement because analysisResult can rewrite the
// statement.
parsedStmt = analysisResult.getParsedStmt();
@@ -3534,7 +3573,25 @@ public class Frontend {
}
}
private CompilerFactory getCompilerFactory(PlanCtx ctx) {
return new CompilerFactoryImpl();
@Nullable
private CompilerFactory getCalciteCompilerFactory(PlanCtx ctx) {
TQueryOptions queryOptions = ctx.getQueryContext().client_request.getQuery_options();
LOG.info("Searching for planner to use...");
if (queryOptions.isUse_calcite_planner()) {
try {
CompilerFactory calciteFactory = (CompilerFactory) Class.forName(
"org.apache.impala.calcite.service." +
"CalciteCompilerFactory").newInstance();
if (calciteFactory != null) {
LOG.info("Found Calcite Planner, using it.");
return calciteFactory;
} else {
LOG.info("Could not find Calcite planner, using original planner.");
}
} catch (Exception e) {
LOG.info("Could not find Calcite planner, using original planner: " + e);
}
}
return null;
}
}

View File

@@ -0,0 +1,155 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.calcite.service;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;
import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.config.CalciteConnectionConfigImpl;
import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.runtime.CalciteContextException;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql.validate.SqlValidatorUtil;
import org.apache.impala.analysis.AnalysisContext;
import org.apache.impala.analysis.AnalysisContext.AnalysisResult;
import org.apache.impala.analysis.AnalysisDriver;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.ParsedStatement;
import org.apache.impala.analysis.StmtMetadataLoader;
import org.apache.impala.analysis.StmtMetadataLoader.StmtTableCache;
import org.apache.impala.authorization.AuthorizationContext;
import org.apache.impala.authorization.AuthorizationFactory;
import org.apache.impala.calcite.operators.ImpalaOperatorTable;
import org.apache.impala.calcite.schema.ImpalaCalciteCatalogReader;
import org.apache.impala.calcite.type.ImpalaTypeSystemImpl;
import org.apache.impala.calcite.util.SimplifiedAnalyzer;
import org.apache.impala.calcite.validate.ImpalaConformance;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.planner.PlannerContext;
import org.apache.impala.planner.SingleNodePlannerIntf;
import org.apache.impala.thrift.TQueryCtx;
/**
* The CalciteAnalysisDriver is the implementation of AnalysisDriver which validates
* the AST produced by Calcite.
*/
public class CalciteAnalysisDriver implements AnalysisDriver {
public final TQueryCtx queryCtx_;
public final AuthorizationFactory authzFactory_;
public final CalciteParsedStatement parsedStmt_;
// Calcite AST
public SqlNode validatedNode_;
public RelDataTypeFactory typeFactory_;
public SqlValidator sqlValidator_;
// CalciteCatalogReader is a context class that holds global information that
// may be needed by the CalciteTable object
private CalciteCatalogReader reader_;
private final AnalysisContext ctx_;
private final StmtTableCache stmtTableCache_;
private final AuthorizationContext authzCtx_;
private final Analyzer analyzer_;
public CalciteAnalysisDriver(AnalysisContext ctx, ParsedStatement parsedStmt,
StmtTableCache stmtTableCache,
AuthorizationContext authzCtx) {
queryCtx_ = ctx.getQueryCtx();
authzFactory_ = ctx.getAuthzFactory();
parsedStmt_ = (CalciteParsedStatement) parsedStmt;
ctx_ = ctx;
stmtTableCache_ = stmtTableCache;
authzCtx_ = authzCtx;
analyzer_ = createAnalyzer(ctx, stmtTableCache, authzCtx);
}
@Override
public AnalysisResult analyze() {
try {
reader_ = CalciteMetadataHandler.createCalciteCatalogReader(stmtTableCache_,
queryCtx_, queryCtx_.session.database);
CalciteMetadataHandler.populateCalciteSchema(reader_, ctx_.getCatalog(),
parsedStmt_.getTablesInQuery(null));
typeFactory_ = new JavaTypeFactoryImpl(new ImpalaTypeSystemImpl());
sqlValidator_ = SqlValidatorUtil.newValidator(
ImpalaOperatorTable.getInstance(),
reader_, typeFactory_,
SqlValidator.Config.DEFAULT
// Impala requires identifier expansion (tpcds test queries fail
// without this)
.withIdentifierExpansion(true)
.withConformance(ImpalaConformance.INSTANCE)
);
validatedNode_ = sqlValidator_.validate(parsedStmt_.getParsedSqlNode());
return new CalciteAnalysisResult(this);
} catch (ImpalaException e) {
return new CalciteAnalysisResult(this, e);
} catch (CalciteContextException e) {
return new CalciteAnalysisResult(this,
new AnalysisException(e.getMessage(), e.getCause()));
}
}
public Analyzer createAnalyzer(AnalysisContext ctx, StmtTableCache stmtTableCache,
AuthorizationContext authzCtx) {
return new SimplifiedAnalyzer(stmtTableCache, ctx.getQueryCtx(),
ctx.getAuthzFactory(), null);
}
public RelDataTypeFactory getTypeFactory() {
return typeFactory_;
}
public CalciteCatalogReader getCatalogReader() {
return reader_;
}
public SqlValidator getSqlValidator() {
return sqlValidator_;
}
public SqlNode getValidatedNode() {
return validatedNode_;
}
public Analyzer getAnalyzer() {
return analyzer_;
}
public ParsedStatement getParsedStmt() {
return parsedStmt_;
}
}

View File

@@ -0,0 +1,74 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.calcite.service;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.impala.analysis.AnalysisContext.AnalysisResult;
import org.apache.impala.analysis.AnalysisDriver;
import org.apache.impala.common.ImpalaException;
/**
* CalciteAnalysisResult is an AnalysisResult with added analysis result
* members produced by the CalciteAnalyzer
*/
public class CalciteAnalysisResult extends AnalysisResult {
// Calcite AST
private final SqlNode validatedNode_;
private final RelDataTypeFactory typeFactory_;
private final SqlValidator sqlValidator_;
// CalciteCatalogReader is a context class that holds global information that
// may be needed by the CalciteTable object
private final CalciteCatalogReader reader_;
public CalciteAnalysisResult(CalciteAnalysisDriver analysisDriver) {
this(analysisDriver, null);
}
public CalciteAnalysisResult(CalciteAnalysisDriver analysisDriver,
ImpalaException e) {
super(analysisDriver.getParsedStmt(), analysisDriver.getAnalyzer(), e);
validatedNode_ = analysisDriver.getValidatedNode();
typeFactory_ = analysisDriver.getTypeFactory();
sqlValidator_ = analysisDriver.getSqlValidator();
reader_ = analysisDriver.getCatalogReader();
}
public CalciteCatalogReader getCatalogReader() {
return reader_;
}
public SqlValidator getSqlValidator() {
return sqlValidator_;
}
public SqlNode getValidatedNode() {
return validatedNode_;
}
public RelDataTypeFactory getTypeFactory() {
return typeFactory_;
}
}

View File

@@ -0,0 +1,102 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.calcite.service;
import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider;
import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.impala.analysis.AnalysisContext;
import org.apache.impala.analysis.AnalysisDriver;
import org.apache.impala.analysis.ParsedStatement;
import org.apache.impala.analysis.StmtMetadataLoader.StmtTableCache;
import org.apache.impala.authorization.AuthorizationContext;
import org.apache.impala.calcite.operators.ImpalaOperatorTable;
import org.apache.impala.catalog.BuiltinsDb;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.UnsupportedFeatureException;
import org.apache.impala.planner.SingleNodePlannerIntf;
import org.apache.impala.planner.PlannerContext;
import org.apache.impala.service.CompilerFactory;
import org.apache.impala.thrift.TQueryCtx;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Factory implementation which creates Compiler implementation classes.
*/
public class CalciteCompilerFactory implements CompilerFactory {
protected static final Logger LOG =
LoggerFactory.getLogger(CalciteCompilerFactory.class.getName());
static {
System.setProperty("calcite.default.charset", "UTF8");
}
private static final String PLANNER = "CalcitePlanner";
static {
ImpalaOperatorTable.create(BuiltinsDb.getInstance());
}
public ParsedStatement createParsedStatement(TQueryCtx queryCtx)
throws ImpalaException {
// check that all options are supported, throws Exception if not.
checkOptionSupportedInCalcite(queryCtx);
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
// Needed for Calcite's JaninoRelMetadataProvider
RelMetadataQuery.THREAD_PROVIDERS.set(
JaninoRelMetadataProvider.of(DefaultRelMetadataProvider.INSTANCE));
return new CalciteParsedStatement(queryCtx);
}
public AnalysisDriver createAnalysisDriver(AnalysisContext ctx,
ParsedStatement parsedStmt, StmtTableCache stmtTableCache,
AuthorizationContext authzCtx) throws AnalysisException {
return new CalciteAnalysisDriver(ctx, parsedStmt, stmtTableCache, authzCtx);
}
public SingleNodePlannerIntf createSingleNodePlanner(PlannerContext ctx)
throws ImpalaException {
return new CalciteSingleNodePlanner(ctx);
}
public String getPlannerString() {
return PLANNER;
}
public static void checkOptionSupportedInCalcite(TQueryCtx queryCtx)
throws ImpalaException {
// IMPALA-13530
if (!queryCtx.getClient_request().getQuery_options().isDecimal_v2()) {
throw new UnsupportedFeatureException("Decimal v1 not supported in Calcite, " +
"falling back to Impala compiler.");
}
// IMPALA-13529
if (queryCtx.getClient_request().getQuery_options()
.isAppx_count_distinct()) {
throw new UnsupportedFeatureException("Approximate count distinct is not " +
"supported in Calcite, falling back to Impala compiler.");
}
}
}

View File

@@ -107,7 +107,9 @@ public class CalciteJniFrontend extends JniFrontend {
CalciteMetadataHandler mdHandler = null;
if (!optionSupportedInCalcite(queryCtx)) {
try {
CalciteCompilerFactory.checkOptionSupportedInCalcite(queryCtx.getTQueryCtx());
} catch (UnsupportedFeatureException e) {
return runThroughOriginalPlanner(thriftQueryContext, queryCtx);
}
@@ -200,20 +202,6 @@ public class CalciteJniFrontend extends JniFrontend {
}
}
private boolean optionSupportedInCalcite(QueryContext queryCtx) {
// IMPALA-13530
if (!queryCtx.getTQueryCtx().getClient_request().getQuery_options().isDecimal_v2()) {
return false;
}
// IMPALA-13529
if (queryCtx.getTQueryCtx().getClient_request().getQuery_options()
.isAppx_count_distinct()) {
return false;
}
return true;
}
private static void loadCalciteImpalaFunctions() {
ImpalaOperatorTable.create(BuiltinsDb.getInstance());
}

View File

@@ -47,6 +47,8 @@ import org.apache.impala.catalog.FeView;
import org.apache.impala.catalog.HdfsTable;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.UnsupportedFeatureException;
import org.apache.impala.thrift.TQueryCtx;
import com.google.common.collect.Sets;
import java.util.ArrayList;
@@ -92,7 +94,8 @@ public class CalciteMetadataHandler implements CompilerStep {
// load the relevant tables in the query from catalogd
this.stmtTableCache_ = stmtMetadataLoader.loadTables(tableVisitor.tableNames_);
this.reader_ = createCalciteCatalogReader(queryCtx, stmtTableCache_);
this.reader_ = createCalciteCatalogReader(stmtTableCache_,
queryCtx.getTQueryCtx(), queryCtx.getCurrentDb());
// populate calcite schema. This step needs to be done after the loader because the
// schema needs to contain the columns in the table for validation, which cannot
@@ -108,24 +111,24 @@ public class CalciteMetadataHandler implements CompilerStep {
* Since the individual Table objects have reference to the Schema, this also serves
* as a way to give the tables Context information about the general query.
*/
private CalciteCatalogReader createCalciteCatalogReader(
CalciteJniFrontend.QueryContext queryCtx,
StmtMetadataLoader.StmtTableCache stmtTableCache) {
public static CalciteCatalogReader createCalciteCatalogReader(
StmtMetadataLoader.StmtTableCache stmtTableCache, TQueryCtx queryCtx,
String database) {
RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl(new ImpalaTypeSystemImpl());
Properties props = new Properties();
props.setProperty(CalciteConnectionProperty.CASE_SENSITIVE.camelName(), "false");
CalciteConnectionConfig config = new CalciteConnectionConfigImpl(props);
CalciteSchema rootSchema = CalciteSchema.createRootSchema(true);
return new ImpalaCalciteCatalogReader(rootSchema,
Collections.singletonList(queryCtx.getCurrentDb()),
typeFactory, config, queryCtx.getTQueryCtx(), stmtTableCache);
Collections.singletonList(database),
typeFactory, config, queryCtx, stmtTableCache);
}
/**
* Populate the CalciteSchema with tables being used by this query. Returns a
* list of tables in the query that are not found in the database.
*/
private static List<String> populateCalciteSchema(CalciteCatalogReader reader,
public static List<String> populateCalciteSchema(CalciteCatalogReader reader,
FeCatalog catalog, Set<TableName> tableNames) throws ImpalaException {
List<String> notFoundTables = new ArrayList<>();
CalciteSchema rootSchema = reader.getRootSchema();
@@ -178,7 +181,7 @@ public class CalciteMetadataHandler implements CompilerStep {
* TableVisitor walks through the AST and places all the tables into
* tableNames
*/
private static class TableVisitor extends SqlBasicVisitor<Void> {
public static class TableVisitor extends SqlBasicVisitor<Void> {
private final String currentDb_;
public final Set<TableName> tableNames_ = new HashSet<>();

View File

@@ -24,6 +24,7 @@ import org.apache.calcite.plan.hep.HepMatchOrder;
import org.apache.calcite.plan.hep.HepPlanner;
import org.apache.calcite.plan.hep.HepProgram;
import org.apache.calcite.plan.hep.HepProgramBuilder;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.RelFactories;
import org.apache.calcite.rel.rules.CoreRules;
@@ -52,15 +53,19 @@ public class CalciteOptimizer implements CompilerStep {
protected static final Logger LOG =
LoggerFactory.getLogger(CalciteOptimizer.class.getName());
private final CalciteValidator validator_;
private final CalciteCatalogReader reader_;
public CalciteOptimizer(CalciteAnalysisResult analysisResult) {
this.reader_ = analysisResult.getCatalogReader();
}
public CalciteOptimizer(CalciteValidator validator) {
this.validator_ = validator;
this.reader_ = validator.getCatalogReader();
}
public ImpalaPlanRel optimize(RelNode logPlan) throws ImpalaException {
RelBuilder relBuilder = RelFactories.LOGICAL_BUILDER.create(logPlan.getCluster(),
validator_.getCatalogReader());
reader_);
// Run some essential rules needed to create working RelNodes before doing
// optimization

View File

@@ -0,0 +1,88 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.calcite.service;
import java.util.Set;
import org.apache.calcite.sql.SqlExplain;
import org.apache.calcite.sql.SqlNode;
import org.apache.impala.analysis.AnalysisContext;
import org.apache.impala.analysis.AnalysisDriver;
import org.apache.impala.analysis.ParsedStatement;
import org.apache.impala.analysis.TableName;
import org.apache.impala.analysis.StmtMetadataLoader;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.thrift.TQueryCtx;
/**
* Implemntation of ParsedStatement hook that holds the AST that
* is parsed from the sql String.
*/
public class CalciteParsedStatement implements ParsedStatement {
private final SqlNode parsedNode_;
private final boolean isExplain_;
private final String sql_;
private final Set<TableName> tableNames_;
public CalciteParsedStatement(TQueryCtx queryCtx) throws ImpalaException {
sql_ = queryCtx.client_request.stmt;
CalciteQueryParser queryParser = new CalciteQueryParser(sql_);
SqlNode parsedSqlNode = queryParser.parse();
isExplain_ = parsedSqlNode instanceof SqlExplain;
if (isExplain_) {
parsedSqlNode = ((SqlExplain) parsedSqlNode).getExplicandum();
}
parsedNode_ = parsedSqlNode;
CalciteMetadataHandler.TableVisitor tableVisitor =
new CalciteMetadataHandler.TableVisitor(queryCtx.session.database);
parsedNode_.accept(tableVisitor);
tableNames_ = tableVisitor.tableNames_;
}
@Override
public Set<TableName> getTablesInQuery(StmtMetadataLoader loader) {
return tableNames_;
}
@Override
public Object getTopLevelNode() {
return parsedNode_;
}
@Override
public boolean isExplain() {
return isExplain_;
}
@Override
public boolean isQueryStmt() {
return true;
}
@Override
public String toSql() {
return sql_;
}
public SqlNode getParsedSqlNode() {
return parsedNode_;
}
}

View File

@@ -40,22 +40,25 @@ public class CalcitePhysPlanCreator implements CompilerStep {
protected static final Logger LOG =
LoggerFactory.getLogger(CalcitePhysPlanCreator.class.getName());
private final CalciteJniFrontend.QueryContext queryCtx_;
private final Analyzer analyzer_;
private final PlannerContext plannerContext_;
public CalcitePhysPlanCreator(Analyzer analyzer, PlannerContext ctx) {
analyzer_ = analyzer;
plannerContext_ = ctx;
}
public CalcitePhysPlanCreator(CalciteMetadataHandler mdHandler,
CalciteJniFrontend.QueryContext queryCtx) throws ImpalaException {
this.queryCtx_ = queryCtx;
// TODO: IMPALA-13011: Awkward call for authorization here. Authorization
// will be done at validation time, but this is needed here for the Analyzer
// instantiation.
AuthorizationFactory authzFactory =
AuthorizationUtil.authzFactoryFrom(BackendConfig.INSTANCE);
this.analyzer_ = new SimplifiedAnalyzer(mdHandler.getStmtTableCache(),
queryCtx_.getTQueryCtx(), authzFactory, null);
queryCtx.getTQueryCtx(), authzFactory, null);
this.plannerContext_ =
new PlannerContext(analyzer_, queryCtx_.getTQueryCtx(), queryCtx_.getTimeline());
new PlannerContext(analyzer_, queryCtx.getTQueryCtx(), queryCtx.getTimeline());
}

View File

@@ -36,16 +36,20 @@ public class CalciteQueryParser implements CompilerStep {
protected static final Logger LOG =
LoggerFactory.getLogger(CalciteQueryParser.class.getName());
private final CalciteJniFrontend.QueryContext queryCtx_;
private final String sqlStmt_;
public CalciteQueryParser(CalciteJniFrontend.QueryContext queryCtx) {
this.queryCtx_ = queryCtx;
this.sqlStmt_ = queryCtx.getStmt();
}
public CalciteQueryParser(String stmt) {
this.sqlStmt_ = stmt;
}
public SqlNode parse() throws ParseException {
try {
// Create an SQL parser
SqlParser parser = SqlParser.create(queryCtx_.getStmt(),
SqlParser parser = SqlParser.create(sqlStmt_,
SqlParser.config().withParserFactory(ImpalaSqlParserImpl.FACTORY)
.withConformance(ImpalaConformance.INSTANCE)
.withQuoting(Quoting.BACK_TICK_BACKSLASH)

View File

@@ -17,6 +17,7 @@
package org.apache.impala.calcite.service;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.ConventionTraitDef;
import org.apache.calcite.plan.RelOptCluster;
@@ -43,6 +44,9 @@ import org.apache.impala.calcite.operators.ImpalaConvertletTable;
import org.apache.calcite.sql2rel.StandardConvertletTable;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.prepare.CalciteCatalogReader;
import java.util.List;
import org.slf4j.Logger;
@@ -59,25 +63,41 @@ public class CalciteRelNodeConverter implements CompilerStep {
private static final RelOptTable.ViewExpander NOOP_EXPANDER =
(type, query, schema, path) -> null;
private final CalciteValidator validator_;
private final RelOptCluster cluster_;
private final RelOptPlanner planner_;
public CalciteRelNodeConverter(CalciteValidator validator) {
this.validator_ = validator;
private final RelDataTypeFactory typeFactory_;
private final SqlValidator sqlValidator_;
private final CalciteCatalogReader reader_;
public CalciteRelNodeConverter(CalciteAnalysisResult analysisResult) {
this.typeFactory_ = analysisResult.getTypeFactory();
this.reader_ = analysisResult.getCatalogReader();
this.sqlValidator_ = analysisResult.getSqlValidator();
this.planner_ = new VolcanoPlanner();
planner_.addRelTraitDef(ConventionTraitDef.INSTANCE);
cluster_ =
RelOptCluster.create(planner_, new RexBuilder(validator_.getTypeFactory()));
RelOptCluster.create(planner_, new RexBuilder(typeFactory_));
}
public CalciteRelNodeConverter(CalciteValidator validator) {
this.typeFactory_ = validator.getTypeFactory();
this.reader_ = validator.getCatalogReader();
this.sqlValidator_ = validator.getSqlValidator();
this.planner_ = new VolcanoPlanner();
planner_.addRelTraitDef(ConventionTraitDef.INSTANCE);
cluster_ =
RelOptCluster.create(planner_, new RexBuilder(typeFactory_));
}
public RelNode convert(SqlNode validatedNode) {
SqlToRelConverter relConverter = new SqlToRelConverter(
NOOP_EXPANDER,
validator_.getSqlValidator(),
validator_.getCatalogReader(),
sqlValidator_,
reader_,
cluster_,
ImpalaConvertletTable.INSTANCE,
SqlToRelConverter.config().withCreateValuesRel(false));
@@ -98,7 +118,7 @@ public class CalciteRelNodeConverter implements CompilerStep {
logDebug(subQueryRemovedPlan);
RelBuilder relBuilder = RelFactories.LOGICAL_BUILDER.create(cluster_,
validator_.getCatalogReader());
reader_);
RelNode decorrelatedPlan =
RelDecorrelator.decorrelateQuery(subQueryRemovedPlan, relBuilder);
@@ -110,10 +130,6 @@ public class CalciteRelNodeConverter implements CompilerStep {
return cluster_;
}
public CalciteValidator getValidator() {
return validator_;
}
@Override
public void logDebug(Object resultObject) {
if (!(resultObject instanceof RelNode)) {

View File

@@ -0,0 +1,110 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.calcite.service;
import java.util.ArrayList;
import java.util.List;
import org.apache.calcite.rel.RelNode;
import org.apache.impala.analysis.AnalysisDriver;
import org.apache.impala.analysis.ExprSubstitutionMap;
import org.apache.impala.analysis.ParsedStatement;
import org.apache.impala.calcite.rel.node.ImpalaPlanRel;
import org.apache.impala.calcite.rel.node.NodeWithExprs;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.planner.DataSink;
import org.apache.impala.planner.PlannerContext;
import org.apache.impala.planner.PlanNode;
import org.apache.impala.planner.PlanRootSink;
import org.apache.impala.planner.SingleNodePlannerIntf;
import org.apache.impala.thrift.TColumn;
import org.apache.impala.thrift.TResultSetMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implementation of the SingleNodePlannerIntf which returns a PlanNode
* to the Impala framework and provides information needed by the
* framework after planning.
*/
public class CalciteSingleNodePlanner implements SingleNodePlannerIntf {
protected static final Logger LOG =
LoggerFactory.getLogger(CalciteSingleNodePlanner.class.getName());
private final PlannerContext ctx_;
private final CalciteAnalysisResult analysisResult_;
private NodeWithExprs rootNode_;
public CalciteSingleNodePlanner(PlannerContext ctx) {
ctx_ = ctx;
analysisResult_ = (CalciteAnalysisResult) ctx.getAnalysisResult();
}
public PlanNode createSingleNodePlan() throws ImpalaException {
// Convert the query to RelNodes which can be optimized
CalciteRelNodeConverter relNodeConverter =
new CalciteRelNodeConverter(analysisResult_);
RelNode logicalPlan = relNodeConverter.convert(analysisResult_.getValidatedNode());
// Optimize the query
CalciteOptimizer optimizer = new CalciteOptimizer(analysisResult_);
ImpalaPlanRel optimizedPlan = optimizer.optimize(logicalPlan);
// Create Physical Impala PlanNodes
CalcitePhysPlanCreator physPlanCreator =
new CalcitePhysPlanCreator(analysisResult_.getAnalyzer(), ctx_);
rootNode_ = physPlanCreator.create(optimizedPlan);
analysisResult_.getAnalyzer().computeValueTransferGraph();
return rootNode_.planNode_;
}
/**
* Creates the DataSink needed by the framework. Only the original planner
* requires the substition map passed in.
*/
@Override
public DataSink createDataSink(ExprSubstitutionMap rootNodeSmap) {
return new PlanRootSink(rootNode_.outputExprs_);
}
@Override
public List<String> getColLabels() {
List<String> colLabels = new ArrayList<>();
int colCnt = rootNode_.outputExprs_.size();
for (int i = 0; i < colCnt; ++i) {
colLabels.add(rootNode_.outputExprs_.get(i).toString());
}
return colLabels;
}
@Override
public TResultSetMetadata getTResultSetMetadata(ParsedStatement parsedStmt) {
TResultSetMetadata metadata = new TResultSetMetadata();
int colCnt = rootNode_.outputExprs_.size();
for (int i = 0; i < colCnt; ++i) {
TColumn colDesc = new TColumn(rootNode_.outputExprs_.get(i).toString(),
rootNode_.outputExprs_.get(i).getType().toThrift());
metadata.addToColumns(colDesc);
}
return metadata;
}
}

View File

@@ -20,6 +20,7 @@ import logging
import pytest
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.test_dimensions import (add_mandatory_exec_option)
LOG = logging.getLogger(__name__)
@@ -30,7 +31,12 @@ class TestCalcitePlanner(CustomClusterTestSuite):
def setup_class(cls):
super(TestCalcitePlanner, cls).setup_class()
@classmethod
def add_test_dimensions(cls):
super(TestCalcitePlanner, cls).add_test_dimensions()
add_mandatory_exec_option(cls, 'use_calcite_planner', 'true')
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(start_args="--use_calcite_planner=true")
@CustomClusterTestSuite.with_args(start_args="--env_vars=USE_CALCITE_PLANNER=true")
def test_calcite_frontend(self, vector, unique_database):
self.run_test_case('QueryTest/calcite', vector, use_db=unique_database)