From b39cd79ae84c415e0aebec2c2b4d7690d2a0cc7a Mon Sep 17 00:00:00 2001 From: Steve Carlin Date: Tue, 5 Mar 2024 16:58:30 -0800 Subject: [PATCH] IMPALA-12872: Use Calcite for optimization - part 1: simple queries This is the first commit to use the Calcite library to parse, analyze, and optimize queries. The hook for the planner is through an override of the JniFrontend. The CalciteJniFrontend class is the driver that walks through each of the Calcite steps which are as follows: CalciteQueryParser: Takes the string query and outputs an AST in the form of Calcite's SqlNode object. CalciteMetadataHandler: Iterate through the SqlNode from the previous step and make sure all essential table metadata is retrieved from catalogd. CalciteValidator: Validate the SqlNode tree, akin to the Impala Analyzer. CalciteRelNodeConverter: Change the AST into a logical plan. In this first commit, the only logical nodes used are LogicalTableScan and LogicalProject. The LogicalTableScan will serve as the node that reads from an Hdfs Table and the LogicalProject will only project out the used columns in the query. In later versions, the LogicalProject will also handle function changes. CalciteOptimizer: This step is to optimize the query. In this cut, it will be a nop, but in later versions, it will perform logical optimizations via Calcite's rule mechanism. CalcitePhysPlanCreator: Converts the Calcite RelNode logical tree into Impala's PlanNode physical tree ExecRequestCreator: Implement the existing Impala steps that turn a Single Node Plan into a Distributed Plan. It will also create the TExecRequest object needed by the runtime server. Only some very basic queries will work with this commit. These include: select * from tbl <-- only needs the LogicalTableScan select c1 from tbl <-- Also uses the LogicalProject In the CalciteJniFrontend, there is some basic checks to make sure only select statements will get processed. Any non-query statement will revert back to the current Impala planner. In this iteration, any queries besides the minimal ones listed above will result in a caught exception which will then be run through the current Impala planner. The tests that do work can be found in calcite.test and run through the custom cluster test test_experimental_planner.py This iteration should support all types with the exception of complex types. Calcite does not have a STRING type, so the string type is represented as VARCHAR(MAXINT) similar to how Hive represents their STRING type. The ImpalaTypeConverter file is used to convert the Impala Type object to corresponding Calcite objects. Authorization is not yet working with this current commit. A Jira has been filed (IMPALA-13011) to deal with this. Change-Id: I453fd75b7b705f4d7de1ed73c3e24cafad0b8c98 Reviewed-on: http://gerrit.cloudera.org:8080/21109 Tested-by: Impala Public Jenkins Reviewed-by: Joe McDonnell --- bin/set-classpath.sh | 16 + bin/start-impala-cluster.py | 14 +- .../org/apache/impala/analysis/TableName.java | 3 +- .../apache/impala/planner/PlannerContext.java | 9 +- .../org/apache/impala/service/Frontend.java | 13 + .../apache/impala/service/JniFrontend.java | 4 + java/calcite-planner/pom.xml | 108 +++++ .../rel/node/ConvertToImpalaRelRules.java | 59 +++ .../calcite/rel/node/ImpalaHdfsScanRel.java | 125 +++++ .../calcite/rel/node/ImpalaPlanRel.java | 34 ++ .../calcite/rel/node/ImpalaProjectRel.java | 112 +++++ .../calcite/rel/node/NodeWithExprs.java | 39 ++ .../rel/node/ParentPlanRelContext.java | 62 +++ .../calcite/rel/phys/ImpalaHdfsScanNode.java | 53 +++ .../calcite/rel/util/CreateExprVisitor.java | 125 +++++ .../impala/calcite/schema/CalciteDb.java | 62 +++ .../impala/calcite/schema/CalciteTable.java | 209 +++++++++ .../schema/ImpalaCalciteCatalogReader.java | 48 ++ .../calcite/service/CalciteJniFrontend.java | 241 ++++++++++ .../service/CalciteMetadataHandler.java | 224 +++++++++ .../calcite/service/CalciteOptimizer.java | 89 ++++ .../service/CalcitePhysPlanCreator.java | 99 ++++ .../calcite/service/CalciteQueryParser.java | 57 +++ .../service/CalciteRelNodeConverter.java | 96 ++++ .../calcite/service/CalciteValidator.java | 88 ++++ .../impala/calcite/service/CompilerStep.java | 26 ++ .../calcite/service/ExecRequestCreator.java | 427 ++++++++++++++++++ .../calcite/type/ImpalaTypeConverter.java | 166 +++++++ .../calcite/type/ImpalaTypeSystemImpl.java | 185 ++++++++ .../calcite/validate/ImpalaConformance.java | 156 +++++++ java/pom.xml | 1 + .../queries/QueryTest/calcite.test | 117 +++++ tests/custom_cluster/test_calcite_planner.py | 40 ++ tests/util/workload_management.py | 2 +- 34 files changed, 3103 insertions(+), 6 deletions(-) create mode 100644 java/calcite-planner/pom.xml create mode 100644 java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ConvertToImpalaRelRules.java create mode 100644 java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaHdfsScanRel.java create mode 100644 java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaPlanRel.java create mode 100644 java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaProjectRel.java create mode 100644 java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/NodeWithExprs.java create mode 100644 java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ParentPlanRelContext.java create mode 100644 java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/phys/ImpalaHdfsScanNode.java create mode 100644 java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/util/CreateExprVisitor.java create mode 100644 java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteDb.java create mode 100644 java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteTable.java create mode 100644 java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/ImpalaCalciteCatalogReader.java create mode 100644 java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteJniFrontend.java create mode 100644 java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteMetadataHandler.java create mode 100644 java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteOptimizer.java create mode 100644 java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalcitePhysPlanCreator.java create mode 100644 java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteQueryParser.java create mode 100644 java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteRelNodeConverter.java create mode 100644 java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteValidator.java create mode 100644 java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CompilerStep.java create mode 100644 java/calcite-planner/src/main/java/org/apache/impala/calcite/service/ExecRequestCreator.java create mode 100644 java/calcite-planner/src/main/java/org/apache/impala/calcite/type/ImpalaTypeConverter.java create mode 100644 java/calcite-planner/src/main/java/org/apache/impala/calcite/type/ImpalaTypeSystemImpl.java create mode 100644 java/calcite-planner/src/main/java/org/apache/impala/calcite/validate/ImpalaConformance.java create mode 100644 testdata/workloads/functional-query/queries/QueryTest/calcite.test create mode 100644 tests/custom_cluster/test_calcite_planner.py diff --git a/bin/set-classpath.sh b/bin/set-classpath.sh index bbdc214bd..abfe54e43 100644 --- a/bin/set-classpath.sh +++ b/bin/set-classpath.sh @@ -48,6 +48,22 @@ fi CLASSPATH=$(cat $FE_CP_FILE):"$CLASSPATH" +# Currently the Calcite planner is experimental and not included by default. +# If the USE_CALCITE_PLANNER is explicitly set, then the jar dependencies +# are added to the CLASSPATH +USE_CALCITE_PLANNER=${USE_CALCITE_PLANNER:-false} +if [ "true" = "$USE_CALCITE_PLANNER" ]; then + + CALCITE_CP_FILE="$IMPALA_HOME/java/calcite-planner/target/calcite-build-classpath.txt" + if [ ! -s "$CALCITE_CP_FILE" ]; then + >&2 echo Calcite front-end classpath file $CALCITE_CP_FILE missing. + >&2 echo Build the Calcite front-end first. + return 1 + fi + CLASSPATH="$CLASSPATH":$(cat $CALCITE_CP_FILE):\ +"$IMPALA_HOME"/java/calcite-planner/target/calcite-planner-${IMPALA_VERSION}.jar: +fi + if [[ "${1:-notest}" = "test" ]]; then FE_TEST_CP_FILE="$IMPALA_HOME/fe/target/test-classpath.txt" CLASSPATH=$(cat $FE_TEST_CP_FILE):"$CLASSPATH" diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py index 5efe2188d..9d2ad99af 100755 --- a/bin/start-impala-cluster.py +++ b/bin/start-impala-cluster.py @@ -163,7 +163,7 @@ parser.add_option("--enable_catalogd_ha", dest="enable_catalogd_ha", help="If true, enables CatalogD HA - the cluster will be launched " "with two catalogd instances as Active-Passive HA pair.") parser.add_option("--jni_frontend_class", dest="jni_frontend_class", - action="store", default="org/apache/impala/service/JniFrontend", + action="store", default="", help="Use a custom java frontend interface.") parser.add_option("--enable_statestored_ha", dest="enable_statestored_ha", action="store_true", default=False, @@ -187,6 +187,10 @@ parser.add_option("--tuple_cache_capacity", dest="tuple_cache_capacity", parser.add_option("--tuple_cache_eviction_policy", dest="tuple_cache_eviction_policy", default="LRU", help="This specifies the cache eviction policy to use " "for the tuple cache.") +parser.add_option("--use_calcite_planner", default="False", type="choice", + choices=["true", "True", "false", "False"], + help="If true, use the Calcite planner for query optimization " + "instead of the Impala planner") # For testing: list of comma-separated delays, in milliseconds, that delay impalad catalog # replica initialization. The ith delay is applied to the ith impalad. @@ -649,7 +653,7 @@ def build_impalad_arg_lists(cluster_size, num_coordinators, use_exclusive_coordi args = "{args} -geospatial_library={geospatial_library}".format( args=args, geospatial_library=options.geospatial_library) - if options.jni_frontend_class: + if options.jni_frontend_class != "": args = "-jni_frontend_class={jni_frontend_class} {args}".format( jni_frontend_class=options.jni_frontend_class, args=args) @@ -658,6 +662,12 @@ def build_impalad_arg_lists(cluster_size, num_coordinators, use_exclusive_coordi else: args = "-allow_tuple_caching=true {args}".format(args=args) + if options.use_calcite_planner.lower() == 'true': + args = "-jni_frontend_class={jni_frontend_class} {args}".format( + jni_frontend_class="org/apache/impala/calcite/service/CalciteJniFrontend", + args=args) + os.environ["USE_CALCITE_PLANNER"] = "true" + # Appended at the end so they can override previous args. if i < len(per_impalad_args): args = "{args} {per_impalad_args}".format( diff --git a/fe/src/main/java/org/apache/impala/analysis/TableName.java b/fe/src/main/java/org/apache/impala/analysis/TableName.java index 1f0efc67e..864b48577 100644 --- a/fe/src/main/java/org/apache/impala/analysis/TableName.java +++ b/fe/src/main/java/org/apache/impala/analysis/TableName.java @@ -66,9 +66,8 @@ public class TableName { // Avoid "db1." and ".tbl1" being treated as the same. We resolve ".tbl1" as // "default.tbl1". But we reject "db1." since it only gives the database name. if (fullName == null || fullName.trim().endsWith(".")) return null; - // TODO: upgrade Guava to 15+ to use splitToList instead List parts = Lists.newArrayList(Splitter.on('.').trimResults() - .omitEmptyStrings().split(fullName.toLowerCase())); + .omitEmptyStrings().splitToList(fullName.toLowerCase())); if (parts.size() == 1) { return new TableName(Catalog.DEFAULT_DB, parts.get(0)); } diff --git a/fe/src/main/java/org/apache/impala/planner/PlannerContext.java b/fe/src/main/java/org/apache/impala/planner/PlannerContext.java index a54e3ae4d..0102def36 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlannerContext.java +++ b/fe/src/main/java/org/apache/impala/planner/PlannerContext.java @@ -68,6 +68,7 @@ public class PlannerContext { private final EventSequence timeline_; private final TQueryCtx queryCtx_; private final QueryStmt queryStmt_; + private final Analyzer rootAnalyzer_; public PlannerContext (AnalysisResult analysisResult, TQueryCtx queryCtx, EventSequence timeline) { @@ -85,14 +86,20 @@ public class PlannerContext { } else { queryStmt_ = analysisResult.getQueryStmt(); } + rootAnalyzer_ = analysisResult.getAnalyzer(); } // Constructor useful for an external planner module public PlannerContext(TQueryCtx queryCtx, EventSequence timeline) { + this((Analyzer) null, queryCtx, timeline); + } + + public PlannerContext(Analyzer analyzer, TQueryCtx queryCtx, EventSequence timeline) { queryCtx_ = queryCtx; timeline_ = timeline; analysisResult_ = null; queryStmt_ = null; + rootAnalyzer_ = analyzer; } public QueryStmt getQueryStmt() { return queryStmt_; } @@ -100,7 +107,7 @@ public class PlannerContext { public TQueryOptions getQueryOptions() { return getRootAnalyzer().getQueryOptions(); } public AnalysisResult getAnalysisResult() { return analysisResult_; } public EventSequence getTimeline() { return timeline_; } - public Analyzer getRootAnalyzer() { return analysisResult_.getAnalyzer(); } + public Analyzer getRootAnalyzer() { return rootAnalyzer_; } public boolean isSingleNodeExec() { return getQueryOptions().num_nodes == 1; } public PlanNodeId getNextNodeId() { return nodeIdGenerator_.getNextId(); } public PlanFragmentId getNextFragmentId() { return fragmentIdGenerator_.getNextId(); } diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index cd7363ebb..c82c269a7 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -273,6 +273,12 @@ public class Frontend { private static final String AVG_ADMISSION_SLOTS_PER_EXECUTOR = "AvgAdmissionSlotsPerExecutor"; + // info about the planner used. In this code, we will always use the Original planner, + // but other planners may set their own planner values + public static final String PLANNER_PROFILE = "PlannerInfo"; + public static final String PLANNER_TYPE = "PlannerType"; + private static final String PLANNER = "OriginalPlanner"; + /** * Plan-time context that allows capturing various artifacts created * during the process. @@ -2135,6 +2141,7 @@ public class Frontend { private TExecRequest getTExecRequest(PlanCtx planCtx, EventSequence timeline) throws ImpalaException { TQueryCtx queryCtx = planCtx.getQueryContext(); + addPlannerToProfile(PLANNER); LOG.info("Analyzing query: " + queryCtx.client_request.stmt + " db: " + queryCtx.session.database); @@ -2500,6 +2507,12 @@ public class Frontend { } } + public static void addPlannerToProfile(String planner) { + TRuntimeProfileNode profile = createTRuntimeProfileNode(PLANNER_PROFILE); + addInfoString(profile, PLANNER_TYPE, planner); + FrontendProfile.getCurrent().addChildrenProfile(profile); + } + private TExecRequest doCreateExecRequest(PlanCtx planCtx, EventSequence timeline) throws ImpalaException { TQueryCtx queryCtx = planCtx.getQueryContext(); diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java b/fe/src/main/java/org/apache/impala/service/JniFrontend.java index 5f08f0ab3..31caa3bf4 100644 --- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java +++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java @@ -1046,4 +1046,8 @@ public class JniFrontend { } return ""; } + + public Frontend getFrontend() { + return frontend_; + } } diff --git a/java/calcite-planner/pom.xml b/java/calcite-planner/pom.xml new file mode 100644 index 000000000..9ec53d8b6 --- /dev/null +++ b/java/calcite-planner/pom.xml @@ -0,0 +1,108 @@ + + + + + org.apache.impala + impala-parent + 4.4.0-SNAPSHOT + + 4.0.0 + + calcite-planner + 4.4.0-SNAPSHOT + jar + + calcite-planner + + + + org.apache.impala + impala-frontend + 4.4.0-SNAPSHOT + + + org.apache.calcite + calcite-core + 1.36.0 + + + org.apache.calcite.avatica + avatica-core + 1.23.0 + + + + + + + + org.apache.maven.plugins + maven-resources-plugin + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.11.0 + + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-dependency-plugin + 3.5.0 + + + copy-dependencies + package + + copy-dependencies + + + pom + runtime + true + + + + + write-classpath + + build-classpath + + + ${project.build.directory}/calcite-build-classpath.txt + runtime + pom + + + + + + + diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ConvertToImpalaRelRules.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ConvertToImpalaRelRules.java new file mode 100644 index 000000000..fa6390d9a --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ConvertToImpalaRelRules.java @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.rel.node; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rel.logical.LogicalTableScan; + +/** + * ConvertToImpalaRelRules. Contains the rules used to change the Calcite RelNodes + * to Impala RelNodes. These Impala RelNodes are responsible for creating the + * physical PlanNode plan. The Calcite RelNode and Impala RelNodes map one to one + * with each other. + */ +public class ConvertToImpalaRelRules { + + public static class ImpalaProjectRule extends RelOptRule { + public ImpalaProjectRule() { + super(operand(LogicalProject.class, any())); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final LogicalProject project = call.rel(0); + call.transformTo(new ImpalaProjectRel(project)); + } + } + + public static class ImpalaScanRule extends RelOptRule { + + public ImpalaScanRule() { + super(operand(LogicalTableScan.class, none())); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final LogicalTableScan scan = call.rel(0); + call.transformTo(new ImpalaHdfsScanRel(scan)); + } + } + +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaHdfsScanRel.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaHdfsScanRel.java new file mode 100644 index 000000000..70fabba3a --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaHdfsScanRel.java @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.rel.node; + +import com.google.common.base.Preconditions; + +import org.apache.calcite.rel.core.TableScan; +import org.apache.impala.analysis.BaseTableRef; +import org.apache.impala.analysis.Expr; +import org.apache.impala.analysis.SlotDescriptor; +import org.apache.impala.analysis.SlotRef; +import org.apache.impala.analysis.TupleDescriptor; +import org.apache.impala.calcite.rel.phys.ImpalaHdfsScanNode; +import org.apache.impala.calcite.schema.CalciteTable; +import org.apache.impala.catalog.FeFsPartition; +import org.apache.impala.catalog.HdfsTable; +import org.apache.impala.common.ImpalaException; +import org.apache.impala.planner.PlanNode; +import org.apache.impala.planner.PlanNodeId; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * ImpalaHdfsScanRel. Calcite RelNode which maps to an Impala TableScan node. + */ +public class ImpalaHdfsScanRel extends TableScan + implements ImpalaPlanRel { + + public ImpalaHdfsScanRel(TableScan scan) { + super(scan.getCluster(), scan.getTraitSet(), scan.getHints(), scan.getTable()); + } + + @Override + public NodeWithExprs getPlanNode(ParentPlanRelContext context) throws ImpalaException { + + CalciteTable table = (CalciteTable) getTable(); + + BaseTableRef baseTblRef = + table.createBaseTableRef(context.ctx_.getRootAnalyzer()); + + // Create the Tuple Descriptor which will contain only the relevant columns + // from the table needed for the query. + TupleDescriptor tupleDesc = table.createTupleAndSlotDesc(baseTblRef, + getInputRefFieldNames(context), context.ctx_.getRootAnalyzer()); + + // outputExprs will contain all the needed columns from the table + List outputExprs = createScanOutputExprs(tupleDesc.getSlots()); + + List impalaPartitions = table.getPrunedPartitions( + context.ctx_.getRootAnalyzer(), tupleDesc); + + // TODO: filters are not handled yet, nor are partitions + List filterConjuncts = new ArrayList<>(); + List partitionConjuncts = new ArrayList<>(); + + PlanNodeId nodeId = context.ctx_.getNextNodeId(); + + PlanNode physicalNode = new ImpalaHdfsScanNode(nodeId, tupleDesc, impalaPartitions, + baseTblRef, null, partitionConjuncts, filterConjuncts); + physicalNode.init(context.ctx_.getRootAnalyzer()); + + return new NodeWithExprs(physicalNode, outputExprs); + } + + /** + * Return a list of (SlotRef) expressions for the scan node. The list will + * be the size of the # of columns in the table. We initialize the list to + * contain null values for all elements. + * + * If a column isn't projected out by the parent of the scan node, the array + * location for the column will remain null. + */ + private List createScanOutputExprs(List slotDescs) { + int totalCols = getRowType().getFieldNames().size(); + + // Initialize all fields to null. + // TODO: Should this be a map instead of a list? See IMPALA-12961 for details. + List scanOutputExprs = new ArrayList<>(Collections.nCopies(totalCols, null)); + + HdfsTable table = ((CalciteTable) getTable()).getHdfsTable(); + Preconditions.checkState(totalCols == table.getColumns().size()); + int nonPartitionedCols = totalCols - table.getNumClusteringCols(); + for (SlotDescriptor slotDesc : slotDescs) { + // On a "select *" with partitioned columns, the partition columns occur after the + // nonpartitioned columns. But Impala displays the partition columns first. The + // modular arithmetic provides the correct position number for Impala. + int position = + (slotDesc.getColumn().getPosition() + nonPartitionedCols) % totalCols; + scanOutputExprs.set(position, new SlotRef(slotDesc)); + } + return scanOutputExprs; + } + + + private List getInputRefFieldNames(ParentPlanRelContext context) { + // If the parent context didn't pass in input refs, we will select all the + // columns from the table. + if (context.inputRefs_ == null) { + return getRowType().getFieldNames(); + } + + List inputRefFieldNames = new ArrayList<>(); + for (Integer i : context.inputRefs_) { + inputRefFieldNames.add(getRowType().getFieldNames().get(i)); + } + return inputRefFieldNames; + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaPlanRel.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaPlanRel.java new file mode 100644 index 000000000..aca02da40 --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaPlanRel.java @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.rel.node; + +import org.apache.impala.common.ImpalaException; + + +/** + * ImpalaPlanRel. Interface used for all Impala intermediary RelNodes + */ +public interface ImpalaPlanRel { + + /** + * getPlanNode returns a NodeWithExprs object, a thin structure containing + * the plan node along with the output expressions generated by the plan node. + */ + public NodeWithExprs getPlanNode(ParentPlanRelContext context) throws ImpalaException; + +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaProjectRel.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaProjectRel.java new file mode 100644 index 000000000..fc9706fe8 --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaProjectRel.java @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.rel.node; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; +import org.apache.impala.analysis.Analyzer; +import org.apache.impala.analysis.Expr; +import org.apache.impala.calcite.rel.util.CreateExprVisitor; +import org.apache.impala.common.ImpalaException; + +import java.util.List; + + +/** + * ImpalaProjectRel is the Impala specific RelNode corresponding to + * Project. + * + * There is no PlanNode equivalent for ProjectRel in Impala. The output expressions + * are generated and passed to its parent PlanNode (or the final output exprs if the + * Project is at the top of the tree). The input references are passed into the child + * PlanNode for pruning purposes (HdfsScan would only need to select out the columns + * that are being used). + */ +public class ImpalaProjectRel extends Project + implements ImpalaPlanRel { + + public ImpalaProjectRel(Project project) { + super(project.getCluster(), project.getTraitSet(), project.getInput(), + project.getProjects(), project.getRowType()); + } + + // Needed for Calcite framework + private ImpalaProjectRel(RelOptCluster cluster, RelTraitSet traits, + RelNode input, List projects, RelDataType rowType) { + super(cluster, traits, input, projects, rowType); + } + + // Needed for Calcite framework + @Override + public Project copy(RelTraitSet traitSet, RelNode input, List projects, + RelDataType rowType) { + return new ImpalaProjectRel(getCluster(), traitSet, input, projects, rowType); + } + + @Override + public NodeWithExprs getPlanNode(ParentPlanRelContext context) throws ImpalaException { + + NodeWithExprs inputWithExprs = getChildPlanNode(context); + + // get the output exprs for this node that are needed by the parent node. + List outputExprs = + createProjectExprs(context.ctx_.getRootAnalyzer(), inputWithExprs); + + // There is no Impala Plan Node mapped to Project, so we just return the child + // PlanNode. However, the outputExprs change with the Project. + return new NodeWithExprs(inputWithExprs.planNode_, outputExprs); + } + + /** + * Translate the RexNode expressions in the Project to Impala Exprs. + */ + private List createProjectExprs(Analyzer basicAnalyzer, + NodeWithExprs inputNodeWithExprs) + throws ImpalaException { + ImpalaPlanRel inputRel = (ImpalaPlanRel) getInput(0); + + CreateExprVisitor visitor = + new CreateExprVisitor(inputNodeWithExprs.outputExprs_); + + ImmutableList.Builder builder = new ImmutableList.Builder(); + for (RexNode rexNode : getProjects()) { + Expr projectExpr = CreateExprVisitor.getExpr(visitor, rexNode); + Preconditions.checkNotNull(projectExpr, + "Visitor returned null Impala expr for RexNode %s", rexNode); + builder.add(projectExpr); + } + return builder.build(); + } + + private NodeWithExprs getChildPlanNode(ParentPlanRelContext context + ) throws ImpalaException { + ImpalaPlanRel relInput = (ImpalaPlanRel) getInput(0); + ParentPlanRelContext.Builder builder = + new ParentPlanRelContext.Builder(context, this); + builder.setInputRefs(RelOptUtil.InputFinder.bits(getProjects(), null)); + return relInput.getPlanNode(builder.build()); + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/NodeWithExprs.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/NodeWithExprs.java new file mode 100644 index 000000000..88f724479 --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/NodeWithExprs.java @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.rel.node; + +import org.apache.impala.analysis.Expr; +import org.apache.impala.planner.PlanNode; + +import java.util.List; + +/** + * NodeWithExprs: A return class that contains a PlanNode and its output exprs + * In the case of the Calcite Project RelNode, the PlanNode stays the same, but the + * output expressions change. This is why the outputExprs cannot be a member of + * PlanNode + */ +public class NodeWithExprs { + public final PlanNode planNode_; + public final List outputExprs_; + + public NodeWithExprs(PlanNode planNode, List outputExprs) { + this.planNode_ = planNode; + this.outputExprs_ = outputExprs; + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ParentPlanRelContext.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ParentPlanRelContext.java new file mode 100644 index 000000000..1ef6591fc --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ParentPlanRelContext.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.rel.node; + +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.impala.analysis.Analyzer; +import org.apache.impala.planner.PlannerContext; + +/** + * ParentPlanRelContext is passed into each layer of the Impala + * RelNodes so the child can make certain decisions based on its + * parent. + */ +public class ParentPlanRelContext { + + // ctx: This doesn't change throughout the tree + public final PlannerContext ctx_; + + // The input refs used by the parent PlanRel Node + public final ImmutableBitSet inputRefs_; + + private ParentPlanRelContext(Builder builder) { + this.ctx_ = builder.context_; + this.inputRefs_ = builder.inputRefs_; + } + + public static class Builder { + private PlannerContext context_; + private ImmutableBitSet inputRefs_; + + public Builder(PlannerContext plannerContext) { + this.context_ = plannerContext; + } + + public Builder(ParentPlanRelContext planRelContext, ImpalaPlanRel planRel) { + this.context_ = planRelContext.ctx_; + } + + public void setInputRefs(ImmutableBitSet inputRefs) { + this.inputRefs_ = inputRefs; + } + + public ParentPlanRelContext build() { + return new ParentPlanRelContext(this); + } + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/phys/ImpalaHdfsScanNode.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/phys/ImpalaHdfsScanNode.java new file mode 100644 index 000000000..c23041e51 --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/phys/ImpalaHdfsScanNode.java @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.rel.phys; + +import org.apache.impala.analysis.Analyzer; +import org.apache.impala.analysis.Expr; +import org.apache.impala.analysis.MultiAggregateInfo; +import org.apache.impala.analysis.TableRef; +import org.apache.impala.analysis.TupleDescriptor; +import org.apache.impala.catalog.FeFsPartition; +import org.apache.impala.planner.HdfsScanNode; +import org.apache.impala.planner.PlanNodeId; + +import java.util.List; + +/** + * ImpalaHdfsScanNode. Extends the HdfsScanNode to bypass processing of the + * assignedConjuncts. + */ +public class ImpalaHdfsScanNode extends HdfsScanNode { + + private final List assignedConjuncts_; + + public ImpalaHdfsScanNode(PlanNodeId id, TupleDescriptor tupleDesc, + List partitions, + TableRef hdfsTblRef, MultiAggregateInfo aggInfo, List partConjuncts, + List assignedConjuncts) { + super(id, tupleDesc, assignedConjuncts, partitions, hdfsTblRef, aggInfo, + partConjuncts, false); + this.assignedConjuncts_ = assignedConjuncts; + } + + @Override + public void assignConjuncts(Analyzer analyzer) { + // ignore analyzer and retrieve the processed Calcite conjuncts + this.conjuncts_ = assignedConjuncts_; + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/util/CreateExprVisitor.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/util/CreateExprVisitor.java new file mode 100644 index 000000000..e8884741e --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/util/CreateExprVisitor.java @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.rel.util; + +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexCorrelVariable; +import org.apache.calcite.rex.RexDynamicParam; +import org.apache.calcite.rex.RexFieldAccess; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexLocalRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexOver; +import org.apache.calcite.rex.RexPatternFieldRef; +import org.apache.calcite.rex.RexRangeRef; +import org.apache.calcite.rex.RexSubQuery; +import org.apache.calcite.rex.RexTableInputRef; +import org.apache.calcite.rex.RexVisitorImpl; +import org.apache.impala.analysis.Expr; +import org.apache.impala.common.AnalysisException; +import org.apache.impala.common.ImpalaException; + +import java.util.List; + +/** + * CreateExprVisitor will generate Impala expressions for function calls and literals. + * TODO: In this iteration, it only handles input refs. + */ +public class CreateExprVisitor extends RexVisitorImpl { + + private final List inputExprs_; + + public CreateExprVisitor(List inputExprs) { + super(false); + this.inputExprs_ = inputExprs; + } + + @Override + public Expr visitInputRef(RexInputRef rexInputRef) { + return inputExprs_.get(rexInputRef.getIndex()); + } + + @Override + public Expr visitCall(RexCall rexCall) { + throw new RuntimeException("Not supported"); + } + + @Override + public Expr visitLiteral(RexLiteral rexLiteral) { + throw new RuntimeException("Not supported"); + } + + @Override + public Expr visitLocalRef(RexLocalRef localRef) { + throw new RuntimeException("Not supported"); + } + + @Override + public Expr visitOver(RexOver over) { + throw new RuntimeException("Not supported"); + } + + @Override + public Expr visitCorrelVariable(RexCorrelVariable correlVariable) { + throw new RuntimeException("Not supported"); + } + + @Override + public Expr visitDynamicParam(RexDynamicParam dynamicParam) { + throw new RuntimeException("Not supported"); + } + + @Override + public Expr visitRangeRef(RexRangeRef rangeRef) { + throw new RuntimeException("Not supported"); + } + + @Override + public Expr visitFieldAccess(RexFieldAccess fieldAccess) { + throw new RuntimeException("Not supported"); + } + + @Override + public Expr visitSubQuery(RexSubQuery subQuery) { + throw new RuntimeException("Not supported"); + } + + @Override + public Expr visitTableInputRef(RexTableInputRef fieldRef) { + throw new RuntimeException("Not supported"); + } + + @Override + public Expr visitPatternFieldRef(RexPatternFieldRef fieldRef) { + throw new RuntimeException("Not supported"); + } + + /** + * Wrapper around visitor which catches the unchecked RuntimeException and throws + * an ImpalaException. + */ + public static Expr getExpr(CreateExprVisitor visitor, RexNode operand) + throws ImpalaException { + try { + return operand.accept(visitor); + } catch (Exception e) { + throw new AnalysisException(e); + } + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteDb.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteDb.java new file mode 100644 index 000000000..9e4890a63 --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteDb.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.schema; + +import com.google.common.collect.ImmutableMap; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.AbstractSchema; +import org.apache.impala.catalog.FeTable; + +import java.util.HashMap; +import java.util.Map; + +public class CalciteDb extends AbstractSchema { + + private final Map tableMap_; + + private CalciteDb(Map tableMap) { + this.tableMap_ = tableMap; + } + + @Override + protected Map getTableMap() { + return tableMap_; + } + + public static class Builder { + private CalciteCatalogReader reader_; + + private final Map tableMap_ = new HashMap<>(); + + public Builder(CalciteCatalogReader reader) { + this.reader_ = reader; + } + + public Builder addTable(String tableName, FeTable table) { + if (!tableMap_.containsKey(tableName)) { + tableMap_.put(tableName.toLowerCase(), new CalciteTable(table, reader_)); + } + return this; + } + + public CalciteDb build() { + return new CalciteDb(ImmutableMap.copyOf(tableMap_)); + } + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteTable.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteTable.java new file mode 100644 index 000000000..0fa849f84 --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteTable.java @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.schema; + +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.plan.RelOptAbstractTable; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.prepare.Prepare; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.RelReferentialConstraint; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.schema.ColumnStrategy; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.schema.Table; +import org.apache.calcite.sql.SqlAccessType; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.validate.SqlModality; +import org.apache.calcite.sql.validate.SqlMonotonicity; +import org.apache.calcite.sql2rel.InitializerContext; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.impala.analysis.Analyzer; +import org.apache.impala.analysis.BaseTableRef; +import org.apache.impala.analysis.Expr; +import org.apache.impala.analysis.Path; +import org.apache.impala.analysis.SlotDescriptor; +import org.apache.impala.analysis.SlotRef; +import org.apache.impala.analysis.TableRef; +import org.apache.impala.analysis.TupleDescriptor; +import org.apache.impala.catalog.Column; +import org.apache.impala.catalog.FeFsPartition; +import org.apache.impala.catalog.FeTable; +import org.apache.impala.catalog.HdfsTable; +import org.apache.impala.calcite.type.ImpalaTypeConverter; +import org.apache.impala.calcite.type.ImpalaTypeSystemImpl; +import org.apache.impala.planner.HdfsPartitionPruner; +import org.apache.impala.common.AnalysisException; +import org.apache.impala.common.ImpalaException; +import org.apache.impala.common.Pair; + +import com.google.common.collect.ImmutableList; + +import java.util.ArrayList; +import java.util.List; + + +public class CalciteTable extends RelOptAbstractTable + implements Table, Prepare.PreparingTable { + + private final HdfsTable table_; + + + private final List qualifiedTableName_; + + public CalciteTable(FeTable table, CalciteCatalogReader reader) { + super(reader, table.getName(), buildColumnsForRelDataType(table)); + this.table_ = (HdfsTable) table; + this.qualifiedTableName_ = table.getTableName().toPath(); + } + + private static RelDataType buildColumnsForRelDataType(FeTable table) { + RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl(new ImpalaTypeSystemImpl()); + + RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory); + // skip clustering columns, save them for the end + for (Column column : table.getColumnsInHiveOrder()) { + RelDataType type = + ImpalaTypeConverter.createRelDataType(typeFactory, column.getType()); + builder.add(column.getName(), type); + } + return builder.build(); + } + + public BaseTableRef createBaseTableRef(Analyzer analyzer + ) throws ImpalaException { + + TableRef tblRef = new TableRef(qualifiedTableName_, null); + + Path resolvedPath = analyzer.resolvePath(tblRef.getPath(), Path.PathType.TABLE_REF); + + BaseTableRef baseTblRef = new BaseTableRef(tblRef, resolvedPath); + baseTblRef.analyze(analyzer); + return baseTblRef; + } + + // Create tuple and slot descriptors for this base table + public TupleDescriptor createTupleAndSlotDesc(BaseTableRef baseTblRef, + List fieldNames, Analyzer analyzer) throws ImpalaException { + // create the slot descriptors corresponding to this tuple descriptor + // by supplying the field names from Calcite's output schema for this node + for (int i = 0; i < fieldNames.size(); i++) { + String fieldName = fieldNames.get(i); + SlotRef slotref = + new SlotRef(Path.createRawPath(baseTblRef.getUniqueAlias(), fieldName)); + slotref.analyze(analyzer); + SlotDescriptor slotDesc = slotref.getDesc(); + if (slotDesc.getType().isCollectionType()) { + throw new AnalysisException(String.format(fieldName + " " + + "is a complex type (array/map/struct) column. " + + "This is not currently supported.")); + } + slotDesc.setIsMaterialized(true); + } + TupleDescriptor tupleDesc = baseTblRef.getDesc(); + return tupleDesc; + } + + /** + * Return the pruned partitions + * TODO: Currently all partitions are returned since filters aren't yet supported. + */ + public List getPrunedPartitions(Analyzer analyzer, + TupleDescriptor tupleDesc) throws ImpalaException { + HdfsPartitionPruner pruner = new HdfsPartitionPruner(tupleDesc); + // TODO: pass in the conjuncts needed. An empty conjunct will return all partitions. + List conjuncts = new ArrayList<>(); + Pair, List> impalaPair = + pruner.prunePartitions(analyzer, conjuncts, true, + null); + return impalaPair.first; + } + + public HdfsTable getHdfsTable() { + return table_; + } + + @Override + public List getQualifiedName() { + return qualifiedTableName_; + } + + @Override + public boolean rolledUpColumnValidInsideAgg(String column, + SqlCall call, SqlNode parent, CalciteConnectionConfig config) { + return true; + } + + @Override + public Schema.TableType getJdbcTableType() { + return Schema.TableType.TABLE; + } + + @Override + public boolean isRolledUp(String column) { + return false; + } + + @Override + public Statistic getStatistic() { + return null; + } + + @Override + public RelDataType getRowType(final RelDataTypeFactory typeFactory) { + return getRowType(); + } + + @Override + public T unwrap(Class arg0) { + // Generic unwrap needed by the Calcite framework to process the table. + return arg0.isInstance(this) ? arg0.cast(this) : null; + } + + @Override + public boolean columnHasDefaultValue(RelDataType rowType, int ordinal, + InitializerContext initializerContext) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isTemporal() { + return false; + } + + @Override + public boolean supportsModality(SqlModality modality) { + return true; + } + + @Override + public SqlAccessType getAllowedAccess() { + return SqlAccessType.ALL; + } + + @Override + public SqlMonotonicity getMonotonicity(String columnName) { + return SqlMonotonicity.NOT_MONOTONIC; + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/ImpalaCalciteCatalogReader.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/ImpalaCalciteCatalogReader.java new file mode 100644 index 000000000..dc7cb5b9c --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/ImpalaCalciteCatalogReader.java @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.schema; + +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.impala.analysis.StmtMetadataLoader; +import org.apache.impala.thrift.TQueryCtx; + +import java.util.List; + +public class ImpalaCalciteCatalogReader extends CalciteCatalogReader { + private final TQueryCtx queryCtx_; + private final StmtMetadataLoader.StmtTableCache stmtTableCache_; + + public ImpalaCalciteCatalogReader(CalciteSchema rootSchema, List defaultSchema, + RelDataTypeFactory typeFactory, CalciteConnectionConfig config, TQueryCtx queryCtx, + StmtMetadataLoader.StmtTableCache stmtTableCache) { + super(rootSchema, defaultSchema, typeFactory, config); + this.queryCtx_ = queryCtx; + this.stmtTableCache_ = stmtTableCache; + } + + public TQueryCtx getTQueryCtx() { + return queryCtx_; + } + + public StmtMetadataLoader.StmtTableCache getStmtTableCache() { + return stmtTableCache_; + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteJniFrontend.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteJniFrontend.java new file mode 100644 index 000000000..af5c8225f --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteJniFrontend.java @@ -0,0 +1,241 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.service; + +import org.apache.impala.util.EventSequence; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider; +import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.sql.SqlNode; +import org.apache.impala.calcite.rel.node.NodeWithExprs; +import org.apache.impala.calcite.rel.node.ImpalaPlanRel; +import org.apache.impala.common.ImpalaException; +import org.apache.impala.common.InternalException; +import org.apache.impala.common.JniUtil; +import org.apache.impala.service.Frontend; +import org.apache.impala.service.FrontendProfile; +import org.apache.impala.service.JniFrontend; +import org.apache.impala.thrift.TExecRequest; +import org.apache.impala.thrift.TQueryCtx; +import org.apache.impala.thrift.TQueryOptions; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; + +import java.util.List; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * CalciteJniFrontend. This is a frontend that uses the Calcite code + * to walk through all the steps of compiling the query (e.g. parsing, validating, + * etc... to generate a TExecRequest that can be used by the execution engine. + */ +public class CalciteJniFrontend extends JniFrontend { + + protected static final Logger LOG = + LoggerFactory.getLogger(CalciteJniFrontend.class.getName()); + + private final static TBinaryProtocol.Factory protocolFactory_ = + new TBinaryProtocol.Factory(); + + private static Pattern SEMI_JOIN = Pattern.compile("\\bsemi\\sjoin\\b", + Pattern.CASE_INSENSITIVE); + + private static Pattern ANTI_JOIN = Pattern.compile("\\banti\\sjoin\\b", + Pattern.CASE_INSENSITIVE); + + public CalciteJniFrontend(byte[] thriftBackendConfig, boolean isBackendTest) + throws ImpalaException, TException { + super(thriftBackendConfig, isBackendTest); + } + + /** + * Jni wrapper for Frontend.createExecRequest(). Accepts a serialized + * TQueryContext; returns a serialized TQueryExecRequest. + */ + @Override + public byte[] createExecRequest(byte[] thriftQueryContext) + throws ImpalaException { + // Needed for Calcite's JaninoRelMetadataProvider + Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); + + QueryContext queryCtx = new QueryContext(thriftQueryContext, getFrontend()); + if (!canStmtBePlannedThroughCalcite(queryCtx)) { + return runThroughOriginalPlanner(thriftQueryContext, queryCtx); + } + + try (FrontendProfile.Scope scope = FrontendProfile.createNewWithScope()) { + LOG.info("Using Calcite Planner for the following query: " + queryCtx.getStmt()); + + // Parse the query + RelMetadataQuery.THREAD_PROVIDERS.set( + JaninoRelMetadataProvider.of(DefaultRelMetadataProvider.INSTANCE)); + CalciteQueryParser queryParser = new CalciteQueryParser(queryCtx); + SqlNode parsedSqlNode = queryParser.parse(); + markEvent(queryParser, parsedSqlNode, queryCtx, "Parsed query"); + + // Make sure the metadata cache has all the info for the query. + CalciteMetadataHandler mdHandler = + new CalciteMetadataHandler(parsedSqlNode, queryCtx); + markEvent(mdHandler, null, queryCtx, "Loaded tables"); + + // Validate the parsed query + CalciteValidator validator = new CalciteValidator(mdHandler, queryCtx); + SqlNode validatedNode = validator.validate(parsedSqlNode); + markEvent(mdHandler, validatedNode, queryCtx, "Validated query"); + + // Convert the query to RelNodes which can be optimized + CalciteRelNodeConverter relNodeConverter = new CalciteRelNodeConverter(validator); + RelNode logicalPlan = relNodeConverter.convert(validatedNode); + markEvent(mdHandler, logicalPlan, queryCtx, "Created initial logical plan"); + + // Optimize the query + CalciteOptimizer optimizer = new CalciteOptimizer(validator); + ImpalaPlanRel optimizedPlan = optimizer.optimize(logicalPlan); + markEvent(mdHandler, optimizedPlan, queryCtx, "Optimized logical plan"); + + // Create Physical Impala PlanNodes + CalcitePhysPlanCreator physPlanCreator = + new CalcitePhysPlanCreator(mdHandler, queryCtx); + NodeWithExprs rootNode = physPlanCreator.create(optimizedPlan); + markEvent(mdHandler, rootNode, queryCtx, "Created physical plan"); + + // Create exec request for the server + ExecRequestCreator execRequestCreator = + new ExecRequestCreator(physPlanCreator, queryCtx, mdHandler); + TExecRequest execRequest = execRequestCreator.create(rootNode); + markEvent(mdHandler, execRequest, queryCtx, "Created exec request"); + + TSerializer serializer = new TSerializer(protocolFactory_); + byte[] serializedRequest = serializer.serialize(execRequest); + queryCtx.getTimeline().markEvent("Serialized request"); + + return serializedRequest; + } catch (Exception e) { + LOG.info("Calcite planner failed."); + LOG.info("Exception: " + e); + if (e != null) { + LOG.info("Stack Trace:" + ExceptionUtils.getStackTrace(e)); + throw new InternalException(e.getMessage()); + } + throw new RuntimeException(e); + } + } + + /** + * Use information about the query syntax to see if this can be handled + * by Calcite + */ + private boolean canStmtBePlannedThroughCalcite(QueryContext queryCtx) { + String stringWithFirstRealWord = queryCtx.getStmt(); + String[] lines = stringWithFirstRealWord.split("\n"); + // Get rid of comments and blank lines which start the query. We need to find + // the first real word. + // TODO: IMPALA-12976: need to make this more generic. Certain patterns aren't caught + // here like /* */ + for (String line : lines) { + if (line.trim().startsWith("--") || line.trim().equals("")) { + stringWithFirstRealWord = stringWithFirstRealWord.replaceFirst(line + "\n", ""); + } else { + break; + } + } + stringWithFirstRealWord = stringWithFirstRealWord.trim(); + String beforeStripString; + do { + beforeStripString = stringWithFirstRealWord; + stringWithFirstRealWord = StringUtils.stripStart(stringWithFirstRealWord, "("); + stringWithFirstRealWord = StringUtils.stripStart(stringWithFirstRealWord, null); + } while (!stringWithFirstRealWord.equals(beforeStripString)); + return StringUtils.startsWithIgnoreCase(stringWithFirstRealWord, "select") || + StringUtils.startsWithIgnoreCase(stringWithFirstRealWord, "values") || + StringUtils.startsWithIgnoreCase(stringWithFirstRealWord, "with"); + } + + /** + * Fallback planner method + */ + public byte[] runThroughOriginalPlanner(byte[] thriftQueryContext, + QueryContext queryCtx) throws ImpalaException { + LOG.info("Using Impala Planner for the following query: " + queryCtx.getStmt()); + return super.createExecRequest(thriftQueryContext); + } + + private void markEvent(CompilerStep compilerStep, Object stepResult, + QueryContext queryCtx, String stepMessage) { + LOG.info(stepMessage); + queryCtx.getTimeline().markEvent(stepMessage); + if (LOG.isDebugEnabled()) { + compilerStep.logDebug(stepResult); + } + } + + public static class QueryContext { + private final TQueryCtx queryCtx_; + private final String stmt_; + private final String currentDb_; + private final Frontend frontend_; + private final EventSequence timeline_; + + public QueryContext(byte[] thriftQueryContext, + Frontend frontend) throws ImpalaException { + this.queryCtx_ = new TQueryCtx(); + JniUtil.deserializeThrift(protocolFactory_, queryCtx_, thriftQueryContext); + + // hack to match the code in Frontend.java: + // If unset, set MT_DOP to 0 to simplify the rest of the code. + if (queryCtx_.getClient_request() != null && + queryCtx_.getClient_request().getQuery_options() != null) { + if (!queryCtx_.getClient_request().getQuery_options().isSetMt_dop()) { + queryCtx_.getClient_request().getQuery_options().setMt_dop(0); + } + } + + this.frontend_ = frontend; + this.stmt_ = queryCtx_.getClient_request().getStmt(); + this.currentDb_ = queryCtx_.getSession().getDatabase(); + this.timeline_ = new EventSequence("Frontend Timeline (Calcite Planner)"); + } + + public TQueryCtx getTQueryCtx() { + return queryCtx_; + } + + public Frontend getFrontend() { + return frontend_; + } + + public String getStmt() { + return stmt_; + } + + public String getCurrentDb() { + return currentDb_; + } + + public EventSequence getTimeline() { + return timeline_; + } + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteMetadataHandler.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteMetadataHandler.java new file mode 100644 index 000000000..6e24eb1a7 --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteMetadataHandler.java @@ -0,0 +1,224 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.service; + +import com.google.common.base.Splitter; +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.config.CalciteConnectionConfigImpl; +import org.apache.calcite.config.CalciteConnectionProperty; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.sql.SqlBasicCall; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlJoin; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlSelect; +import org.apache.calcite.sql.util.SqlBasicVisitor; +import org.apache.impala.analysis.StmtMetadataLoader; +import org.apache.impala.analysis.TableName; +import org.apache.impala.calcite.schema.CalciteDb; +import org.apache.impala.calcite.schema.CalciteTable; +import org.apache.impala.calcite.schema.ImpalaCalciteCatalogReader; +import org.apache.impala.calcite.type.ImpalaTypeSystemImpl; +import org.apache.impala.catalog.Column; +import org.apache.impala.catalog.FeCatalog; +import org.apache.impala.catalog.FeDb; +import org.apache.impala.catalog.FeTable; +import org.apache.impala.catalog.FeView; +import org.apache.impala.catalog.HdfsTable; +import org.apache.impala.common.ImpalaException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * CalciteMetadataHandler. Responsible for loading the tables for a query + * from catalogd into the coordinator and populating the Calcite schema with + * these tables. + */ +public class CalciteMetadataHandler implements CompilerStep { + + protected static final Logger LOG = + LoggerFactory.getLogger(CalciteMetadataHandler.class.getName()); + + // StmtTableCache needed by Impala's Analyzer class at planning time. + private final StmtMetadataLoader.StmtTableCache stmtTableCache_; + + // CalciteCatalogReader is a context class that holds global information that + // may be needed by the CalciteTable object + private final CalciteCatalogReader reader_; + + public CalciteMetadataHandler(SqlNode parsedNode, + CalciteJniFrontend.QueryContext queryCtx) throws ImpalaException { + + StmtMetadataLoader stmtMetadataLoader = new StmtMetadataLoader( + queryCtx.getFrontend(), queryCtx.getCurrentDb(), queryCtx.getTimeline()); + + // retrieve all the tablenames in the query, will be in tableVisitor.tableNames + TableVisitor tableVisitor = new TableVisitor(queryCtx.getCurrentDb()); + parsedNode.accept(tableVisitor); + + // load the relevant tables in the query from catalogd + this.stmtTableCache_ = stmtMetadataLoader.loadTables(tableVisitor.tableNames_); + + this.reader_ = createCalciteCatalogReader(queryCtx, stmtTableCache_); + + // populate calcite schema. This step needs to be done after the loader because the + // schema needs to contain the columns in the table for validation, which cannot + // be done when it's an IncompleteTable + populateCalciteSchema(reader_, queryCtx.getFrontend().getCatalog(), + tableVisitor.tableNames_); + } + + /** + * Creates CalciteCatalogReader object which will contain information about the schema. + * Since the individual Table objects have reference to the Schema, this also serves + * as a way to give the tables Context information about the general query. + */ + private CalciteCatalogReader createCalciteCatalogReader( + CalciteJniFrontend.QueryContext queryCtx, + StmtMetadataLoader.StmtTableCache stmtTableCache) { + RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl(new ImpalaTypeSystemImpl()); + Properties props = new Properties(); + props.setProperty(CalciteConnectionProperty.CASE_SENSITIVE.camelName(), "false"); + CalciteConnectionConfig config = new CalciteConnectionConfigImpl(props); + CalciteSchema rootSchema = CalciteSchema.createRootSchema(true); + return new ImpalaCalciteCatalogReader(rootSchema, + Collections.singletonList(queryCtx.getCurrentDb()), + typeFactory, config, queryCtx.getTQueryCtx(), stmtTableCache); + } + + /** + * Populate the CalciteSchema with tables being used by this query. + */ + private void populateCalciteSchema(CalciteCatalogReader reader, + FeCatalog catalog, Set tableNames) { + CalciteSchema rootSchema = reader.getRootSchema(); + Map dbSchemas = new HashMap<>(); + for (TableName tableName : tableNames) { + FeDb db = catalog.getDb(tableName.getDb()); + // db is not found, this will probably fail in the validation step + if (db == null) { + continue; + } + + // table is not found, this will probably fail in the validation step + FeTable feTable = db.getTable(tableName.getTbl()); + if (feTable == null) { + continue; + } + + // populate the dbschema with its table, creating the dbschema if it's the + // first instance seen in the query. + CalciteDb.Builder dbBuilder = + dbSchemas.getOrDefault(tableName.getDb(), new CalciteDb.Builder(reader)); + dbBuilder.addTable(tableName.getTbl().toLowerCase(), feTable); + dbSchemas.put(tableName.getDb().toLowerCase(), dbBuilder); + } + + // add all databases to the root schema + for (String dbName : dbSchemas.keySet()) { + rootSchema.add(dbName, dbSchemas.get(dbName.toLowerCase()).build()); + } + } + + public StmtMetadataLoader.StmtTableCache getStmtTableCache() { + return stmtTableCache_; + } + + public CalciteCatalogReader getCalciteCatalogReader() { + return reader_; + } + + /** + * TableVisitor walks through the AST and places all the tables into + * tableNames + */ + private static class TableVisitor extends SqlBasicVisitor { + private final String currentDb_; + public final Set tableNames_ = new HashSet<>(); + + public TableVisitor(String currentDb) { + this.currentDb_ = currentDb.toLowerCase(); + } + + @Override + public Void visit(SqlCall call) { + if (call.getKind() == SqlKind.SELECT) { + SqlSelect select = (SqlSelect) call; + if (select.getFrom() != null) { + tableNames_.addAll(getTableNames(select.getFrom())); + } + } + return super.visit(call); + } + + private List getTableNames(SqlNode fromNode) { + List localTableNames = new ArrayList<>(); + if (fromNode instanceof SqlIdentifier) { + String tableName = fromNode.toString(); + List parts = Splitter.on('.').splitToList(tableName); + // TODO: 'complex' tables ignored for now + if (parts.size() == 1) { + localTableNames.add(new TableName( + currentDb_.toLowerCase(), parts.get(0).toLowerCase())); + } else if (parts.size() == 2) { + localTableNames.add( + new TableName(parts.get(0).toLowerCase(), parts.get(1).toLowerCase())); + } + } + + // Join node has the tables in the left and right node. + if (fromNode instanceof SqlJoin) { + localTableNames.addAll(getTableNames(((SqlJoin) fromNode).getLeft())); + localTableNames.addAll(getTableNames(((SqlJoin) fromNode).getRight())); + } + + // Put references in the schema too + if (fromNode instanceof SqlBasicCall) { + SqlBasicCall basicCall = (SqlBasicCall) fromNode; + if (basicCall.getKind().equals(SqlKind.AS)) { + localTableNames.addAll(getTableNames(basicCall.operand(0))); + } + } + return localTableNames; + } + } + + @Override + public void logDebug(Object resultObject) { + LOG.debug("Loaded tables: " + stmtTableCache_.tables.values().stream() + .map(feTable -> feTable.getName().toString()) + .collect(Collectors.joining( ", " ))); + } +} + diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteOptimizer.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteOptimizer.java new file mode 100644 index 000000000..7ab8c1174 --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteOptimizer.java @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.service; + +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.RelOptCostImpl; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.hep.HepPlanner; +import org.apache.calcite.plan.hep.HepProgramBuilder; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider; +import org.apache.calcite.sql.SqlExplainFormat; +import org.apache.calcite.sql.SqlExplainLevel; +import org.apache.impala.calcite.rel.node.ConvertToImpalaRelRules; +import org.apache.impala.calcite.rel.node.ImpalaPlanRel; +import org.apache.impala.common.ImpalaException; +import org.apache.impala.common.InternalException; + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * CalciteOptimizer. Responsible for optimizing the plan into its final + * Calcite form. The final Calcite form will be an ImpalaPlanRel node which + * will contain code that maps the node into a physical Impala PlanNode. + */ +public class CalciteOptimizer implements CompilerStep { + protected static final Logger LOG = + LoggerFactory.getLogger(CalciteOptimizer.class.getName()); + + private final CalciteValidator validator_; + + public CalciteOptimizer(CalciteValidator validator) { + this.validator_ = validator; + } + + public ImpalaPlanRel optimize(RelNode logPlan) throws ImpalaException { + HepProgramBuilder builder = new HepProgramBuilder(); + + // rules to convert Calcite nodes into ImpalaPlanRel nodes + builder.addRuleCollection( + ImmutableList.of( + new ConvertToImpalaRelRules.ImpalaScanRule(), + new ConvertToImpalaRelRules.ImpalaProjectRule())); + + HepPlanner planner = new HepPlanner(builder.build(), + logPlan.getCluster().getPlanner().getContext(), + false, null, RelOptCostImpl.FACTORY); + logPlan.getCluster().setMetadataProvider(JaninoRelMetadataProvider.DEFAULT); + planner.setRoot(logPlan); + RelNode optimizedPlan = planner.findBestExp(); + if (!(optimizedPlan instanceof ImpalaPlanRel)) { + throw new InternalException("Could not generate Impala RelNode plan. Plan " + + "is \n" + getDebugString(optimizedPlan)); + } + return (ImpalaPlanRel) optimizedPlan; + } + + public String getDebugString(Object optimizedPlan) { + return RelOptUtil.dumpPlan("[Impala plan]", (RelNode) optimizedPlan, + SqlExplainFormat.TEXT, SqlExplainLevel.NON_COST_ATTRIBUTES); + } + + @Override + public void logDebug(Object resultObject) { + if (!(resultObject instanceof RelNode)) { + LOG.debug("Finished optimizer step, but unknown result: " + resultObject); + return; + } + LOG.debug(getDebugString(resultObject)); + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalcitePhysPlanCreator.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalcitePhysPlanCreator.java new file mode 100644 index 000000000..0c3d459ab --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalcitePhysPlanCreator.java @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.service; + +import org.apache.impala.calcite.rel.node.NodeWithExprs; +import org.apache.impala.calcite.rel.node.ImpalaPlanRel; +import org.apache.impala.authorization.AuthorizationFactory; +import org.apache.impala.analysis.Analyzer; +import org.apache.impala.analysis.Expr; +import org.apache.impala.calcite.rel.node.ParentPlanRelContext; +import org.apache.impala.common.ImpalaException; +import org.apache.impala.planner.PlannerContext; +import org.apache.impala.planner.PlanNode; +import org.apache.impala.service.BackendConfig; +import org.apache.impala.util.AuthorizationUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * CalcitePhysPlanCreator. This class is responsible for turning an ImpalaPlanRel + * Calcite plan into the Impala physical PlanNode plan for a single node. + */ +public class CalcitePhysPlanCreator implements CompilerStep { + protected static final Logger LOG = + LoggerFactory.getLogger(CalcitePhysPlanCreator.class.getName()); + + private final CalciteJniFrontend.QueryContext queryCtx_; + private final Analyzer analyzer_; + private final PlannerContext plannerContext_; + + public CalcitePhysPlanCreator(CalciteMetadataHandler mdHandler, + CalciteJniFrontend.QueryContext queryCtx) throws ImpalaException { + this.queryCtx_ = queryCtx; + // TODO: IMPALA-13011: Awkward call for authorization here. Authorization + // will be done at validation time, but this is needed here for the Analyzer + // instantiation. + AuthorizationFactory authzFactory = + AuthorizationUtil.authzFactoryFrom(BackendConfig.INSTANCE); + this.analyzer_ = new Analyzer(mdHandler.getStmtTableCache(), + queryCtx_.getTQueryCtx(), authzFactory, null); + this.plannerContext_ = + new PlannerContext(analyzer_, queryCtx_.getTQueryCtx(), queryCtx_.getTimeline()); + + } + + /** + * returns the root plan node along with its output expressions. + */ + public NodeWithExprs create(ImpalaPlanRel optimizedPlan) throws ImpalaException { + ParentPlanRelContext.Builder builder = + new ParentPlanRelContext.Builder(plannerContext_); + NodeWithExprs rootNodeWithExprs = optimizedPlan.getPlanNode(builder.build()); + if (LOG.isDebugEnabled()) { + LOG.debug("Printing PlanNode tree..."); + printPlanNodeTree(rootNodeWithExprs.planNode_, ""); + } + return rootNodeWithExprs; + } + + public void printPlanNodeTree(PlanNode node, String prefix) { + LOG.debug(prefix + node.getClass()); + for (PlanNode child : node.getChildren()) { + printPlanNodeTree(child, " " + prefix); + } + } + + public Analyzer getAnalyzer() { + return analyzer_; + } + + public PlannerContext getPlannerContext() { + return plannerContext_; + } + + @Override + public void logDebug(Object resultObject) { + if (!(resultObject instanceof NodeWithExprs)) { + LOG.debug("Finished physical plan step, but unknown result: " + resultObject); + return; + } + LOG.debug("Physical Plan: " + resultObject); + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteQueryParser.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteQueryParser.java new file mode 100644 index 000000000..b3e97cfec --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteQueryParser.java @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.service; + +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.parser.SqlParseException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * CalciteQueryParser. Responsible for turning a String query statement into + * a Calcite SqlNode. + */ +public class CalciteQueryParser implements CompilerStep { + protected static final Logger LOG = + LoggerFactory.getLogger(CalciteQueryParser.class.getName()); + + private final CalciteJniFrontend.QueryContext queryCtx_; + + public CalciteQueryParser(CalciteJniFrontend.QueryContext queryCtx) { + this.queryCtx_ = queryCtx; + } + + public SqlNode parse() throws SqlParseException { + // Create an SQL parser + SqlParser parser = SqlParser.create(queryCtx_.getStmt()); + + // Parse the query into an AST + SqlNode sqlNode = parser.parseQuery(); + return sqlNode; + } + + public void logDebug(Object resultObject) { + if (!(resultObject instanceof SqlNode)) { + LOG.debug("Parser produced an unknown output: " + resultObject); + return; + } + LOG.debug("Parsed node: " + resultObject); + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteRelNodeConverter.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteRelNodeConverter.java new file mode 100644 index 000000000..7f10c51d3 --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteRelNodeConverter.java @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.service; + +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.volcano.VolcanoPlanner; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.sql.SqlExplainFormat; +import org.apache.calcite.sql.SqlExplainLevel; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql2rel.SqlToRelConverter; +import org.apache.calcite.sql2rel.StandardConvertletTable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * CalciteRelNodeConverter. Responsible for converting a Calcite AST SqlNode into + * a logical (pre-optimized) plan. + */ +public class CalciteRelNodeConverter implements CompilerStep { + protected static final Logger LOG = + LoggerFactory.getLogger(CalciteRelNodeConverter.class.getName()); + + private static final RelOptTable.ViewExpander NOOP_EXPANDER = + (type, query, schema, path) -> null; + + private final CalciteValidator validator_; + + private final RelOptCluster cluster_; + + private final RelOptPlanner planner_; + + public CalciteRelNodeConverter(CalciteValidator validator) { + this.validator_ = validator; + this.planner_ = new VolcanoPlanner(); + planner_.addRelTraitDef(ConventionTraitDef.INSTANCE); + cluster_ = + RelOptCluster.create(planner_, new RexBuilder(validator_.getTypeFactory())); + } + + public RelNode convert(SqlNode validatedNode) { + SqlToRelConverter relConverter = new SqlToRelConverter( + NOOP_EXPANDER, + validator_.getSqlValidator(), + validator_.getCatalogReader(), + cluster_, + StandardConvertletTable.INSTANCE, + SqlToRelConverter.config()); + + // Convert the valid AST into a logical plan + RelRoot root = relConverter.convertQuery(validatedNode, false, true); + RelNode relNode = root.project(); + logDebug(relNode); + return relNode; + } + + public RelOptCluster getCluster() { + return cluster_; + } + + public CalciteValidator getValidator() { + return validator_; + } + + @Override + public void logDebug(Object resultObject) { + if (!(resultObject instanceof RelNode)) { + LOG.debug("RelNodeConverter produced an unknown output: " + resultObject); + return; + } + LOG.info(RelOptUtil.dumpPlan("[Logical plan]", (RelNode) resultObject, + SqlExplainFormat.TEXT, SqlExplainLevel.NON_COST_ATTRIBUTES)); + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteValidator.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteValidator.java new file mode 100644 index 000000000..7c455f97e --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteValidator.java @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.service; + +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.validate.SqlValidator; +import org.apache.calcite.sql.validate.SqlValidatorUtil; +import org.apache.impala.calcite.type.ImpalaTypeSystemImpl; +import org.apache.impala.calcite.validate.ImpalaConformance; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * SqlValidator. Responsible for validating the parsed SQL AST. + */ +public class CalciteValidator implements CompilerStep { + protected static final Logger LOG = + LoggerFactory.getLogger(CalciteValidator.class.getName()); + + private final CalciteMetadataHandler mdHandler; + private final CalciteJniFrontend.QueryContext queryCtx; + private final RelDataTypeFactory typeFactory; + private final CalciteCatalogReader catalogReader; + private final SqlValidator sqlValidator; + + public CalciteValidator(CalciteMetadataHandler mdHandler, + CalciteJniFrontend.QueryContext queryCtx) { + this.mdHandler = mdHandler; + this.queryCtx = queryCtx; + this.typeFactory = new JavaTypeFactoryImpl(new ImpalaTypeSystemImpl()); + this.catalogReader = mdHandler.getCalciteCatalogReader(); + + this.sqlValidator = SqlValidatorUtil.newValidator( + SqlStdOperatorTable.instance(), + catalogReader, typeFactory, + SqlValidator.Config.DEFAULT + .withConformance(ImpalaConformance.INSTANCE) + ); + } + + public SqlNode validate(SqlNode parsedNode) { + // Validate the initial AST + SqlNode node = sqlValidator.validate(parsedNode); + return node; + } + + public RelDataTypeFactory getTypeFactory() { + return typeFactory; + } + + public SqlValidator getSqlValidator() { + return sqlValidator; + } + + public CalciteCatalogReader getCatalogReader() { + return catalogReader; + } + + @Override + public void logDebug(Object resultObject) { + if (!(resultObject instanceof SqlNode)) { + LOG.debug("Finished validator step, but unknown result: " + resultObject); + return; + } + LOG.debug("Validated node: " + resultObject); + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CompilerStep.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CompilerStep.java new file mode 100644 index 000000000..7bc5d126e --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CompilerStep.java @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.service; + +/** + * CompilerStep. An interface to be used by all top level actions run by the + * Calcite compiler. + */ +public interface CompilerStep { + void logDebug(Object resultObject); +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/ExecRequestCreator.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/ExecRequestCreator.java new file mode 100644 index 000000000..96c0a990c --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/ExecRequestCreator.java @@ -0,0 +1,427 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.service; + +import org.apache.impala.calcite.rel.node.NodeWithExprs; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.impala.analysis.Analyzer; +import org.apache.impala.analysis.Expr; +import org.apache.impala.analysis.JoinOperator; +import org.apache.impala.catalog.FeTable; +import org.apache.impala.catalog.HdfsTable; +import org.apache.impala.common.ImpalaException; +import org.apache.impala.planner.DataPartition; +import org.apache.impala.planner.DistributedPlanner; +import org.apache.impala.planner.JoinNode; +import org.apache.impala.planner.NestedLoopJoinNode; +import org.apache.impala.planner.ParallelPlanner; +import org.apache.impala.planner.PlanFragment; +import org.apache.impala.planner.PlanNode; +import org.apache.impala.planner.Planner; +import org.apache.impala.planner.PlannerContext; +import org.apache.impala.planner.PlanRootSink; +import org.apache.impala.planner.RuntimeFilterGenerator; +import org.apache.impala.planner.SingleNodePlanner; +import org.apache.impala.planner.SingularRowSrcNode; +import org.apache.impala.planner.SubplanNode; +import org.apache.impala.service.Frontend; +import org.apache.impala.service.FrontendProfile; +import org.apache.impala.thrift.TColumn; +import org.apache.impala.thrift.TExecRequest; +import org.apache.impala.thrift.TExplainLevel; +import org.apache.impala.thrift.TNetworkAddress; +import org.apache.impala.thrift.TPlanExecInfo; +import org.apache.impala.thrift.TPlanFragment; +import org.apache.impala.thrift.TQueryCtx; +import org.apache.impala.thrift.TQueryExecRequest; +import org.apache.impala.thrift.TResultSetMetadata; +import org.apache.impala.thrift.TRuntimeFilterMode; +import org.apache.impala.thrift.TRuntimeProfileNode; +import org.apache.impala.thrift.TStmtType; +import org.apache.impala.util.EventSequence; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + + +/** + * ExecRequestCreator. Responsible for taking a PlanNode and the output Expr list + * from the top level PlanNode and convert it into a TExecRequest thrift object + * which is needed by the backend executor code. The input PlanNode tree is + * an optimized logical tree based on the Calcite rules. This class is also + * responsible for creating physical node optimizations which are located in + * the SingleNodePlanner and DistributedPlanner. + * + * TODO: This class is very similar to the Frontend.createExecRequest method and + * contains duplicate code. Accordingly, This class and that method should be + * refactored to prevent this duplication. + **/ +public class ExecRequestCreator implements CompilerStep { + protected static final Logger LOG = + LoggerFactory.getLogger(ExecRequestCreator.class.getName()); + + private final CalcitePhysPlanCreator physPlanCreator; + private final CalciteJniFrontend.QueryContext queryCtx; + private final CalciteMetadataHandler mdHandler; + + public ExecRequestCreator(CalcitePhysPlanCreator physPlanCreator, + CalciteJniFrontend.QueryContext queryCtx, CalciteMetadataHandler mdHandler) { + this.physPlanCreator = physPlanCreator; + this.queryCtx = queryCtx; + this.mdHandler = mdHandler; + } + + /** + * create() is the main public method responsible for taking the NodeWithExprs + * object containing the PlanNode and output Expr list and returning the TExecRequest. + */ + public TExecRequest create(NodeWithExprs nodeWithExprs) throws ImpalaException { + TExecRequest request = createExecRequest(nodeWithExprs.planNode_, + queryCtx.getTQueryCtx(), physPlanCreator.getPlannerContext(), + physPlanCreator.getAnalyzer(), nodeWithExprs.outputExprs_, + mdHandler.getStmtTableCache().tables.values()); + + return request; + } + + /** + * Create an exec request for Impala to execute based on the supplied plan. This + * method is similar to the Frontend.createExecRequest method and needs to be + * refactored. + */ + private TExecRequest createExecRequest(PlanNode planNodeRoot, TQueryCtx queryCtx, + PlannerContext plannerContext, Analyzer analyzer, List outputExprs, + Collection tables) throws ImpalaException { + List fragments = + createPlans(planNodeRoot, analyzer, plannerContext, outputExprs); + PlanFragment planFragmentRoot = fragments.get(0); + + TQueryExecRequest queryExecRequest = new TQueryExecRequest(); + TExecRequest result = createExecRequest(queryCtx, planFragmentRoot, + queryExecRequest); + queryExecRequest.setHost_list(getHostLocations(tables)); + queryExecRequest.setCores_required(-1); + + // compute resource requirements of the final plan + Planner.computeResourceReqs(fragments, queryCtx, queryExecRequest, + plannerContext, true /*isQuery*/); + + // create the plan's exec-info and assign fragment idx + int idx = 0; + for (PlanFragment planRoot : fragments) { + TPlanExecInfo tPlanExecInfo = Frontend.createPlanExecInfo(planRoot, queryCtx); + queryExecRequest.addToPlan_exec_info(tPlanExecInfo); + for (TPlanFragment fragment : tPlanExecInfo.fragments) { + fragment.setIdx(idx++); + } + } + + // create EXPLAIN output after setting everything else + queryExecRequest.setQuery_ctx(queryCtx); // needed by getExplainString() + + List allFragments = planFragmentRoot.getNodesPreOrder(); + // to mimic the original planner behavior, use EXTENDED mode explain except for + // EXPLAIN statements. + // TODO: support explain plans + // TExplainLevel explainLevel = + // isExplain ? plannerContext.getQueryOptions().getExplain_level() : + // TExplainLevel.EXTENDED; + TExplainLevel explainLevel = TExplainLevel.EXTENDED; + // if (isExplain) { + // result.setStmt_type(TStmtType.EXPLAIN); + // } + String explainString = getExplainString(allFragments, explainLevel, plannerContext); + queryExecRequest.setQuery_plan(explainString); + + + queryCtx.setDesc_tbl_serialized( + plannerContext.getRootAnalyzer().getDescTbl().toSerializedThrift()); + + plannerContext.getTimeline().markEvent("Execution request created"); + EventSequence eventSequence = plannerContext.getTimeline(); + result.setTimeline(eventSequence.toThrift()); + + TRuntimeProfileNode calciteProfile = + this.queryCtx.getFrontend().createTRuntimeProfileNode(Frontend.PLANNER_PROFILE); + this.queryCtx.getFrontend().addPlannerToProfile("CalcitePlanner"); + result.setProfile(FrontendProfile.getCurrent().emitAsThrift()); + result.setProfile_children(FrontendProfile.getCurrent().emitChildrenAsThrift()); + return result; + } + + List createPlans(PlanNode planNodeRoot, Analyzer analyzer, + PlannerContext ctx, List outputExprs) throws ImpalaException { + // Create the values transfer graph in the Analyzer. Note that Calcite plans + // don't register equijoin predicates in the Analyzer's GlobalState since + // Calcite should have already done the predicate inferencing analysis. + // Hence, the GlobalState's registeredValueTransfers will be empty. It is + // still necessary to instantiate the graph because otherwise + // RuntimeFilterGenerator tries to de-reference it and encounters NPE. + analyzer.computeValueTransferGraph(); + Planner.checkForSmallQueryOptimization(planNodeRoot, ctx); + + // Although the Calcite plan creates the relative order among different + // joins, currently it does not swap left and right inputs if the right + // input has higher estimated cardinality. Do this through Impala's method + // since we are using Impala's cardinality estimates in the physical planning. + invertJoins(planNodeRoot, ctx.isSingleNodeExec(), ctx.getRootAnalyzer()); + SingleNodePlanner.validatePlan(ctx, planNodeRoot); + + List fragments = + createPlanFragments(planNodeRoot, ctx, analyzer, outputExprs); + PlanFragment planFragmentRoot = fragments.get(0); + List rootFragments; + if (Planner.useParallelPlan(ctx)) { + ParallelPlanner parallelPlanner = new ParallelPlanner(ctx); + // The rootFragmentList contains the 'root' fragments of each of + // the parallel plans + rootFragments = parallelPlanner.createPlans(planFragmentRoot); + ctx.getTimeline().markEvent("Parallel plans created"); + } else { + rootFragments = new ArrayList(Arrays.asList(planFragmentRoot)); + } + return rootFragments; + } + + /** + * Create one or more plan fragments corresponding to the supplied single node physical + * plan. This function calls Impala's DistributedPlanner to create the plan fragments + * and does some post-processing. It is loosely based on Impala's Planner.createPlan() + * function. + */ + private List createPlanFragments(PlanNode planNodeRoot, + PlannerContext ctx, Analyzer analyzer, + List outputExprs) throws ImpalaException { + + DistributedPlanner distributedPlanner = new DistributedPlanner(ctx); + List fragments; + + if (ctx.isSingleNodeExec()) { + // create one fragment containing the entire single-node plan tree + fragments = Lists.newArrayList(new PlanFragment( + ctx.getNextFragmentId(), planNodeRoot, DataPartition.UNPARTITIONED)); + } else { + fragments = new ArrayList<>(); + // Create distributed plan. For insert/CTAS without limit, + // isPartitioned should be true. + // TODO: only query statements are currently supported + // final boolean isPartitioned = stmtType_ == + // TStmtType.DML && !planNodeRoot.hasLimit(); + boolean isPartitioned = false; + distributedPlanner.createPlanFragments(planNodeRoot, isPartitioned, fragments); + } + + PlanFragment rootFragment = fragments.get(fragments.size() - 1); + // Create runtime filters. + if (ctx.getQueryOptions().getRuntime_filter_mode() != TRuntimeFilterMode.OFF) { + RuntimeFilterGenerator.generateRuntimeFilters(ctx, rootFragment.getPlanRoot()); + ctx.getTimeline().markEvent("Runtime filters computed"); + } + + rootFragment.verifyTree(); + + List resultExprs = outputExprs; + rootFragment.setSink(new PlanRootSink(resultExprs)); + + Planner.checkForDisableCodegen(rootFragment.getPlanRoot(), ctx); + // finalize exchanges: this ensures that for hash partitioned joins, the partitioning + // keys on both sides of the join have compatible data types + for (PlanFragment fragment: fragments) { + fragment.finalizeExchanges(analyzer); + } + + Collections.reverse(fragments); + ctx.getTimeline().markEvent("Distributed plan created"); + return fragments; + } + + private TExecRequest createExecRequest(TQueryCtx queryCtx, + PlanFragment planFragmentRoot, TQueryExecRequest queryExecRequest) { + TExecRequest result = new TExecRequest(); + // NOTE: the below 4 are mandatory fields + result.setQuery_options(queryCtx.getClient_request().getQuery_options()); + + // TODO: Need to populate these 3 fields + result.setAccess_events(new ArrayList<>()); + result.setAnalysis_warnings(new ArrayList<>()); + result.setUser_has_profile_access(true); + + result.setQuery_exec_request(queryExecRequest); + + // TODO: only query currently supported + // result.setStmt_type(stmtType_); + // result.getQuery_exec_request().setStmt_type(stmtType_); + result.setStmt_type(TStmtType.QUERY); + result.getQuery_exec_request().setStmt_type(TStmtType.QUERY); + + // fill in the metadata using the root fragment's PlanRootSink + Preconditions.checkState(planFragmentRoot.hasSink()); + List outputExprs = new ArrayList<>(); + + planFragmentRoot.getSink().collectExprs(outputExprs); + result.setResult_set_metadata(createQueryResultSetMetadata(outputExprs)); + + return result; + } + + // TODO: Refactor and share Impala's getExplainString() + private String getExplainString(List fragments, + TExplainLevel explainLevel, PlannerContext ctx) { + if (explainLevel.ordinal() < TExplainLevel.VERBOSE.ordinal()) { + // Print the non-fragmented parallel plan. + return fragments.get(0).getExplainString(ctx.getQueryOptions(), explainLevel); + } + + StringBuffer sb = new StringBuffer(); + // Print the fragmented parallel plan. + for (int i = 0; i < fragments.size(); ++i) { + PlanFragment fragment = fragments.get(i); + sb.append(fragment.getExplainString(ctx.getQueryOptions(), explainLevel)); + if (i < fragments.size() - 1) { + sb.append("\n"); + } + } + return sb.toString(); + } + + private TResultSetMetadata createQueryResultSetMetadata(List outputExprs) { + TResultSetMetadata metadata = new TResultSetMetadata(); + int colCnt = outputExprs.size(); + for (int i = 0; i < colCnt; ++i) { + TColumn colDesc = new TColumn(outputExprs.get(i).toString(), + outputExprs.get(i).getType().toThrift()); + metadata.addToColumns(colDesc); + } + return metadata; + } + + private List getHostLocations(Collection tables) { + Set hostLocations = new HashSet<>(); + for (FeTable table : tables) { + if (table instanceof HdfsTable) { + hostLocations.addAll(((HdfsTable) table).getHostIndex().getList()); + } + } + return new ArrayList<>(hostLocations); + } + + /** + * Traverses the plan tree rooted at 'root' and inverts joins in the following + * situations: + * 1. If the left-hand side is a SingularRowSrcNode then we invert the join because + * then the build side is guaranteed to have only a single row. + * 2. There is no backend support for distributed non-equi right outer/semi joins, + * so we invert them (any distributed left semi/outer join is ok). + * 3. If we estimate that the inverted join is cheaper (see isInvertedJoinCheaper()). + * Do not invert if relevant stats are missing. + * The first two inversion rules are independent of the presence/absence of stats. + * Left Null Aware Anti Joins are never inverted due to lack of backend support. + * Joins that originate from query blocks with a straight join hint are not inverted. + * The 'isLocalPlan' parameter indicates whether the plan tree rooted at 'root' + * will be executed locally within one machine, i.e., without any data exchanges. + * Return true if any join in the plan rooted at 'root' was inverted. + * + * TODO: This should be replaced once we conclude the changes contained in this method + * are safe to be pushed to Planner.invertJoins, i.e., they do not cause any + * performance regressions with Impala FE. A couple of differences in this version + * of invertJoins include: + * - The computeStats is done here. The Calcite planner will eventually have + * the computeStats built in during optimization time, but the join stats here + * are specific for Impala. So we need an explicit computeStats call only for + * Impala here. + * - We only need to do the computeStats when the plan changes via inversion, so + * the method returns a boolean whenever the inversion happens. + * - A Jira has been filed for this: IMPALA-12958. Probably the best fix for this + * is to put the inversion inside a Calcite rule. + * + **/ + private static boolean invertJoins(PlanNode root, boolean isLocalPlan, + Analyzer analyzer) { + boolean inverted = false; + if (root instanceof SubplanNode) { + inverted |= invertJoins(root.getChild(0), isLocalPlan, analyzer); + inverted |= invertJoins(root.getChild(1), true, analyzer); + } else { + for (PlanNode child: root.getChildren()) { + inverted |= invertJoins(child, isLocalPlan, analyzer); + } + } + + if (root instanceof JoinNode) { + JoinNode joinNode = (JoinNode) root; + JoinOperator joinOp = joinNode.getJoinOp(); + + if (!joinNode.isInvertible(isLocalPlan)) { + if (inverted) { + // Re-compute tuple ids since their order must correspond to the order + // of children. + root.computeTupleIds(); + // Re-compute stats since PK-FK inference and cardinality may have changed after + // inversion. + root.computeStats(analyzer); + } + return inverted; + } + + if (joinNode.getChild(0) instanceof SingularRowSrcNode) { + // Always place a singular row src on the build side because it + // only produces a single row. + joinNode.invertJoin(); + inverted = true; + } else if (!isLocalPlan && joinNode instanceof NestedLoopJoinNode && + (joinOp.isRightSemiJoin() || joinOp.isRightOuterJoin())) { + // The current join is a distributed non-equi right outer or semi join + // which has no backend support. Invert the join to make it executable. + joinNode.invertJoin(); + inverted = true; + } else if (Planner.isInvertedJoinCheaper(joinNode, isLocalPlan)) { + joinNode.invertJoin(); + inverted = true; + } + // Re-compute the numNodes and numInstances based on the new input order + joinNode.recomputeNodes(); + } + + if (inverted) { + // Re-compute tuple ids because the backend assumes that their order corresponds to + // the order of children. + root.computeTupleIds(); + // Re-compute stats since PK-FK inference and cardinality may have changed after + // inversion. + root.computeStats(analyzer); + } + return inverted; + } + + @Override + public void logDebug(Object resultObject) { + if (!(resultObject instanceof TExecRequest)) { + LOG.debug("Finished create exec request step, but unknown result: " + resultObject); + } + LOG.debug("Exec request: " + resultObject); + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/type/ImpalaTypeConverter.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/type/ImpalaTypeConverter.java new file mode 100644 index 000000000..8a93f6541 --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/type/ImpalaTypeConverter.java @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.type; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlCollation; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.ConversionUtil; +import org.apache.impala.catalog.PrimitiveType; +import org.apache.impala.catalog.ScalarType; +import org.apache.impala.catalog.Type; +import org.apache.impala.thrift.TPrimitiveType; + +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A utility class that holds methods that convert impala types to Calcite + * types and vice versa. + * + * One important distinction is the different classes used here. On the Impala side, + * there are: + * - Normalized Type names (e.g. Type.BOOLEAN). These are pre-created types. This becomes + * important for types with precisions like decimal, char, and varchar. The Type.DECIMAL + * (and char and varchar) do not have any precision (or scale) associated with them. In + * the function signatures, all precisions are allowed, so this type is used when + * describing them. + * - types with precisions. These also use Types (also of type ScalarType), but the + * precision (and scale) is included with the datatype. + * + * On the Calcite side, there are: + * - Normalized RelDataTypes. While theoretically we should have been able to use + * SqlTypeName to have the same purpose as the Impala default dataypes, there is no + * SqlTypeName.STRING. Therefore, we needed to resort to RelDataTypes for this purpose. + * - types with precisions. The normal RelDataType is used. + */ +public class ImpalaTypeConverter { + + // Maps Impala default types to Calcite default types. + private static Map impalaToCalciteMap; + + static { + RexBuilder rexBuilder = + new RexBuilder(new JavaTypeFactoryImpl(new ImpalaTypeSystemImpl())); + RelDataTypeFactory factory = rexBuilder.getTypeFactory(); + Map map = new HashMap<>(); + map.put(Type.BOOLEAN, factory.createSqlType(SqlTypeName.BOOLEAN)); + map.put(Type.TINYINT, factory.createSqlType(SqlTypeName.TINYINT)); + map.put(Type.SMALLINT, factory.createSqlType(SqlTypeName.SMALLINT)); + map.put(Type.INT, factory.createSqlType(SqlTypeName.INTEGER)); + map.put(Type.BIGINT, factory.createSqlType(SqlTypeName.BIGINT)); + map.put(Type.FLOAT, factory.createSqlType(SqlTypeName.FLOAT)); + map.put(Type.DOUBLE, factory.createSqlType(SqlTypeName.DOUBLE)); + map.put(Type.TIMESTAMP, factory.createSqlType(SqlTypeName.TIMESTAMP)); + map.put(Type.DATE, factory.createSqlType(SqlTypeName.DATE)); + map.put(Type.BINARY, factory.createSqlType(SqlTypeName.BINARY)); + map.put(Type.STRING, factory.createSqlType(SqlTypeName.VARCHAR, Integer.MAX_VALUE)); + map.put(Type.NULL, factory.createSqlType(SqlTypeName.NULL)); + + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (Type t : map.keySet()) { + RelDataType r = map.get(t); + builder.put(t, factory.createTypeWithNullability(r, true)); + } + impalaToCalciteMap = builder.build(); + } + + /** + * Create a new RelDataType given the Impala type. + */ + public static RelDataType createRelDataType(RelDataTypeFactory factory, + Type impalaType) { + if (impalaType == null) { + return null; + } + TPrimitiveType primitiveType = impalaType.getPrimitiveType().toThrift(); + ScalarType scalarType = (ScalarType) impalaType; + switch (primitiveType) { + case DECIMAL: + RelDataType decimalDefinedRetType = factory.createSqlType(SqlTypeName.DECIMAL, + scalarType.decimalPrecision(), scalarType.decimalScale()); + return factory.createTypeWithNullability(decimalDefinedRetType, true); + case VARCHAR: + RelDataType varcharType = factory.createSqlType(SqlTypeName.VARCHAR, + scalarType.getLength()); + return factory.createTypeWithNullability(varcharType, true); + case CHAR: + RelDataType charType = factory.createSqlType(SqlTypeName.CHAR, + scalarType.getLength()); + return factory.createTypeWithNullability(charType, true); + default: + Type normalizedImpalaType = getImpalaType(primitiveType); + return impalaToCalciteMap.get(normalizedImpalaType); + } + } + + /** + * Create Impala types given primitive types. + * Primitive types should not be exposed outside of this class. + */ + public static Type getImpalaType(TPrimitiveType argType) { + // Char and decimal contain precisions and need to be treated separately from + // the rest. The precisions for this case are unknown though, as we are only given + // a "primitivetype'. + switch (argType) { + case CHAR: + return Type.CHAR; + case VARCHAR: + return Type.VARCHAR; + case DECIMAL: + return Type.DECIMAL; + case BOOLEAN: + return Type.BOOLEAN; + case TINYINT: + return Type.TINYINT; + case SMALLINT: + return Type.SMALLINT; + case INT: + return Type.INT; + case BIGINT: + return Type.BIGINT; + case FLOAT: + return Type.FLOAT; + case DOUBLE: + return Type.DOUBLE; + case TIMESTAMP: + return Type.TIMESTAMP; + case DATE: + return Type.DATE; + case STRING: + return Type.STRING; + case FIXED_UDA_INTERMEDIATE: + return Type.FIXED_UDA_INTERMEDIATE; + case NULL_TYPE: + return Type.NULL; + case BINARY: + return Type.BINARY; + default: + throw new RuntimeException("Unknown type " + argType); + } + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/type/ImpalaTypeSystemImpl.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/type/ImpalaTypeSystemImpl.java new file mode 100644 index 000000000..c812b461f --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/type/ImpalaTypeSystemImpl.java @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.type; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeSystemImpl; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.type.SqlTypeUtil; +import org.apache.impala.analysis.ArithmeticExpr; +import org.apache.impala.analysis.TypesUtil; +import org.apache.impala.catalog.ScalarType; +import org.apache.impala.catalog.Type; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.exception.ExceptionUtils; + +/** + * ImpalaTypeSystemImpl contains constants that are specific + * to Impala datatypes that are used by Calcite. + * Many of these constants were copied from the Hive repository + * in the HiveTypeSystemImpl file. These are not fully tested + * at this point, but since Hive datatypes are similar to Impala + * datatypes, these definitions probably make sense. This may + * change later as more code gets added if it turns out some of these + * definitions do not make sense. + */ +public class ImpalaTypeSystemImpl extends RelDataTypeSystemImpl { + protected static final Logger LOG = + LoggerFactory.getLogger(ImpalaTypeSystemImpl.class.getName()); + private static final int MAX_BINARY_PRECISION = Integer.MAX_VALUE; + // TIMESTAMP precision can go up to nanos + private static final int MAX_TIMESTAMP_PRECISION = 15; + private static final int MAX_TIMESTAMP_WITH_LOCAL_TIME_ZONE_PRECISION = 15; // nanos + // The precisions here match the number of digits that can be held by the type. + // For example, the maximum value of TINYINT is 127, which is 3 digits. + // The float and double precisions also match the number of total digits in the number. + // Note: The FLOAT precision here is different from the precision used in the + // Calcite RelDataTypeSystem file. Calcite treats its floats the same as doubles. + // Also note that the precision sizes match the values existing in + // HiveDataTypeSystemImpl in the Hive github code base. + private static final int DEFAULT_TINYINT_PRECISION = 3; + private static final int DEFAULT_SMALLINT_PRECISION = 5; + private static final int DEFAULT_INTEGER_PRECISION = 10; + private static final int DEFAULT_BIGINT_PRECISION = 19; + private static final int DEFAULT_FLOAT_PRECISION = 7; + private static final int DEFAULT_DOUBLE_PRECISION = 15; + + + @Override + public int getMaxScale(SqlTypeName typeName) { + switch (typeName) { + case DECIMAL: + return getMaxNumericScale(); + case INTERVAL_YEAR: + case INTERVAL_MONTH: + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY: + case INTERVAL_DAY_HOUR: + case INTERVAL_DAY_MINUTE: + case INTERVAL_DAY_SECOND: + case INTERVAL_HOUR: + case INTERVAL_HOUR_MINUTE: + case INTERVAL_HOUR_SECOND: + case INTERVAL_MINUTE: + case INTERVAL_MINUTE_SECOND: + case INTERVAL_SECOND: + return SqlTypeName.MAX_INTERVAL_FRACTIONAL_SECOND_PRECISION; + default: + return -1; + } + } + + @Override + public int getDefaultPrecision(SqlTypeName typeName) { + switch (typeName) { + // Binary doesn't need any sizes; Decimal has the default of 10. + case BINARY: + case VARBINARY: + return RelDataType.PRECISION_NOT_SPECIFIED; + case CHAR: + return RelDataType.PRECISION_NOT_SPECIFIED; + case VARCHAR: + return RelDataType.PRECISION_NOT_SPECIFIED; + case DECIMAL: + return RelDataType.PRECISION_NOT_SPECIFIED; + case INTERVAL_YEAR: + case INTERVAL_MONTH: + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY: + case INTERVAL_DAY_HOUR: + case INTERVAL_DAY_MINUTE: + case INTERVAL_DAY_SECOND: + case INTERVAL_HOUR: + case INTERVAL_HOUR_MINUTE: + case INTERVAL_HOUR_SECOND: + case INTERVAL_MINUTE: + case INTERVAL_MINUTE_SECOND: + case INTERVAL_SECOND: + return SqlTypeName.DEFAULT_INTERVAL_START_PRECISION; + default: + return getMaxPrecision(typeName); + } + } + + @Override + public int getMaxPrecision(SqlTypeName typeName) { + switch (typeName) { + case BINARY: + case VARBINARY: + return MAX_BINARY_PRECISION; + case TIME: + case TIMESTAMP: + return MAX_TIMESTAMP_PRECISION; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return MAX_TIMESTAMP_WITH_LOCAL_TIME_ZONE_PRECISION; + case CHAR: + return ScalarType.MAX_CHAR_LENGTH; + case VARCHAR: + return Integer.MAX_VALUE; + case DECIMAL: + return getMaxNumericPrecision(); + case INTERVAL_YEAR: + case INTERVAL_MONTH: + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY: + case INTERVAL_DAY_HOUR: + case INTERVAL_DAY_MINUTE: + case INTERVAL_DAY_SECOND: + case INTERVAL_HOUR: + case INTERVAL_HOUR_MINUTE: + case INTERVAL_HOUR_SECOND: + case INTERVAL_MINUTE: + case INTERVAL_MINUTE_SECOND: + case INTERVAL_SECOND: + return SqlTypeName.MAX_INTERVAL_START_PRECISION; + case TINYINT: + return DEFAULT_TINYINT_PRECISION; + case SMALLINT: + return DEFAULT_SMALLINT_PRECISION; + case INTEGER: + return DEFAULT_INTEGER_PRECISION; + case BIGINT: + return DEFAULT_BIGINT_PRECISION; + case FLOAT: + return DEFAULT_FLOAT_PRECISION; + case DOUBLE: + return DEFAULT_DOUBLE_PRECISION; + default: + return -1; + } + } + + @Override + public int getMaxNumericScale() { + return ScalarType.MAX_SCALE; + } + + @Override + public int getMaxNumericPrecision() { + return ScalarType.MAX_PRECISION; + } + + @Override + public boolean isSchemaCaseSensitive() { + return false; + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/validate/ImpalaConformance.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/validate/ImpalaConformance.java new file mode 100644 index 000000000..4f1000919 --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/validate/ImpalaConformance.java @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.validate; + +import org.apache.calcite.sql.fun.SqlLibrary; +import org.apache.calcite.sql.validate.SqlConformance; + +/** + * ImpalaConformance describes supported Calcite features. + * For more info on the description of these methods, see: + * https://calcite.apache.org/javadocAggregate/org/apache/calcite/sql/validate/SqlConformance.html + */ +public class ImpalaConformance implements SqlConformance { + + public static final SqlConformance INSTANCE = new ImpalaConformance(); + + @Override public boolean isLiberal() { + return true; + } + + @Override public boolean allowCharLiteralAlias() { + return true; + } + + + @Override public boolean isGroupByAlias() { + return true; + } + + @Override public boolean isGroupByOrdinal() { + return true; + } + + @Override public boolean isHavingAlias() { + return true; + } + + @Override public boolean isSortByOrdinal() { + return true; + } + + @Override public boolean isSortByAlias() { + return true; + } + + @Override public boolean isSortByAliasObscures() { + return false; + } + + @Override public boolean isFromRequired() { + return false; + } + + @Override public boolean splitQuotedTableName() { + return false; + } + + @Override public boolean allowHyphenInUnquotedTableName() { + return false; + } + + @Override public boolean isBangEqualAllowed() { + return true; + } + + @Override public boolean isMinusAllowed() { + return true; + } + + @Override public boolean isRegexReplaceCaptureGroupDollarIndexed() { + return true; + } + + @Override public boolean isPercentRemainderAllowed() { + return true; + } + + @Override public boolean isApplyAllowed() { + return false; + } + + @Override public boolean isInsertSubsetColumnsAllowed() { + return false; + } + + @Override public boolean allowNiladicParentheses() { + return true; + } + + @Override public boolean allowExplicitRowValueConstructor() { + return false; + } + + @Override public boolean allowExtend() { + return false; + } + + @Override public boolean isLimitStartCountAllowed() { + return false; + } + + @Override public boolean isOffsetLimitAllowed() { + return false; + } + + @Override public boolean allowGeometry() { + return false; + } + + @Override public boolean shouldConvertRaggedUnionTypesToVarying() { + return false; + } + + @Override public boolean allowExtendedTrim() { + return false; + } + + @Override public boolean allowPluralTimeUnits() { + return false; + } + + @Override public boolean allowQualifyingCommonColumn() { + return true; + } + + @Override public boolean allowAliasUnnestItems() { + return false; + } + + @Override public boolean isValueAllowed() { + return true; + } + + @Override public SqlLibrary semantics() { + return SqlLibrary.STANDARD; + } + + @Override public boolean allowLenientCoercion() { + return false; + } +} diff --git a/java/pom.xml b/java/pom.xml index e7a2d3d72..9e4697e58 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -385,6 +385,7 @@ under the License. ext-data-source ../fe external-frontend + calcite-planner query-event-hook-api shaded-deps/hive-exec shaded-deps/s3a-aws-sdk diff --git a/testdata/workloads/functional-query/queries/QueryTest/calcite.test b/testdata/workloads/functional-query/queries/QueryTest/calcite.test new file mode 100644 index 000000000..b05f3089a --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/calcite.test @@ -0,0 +1,117 @@ +==== +---- QUERY +-- this is a test for a comment line above a blank line +-- we do not care about the results, just the comment + +select * from functional.alltypestiny; +---- RUNTIME_PROFILE +row_regex: .*PlannerType: CalcitePlanner.* +==== +---- QUERY +create table calcite_alltypes as select * from functional.alltypes order by id limit 5; +---- RUNTIME_PROFILE +row_regex: .*PlannerType: OriginalPlanner.* +==== +---- QUERY +select * from calcite_alltypes; +---- RESULTS +0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1 +1,false,1,1,1,10,1.100000023841858,10.1,'01/01/09','1',2009-01-01 00:01:00,2009,1 +2,true,2,2,2,20,2.200000047683716,20.2,'01/01/09','2',2009-01-01 00:02:00.100000000,2009,1 +3,false,3,3,3,30,3.299999952316284,30.3,'01/01/09','3',2009-01-01 00:03:00.300000000,2009,1 +4,true,4,4,4,40,4.400000095367432,40.4,'01/01/09','4',2009-01-01 00:04:00.600000000,2009,1 +---- TYPES +int,boolean,tinyint,smallint,int,bigint,float,double,string,string,timestamp,int,int +---- RUNTIME_PROFILE +row_regex: .*PlannerType: CalcitePlanner.* +==== +---- QUERY +select string_col, tinyint_col from calcite_alltypes; +---- RUNTIME_PROFILE +row_regex: .*PlannerType: CalcitePlanner.* +---- RESULTS +'0',0 +'1',1 +'2',2 +'3',3 +'4',4 +---- TYPES +string,tinyint +==== +---- QUERY +select d1,d2,d3,d4,d5,d6 from functional.decimal_tbl; +---- RESULTS +1234,2222,1.2345678900,0.12345678900000000000000000000000000000,12345.78900,1 +12345,333,123.4567890000,0.12345678900000000000000000000000000000,11.22000,1 +12345,333,1234.5678900000,0.12345678900000000000000000000000000000,0.10000,1 +132842,333,12345.6789000000,0.12345678900000000000000000000000000000,0.77889,1 +2345,111,12.3456789000,0.12345678900000000000000000000000000000,3.14100,1 +---- TYPES +decimal,decimal,decimal,decimal,decimal,decimal +---- RUNTIME_PROFILE +row_regex: .*PlannerType: CalcitePlanner.* +==== +---- QUERY +select * from functional.chars_tiny; +---- RESULTS +'1aaaa','1bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb','1cccc' +'2aaaa','2bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb','2cccccc' +'3aaa ','3bbbbb ','3ccc' +'4aa ','4bbbb ','4cc' +'5a ','5bbb ','5c' +'6a ','6b ','6c' +'6a ','6b ','6c' +'NULL','NULL','NULL' +'a ','b ','c' +---- TYPES +# varchar shows up as string, just as it does in the chars.test file +char,char,string +---- RUNTIME_PROFILE +row_regex: .*PlannerType: CalcitePlanner.* +==== +---- QUERY +select * from functional.date_tbl; +---- RESULTS +0,0001-01-01,0001-01-01 +1,0001-12-31,0001-01-01 +10,2017-11-28,1399-06-27 +11,NULL,1399-06-27 +12,2018-12-31,1399-06-27 +2,0002-01-01,0001-01-01 +20,0001-06-21,2017-11-27 +21,0001-06-22,2017-11-27 +22,0001-06-23,2017-11-27 +23,0001-06-24,2017-11-27 +24,0001-06-25,2017-11-27 +25,0001-06-26,2017-11-27 +26,0001-06-27,2017-11-27 +27,0001-06-28,2017-11-27 +28,0001-06-29,2017-11-27 +29,2017-11-28,2017-11-27 +3,1399-12-31,0001-01-01 +30,9999-12-01,9999-12-31 +31,9999-12-31,9999-12-31 +4,2017-11-28,0001-01-01 +5,9999-12-31,0001-01-01 +6,NULL,0001-01-01 +---- TYPES +int,date,date +---- RUNTIME_PROFILE +row_regex: .*PlannerType: CalcitePlanner.* +==== +---- QUERY +# creating a new table. We cannot use any functions at this point to +# manipulate the binary data (as the binary test currently does), so +# this just grabs the rows that can be checked. +create table ascii_binary as select * from functional.binary_tbl where id <= 4; +select * from ascii_binary; +---- RESULTS +1,'ascii','binary1' +2,'ascii','binary2' +3,'null','NULL' +4,'empty','' +---- TYPES +int,string,binary +---- RUNTIME_PROFILE +row_regex: .*PlannerType: CalcitePlanner.* +==== diff --git a/tests/custom_cluster/test_calcite_planner.py b/tests/custom_cluster/test_calcite_planner.py new file mode 100644 index 000000000..1b49df791 --- /dev/null +++ b/tests/custom_cluster/test_calcite_planner.py @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import absolute_import, division, print_function +import logging +import pytest + +from tests.common.custom_cluster_test_suite import CustomClusterTestSuite + +LOG = logging.getLogger(__name__) + + +class TestCalcitePlanner(CustomClusterTestSuite): + + @classmethod + def setup_class(cls): + super(TestCalcitePlanner, cls).setup_class() + + @classmethod + def get_workload(cls): + return 'functional-query' + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args(start_args="--use_calcite_planner=true") + def test_calcite_frontend(self, vector, unique_database): + self.run_test_case('QueryTest/calcite', vector, use_db=unique_database) diff --git a/tests/util/workload_management.py b/tests/util/workload_management.py index 4cdacfe54..c092f85c5 100644 --- a/tests/util/workload_management.py +++ b/tests/util/workload_management.py @@ -417,7 +417,7 @@ def assert_query(query_tbl, client, expected_cluster_id, raw_profile=None, impal index += 1 assert sql_results.column_labels[index] == EXECUTOR_GROUPS ret_data[EXECUTOR_GROUPS] = data[index] - exec_groups = re.search(r'\n\s+(Executor group \d+:.*?)\n\s+ImpalaServer', profile_text, + exec_groups = re.search(r'\n\s+(Executor group \d+:.*?)\n\s+PlannerInfo', profile_text, re.DOTALL) if query_state_value == "EXCEPTION": assert exec_groups is None, "executor groups should not have been found"