diff --git a/bin/set-classpath.sh b/bin/set-classpath.sh index bbdc214bd..abfe54e43 100644 --- a/bin/set-classpath.sh +++ b/bin/set-classpath.sh @@ -48,6 +48,22 @@ fi CLASSPATH=$(cat $FE_CP_FILE):"$CLASSPATH" +# Currently the Calcite planner is experimental and not included by default. +# If the USE_CALCITE_PLANNER is explicitly set, then the jar dependencies +# are added to the CLASSPATH +USE_CALCITE_PLANNER=${USE_CALCITE_PLANNER:-false} +if [ "true" = "$USE_CALCITE_PLANNER" ]; then + + CALCITE_CP_FILE="$IMPALA_HOME/java/calcite-planner/target/calcite-build-classpath.txt" + if [ ! -s "$CALCITE_CP_FILE" ]; then + >&2 echo Calcite front-end classpath file $CALCITE_CP_FILE missing. + >&2 echo Build the Calcite front-end first. + return 1 + fi + CLASSPATH="$CLASSPATH":$(cat $CALCITE_CP_FILE):\ +"$IMPALA_HOME"/java/calcite-planner/target/calcite-planner-${IMPALA_VERSION}.jar: +fi + if [[ "${1:-notest}" = "test" ]]; then FE_TEST_CP_FILE="$IMPALA_HOME/fe/target/test-classpath.txt" CLASSPATH=$(cat $FE_TEST_CP_FILE):"$CLASSPATH" diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py index 5efe2188d..9d2ad99af 100755 --- a/bin/start-impala-cluster.py +++ b/bin/start-impala-cluster.py @@ -163,7 +163,7 @@ parser.add_option("--enable_catalogd_ha", dest="enable_catalogd_ha", help="If true, enables CatalogD HA - the cluster will be launched " "with two catalogd instances as Active-Passive HA pair.") parser.add_option("--jni_frontend_class", dest="jni_frontend_class", - action="store", default="org/apache/impala/service/JniFrontend", + action="store", default="", help="Use a custom java frontend interface.") parser.add_option("--enable_statestored_ha", dest="enable_statestored_ha", action="store_true", default=False, @@ -187,6 +187,10 @@ parser.add_option("--tuple_cache_capacity", dest="tuple_cache_capacity", parser.add_option("--tuple_cache_eviction_policy", dest="tuple_cache_eviction_policy", default="LRU", help="This specifies the cache eviction policy to use " "for the tuple cache.") +parser.add_option("--use_calcite_planner", default="False", type="choice", + choices=["true", "True", "false", "False"], + help="If true, use the Calcite planner for query optimization " + "instead of the Impala planner") # For testing: list of comma-separated delays, in milliseconds, that delay impalad catalog # replica initialization. The ith delay is applied to the ith impalad. @@ -649,7 +653,7 @@ def build_impalad_arg_lists(cluster_size, num_coordinators, use_exclusive_coordi args = "{args} -geospatial_library={geospatial_library}".format( args=args, geospatial_library=options.geospatial_library) - if options.jni_frontend_class: + if options.jni_frontend_class != "": args = "-jni_frontend_class={jni_frontend_class} {args}".format( jni_frontend_class=options.jni_frontend_class, args=args) @@ -658,6 +662,12 @@ def build_impalad_arg_lists(cluster_size, num_coordinators, use_exclusive_coordi else: args = "-allow_tuple_caching=true {args}".format(args=args) + if options.use_calcite_planner.lower() == 'true': + args = "-jni_frontend_class={jni_frontend_class} {args}".format( + jni_frontend_class="org/apache/impala/calcite/service/CalciteJniFrontend", + args=args) + os.environ["USE_CALCITE_PLANNER"] = "true" + # Appended at the end so they can override previous args. if i < len(per_impalad_args): args = "{args} {per_impalad_args}".format( diff --git a/fe/src/main/java/org/apache/impala/analysis/TableName.java b/fe/src/main/java/org/apache/impala/analysis/TableName.java index 1f0efc67e..864b48577 100644 --- a/fe/src/main/java/org/apache/impala/analysis/TableName.java +++ b/fe/src/main/java/org/apache/impala/analysis/TableName.java @@ -66,9 +66,8 @@ public class TableName { // Avoid "db1." and ".tbl1" being treated as the same. We resolve ".tbl1" as // "default.tbl1". But we reject "db1." since it only gives the database name. if (fullName == null || fullName.trim().endsWith(".")) return null; - // TODO: upgrade Guava to 15+ to use splitToList instead List parts = Lists.newArrayList(Splitter.on('.').trimResults() - .omitEmptyStrings().split(fullName.toLowerCase())); + .omitEmptyStrings().splitToList(fullName.toLowerCase())); if (parts.size() == 1) { return new TableName(Catalog.DEFAULT_DB, parts.get(0)); } diff --git a/fe/src/main/java/org/apache/impala/planner/PlannerContext.java b/fe/src/main/java/org/apache/impala/planner/PlannerContext.java index a54e3ae4d..0102def36 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlannerContext.java +++ b/fe/src/main/java/org/apache/impala/planner/PlannerContext.java @@ -68,6 +68,7 @@ public class PlannerContext { private final EventSequence timeline_; private final TQueryCtx queryCtx_; private final QueryStmt queryStmt_; + private final Analyzer rootAnalyzer_; public PlannerContext (AnalysisResult analysisResult, TQueryCtx queryCtx, EventSequence timeline) { @@ -85,14 +86,20 @@ public class PlannerContext { } else { queryStmt_ = analysisResult.getQueryStmt(); } + rootAnalyzer_ = analysisResult.getAnalyzer(); } // Constructor useful for an external planner module public PlannerContext(TQueryCtx queryCtx, EventSequence timeline) { + this((Analyzer) null, queryCtx, timeline); + } + + public PlannerContext(Analyzer analyzer, TQueryCtx queryCtx, EventSequence timeline) { queryCtx_ = queryCtx; timeline_ = timeline; analysisResult_ = null; queryStmt_ = null; + rootAnalyzer_ = analyzer; } public QueryStmt getQueryStmt() { return queryStmt_; } @@ -100,7 +107,7 @@ public class PlannerContext { public TQueryOptions getQueryOptions() { return getRootAnalyzer().getQueryOptions(); } public AnalysisResult getAnalysisResult() { return analysisResult_; } public EventSequence getTimeline() { return timeline_; } - public Analyzer getRootAnalyzer() { return analysisResult_.getAnalyzer(); } + public Analyzer getRootAnalyzer() { return rootAnalyzer_; } public boolean isSingleNodeExec() { return getQueryOptions().num_nodes == 1; } public PlanNodeId getNextNodeId() { return nodeIdGenerator_.getNextId(); } public PlanFragmentId getNextFragmentId() { return fragmentIdGenerator_.getNextId(); } diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index cd7363ebb..c82c269a7 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -273,6 +273,12 @@ public class Frontend { private static final String AVG_ADMISSION_SLOTS_PER_EXECUTOR = "AvgAdmissionSlotsPerExecutor"; + // info about the planner used. In this code, we will always use the Original planner, + // but other planners may set their own planner values + public static final String PLANNER_PROFILE = "PlannerInfo"; + public static final String PLANNER_TYPE = "PlannerType"; + private static final String PLANNER = "OriginalPlanner"; + /** * Plan-time context that allows capturing various artifacts created * during the process. @@ -2135,6 +2141,7 @@ public class Frontend { private TExecRequest getTExecRequest(PlanCtx planCtx, EventSequence timeline) throws ImpalaException { TQueryCtx queryCtx = planCtx.getQueryContext(); + addPlannerToProfile(PLANNER); LOG.info("Analyzing query: " + queryCtx.client_request.stmt + " db: " + queryCtx.session.database); @@ -2500,6 +2507,12 @@ public class Frontend { } } + public static void addPlannerToProfile(String planner) { + TRuntimeProfileNode profile = createTRuntimeProfileNode(PLANNER_PROFILE); + addInfoString(profile, PLANNER_TYPE, planner); + FrontendProfile.getCurrent().addChildrenProfile(profile); + } + private TExecRequest doCreateExecRequest(PlanCtx planCtx, EventSequence timeline) throws ImpalaException { TQueryCtx queryCtx = planCtx.getQueryContext(); diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java b/fe/src/main/java/org/apache/impala/service/JniFrontend.java index 5f08f0ab3..31caa3bf4 100644 --- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java +++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java @@ -1046,4 +1046,8 @@ public class JniFrontend { } return ""; } + + public Frontend getFrontend() { + return frontend_; + } } diff --git a/java/calcite-planner/pom.xml b/java/calcite-planner/pom.xml new file mode 100644 index 000000000..9ec53d8b6 --- /dev/null +++ b/java/calcite-planner/pom.xml @@ -0,0 +1,108 @@ + + + + + org.apache.impala + impala-parent + 4.4.0-SNAPSHOT + + 4.0.0 + + calcite-planner + 4.4.0-SNAPSHOT + jar + + calcite-planner + + + + org.apache.impala + impala-frontend + 4.4.0-SNAPSHOT + + + org.apache.calcite + calcite-core + 1.36.0 + + + org.apache.calcite.avatica + avatica-core + 1.23.0 + + + + + + + + org.apache.maven.plugins + maven-resources-plugin + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.11.0 + + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-dependency-plugin + 3.5.0 + + + copy-dependencies + package + + copy-dependencies + + + pom + runtime + true + + + + + write-classpath + + build-classpath + + + ${project.build.directory}/calcite-build-classpath.txt + runtime + pom + + + + + + + diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ConvertToImpalaRelRules.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ConvertToImpalaRelRules.java new file mode 100644 index 000000000..fa6390d9a --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ConvertToImpalaRelRules.java @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.rel.node; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rel.logical.LogicalTableScan; + +/** + * ConvertToImpalaRelRules. Contains the rules used to change the Calcite RelNodes + * to Impala RelNodes. These Impala RelNodes are responsible for creating the + * physical PlanNode plan. The Calcite RelNode and Impala RelNodes map one to one + * with each other. + */ +public class ConvertToImpalaRelRules { + + public static class ImpalaProjectRule extends RelOptRule { + public ImpalaProjectRule() { + super(operand(LogicalProject.class, any())); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final LogicalProject project = call.rel(0); + call.transformTo(new ImpalaProjectRel(project)); + } + } + + public static class ImpalaScanRule extends RelOptRule { + + public ImpalaScanRule() { + super(operand(LogicalTableScan.class, none())); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final LogicalTableScan scan = call.rel(0); + call.transformTo(new ImpalaHdfsScanRel(scan)); + } + } + +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaHdfsScanRel.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaHdfsScanRel.java new file mode 100644 index 000000000..70fabba3a --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaHdfsScanRel.java @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.rel.node; + +import com.google.common.base.Preconditions; + +import org.apache.calcite.rel.core.TableScan; +import org.apache.impala.analysis.BaseTableRef; +import org.apache.impala.analysis.Expr; +import org.apache.impala.analysis.SlotDescriptor; +import org.apache.impala.analysis.SlotRef; +import org.apache.impala.analysis.TupleDescriptor; +import org.apache.impala.calcite.rel.phys.ImpalaHdfsScanNode; +import org.apache.impala.calcite.schema.CalciteTable; +import org.apache.impala.catalog.FeFsPartition; +import org.apache.impala.catalog.HdfsTable; +import org.apache.impala.common.ImpalaException; +import org.apache.impala.planner.PlanNode; +import org.apache.impala.planner.PlanNodeId; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * ImpalaHdfsScanRel. Calcite RelNode which maps to an Impala TableScan node. + */ +public class ImpalaHdfsScanRel extends TableScan + implements ImpalaPlanRel { + + public ImpalaHdfsScanRel(TableScan scan) { + super(scan.getCluster(), scan.getTraitSet(), scan.getHints(), scan.getTable()); + } + + @Override + public NodeWithExprs getPlanNode(ParentPlanRelContext context) throws ImpalaException { + + CalciteTable table = (CalciteTable) getTable(); + + BaseTableRef baseTblRef = + table.createBaseTableRef(context.ctx_.getRootAnalyzer()); + + // Create the Tuple Descriptor which will contain only the relevant columns + // from the table needed for the query. + TupleDescriptor tupleDesc = table.createTupleAndSlotDesc(baseTblRef, + getInputRefFieldNames(context), context.ctx_.getRootAnalyzer()); + + // outputExprs will contain all the needed columns from the table + List outputExprs = createScanOutputExprs(tupleDesc.getSlots()); + + List impalaPartitions = table.getPrunedPartitions( + context.ctx_.getRootAnalyzer(), tupleDesc); + + // TODO: filters are not handled yet, nor are partitions + List filterConjuncts = new ArrayList<>(); + List partitionConjuncts = new ArrayList<>(); + + PlanNodeId nodeId = context.ctx_.getNextNodeId(); + + PlanNode physicalNode = new ImpalaHdfsScanNode(nodeId, tupleDesc, impalaPartitions, + baseTblRef, null, partitionConjuncts, filterConjuncts); + physicalNode.init(context.ctx_.getRootAnalyzer()); + + return new NodeWithExprs(physicalNode, outputExprs); + } + + /** + * Return a list of (SlotRef) expressions for the scan node. The list will + * be the size of the # of columns in the table. We initialize the list to + * contain null values for all elements. + * + * If a column isn't projected out by the parent of the scan node, the array + * location for the column will remain null. + */ + private List createScanOutputExprs(List slotDescs) { + int totalCols = getRowType().getFieldNames().size(); + + // Initialize all fields to null. + // TODO: Should this be a map instead of a list? See IMPALA-12961 for details. + List scanOutputExprs = new ArrayList<>(Collections.nCopies(totalCols, null)); + + HdfsTable table = ((CalciteTable) getTable()).getHdfsTable(); + Preconditions.checkState(totalCols == table.getColumns().size()); + int nonPartitionedCols = totalCols - table.getNumClusteringCols(); + for (SlotDescriptor slotDesc : slotDescs) { + // On a "select *" with partitioned columns, the partition columns occur after the + // nonpartitioned columns. But Impala displays the partition columns first. The + // modular arithmetic provides the correct position number for Impala. + int position = + (slotDesc.getColumn().getPosition() + nonPartitionedCols) % totalCols; + scanOutputExprs.set(position, new SlotRef(slotDesc)); + } + return scanOutputExprs; + } + + + private List getInputRefFieldNames(ParentPlanRelContext context) { + // If the parent context didn't pass in input refs, we will select all the + // columns from the table. + if (context.inputRefs_ == null) { + return getRowType().getFieldNames(); + } + + List inputRefFieldNames = new ArrayList<>(); + for (Integer i : context.inputRefs_) { + inputRefFieldNames.add(getRowType().getFieldNames().get(i)); + } + return inputRefFieldNames; + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaPlanRel.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaPlanRel.java new file mode 100644 index 000000000..aca02da40 --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaPlanRel.java @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.rel.node; + +import org.apache.impala.common.ImpalaException; + + +/** + * ImpalaPlanRel. Interface used for all Impala intermediary RelNodes + */ +public interface ImpalaPlanRel { + + /** + * getPlanNode returns a NodeWithExprs object, a thin structure containing + * the plan node along with the output expressions generated by the plan node. + */ + public NodeWithExprs getPlanNode(ParentPlanRelContext context) throws ImpalaException; + +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaProjectRel.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaProjectRel.java new file mode 100644 index 000000000..fc9706fe8 --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaProjectRel.java @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.rel.node; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; +import org.apache.impala.analysis.Analyzer; +import org.apache.impala.analysis.Expr; +import org.apache.impala.calcite.rel.util.CreateExprVisitor; +import org.apache.impala.common.ImpalaException; + +import java.util.List; + + +/** + * ImpalaProjectRel is the Impala specific RelNode corresponding to + * Project. + * + * There is no PlanNode equivalent for ProjectRel in Impala. The output expressions + * are generated and passed to its parent PlanNode (or the final output exprs if the + * Project is at the top of the tree). The input references are passed into the child + * PlanNode for pruning purposes (HdfsScan would only need to select out the columns + * that are being used). + */ +public class ImpalaProjectRel extends Project + implements ImpalaPlanRel { + + public ImpalaProjectRel(Project project) { + super(project.getCluster(), project.getTraitSet(), project.getInput(), + project.getProjects(), project.getRowType()); + } + + // Needed for Calcite framework + private ImpalaProjectRel(RelOptCluster cluster, RelTraitSet traits, + RelNode input, List projects, RelDataType rowType) { + super(cluster, traits, input, projects, rowType); + } + + // Needed for Calcite framework + @Override + public Project copy(RelTraitSet traitSet, RelNode input, List projects, + RelDataType rowType) { + return new ImpalaProjectRel(getCluster(), traitSet, input, projects, rowType); + } + + @Override + public NodeWithExprs getPlanNode(ParentPlanRelContext context) throws ImpalaException { + + NodeWithExprs inputWithExprs = getChildPlanNode(context); + + // get the output exprs for this node that are needed by the parent node. + List outputExprs = + createProjectExprs(context.ctx_.getRootAnalyzer(), inputWithExprs); + + // There is no Impala Plan Node mapped to Project, so we just return the child + // PlanNode. However, the outputExprs change with the Project. + return new NodeWithExprs(inputWithExprs.planNode_, outputExprs); + } + + /** + * Translate the RexNode expressions in the Project to Impala Exprs. + */ + private List createProjectExprs(Analyzer basicAnalyzer, + NodeWithExprs inputNodeWithExprs) + throws ImpalaException { + ImpalaPlanRel inputRel = (ImpalaPlanRel) getInput(0); + + CreateExprVisitor visitor = + new CreateExprVisitor(inputNodeWithExprs.outputExprs_); + + ImmutableList.Builder builder = new ImmutableList.Builder(); + for (RexNode rexNode : getProjects()) { + Expr projectExpr = CreateExprVisitor.getExpr(visitor, rexNode); + Preconditions.checkNotNull(projectExpr, + "Visitor returned null Impala expr for RexNode %s", rexNode); + builder.add(projectExpr); + } + return builder.build(); + } + + private NodeWithExprs getChildPlanNode(ParentPlanRelContext context + ) throws ImpalaException { + ImpalaPlanRel relInput = (ImpalaPlanRel) getInput(0); + ParentPlanRelContext.Builder builder = + new ParentPlanRelContext.Builder(context, this); + builder.setInputRefs(RelOptUtil.InputFinder.bits(getProjects(), null)); + return relInput.getPlanNode(builder.build()); + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/NodeWithExprs.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/NodeWithExprs.java new file mode 100644 index 000000000..88f724479 --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/NodeWithExprs.java @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.rel.node; + +import org.apache.impala.analysis.Expr; +import org.apache.impala.planner.PlanNode; + +import java.util.List; + +/** + * NodeWithExprs: A return class that contains a PlanNode and its output exprs + * In the case of the Calcite Project RelNode, the PlanNode stays the same, but the + * output expressions change. This is why the outputExprs cannot be a member of + * PlanNode + */ +public class NodeWithExprs { + public final PlanNode planNode_; + public final List outputExprs_; + + public NodeWithExprs(PlanNode planNode, List outputExprs) { + this.planNode_ = planNode; + this.outputExprs_ = outputExprs; + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ParentPlanRelContext.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ParentPlanRelContext.java new file mode 100644 index 000000000..1ef6591fc --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ParentPlanRelContext.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.rel.node; + +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.impala.analysis.Analyzer; +import org.apache.impala.planner.PlannerContext; + +/** + * ParentPlanRelContext is passed into each layer of the Impala + * RelNodes so the child can make certain decisions based on its + * parent. + */ +public class ParentPlanRelContext { + + // ctx: This doesn't change throughout the tree + public final PlannerContext ctx_; + + // The input refs used by the parent PlanRel Node + public final ImmutableBitSet inputRefs_; + + private ParentPlanRelContext(Builder builder) { + this.ctx_ = builder.context_; + this.inputRefs_ = builder.inputRefs_; + } + + public static class Builder { + private PlannerContext context_; + private ImmutableBitSet inputRefs_; + + public Builder(PlannerContext plannerContext) { + this.context_ = plannerContext; + } + + public Builder(ParentPlanRelContext planRelContext, ImpalaPlanRel planRel) { + this.context_ = planRelContext.ctx_; + } + + public void setInputRefs(ImmutableBitSet inputRefs) { + this.inputRefs_ = inputRefs; + } + + public ParentPlanRelContext build() { + return new ParentPlanRelContext(this); + } + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/phys/ImpalaHdfsScanNode.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/phys/ImpalaHdfsScanNode.java new file mode 100644 index 000000000..c23041e51 --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/phys/ImpalaHdfsScanNode.java @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.rel.phys; + +import org.apache.impala.analysis.Analyzer; +import org.apache.impala.analysis.Expr; +import org.apache.impala.analysis.MultiAggregateInfo; +import org.apache.impala.analysis.TableRef; +import org.apache.impala.analysis.TupleDescriptor; +import org.apache.impala.catalog.FeFsPartition; +import org.apache.impala.planner.HdfsScanNode; +import org.apache.impala.planner.PlanNodeId; + +import java.util.List; + +/** + * ImpalaHdfsScanNode. Extends the HdfsScanNode to bypass processing of the + * assignedConjuncts. + */ +public class ImpalaHdfsScanNode extends HdfsScanNode { + + private final List assignedConjuncts_; + + public ImpalaHdfsScanNode(PlanNodeId id, TupleDescriptor tupleDesc, + List partitions, + TableRef hdfsTblRef, MultiAggregateInfo aggInfo, List partConjuncts, + List assignedConjuncts) { + super(id, tupleDesc, assignedConjuncts, partitions, hdfsTblRef, aggInfo, + partConjuncts, false); + this.assignedConjuncts_ = assignedConjuncts; + } + + @Override + public void assignConjuncts(Analyzer analyzer) { + // ignore analyzer and retrieve the processed Calcite conjuncts + this.conjuncts_ = assignedConjuncts_; + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/util/CreateExprVisitor.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/util/CreateExprVisitor.java new file mode 100644 index 000000000..e8884741e --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/util/CreateExprVisitor.java @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.rel.util; + +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexCorrelVariable; +import org.apache.calcite.rex.RexDynamicParam; +import org.apache.calcite.rex.RexFieldAccess; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexLocalRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexOver; +import org.apache.calcite.rex.RexPatternFieldRef; +import org.apache.calcite.rex.RexRangeRef; +import org.apache.calcite.rex.RexSubQuery; +import org.apache.calcite.rex.RexTableInputRef; +import org.apache.calcite.rex.RexVisitorImpl; +import org.apache.impala.analysis.Expr; +import org.apache.impala.common.AnalysisException; +import org.apache.impala.common.ImpalaException; + +import java.util.List; + +/** + * CreateExprVisitor will generate Impala expressions for function calls and literals. + * TODO: In this iteration, it only handles input refs. + */ +public class CreateExprVisitor extends RexVisitorImpl { + + private final List inputExprs_; + + public CreateExprVisitor(List inputExprs) { + super(false); + this.inputExprs_ = inputExprs; + } + + @Override + public Expr visitInputRef(RexInputRef rexInputRef) { + return inputExprs_.get(rexInputRef.getIndex()); + } + + @Override + public Expr visitCall(RexCall rexCall) { + throw new RuntimeException("Not supported"); + } + + @Override + public Expr visitLiteral(RexLiteral rexLiteral) { + throw new RuntimeException("Not supported"); + } + + @Override + public Expr visitLocalRef(RexLocalRef localRef) { + throw new RuntimeException("Not supported"); + } + + @Override + public Expr visitOver(RexOver over) { + throw new RuntimeException("Not supported"); + } + + @Override + public Expr visitCorrelVariable(RexCorrelVariable correlVariable) { + throw new RuntimeException("Not supported"); + } + + @Override + public Expr visitDynamicParam(RexDynamicParam dynamicParam) { + throw new RuntimeException("Not supported"); + } + + @Override + public Expr visitRangeRef(RexRangeRef rangeRef) { + throw new RuntimeException("Not supported"); + } + + @Override + public Expr visitFieldAccess(RexFieldAccess fieldAccess) { + throw new RuntimeException("Not supported"); + } + + @Override + public Expr visitSubQuery(RexSubQuery subQuery) { + throw new RuntimeException("Not supported"); + } + + @Override + public Expr visitTableInputRef(RexTableInputRef fieldRef) { + throw new RuntimeException("Not supported"); + } + + @Override + public Expr visitPatternFieldRef(RexPatternFieldRef fieldRef) { + throw new RuntimeException("Not supported"); + } + + /** + * Wrapper around visitor which catches the unchecked RuntimeException and throws + * an ImpalaException. + */ + public static Expr getExpr(CreateExprVisitor visitor, RexNode operand) + throws ImpalaException { + try { + return operand.accept(visitor); + } catch (Exception e) { + throw new AnalysisException(e); + } + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteDb.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteDb.java new file mode 100644 index 000000000..9e4890a63 --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteDb.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.schema; + +import com.google.common.collect.ImmutableMap; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.AbstractSchema; +import org.apache.impala.catalog.FeTable; + +import java.util.HashMap; +import java.util.Map; + +public class CalciteDb extends AbstractSchema { + + private final Map tableMap_; + + private CalciteDb(Map tableMap) { + this.tableMap_ = tableMap; + } + + @Override + protected Map getTableMap() { + return tableMap_; + } + + public static class Builder { + private CalciteCatalogReader reader_; + + private final Map tableMap_ = new HashMap<>(); + + public Builder(CalciteCatalogReader reader) { + this.reader_ = reader; + } + + public Builder addTable(String tableName, FeTable table) { + if (!tableMap_.containsKey(tableName)) { + tableMap_.put(tableName.toLowerCase(), new CalciteTable(table, reader_)); + } + return this; + } + + public CalciteDb build() { + return new CalciteDb(ImmutableMap.copyOf(tableMap_)); + } + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteTable.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteTable.java new file mode 100644 index 000000000..0fa849f84 --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/CalciteTable.java @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.schema; + +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.plan.RelOptAbstractTable; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.prepare.Prepare; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.RelReferentialConstraint; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.schema.ColumnStrategy; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.schema.Table; +import org.apache.calcite.sql.SqlAccessType; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.validate.SqlModality; +import org.apache.calcite.sql.validate.SqlMonotonicity; +import org.apache.calcite.sql2rel.InitializerContext; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.impala.analysis.Analyzer; +import org.apache.impala.analysis.BaseTableRef; +import org.apache.impala.analysis.Expr; +import org.apache.impala.analysis.Path; +import org.apache.impala.analysis.SlotDescriptor; +import org.apache.impala.analysis.SlotRef; +import org.apache.impala.analysis.TableRef; +import org.apache.impala.analysis.TupleDescriptor; +import org.apache.impala.catalog.Column; +import org.apache.impala.catalog.FeFsPartition; +import org.apache.impala.catalog.FeTable; +import org.apache.impala.catalog.HdfsTable; +import org.apache.impala.calcite.type.ImpalaTypeConverter; +import org.apache.impala.calcite.type.ImpalaTypeSystemImpl; +import org.apache.impala.planner.HdfsPartitionPruner; +import org.apache.impala.common.AnalysisException; +import org.apache.impala.common.ImpalaException; +import org.apache.impala.common.Pair; + +import com.google.common.collect.ImmutableList; + +import java.util.ArrayList; +import java.util.List; + + +public class CalciteTable extends RelOptAbstractTable + implements Table, Prepare.PreparingTable { + + private final HdfsTable table_; + + + private final List qualifiedTableName_; + + public CalciteTable(FeTable table, CalciteCatalogReader reader) { + super(reader, table.getName(), buildColumnsForRelDataType(table)); + this.table_ = (HdfsTable) table; + this.qualifiedTableName_ = table.getTableName().toPath(); + } + + private static RelDataType buildColumnsForRelDataType(FeTable table) { + RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl(new ImpalaTypeSystemImpl()); + + RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory); + // skip clustering columns, save them for the end + for (Column column : table.getColumnsInHiveOrder()) { + RelDataType type = + ImpalaTypeConverter.createRelDataType(typeFactory, column.getType()); + builder.add(column.getName(), type); + } + return builder.build(); + } + + public BaseTableRef createBaseTableRef(Analyzer analyzer + ) throws ImpalaException { + + TableRef tblRef = new TableRef(qualifiedTableName_, null); + + Path resolvedPath = analyzer.resolvePath(tblRef.getPath(), Path.PathType.TABLE_REF); + + BaseTableRef baseTblRef = new BaseTableRef(tblRef, resolvedPath); + baseTblRef.analyze(analyzer); + return baseTblRef; + } + + // Create tuple and slot descriptors for this base table + public TupleDescriptor createTupleAndSlotDesc(BaseTableRef baseTblRef, + List fieldNames, Analyzer analyzer) throws ImpalaException { + // create the slot descriptors corresponding to this tuple descriptor + // by supplying the field names from Calcite's output schema for this node + for (int i = 0; i < fieldNames.size(); i++) { + String fieldName = fieldNames.get(i); + SlotRef slotref = + new SlotRef(Path.createRawPath(baseTblRef.getUniqueAlias(), fieldName)); + slotref.analyze(analyzer); + SlotDescriptor slotDesc = slotref.getDesc(); + if (slotDesc.getType().isCollectionType()) { + throw new AnalysisException(String.format(fieldName + " " + + "is a complex type (array/map/struct) column. " + + "This is not currently supported.")); + } + slotDesc.setIsMaterialized(true); + } + TupleDescriptor tupleDesc = baseTblRef.getDesc(); + return tupleDesc; + } + + /** + * Return the pruned partitions + * TODO: Currently all partitions are returned since filters aren't yet supported. + */ + public List getPrunedPartitions(Analyzer analyzer, + TupleDescriptor tupleDesc) throws ImpalaException { + HdfsPartitionPruner pruner = new HdfsPartitionPruner(tupleDesc); + // TODO: pass in the conjuncts needed. An empty conjunct will return all partitions. + List conjuncts = new ArrayList<>(); + Pair, List> impalaPair = + pruner.prunePartitions(analyzer, conjuncts, true, + null); + return impalaPair.first; + } + + public HdfsTable getHdfsTable() { + return table_; + } + + @Override + public List getQualifiedName() { + return qualifiedTableName_; + } + + @Override + public boolean rolledUpColumnValidInsideAgg(String column, + SqlCall call, SqlNode parent, CalciteConnectionConfig config) { + return true; + } + + @Override + public Schema.TableType getJdbcTableType() { + return Schema.TableType.TABLE; + } + + @Override + public boolean isRolledUp(String column) { + return false; + } + + @Override + public Statistic getStatistic() { + return null; + } + + @Override + public RelDataType getRowType(final RelDataTypeFactory typeFactory) { + return getRowType(); + } + + @Override + public T unwrap(Class arg0) { + // Generic unwrap needed by the Calcite framework to process the table. + return arg0.isInstance(this) ? arg0.cast(this) : null; + } + + @Override + public boolean columnHasDefaultValue(RelDataType rowType, int ordinal, + InitializerContext initializerContext) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isTemporal() { + return false; + } + + @Override + public boolean supportsModality(SqlModality modality) { + return true; + } + + @Override + public SqlAccessType getAllowedAccess() { + return SqlAccessType.ALL; + } + + @Override + public SqlMonotonicity getMonotonicity(String columnName) { + return SqlMonotonicity.NOT_MONOTONIC; + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/ImpalaCalciteCatalogReader.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/ImpalaCalciteCatalogReader.java new file mode 100644 index 000000000..dc7cb5b9c --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/schema/ImpalaCalciteCatalogReader.java @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.schema; + +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.impala.analysis.StmtMetadataLoader; +import org.apache.impala.thrift.TQueryCtx; + +import java.util.List; + +public class ImpalaCalciteCatalogReader extends CalciteCatalogReader { + private final TQueryCtx queryCtx_; + private final StmtMetadataLoader.StmtTableCache stmtTableCache_; + + public ImpalaCalciteCatalogReader(CalciteSchema rootSchema, List defaultSchema, + RelDataTypeFactory typeFactory, CalciteConnectionConfig config, TQueryCtx queryCtx, + StmtMetadataLoader.StmtTableCache stmtTableCache) { + super(rootSchema, defaultSchema, typeFactory, config); + this.queryCtx_ = queryCtx; + this.stmtTableCache_ = stmtTableCache; + } + + public TQueryCtx getTQueryCtx() { + return queryCtx_; + } + + public StmtMetadataLoader.StmtTableCache getStmtTableCache() { + return stmtTableCache_; + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteJniFrontend.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteJniFrontend.java new file mode 100644 index 000000000..af5c8225f --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteJniFrontend.java @@ -0,0 +1,241 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.service; + +import org.apache.impala.util.EventSequence; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider; +import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.sql.SqlNode; +import org.apache.impala.calcite.rel.node.NodeWithExprs; +import org.apache.impala.calcite.rel.node.ImpalaPlanRel; +import org.apache.impala.common.ImpalaException; +import org.apache.impala.common.InternalException; +import org.apache.impala.common.JniUtil; +import org.apache.impala.service.Frontend; +import org.apache.impala.service.FrontendProfile; +import org.apache.impala.service.JniFrontend; +import org.apache.impala.thrift.TExecRequest; +import org.apache.impala.thrift.TQueryCtx; +import org.apache.impala.thrift.TQueryOptions; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; + +import java.util.List; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * CalciteJniFrontend. This is a frontend that uses the Calcite code + * to walk through all the steps of compiling the query (e.g. parsing, validating, + * etc... to generate a TExecRequest that can be used by the execution engine. + */ +public class CalciteJniFrontend extends JniFrontend { + + protected static final Logger LOG = + LoggerFactory.getLogger(CalciteJniFrontend.class.getName()); + + private final static TBinaryProtocol.Factory protocolFactory_ = + new TBinaryProtocol.Factory(); + + private static Pattern SEMI_JOIN = Pattern.compile("\\bsemi\\sjoin\\b", + Pattern.CASE_INSENSITIVE); + + private static Pattern ANTI_JOIN = Pattern.compile("\\banti\\sjoin\\b", + Pattern.CASE_INSENSITIVE); + + public CalciteJniFrontend(byte[] thriftBackendConfig, boolean isBackendTest) + throws ImpalaException, TException { + super(thriftBackendConfig, isBackendTest); + } + + /** + * Jni wrapper for Frontend.createExecRequest(). Accepts a serialized + * TQueryContext; returns a serialized TQueryExecRequest. + */ + @Override + public byte[] createExecRequest(byte[] thriftQueryContext) + throws ImpalaException { + // Needed for Calcite's JaninoRelMetadataProvider + Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); + + QueryContext queryCtx = new QueryContext(thriftQueryContext, getFrontend()); + if (!canStmtBePlannedThroughCalcite(queryCtx)) { + return runThroughOriginalPlanner(thriftQueryContext, queryCtx); + } + + try (FrontendProfile.Scope scope = FrontendProfile.createNewWithScope()) { + LOG.info("Using Calcite Planner for the following query: " + queryCtx.getStmt()); + + // Parse the query + RelMetadataQuery.THREAD_PROVIDERS.set( + JaninoRelMetadataProvider.of(DefaultRelMetadataProvider.INSTANCE)); + CalciteQueryParser queryParser = new CalciteQueryParser(queryCtx); + SqlNode parsedSqlNode = queryParser.parse(); + markEvent(queryParser, parsedSqlNode, queryCtx, "Parsed query"); + + // Make sure the metadata cache has all the info for the query. + CalciteMetadataHandler mdHandler = + new CalciteMetadataHandler(parsedSqlNode, queryCtx); + markEvent(mdHandler, null, queryCtx, "Loaded tables"); + + // Validate the parsed query + CalciteValidator validator = new CalciteValidator(mdHandler, queryCtx); + SqlNode validatedNode = validator.validate(parsedSqlNode); + markEvent(mdHandler, validatedNode, queryCtx, "Validated query"); + + // Convert the query to RelNodes which can be optimized + CalciteRelNodeConverter relNodeConverter = new CalciteRelNodeConverter(validator); + RelNode logicalPlan = relNodeConverter.convert(validatedNode); + markEvent(mdHandler, logicalPlan, queryCtx, "Created initial logical plan"); + + // Optimize the query + CalciteOptimizer optimizer = new CalciteOptimizer(validator); + ImpalaPlanRel optimizedPlan = optimizer.optimize(logicalPlan); + markEvent(mdHandler, optimizedPlan, queryCtx, "Optimized logical plan"); + + // Create Physical Impala PlanNodes + CalcitePhysPlanCreator physPlanCreator = + new CalcitePhysPlanCreator(mdHandler, queryCtx); + NodeWithExprs rootNode = physPlanCreator.create(optimizedPlan); + markEvent(mdHandler, rootNode, queryCtx, "Created physical plan"); + + // Create exec request for the server + ExecRequestCreator execRequestCreator = + new ExecRequestCreator(physPlanCreator, queryCtx, mdHandler); + TExecRequest execRequest = execRequestCreator.create(rootNode); + markEvent(mdHandler, execRequest, queryCtx, "Created exec request"); + + TSerializer serializer = new TSerializer(protocolFactory_); + byte[] serializedRequest = serializer.serialize(execRequest); + queryCtx.getTimeline().markEvent("Serialized request"); + + return serializedRequest; + } catch (Exception e) { + LOG.info("Calcite planner failed."); + LOG.info("Exception: " + e); + if (e != null) { + LOG.info("Stack Trace:" + ExceptionUtils.getStackTrace(e)); + throw new InternalException(e.getMessage()); + } + throw new RuntimeException(e); + } + } + + /** + * Use information about the query syntax to see if this can be handled + * by Calcite + */ + private boolean canStmtBePlannedThroughCalcite(QueryContext queryCtx) { + String stringWithFirstRealWord = queryCtx.getStmt(); + String[] lines = stringWithFirstRealWord.split("\n"); + // Get rid of comments and blank lines which start the query. We need to find + // the first real word. + // TODO: IMPALA-12976: need to make this more generic. Certain patterns aren't caught + // here like /* */ + for (String line : lines) { + if (line.trim().startsWith("--") || line.trim().equals("")) { + stringWithFirstRealWord = stringWithFirstRealWord.replaceFirst(line + "\n", ""); + } else { + break; + } + } + stringWithFirstRealWord = stringWithFirstRealWord.trim(); + String beforeStripString; + do { + beforeStripString = stringWithFirstRealWord; + stringWithFirstRealWord = StringUtils.stripStart(stringWithFirstRealWord, "("); + stringWithFirstRealWord = StringUtils.stripStart(stringWithFirstRealWord, null); + } while (!stringWithFirstRealWord.equals(beforeStripString)); + return StringUtils.startsWithIgnoreCase(stringWithFirstRealWord, "select") || + StringUtils.startsWithIgnoreCase(stringWithFirstRealWord, "values") || + StringUtils.startsWithIgnoreCase(stringWithFirstRealWord, "with"); + } + + /** + * Fallback planner method + */ + public byte[] runThroughOriginalPlanner(byte[] thriftQueryContext, + QueryContext queryCtx) throws ImpalaException { + LOG.info("Using Impala Planner for the following query: " + queryCtx.getStmt()); + return super.createExecRequest(thriftQueryContext); + } + + private void markEvent(CompilerStep compilerStep, Object stepResult, + QueryContext queryCtx, String stepMessage) { + LOG.info(stepMessage); + queryCtx.getTimeline().markEvent(stepMessage); + if (LOG.isDebugEnabled()) { + compilerStep.logDebug(stepResult); + } + } + + public static class QueryContext { + private final TQueryCtx queryCtx_; + private final String stmt_; + private final String currentDb_; + private final Frontend frontend_; + private final EventSequence timeline_; + + public QueryContext(byte[] thriftQueryContext, + Frontend frontend) throws ImpalaException { + this.queryCtx_ = new TQueryCtx(); + JniUtil.deserializeThrift(protocolFactory_, queryCtx_, thriftQueryContext); + + // hack to match the code in Frontend.java: + // If unset, set MT_DOP to 0 to simplify the rest of the code. + if (queryCtx_.getClient_request() != null && + queryCtx_.getClient_request().getQuery_options() != null) { + if (!queryCtx_.getClient_request().getQuery_options().isSetMt_dop()) { + queryCtx_.getClient_request().getQuery_options().setMt_dop(0); + } + } + + this.frontend_ = frontend; + this.stmt_ = queryCtx_.getClient_request().getStmt(); + this.currentDb_ = queryCtx_.getSession().getDatabase(); + this.timeline_ = new EventSequence("Frontend Timeline (Calcite Planner)"); + } + + public TQueryCtx getTQueryCtx() { + return queryCtx_; + } + + public Frontend getFrontend() { + return frontend_; + } + + public String getStmt() { + return stmt_; + } + + public String getCurrentDb() { + return currentDb_; + } + + public EventSequence getTimeline() { + return timeline_; + } + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteMetadataHandler.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteMetadataHandler.java new file mode 100644 index 000000000..6e24eb1a7 --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteMetadataHandler.java @@ -0,0 +1,224 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.service; + +import com.google.common.base.Splitter; +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.config.CalciteConnectionConfigImpl; +import org.apache.calcite.config.CalciteConnectionProperty; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.sql.SqlBasicCall; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlJoin; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlSelect; +import org.apache.calcite.sql.util.SqlBasicVisitor; +import org.apache.impala.analysis.StmtMetadataLoader; +import org.apache.impala.analysis.TableName; +import org.apache.impala.calcite.schema.CalciteDb; +import org.apache.impala.calcite.schema.CalciteTable; +import org.apache.impala.calcite.schema.ImpalaCalciteCatalogReader; +import org.apache.impala.calcite.type.ImpalaTypeSystemImpl; +import org.apache.impala.catalog.Column; +import org.apache.impala.catalog.FeCatalog; +import org.apache.impala.catalog.FeDb; +import org.apache.impala.catalog.FeTable; +import org.apache.impala.catalog.FeView; +import org.apache.impala.catalog.HdfsTable; +import org.apache.impala.common.ImpalaException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * CalciteMetadataHandler. Responsible for loading the tables for a query + * from catalogd into the coordinator and populating the Calcite schema with + * these tables. + */ +public class CalciteMetadataHandler implements CompilerStep { + + protected static final Logger LOG = + LoggerFactory.getLogger(CalciteMetadataHandler.class.getName()); + + // StmtTableCache needed by Impala's Analyzer class at planning time. + private final StmtMetadataLoader.StmtTableCache stmtTableCache_; + + // CalciteCatalogReader is a context class that holds global information that + // may be needed by the CalciteTable object + private final CalciteCatalogReader reader_; + + public CalciteMetadataHandler(SqlNode parsedNode, + CalciteJniFrontend.QueryContext queryCtx) throws ImpalaException { + + StmtMetadataLoader stmtMetadataLoader = new StmtMetadataLoader( + queryCtx.getFrontend(), queryCtx.getCurrentDb(), queryCtx.getTimeline()); + + // retrieve all the tablenames in the query, will be in tableVisitor.tableNames + TableVisitor tableVisitor = new TableVisitor(queryCtx.getCurrentDb()); + parsedNode.accept(tableVisitor); + + // load the relevant tables in the query from catalogd + this.stmtTableCache_ = stmtMetadataLoader.loadTables(tableVisitor.tableNames_); + + this.reader_ = createCalciteCatalogReader(queryCtx, stmtTableCache_); + + // populate calcite schema. This step needs to be done after the loader because the + // schema needs to contain the columns in the table for validation, which cannot + // be done when it's an IncompleteTable + populateCalciteSchema(reader_, queryCtx.getFrontend().getCatalog(), + tableVisitor.tableNames_); + } + + /** + * Creates CalciteCatalogReader object which will contain information about the schema. + * Since the individual Table objects have reference to the Schema, this also serves + * as a way to give the tables Context information about the general query. + */ + private CalciteCatalogReader createCalciteCatalogReader( + CalciteJniFrontend.QueryContext queryCtx, + StmtMetadataLoader.StmtTableCache stmtTableCache) { + RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl(new ImpalaTypeSystemImpl()); + Properties props = new Properties(); + props.setProperty(CalciteConnectionProperty.CASE_SENSITIVE.camelName(), "false"); + CalciteConnectionConfig config = new CalciteConnectionConfigImpl(props); + CalciteSchema rootSchema = CalciteSchema.createRootSchema(true); + return new ImpalaCalciteCatalogReader(rootSchema, + Collections.singletonList(queryCtx.getCurrentDb()), + typeFactory, config, queryCtx.getTQueryCtx(), stmtTableCache); + } + + /** + * Populate the CalciteSchema with tables being used by this query. + */ + private void populateCalciteSchema(CalciteCatalogReader reader, + FeCatalog catalog, Set tableNames) { + CalciteSchema rootSchema = reader.getRootSchema(); + Map dbSchemas = new HashMap<>(); + for (TableName tableName : tableNames) { + FeDb db = catalog.getDb(tableName.getDb()); + // db is not found, this will probably fail in the validation step + if (db == null) { + continue; + } + + // table is not found, this will probably fail in the validation step + FeTable feTable = db.getTable(tableName.getTbl()); + if (feTable == null) { + continue; + } + + // populate the dbschema with its table, creating the dbschema if it's the + // first instance seen in the query. + CalciteDb.Builder dbBuilder = + dbSchemas.getOrDefault(tableName.getDb(), new CalciteDb.Builder(reader)); + dbBuilder.addTable(tableName.getTbl().toLowerCase(), feTable); + dbSchemas.put(tableName.getDb().toLowerCase(), dbBuilder); + } + + // add all databases to the root schema + for (String dbName : dbSchemas.keySet()) { + rootSchema.add(dbName, dbSchemas.get(dbName.toLowerCase()).build()); + } + } + + public StmtMetadataLoader.StmtTableCache getStmtTableCache() { + return stmtTableCache_; + } + + public CalciteCatalogReader getCalciteCatalogReader() { + return reader_; + } + + /** + * TableVisitor walks through the AST and places all the tables into + * tableNames + */ + private static class TableVisitor extends SqlBasicVisitor { + private final String currentDb_; + public final Set tableNames_ = new HashSet<>(); + + public TableVisitor(String currentDb) { + this.currentDb_ = currentDb.toLowerCase(); + } + + @Override + public Void visit(SqlCall call) { + if (call.getKind() == SqlKind.SELECT) { + SqlSelect select = (SqlSelect) call; + if (select.getFrom() != null) { + tableNames_.addAll(getTableNames(select.getFrom())); + } + } + return super.visit(call); + } + + private List getTableNames(SqlNode fromNode) { + List localTableNames = new ArrayList<>(); + if (fromNode instanceof SqlIdentifier) { + String tableName = fromNode.toString(); + List parts = Splitter.on('.').splitToList(tableName); + // TODO: 'complex' tables ignored for now + if (parts.size() == 1) { + localTableNames.add(new TableName( + currentDb_.toLowerCase(), parts.get(0).toLowerCase())); + } else if (parts.size() == 2) { + localTableNames.add( + new TableName(parts.get(0).toLowerCase(), parts.get(1).toLowerCase())); + } + } + + // Join node has the tables in the left and right node. + if (fromNode instanceof SqlJoin) { + localTableNames.addAll(getTableNames(((SqlJoin) fromNode).getLeft())); + localTableNames.addAll(getTableNames(((SqlJoin) fromNode).getRight())); + } + + // Put references in the schema too + if (fromNode instanceof SqlBasicCall) { + SqlBasicCall basicCall = (SqlBasicCall) fromNode; + if (basicCall.getKind().equals(SqlKind.AS)) { + localTableNames.addAll(getTableNames(basicCall.operand(0))); + } + } + return localTableNames; + } + } + + @Override + public void logDebug(Object resultObject) { + LOG.debug("Loaded tables: " + stmtTableCache_.tables.values().stream() + .map(feTable -> feTable.getName().toString()) + .collect(Collectors.joining( ", " ))); + } +} + diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteOptimizer.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteOptimizer.java new file mode 100644 index 000000000..7ab8c1174 --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteOptimizer.java @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.service; + +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.RelOptCostImpl; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.hep.HepPlanner; +import org.apache.calcite.plan.hep.HepProgramBuilder; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider; +import org.apache.calcite.sql.SqlExplainFormat; +import org.apache.calcite.sql.SqlExplainLevel; +import org.apache.impala.calcite.rel.node.ConvertToImpalaRelRules; +import org.apache.impala.calcite.rel.node.ImpalaPlanRel; +import org.apache.impala.common.ImpalaException; +import org.apache.impala.common.InternalException; + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * CalciteOptimizer. Responsible for optimizing the plan into its final + * Calcite form. The final Calcite form will be an ImpalaPlanRel node which + * will contain code that maps the node into a physical Impala PlanNode. + */ +public class CalciteOptimizer implements CompilerStep { + protected static final Logger LOG = + LoggerFactory.getLogger(CalciteOptimizer.class.getName()); + + private final CalciteValidator validator_; + + public CalciteOptimizer(CalciteValidator validator) { + this.validator_ = validator; + } + + public ImpalaPlanRel optimize(RelNode logPlan) throws ImpalaException { + HepProgramBuilder builder = new HepProgramBuilder(); + + // rules to convert Calcite nodes into ImpalaPlanRel nodes + builder.addRuleCollection( + ImmutableList.of( + new ConvertToImpalaRelRules.ImpalaScanRule(), + new ConvertToImpalaRelRules.ImpalaProjectRule())); + + HepPlanner planner = new HepPlanner(builder.build(), + logPlan.getCluster().getPlanner().getContext(), + false, null, RelOptCostImpl.FACTORY); + logPlan.getCluster().setMetadataProvider(JaninoRelMetadataProvider.DEFAULT); + planner.setRoot(logPlan); + RelNode optimizedPlan = planner.findBestExp(); + if (!(optimizedPlan instanceof ImpalaPlanRel)) { + throw new InternalException("Could not generate Impala RelNode plan. Plan " + + "is \n" + getDebugString(optimizedPlan)); + } + return (ImpalaPlanRel) optimizedPlan; + } + + public String getDebugString(Object optimizedPlan) { + return RelOptUtil.dumpPlan("[Impala plan]", (RelNode) optimizedPlan, + SqlExplainFormat.TEXT, SqlExplainLevel.NON_COST_ATTRIBUTES); + } + + @Override + public void logDebug(Object resultObject) { + if (!(resultObject instanceof RelNode)) { + LOG.debug("Finished optimizer step, but unknown result: " + resultObject); + return; + } + LOG.debug(getDebugString(resultObject)); + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalcitePhysPlanCreator.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalcitePhysPlanCreator.java new file mode 100644 index 000000000..0c3d459ab --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalcitePhysPlanCreator.java @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.service; + +import org.apache.impala.calcite.rel.node.NodeWithExprs; +import org.apache.impala.calcite.rel.node.ImpalaPlanRel; +import org.apache.impala.authorization.AuthorizationFactory; +import org.apache.impala.analysis.Analyzer; +import org.apache.impala.analysis.Expr; +import org.apache.impala.calcite.rel.node.ParentPlanRelContext; +import org.apache.impala.common.ImpalaException; +import org.apache.impala.planner.PlannerContext; +import org.apache.impala.planner.PlanNode; +import org.apache.impala.service.BackendConfig; +import org.apache.impala.util.AuthorizationUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * CalcitePhysPlanCreator. This class is responsible for turning an ImpalaPlanRel + * Calcite plan into the Impala physical PlanNode plan for a single node. + */ +public class CalcitePhysPlanCreator implements CompilerStep { + protected static final Logger LOG = + LoggerFactory.getLogger(CalcitePhysPlanCreator.class.getName()); + + private final CalciteJniFrontend.QueryContext queryCtx_; + private final Analyzer analyzer_; + private final PlannerContext plannerContext_; + + public CalcitePhysPlanCreator(CalciteMetadataHandler mdHandler, + CalciteJniFrontend.QueryContext queryCtx) throws ImpalaException { + this.queryCtx_ = queryCtx; + // TODO: IMPALA-13011: Awkward call for authorization here. Authorization + // will be done at validation time, but this is needed here for the Analyzer + // instantiation. + AuthorizationFactory authzFactory = + AuthorizationUtil.authzFactoryFrom(BackendConfig.INSTANCE); + this.analyzer_ = new Analyzer(mdHandler.getStmtTableCache(), + queryCtx_.getTQueryCtx(), authzFactory, null); + this.plannerContext_ = + new PlannerContext(analyzer_, queryCtx_.getTQueryCtx(), queryCtx_.getTimeline()); + + } + + /** + * returns the root plan node along with its output expressions. + */ + public NodeWithExprs create(ImpalaPlanRel optimizedPlan) throws ImpalaException { + ParentPlanRelContext.Builder builder = + new ParentPlanRelContext.Builder(plannerContext_); + NodeWithExprs rootNodeWithExprs = optimizedPlan.getPlanNode(builder.build()); + if (LOG.isDebugEnabled()) { + LOG.debug("Printing PlanNode tree..."); + printPlanNodeTree(rootNodeWithExprs.planNode_, ""); + } + return rootNodeWithExprs; + } + + public void printPlanNodeTree(PlanNode node, String prefix) { + LOG.debug(prefix + node.getClass()); + for (PlanNode child : node.getChildren()) { + printPlanNodeTree(child, " " + prefix); + } + } + + public Analyzer getAnalyzer() { + return analyzer_; + } + + public PlannerContext getPlannerContext() { + return plannerContext_; + } + + @Override + public void logDebug(Object resultObject) { + if (!(resultObject instanceof NodeWithExprs)) { + LOG.debug("Finished physical plan step, but unknown result: " + resultObject); + return; + } + LOG.debug("Physical Plan: " + resultObject); + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteQueryParser.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteQueryParser.java new file mode 100644 index 000000000..b3e97cfec --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteQueryParser.java @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.service; + +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.parser.SqlParseException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * CalciteQueryParser. Responsible for turning a String query statement into + * a Calcite SqlNode. + */ +public class CalciteQueryParser implements CompilerStep { + protected static final Logger LOG = + LoggerFactory.getLogger(CalciteQueryParser.class.getName()); + + private final CalciteJniFrontend.QueryContext queryCtx_; + + public CalciteQueryParser(CalciteJniFrontend.QueryContext queryCtx) { + this.queryCtx_ = queryCtx; + } + + public SqlNode parse() throws SqlParseException { + // Create an SQL parser + SqlParser parser = SqlParser.create(queryCtx_.getStmt()); + + // Parse the query into an AST + SqlNode sqlNode = parser.parseQuery(); + return sqlNode; + } + + public void logDebug(Object resultObject) { + if (!(resultObject instanceof SqlNode)) { + LOG.debug("Parser produced an unknown output: " + resultObject); + return; + } + LOG.debug("Parsed node: " + resultObject); + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteRelNodeConverter.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteRelNodeConverter.java new file mode 100644 index 000000000..7f10c51d3 --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteRelNodeConverter.java @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.service; + +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.volcano.VolcanoPlanner; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.sql.SqlExplainFormat; +import org.apache.calcite.sql.SqlExplainLevel; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql2rel.SqlToRelConverter; +import org.apache.calcite.sql2rel.StandardConvertletTable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * CalciteRelNodeConverter. Responsible for converting a Calcite AST SqlNode into + * a logical (pre-optimized) plan. + */ +public class CalciteRelNodeConverter implements CompilerStep { + protected static final Logger LOG = + LoggerFactory.getLogger(CalciteRelNodeConverter.class.getName()); + + private static final RelOptTable.ViewExpander NOOP_EXPANDER = + (type, query, schema, path) -> null; + + private final CalciteValidator validator_; + + private final RelOptCluster cluster_; + + private final RelOptPlanner planner_; + + public CalciteRelNodeConverter(CalciteValidator validator) { + this.validator_ = validator; + this.planner_ = new VolcanoPlanner(); + planner_.addRelTraitDef(ConventionTraitDef.INSTANCE); + cluster_ = + RelOptCluster.create(planner_, new RexBuilder(validator_.getTypeFactory())); + } + + public RelNode convert(SqlNode validatedNode) { + SqlToRelConverter relConverter = new SqlToRelConverter( + NOOP_EXPANDER, + validator_.getSqlValidator(), + validator_.getCatalogReader(), + cluster_, + StandardConvertletTable.INSTANCE, + SqlToRelConverter.config()); + + // Convert the valid AST into a logical plan + RelRoot root = relConverter.convertQuery(validatedNode, false, true); + RelNode relNode = root.project(); + logDebug(relNode); + return relNode; + } + + public RelOptCluster getCluster() { + return cluster_; + } + + public CalciteValidator getValidator() { + return validator_; + } + + @Override + public void logDebug(Object resultObject) { + if (!(resultObject instanceof RelNode)) { + LOG.debug("RelNodeConverter produced an unknown output: " + resultObject); + return; + } + LOG.info(RelOptUtil.dumpPlan("[Logical plan]", (RelNode) resultObject, + SqlExplainFormat.TEXT, SqlExplainLevel.NON_COST_ATTRIBUTES)); + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteValidator.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteValidator.java new file mode 100644 index 000000000..7c455f97e --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CalciteValidator.java @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.service; + +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.validate.SqlValidator; +import org.apache.calcite.sql.validate.SqlValidatorUtil; +import org.apache.impala.calcite.type.ImpalaTypeSystemImpl; +import org.apache.impala.calcite.validate.ImpalaConformance; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * SqlValidator. Responsible for validating the parsed SQL AST. + */ +public class CalciteValidator implements CompilerStep { + protected static final Logger LOG = + LoggerFactory.getLogger(CalciteValidator.class.getName()); + + private final CalciteMetadataHandler mdHandler; + private final CalciteJniFrontend.QueryContext queryCtx; + private final RelDataTypeFactory typeFactory; + private final CalciteCatalogReader catalogReader; + private final SqlValidator sqlValidator; + + public CalciteValidator(CalciteMetadataHandler mdHandler, + CalciteJniFrontend.QueryContext queryCtx) { + this.mdHandler = mdHandler; + this.queryCtx = queryCtx; + this.typeFactory = new JavaTypeFactoryImpl(new ImpalaTypeSystemImpl()); + this.catalogReader = mdHandler.getCalciteCatalogReader(); + + this.sqlValidator = SqlValidatorUtil.newValidator( + SqlStdOperatorTable.instance(), + catalogReader, typeFactory, + SqlValidator.Config.DEFAULT + .withConformance(ImpalaConformance.INSTANCE) + ); + } + + public SqlNode validate(SqlNode parsedNode) { + // Validate the initial AST + SqlNode node = sqlValidator.validate(parsedNode); + return node; + } + + public RelDataTypeFactory getTypeFactory() { + return typeFactory; + } + + public SqlValidator getSqlValidator() { + return sqlValidator; + } + + public CalciteCatalogReader getCatalogReader() { + return catalogReader; + } + + @Override + public void logDebug(Object resultObject) { + if (!(resultObject instanceof SqlNode)) { + LOG.debug("Finished validator step, but unknown result: " + resultObject); + return; + } + LOG.debug("Validated node: " + resultObject); + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CompilerStep.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CompilerStep.java new file mode 100644 index 000000000..7bc5d126e --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/CompilerStep.java @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.service; + +/** + * CompilerStep. An interface to be used by all top level actions run by the + * Calcite compiler. + */ +public interface CompilerStep { + void logDebug(Object resultObject); +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/ExecRequestCreator.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/ExecRequestCreator.java new file mode 100644 index 000000000..96c0a990c --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/service/ExecRequestCreator.java @@ -0,0 +1,427 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.service; + +import org.apache.impala.calcite.rel.node.NodeWithExprs; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.impala.analysis.Analyzer; +import org.apache.impala.analysis.Expr; +import org.apache.impala.analysis.JoinOperator; +import org.apache.impala.catalog.FeTable; +import org.apache.impala.catalog.HdfsTable; +import org.apache.impala.common.ImpalaException; +import org.apache.impala.planner.DataPartition; +import org.apache.impala.planner.DistributedPlanner; +import org.apache.impala.planner.JoinNode; +import org.apache.impala.planner.NestedLoopJoinNode; +import org.apache.impala.planner.ParallelPlanner; +import org.apache.impala.planner.PlanFragment; +import org.apache.impala.planner.PlanNode; +import org.apache.impala.planner.Planner; +import org.apache.impala.planner.PlannerContext; +import org.apache.impala.planner.PlanRootSink; +import org.apache.impala.planner.RuntimeFilterGenerator; +import org.apache.impala.planner.SingleNodePlanner; +import org.apache.impala.planner.SingularRowSrcNode; +import org.apache.impala.planner.SubplanNode; +import org.apache.impala.service.Frontend; +import org.apache.impala.service.FrontendProfile; +import org.apache.impala.thrift.TColumn; +import org.apache.impala.thrift.TExecRequest; +import org.apache.impala.thrift.TExplainLevel; +import org.apache.impala.thrift.TNetworkAddress; +import org.apache.impala.thrift.TPlanExecInfo; +import org.apache.impala.thrift.TPlanFragment; +import org.apache.impala.thrift.TQueryCtx; +import org.apache.impala.thrift.TQueryExecRequest; +import org.apache.impala.thrift.TResultSetMetadata; +import org.apache.impala.thrift.TRuntimeFilterMode; +import org.apache.impala.thrift.TRuntimeProfileNode; +import org.apache.impala.thrift.TStmtType; +import org.apache.impala.util.EventSequence; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + + +/** + * ExecRequestCreator. Responsible for taking a PlanNode and the output Expr list + * from the top level PlanNode and convert it into a TExecRequest thrift object + * which is needed by the backend executor code. The input PlanNode tree is + * an optimized logical tree based on the Calcite rules. This class is also + * responsible for creating physical node optimizations which are located in + * the SingleNodePlanner and DistributedPlanner. + * + * TODO: This class is very similar to the Frontend.createExecRequest method and + * contains duplicate code. Accordingly, This class and that method should be + * refactored to prevent this duplication. + **/ +public class ExecRequestCreator implements CompilerStep { + protected static final Logger LOG = + LoggerFactory.getLogger(ExecRequestCreator.class.getName()); + + private final CalcitePhysPlanCreator physPlanCreator; + private final CalciteJniFrontend.QueryContext queryCtx; + private final CalciteMetadataHandler mdHandler; + + public ExecRequestCreator(CalcitePhysPlanCreator physPlanCreator, + CalciteJniFrontend.QueryContext queryCtx, CalciteMetadataHandler mdHandler) { + this.physPlanCreator = physPlanCreator; + this.queryCtx = queryCtx; + this.mdHandler = mdHandler; + } + + /** + * create() is the main public method responsible for taking the NodeWithExprs + * object containing the PlanNode and output Expr list and returning the TExecRequest. + */ + public TExecRequest create(NodeWithExprs nodeWithExprs) throws ImpalaException { + TExecRequest request = createExecRequest(nodeWithExprs.planNode_, + queryCtx.getTQueryCtx(), physPlanCreator.getPlannerContext(), + physPlanCreator.getAnalyzer(), nodeWithExprs.outputExprs_, + mdHandler.getStmtTableCache().tables.values()); + + return request; + } + + /** + * Create an exec request for Impala to execute based on the supplied plan. This + * method is similar to the Frontend.createExecRequest method and needs to be + * refactored. + */ + private TExecRequest createExecRequest(PlanNode planNodeRoot, TQueryCtx queryCtx, + PlannerContext plannerContext, Analyzer analyzer, List outputExprs, + Collection tables) throws ImpalaException { + List fragments = + createPlans(planNodeRoot, analyzer, plannerContext, outputExprs); + PlanFragment planFragmentRoot = fragments.get(0); + + TQueryExecRequest queryExecRequest = new TQueryExecRequest(); + TExecRequest result = createExecRequest(queryCtx, planFragmentRoot, + queryExecRequest); + queryExecRequest.setHost_list(getHostLocations(tables)); + queryExecRequest.setCores_required(-1); + + // compute resource requirements of the final plan + Planner.computeResourceReqs(fragments, queryCtx, queryExecRequest, + plannerContext, true /*isQuery*/); + + // create the plan's exec-info and assign fragment idx + int idx = 0; + for (PlanFragment planRoot : fragments) { + TPlanExecInfo tPlanExecInfo = Frontend.createPlanExecInfo(planRoot, queryCtx); + queryExecRequest.addToPlan_exec_info(tPlanExecInfo); + for (TPlanFragment fragment : tPlanExecInfo.fragments) { + fragment.setIdx(idx++); + } + } + + // create EXPLAIN output after setting everything else + queryExecRequest.setQuery_ctx(queryCtx); // needed by getExplainString() + + List allFragments = planFragmentRoot.getNodesPreOrder(); + // to mimic the original planner behavior, use EXTENDED mode explain except for + // EXPLAIN statements. + // TODO: support explain plans + // TExplainLevel explainLevel = + // isExplain ? plannerContext.getQueryOptions().getExplain_level() : + // TExplainLevel.EXTENDED; + TExplainLevel explainLevel = TExplainLevel.EXTENDED; + // if (isExplain) { + // result.setStmt_type(TStmtType.EXPLAIN); + // } + String explainString = getExplainString(allFragments, explainLevel, plannerContext); + queryExecRequest.setQuery_plan(explainString); + + + queryCtx.setDesc_tbl_serialized( + plannerContext.getRootAnalyzer().getDescTbl().toSerializedThrift()); + + plannerContext.getTimeline().markEvent("Execution request created"); + EventSequence eventSequence = plannerContext.getTimeline(); + result.setTimeline(eventSequence.toThrift()); + + TRuntimeProfileNode calciteProfile = + this.queryCtx.getFrontend().createTRuntimeProfileNode(Frontend.PLANNER_PROFILE); + this.queryCtx.getFrontend().addPlannerToProfile("CalcitePlanner"); + result.setProfile(FrontendProfile.getCurrent().emitAsThrift()); + result.setProfile_children(FrontendProfile.getCurrent().emitChildrenAsThrift()); + return result; + } + + List createPlans(PlanNode planNodeRoot, Analyzer analyzer, + PlannerContext ctx, List outputExprs) throws ImpalaException { + // Create the values transfer graph in the Analyzer. Note that Calcite plans + // don't register equijoin predicates in the Analyzer's GlobalState since + // Calcite should have already done the predicate inferencing analysis. + // Hence, the GlobalState's registeredValueTransfers will be empty. It is + // still necessary to instantiate the graph because otherwise + // RuntimeFilterGenerator tries to de-reference it and encounters NPE. + analyzer.computeValueTransferGraph(); + Planner.checkForSmallQueryOptimization(planNodeRoot, ctx); + + // Although the Calcite plan creates the relative order among different + // joins, currently it does not swap left and right inputs if the right + // input has higher estimated cardinality. Do this through Impala's method + // since we are using Impala's cardinality estimates in the physical planning. + invertJoins(planNodeRoot, ctx.isSingleNodeExec(), ctx.getRootAnalyzer()); + SingleNodePlanner.validatePlan(ctx, planNodeRoot); + + List fragments = + createPlanFragments(planNodeRoot, ctx, analyzer, outputExprs); + PlanFragment planFragmentRoot = fragments.get(0); + List rootFragments; + if (Planner.useParallelPlan(ctx)) { + ParallelPlanner parallelPlanner = new ParallelPlanner(ctx); + // The rootFragmentList contains the 'root' fragments of each of + // the parallel plans + rootFragments = parallelPlanner.createPlans(planFragmentRoot); + ctx.getTimeline().markEvent("Parallel plans created"); + } else { + rootFragments = new ArrayList(Arrays.asList(planFragmentRoot)); + } + return rootFragments; + } + + /** + * Create one or more plan fragments corresponding to the supplied single node physical + * plan. This function calls Impala's DistributedPlanner to create the plan fragments + * and does some post-processing. It is loosely based on Impala's Planner.createPlan() + * function. + */ + private List createPlanFragments(PlanNode planNodeRoot, + PlannerContext ctx, Analyzer analyzer, + List outputExprs) throws ImpalaException { + + DistributedPlanner distributedPlanner = new DistributedPlanner(ctx); + List fragments; + + if (ctx.isSingleNodeExec()) { + // create one fragment containing the entire single-node plan tree + fragments = Lists.newArrayList(new PlanFragment( + ctx.getNextFragmentId(), planNodeRoot, DataPartition.UNPARTITIONED)); + } else { + fragments = new ArrayList<>(); + // Create distributed plan. For insert/CTAS without limit, + // isPartitioned should be true. + // TODO: only query statements are currently supported + // final boolean isPartitioned = stmtType_ == + // TStmtType.DML && !planNodeRoot.hasLimit(); + boolean isPartitioned = false; + distributedPlanner.createPlanFragments(planNodeRoot, isPartitioned, fragments); + } + + PlanFragment rootFragment = fragments.get(fragments.size() - 1); + // Create runtime filters. + if (ctx.getQueryOptions().getRuntime_filter_mode() != TRuntimeFilterMode.OFF) { + RuntimeFilterGenerator.generateRuntimeFilters(ctx, rootFragment.getPlanRoot()); + ctx.getTimeline().markEvent("Runtime filters computed"); + } + + rootFragment.verifyTree(); + + List resultExprs = outputExprs; + rootFragment.setSink(new PlanRootSink(resultExprs)); + + Planner.checkForDisableCodegen(rootFragment.getPlanRoot(), ctx); + // finalize exchanges: this ensures that for hash partitioned joins, the partitioning + // keys on both sides of the join have compatible data types + for (PlanFragment fragment: fragments) { + fragment.finalizeExchanges(analyzer); + } + + Collections.reverse(fragments); + ctx.getTimeline().markEvent("Distributed plan created"); + return fragments; + } + + private TExecRequest createExecRequest(TQueryCtx queryCtx, + PlanFragment planFragmentRoot, TQueryExecRequest queryExecRequest) { + TExecRequest result = new TExecRequest(); + // NOTE: the below 4 are mandatory fields + result.setQuery_options(queryCtx.getClient_request().getQuery_options()); + + // TODO: Need to populate these 3 fields + result.setAccess_events(new ArrayList<>()); + result.setAnalysis_warnings(new ArrayList<>()); + result.setUser_has_profile_access(true); + + result.setQuery_exec_request(queryExecRequest); + + // TODO: only query currently supported + // result.setStmt_type(stmtType_); + // result.getQuery_exec_request().setStmt_type(stmtType_); + result.setStmt_type(TStmtType.QUERY); + result.getQuery_exec_request().setStmt_type(TStmtType.QUERY); + + // fill in the metadata using the root fragment's PlanRootSink + Preconditions.checkState(planFragmentRoot.hasSink()); + List outputExprs = new ArrayList<>(); + + planFragmentRoot.getSink().collectExprs(outputExprs); + result.setResult_set_metadata(createQueryResultSetMetadata(outputExprs)); + + return result; + } + + // TODO: Refactor and share Impala's getExplainString() + private String getExplainString(List fragments, + TExplainLevel explainLevel, PlannerContext ctx) { + if (explainLevel.ordinal() < TExplainLevel.VERBOSE.ordinal()) { + // Print the non-fragmented parallel plan. + return fragments.get(0).getExplainString(ctx.getQueryOptions(), explainLevel); + } + + StringBuffer sb = new StringBuffer(); + // Print the fragmented parallel plan. + for (int i = 0; i < fragments.size(); ++i) { + PlanFragment fragment = fragments.get(i); + sb.append(fragment.getExplainString(ctx.getQueryOptions(), explainLevel)); + if (i < fragments.size() - 1) { + sb.append("\n"); + } + } + return sb.toString(); + } + + private TResultSetMetadata createQueryResultSetMetadata(List outputExprs) { + TResultSetMetadata metadata = new TResultSetMetadata(); + int colCnt = outputExprs.size(); + for (int i = 0; i < colCnt; ++i) { + TColumn colDesc = new TColumn(outputExprs.get(i).toString(), + outputExprs.get(i).getType().toThrift()); + metadata.addToColumns(colDesc); + } + return metadata; + } + + private List getHostLocations(Collection tables) { + Set hostLocations = new HashSet<>(); + for (FeTable table : tables) { + if (table instanceof HdfsTable) { + hostLocations.addAll(((HdfsTable) table).getHostIndex().getList()); + } + } + return new ArrayList<>(hostLocations); + } + + /** + * Traverses the plan tree rooted at 'root' and inverts joins in the following + * situations: + * 1. If the left-hand side is a SingularRowSrcNode then we invert the join because + * then the build side is guaranteed to have only a single row. + * 2. There is no backend support for distributed non-equi right outer/semi joins, + * so we invert them (any distributed left semi/outer join is ok). + * 3. If we estimate that the inverted join is cheaper (see isInvertedJoinCheaper()). + * Do not invert if relevant stats are missing. + * The first two inversion rules are independent of the presence/absence of stats. + * Left Null Aware Anti Joins are never inverted due to lack of backend support. + * Joins that originate from query blocks with a straight join hint are not inverted. + * The 'isLocalPlan' parameter indicates whether the plan tree rooted at 'root' + * will be executed locally within one machine, i.e., without any data exchanges. + * Return true if any join in the plan rooted at 'root' was inverted. + * + * TODO: This should be replaced once we conclude the changes contained in this method + * are safe to be pushed to Planner.invertJoins, i.e., they do not cause any + * performance regressions with Impala FE. A couple of differences in this version + * of invertJoins include: + * - The computeStats is done here. The Calcite planner will eventually have + * the computeStats built in during optimization time, but the join stats here + * are specific for Impala. So we need an explicit computeStats call only for + * Impala here. + * - We only need to do the computeStats when the plan changes via inversion, so + * the method returns a boolean whenever the inversion happens. + * - A Jira has been filed for this: IMPALA-12958. Probably the best fix for this + * is to put the inversion inside a Calcite rule. + * + **/ + private static boolean invertJoins(PlanNode root, boolean isLocalPlan, + Analyzer analyzer) { + boolean inverted = false; + if (root instanceof SubplanNode) { + inverted |= invertJoins(root.getChild(0), isLocalPlan, analyzer); + inverted |= invertJoins(root.getChild(1), true, analyzer); + } else { + for (PlanNode child: root.getChildren()) { + inverted |= invertJoins(child, isLocalPlan, analyzer); + } + } + + if (root instanceof JoinNode) { + JoinNode joinNode = (JoinNode) root; + JoinOperator joinOp = joinNode.getJoinOp(); + + if (!joinNode.isInvertible(isLocalPlan)) { + if (inverted) { + // Re-compute tuple ids since their order must correspond to the order + // of children. + root.computeTupleIds(); + // Re-compute stats since PK-FK inference and cardinality may have changed after + // inversion. + root.computeStats(analyzer); + } + return inverted; + } + + if (joinNode.getChild(0) instanceof SingularRowSrcNode) { + // Always place a singular row src on the build side because it + // only produces a single row. + joinNode.invertJoin(); + inverted = true; + } else if (!isLocalPlan && joinNode instanceof NestedLoopJoinNode && + (joinOp.isRightSemiJoin() || joinOp.isRightOuterJoin())) { + // The current join is a distributed non-equi right outer or semi join + // which has no backend support. Invert the join to make it executable. + joinNode.invertJoin(); + inverted = true; + } else if (Planner.isInvertedJoinCheaper(joinNode, isLocalPlan)) { + joinNode.invertJoin(); + inverted = true; + } + // Re-compute the numNodes and numInstances based on the new input order + joinNode.recomputeNodes(); + } + + if (inverted) { + // Re-compute tuple ids because the backend assumes that their order corresponds to + // the order of children. + root.computeTupleIds(); + // Re-compute stats since PK-FK inference and cardinality may have changed after + // inversion. + root.computeStats(analyzer); + } + return inverted; + } + + @Override + public void logDebug(Object resultObject) { + if (!(resultObject instanceof TExecRequest)) { + LOG.debug("Finished create exec request step, but unknown result: " + resultObject); + } + LOG.debug("Exec request: " + resultObject); + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/type/ImpalaTypeConverter.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/type/ImpalaTypeConverter.java new file mode 100644 index 000000000..8a93f6541 --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/type/ImpalaTypeConverter.java @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.type; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlCollation; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.ConversionUtil; +import org.apache.impala.catalog.PrimitiveType; +import org.apache.impala.catalog.ScalarType; +import org.apache.impala.catalog.Type; +import org.apache.impala.thrift.TPrimitiveType; + +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A utility class that holds methods that convert impala types to Calcite + * types and vice versa. + * + * One important distinction is the different classes used here. On the Impala side, + * there are: + * - Normalized Type names (e.g. Type.BOOLEAN). These are pre-created types. This becomes + * important for types with precisions like decimal, char, and varchar. The Type.DECIMAL + * (and char and varchar) do not have any precision (or scale) associated with them. In + * the function signatures, all precisions are allowed, so this type is used when + * describing them. + * - types with precisions. These also use Types (also of type ScalarType), but the + * precision (and scale) is included with the datatype. + * + * On the Calcite side, there are: + * - Normalized RelDataTypes. While theoretically we should have been able to use + * SqlTypeName to have the same purpose as the Impala default dataypes, there is no + * SqlTypeName.STRING. Therefore, we needed to resort to RelDataTypes for this purpose. + * - types with precisions. The normal RelDataType is used. + */ +public class ImpalaTypeConverter { + + // Maps Impala default types to Calcite default types. + private static Map impalaToCalciteMap; + + static { + RexBuilder rexBuilder = + new RexBuilder(new JavaTypeFactoryImpl(new ImpalaTypeSystemImpl())); + RelDataTypeFactory factory = rexBuilder.getTypeFactory(); + Map map = new HashMap<>(); + map.put(Type.BOOLEAN, factory.createSqlType(SqlTypeName.BOOLEAN)); + map.put(Type.TINYINT, factory.createSqlType(SqlTypeName.TINYINT)); + map.put(Type.SMALLINT, factory.createSqlType(SqlTypeName.SMALLINT)); + map.put(Type.INT, factory.createSqlType(SqlTypeName.INTEGER)); + map.put(Type.BIGINT, factory.createSqlType(SqlTypeName.BIGINT)); + map.put(Type.FLOAT, factory.createSqlType(SqlTypeName.FLOAT)); + map.put(Type.DOUBLE, factory.createSqlType(SqlTypeName.DOUBLE)); + map.put(Type.TIMESTAMP, factory.createSqlType(SqlTypeName.TIMESTAMP)); + map.put(Type.DATE, factory.createSqlType(SqlTypeName.DATE)); + map.put(Type.BINARY, factory.createSqlType(SqlTypeName.BINARY)); + map.put(Type.STRING, factory.createSqlType(SqlTypeName.VARCHAR, Integer.MAX_VALUE)); + map.put(Type.NULL, factory.createSqlType(SqlTypeName.NULL)); + + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (Type t : map.keySet()) { + RelDataType r = map.get(t); + builder.put(t, factory.createTypeWithNullability(r, true)); + } + impalaToCalciteMap = builder.build(); + } + + /** + * Create a new RelDataType given the Impala type. + */ + public static RelDataType createRelDataType(RelDataTypeFactory factory, + Type impalaType) { + if (impalaType == null) { + return null; + } + TPrimitiveType primitiveType = impalaType.getPrimitiveType().toThrift(); + ScalarType scalarType = (ScalarType) impalaType; + switch (primitiveType) { + case DECIMAL: + RelDataType decimalDefinedRetType = factory.createSqlType(SqlTypeName.DECIMAL, + scalarType.decimalPrecision(), scalarType.decimalScale()); + return factory.createTypeWithNullability(decimalDefinedRetType, true); + case VARCHAR: + RelDataType varcharType = factory.createSqlType(SqlTypeName.VARCHAR, + scalarType.getLength()); + return factory.createTypeWithNullability(varcharType, true); + case CHAR: + RelDataType charType = factory.createSqlType(SqlTypeName.CHAR, + scalarType.getLength()); + return factory.createTypeWithNullability(charType, true); + default: + Type normalizedImpalaType = getImpalaType(primitiveType); + return impalaToCalciteMap.get(normalizedImpalaType); + } + } + + /** + * Create Impala types given primitive types. + * Primitive types should not be exposed outside of this class. + */ + public static Type getImpalaType(TPrimitiveType argType) { + // Char and decimal contain precisions and need to be treated separately from + // the rest. The precisions for this case are unknown though, as we are only given + // a "primitivetype'. + switch (argType) { + case CHAR: + return Type.CHAR; + case VARCHAR: + return Type.VARCHAR; + case DECIMAL: + return Type.DECIMAL; + case BOOLEAN: + return Type.BOOLEAN; + case TINYINT: + return Type.TINYINT; + case SMALLINT: + return Type.SMALLINT; + case INT: + return Type.INT; + case BIGINT: + return Type.BIGINT; + case FLOAT: + return Type.FLOAT; + case DOUBLE: + return Type.DOUBLE; + case TIMESTAMP: + return Type.TIMESTAMP; + case DATE: + return Type.DATE; + case STRING: + return Type.STRING; + case FIXED_UDA_INTERMEDIATE: + return Type.FIXED_UDA_INTERMEDIATE; + case NULL_TYPE: + return Type.NULL; + case BINARY: + return Type.BINARY; + default: + throw new RuntimeException("Unknown type " + argType); + } + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/type/ImpalaTypeSystemImpl.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/type/ImpalaTypeSystemImpl.java new file mode 100644 index 000000000..c812b461f --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/type/ImpalaTypeSystemImpl.java @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.type; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeSystemImpl; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.type.SqlTypeUtil; +import org.apache.impala.analysis.ArithmeticExpr; +import org.apache.impala.analysis.TypesUtil; +import org.apache.impala.catalog.ScalarType; +import org.apache.impala.catalog.Type; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.exception.ExceptionUtils; + +/** + * ImpalaTypeSystemImpl contains constants that are specific + * to Impala datatypes that are used by Calcite. + * Many of these constants were copied from the Hive repository + * in the HiveTypeSystemImpl file. These are not fully tested + * at this point, but since Hive datatypes are similar to Impala + * datatypes, these definitions probably make sense. This may + * change later as more code gets added if it turns out some of these + * definitions do not make sense. + */ +public class ImpalaTypeSystemImpl extends RelDataTypeSystemImpl { + protected static final Logger LOG = + LoggerFactory.getLogger(ImpalaTypeSystemImpl.class.getName()); + private static final int MAX_BINARY_PRECISION = Integer.MAX_VALUE; + // TIMESTAMP precision can go up to nanos + private static final int MAX_TIMESTAMP_PRECISION = 15; + private static final int MAX_TIMESTAMP_WITH_LOCAL_TIME_ZONE_PRECISION = 15; // nanos + // The precisions here match the number of digits that can be held by the type. + // For example, the maximum value of TINYINT is 127, which is 3 digits. + // The float and double precisions also match the number of total digits in the number. + // Note: The FLOAT precision here is different from the precision used in the + // Calcite RelDataTypeSystem file. Calcite treats its floats the same as doubles. + // Also note that the precision sizes match the values existing in + // HiveDataTypeSystemImpl in the Hive github code base. + private static final int DEFAULT_TINYINT_PRECISION = 3; + private static final int DEFAULT_SMALLINT_PRECISION = 5; + private static final int DEFAULT_INTEGER_PRECISION = 10; + private static final int DEFAULT_BIGINT_PRECISION = 19; + private static final int DEFAULT_FLOAT_PRECISION = 7; + private static final int DEFAULT_DOUBLE_PRECISION = 15; + + + @Override + public int getMaxScale(SqlTypeName typeName) { + switch (typeName) { + case DECIMAL: + return getMaxNumericScale(); + case INTERVAL_YEAR: + case INTERVAL_MONTH: + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY: + case INTERVAL_DAY_HOUR: + case INTERVAL_DAY_MINUTE: + case INTERVAL_DAY_SECOND: + case INTERVAL_HOUR: + case INTERVAL_HOUR_MINUTE: + case INTERVAL_HOUR_SECOND: + case INTERVAL_MINUTE: + case INTERVAL_MINUTE_SECOND: + case INTERVAL_SECOND: + return SqlTypeName.MAX_INTERVAL_FRACTIONAL_SECOND_PRECISION; + default: + return -1; + } + } + + @Override + public int getDefaultPrecision(SqlTypeName typeName) { + switch (typeName) { + // Binary doesn't need any sizes; Decimal has the default of 10. + case BINARY: + case VARBINARY: + return RelDataType.PRECISION_NOT_SPECIFIED; + case CHAR: + return RelDataType.PRECISION_NOT_SPECIFIED; + case VARCHAR: + return RelDataType.PRECISION_NOT_SPECIFIED; + case DECIMAL: + return RelDataType.PRECISION_NOT_SPECIFIED; + case INTERVAL_YEAR: + case INTERVAL_MONTH: + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY: + case INTERVAL_DAY_HOUR: + case INTERVAL_DAY_MINUTE: + case INTERVAL_DAY_SECOND: + case INTERVAL_HOUR: + case INTERVAL_HOUR_MINUTE: + case INTERVAL_HOUR_SECOND: + case INTERVAL_MINUTE: + case INTERVAL_MINUTE_SECOND: + case INTERVAL_SECOND: + return SqlTypeName.DEFAULT_INTERVAL_START_PRECISION; + default: + return getMaxPrecision(typeName); + } + } + + @Override + public int getMaxPrecision(SqlTypeName typeName) { + switch (typeName) { + case BINARY: + case VARBINARY: + return MAX_BINARY_PRECISION; + case TIME: + case TIMESTAMP: + return MAX_TIMESTAMP_PRECISION; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return MAX_TIMESTAMP_WITH_LOCAL_TIME_ZONE_PRECISION; + case CHAR: + return ScalarType.MAX_CHAR_LENGTH; + case VARCHAR: + return Integer.MAX_VALUE; + case DECIMAL: + return getMaxNumericPrecision(); + case INTERVAL_YEAR: + case INTERVAL_MONTH: + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY: + case INTERVAL_DAY_HOUR: + case INTERVAL_DAY_MINUTE: + case INTERVAL_DAY_SECOND: + case INTERVAL_HOUR: + case INTERVAL_HOUR_MINUTE: + case INTERVAL_HOUR_SECOND: + case INTERVAL_MINUTE: + case INTERVAL_MINUTE_SECOND: + case INTERVAL_SECOND: + return SqlTypeName.MAX_INTERVAL_START_PRECISION; + case TINYINT: + return DEFAULT_TINYINT_PRECISION; + case SMALLINT: + return DEFAULT_SMALLINT_PRECISION; + case INTEGER: + return DEFAULT_INTEGER_PRECISION; + case BIGINT: + return DEFAULT_BIGINT_PRECISION; + case FLOAT: + return DEFAULT_FLOAT_PRECISION; + case DOUBLE: + return DEFAULT_DOUBLE_PRECISION; + default: + return -1; + } + } + + @Override + public int getMaxNumericScale() { + return ScalarType.MAX_SCALE; + } + + @Override + public int getMaxNumericPrecision() { + return ScalarType.MAX_PRECISION; + } + + @Override + public boolean isSchemaCaseSensitive() { + return false; + } +} diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/validate/ImpalaConformance.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/validate/ImpalaConformance.java new file mode 100644 index 000000000..4f1000919 --- /dev/null +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/validate/ImpalaConformance.java @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.calcite.validate; + +import org.apache.calcite.sql.fun.SqlLibrary; +import org.apache.calcite.sql.validate.SqlConformance; + +/** + * ImpalaConformance describes supported Calcite features. + * For more info on the description of these methods, see: + * https://calcite.apache.org/javadocAggregate/org/apache/calcite/sql/validate/SqlConformance.html + */ +public class ImpalaConformance implements SqlConformance { + + public static final SqlConformance INSTANCE = new ImpalaConformance(); + + @Override public boolean isLiberal() { + return true; + } + + @Override public boolean allowCharLiteralAlias() { + return true; + } + + + @Override public boolean isGroupByAlias() { + return true; + } + + @Override public boolean isGroupByOrdinal() { + return true; + } + + @Override public boolean isHavingAlias() { + return true; + } + + @Override public boolean isSortByOrdinal() { + return true; + } + + @Override public boolean isSortByAlias() { + return true; + } + + @Override public boolean isSortByAliasObscures() { + return false; + } + + @Override public boolean isFromRequired() { + return false; + } + + @Override public boolean splitQuotedTableName() { + return false; + } + + @Override public boolean allowHyphenInUnquotedTableName() { + return false; + } + + @Override public boolean isBangEqualAllowed() { + return true; + } + + @Override public boolean isMinusAllowed() { + return true; + } + + @Override public boolean isRegexReplaceCaptureGroupDollarIndexed() { + return true; + } + + @Override public boolean isPercentRemainderAllowed() { + return true; + } + + @Override public boolean isApplyAllowed() { + return false; + } + + @Override public boolean isInsertSubsetColumnsAllowed() { + return false; + } + + @Override public boolean allowNiladicParentheses() { + return true; + } + + @Override public boolean allowExplicitRowValueConstructor() { + return false; + } + + @Override public boolean allowExtend() { + return false; + } + + @Override public boolean isLimitStartCountAllowed() { + return false; + } + + @Override public boolean isOffsetLimitAllowed() { + return false; + } + + @Override public boolean allowGeometry() { + return false; + } + + @Override public boolean shouldConvertRaggedUnionTypesToVarying() { + return false; + } + + @Override public boolean allowExtendedTrim() { + return false; + } + + @Override public boolean allowPluralTimeUnits() { + return false; + } + + @Override public boolean allowQualifyingCommonColumn() { + return true; + } + + @Override public boolean allowAliasUnnestItems() { + return false; + } + + @Override public boolean isValueAllowed() { + return true; + } + + @Override public SqlLibrary semantics() { + return SqlLibrary.STANDARD; + } + + @Override public boolean allowLenientCoercion() { + return false; + } +} diff --git a/java/pom.xml b/java/pom.xml index e7a2d3d72..9e4697e58 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -385,6 +385,7 @@ under the License. ext-data-source ../fe external-frontend + calcite-planner query-event-hook-api shaded-deps/hive-exec shaded-deps/s3a-aws-sdk diff --git a/testdata/workloads/functional-query/queries/QueryTest/calcite.test b/testdata/workloads/functional-query/queries/QueryTest/calcite.test new file mode 100644 index 000000000..b05f3089a --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/calcite.test @@ -0,0 +1,117 @@ +==== +---- QUERY +-- this is a test for a comment line above a blank line +-- we do not care about the results, just the comment + +select * from functional.alltypestiny; +---- RUNTIME_PROFILE +row_regex: .*PlannerType: CalcitePlanner.* +==== +---- QUERY +create table calcite_alltypes as select * from functional.alltypes order by id limit 5; +---- RUNTIME_PROFILE +row_regex: .*PlannerType: OriginalPlanner.* +==== +---- QUERY +select * from calcite_alltypes; +---- RESULTS +0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1 +1,false,1,1,1,10,1.100000023841858,10.1,'01/01/09','1',2009-01-01 00:01:00,2009,1 +2,true,2,2,2,20,2.200000047683716,20.2,'01/01/09','2',2009-01-01 00:02:00.100000000,2009,1 +3,false,3,3,3,30,3.299999952316284,30.3,'01/01/09','3',2009-01-01 00:03:00.300000000,2009,1 +4,true,4,4,4,40,4.400000095367432,40.4,'01/01/09','4',2009-01-01 00:04:00.600000000,2009,1 +---- TYPES +int,boolean,tinyint,smallint,int,bigint,float,double,string,string,timestamp,int,int +---- RUNTIME_PROFILE +row_regex: .*PlannerType: CalcitePlanner.* +==== +---- QUERY +select string_col, tinyint_col from calcite_alltypes; +---- RUNTIME_PROFILE +row_regex: .*PlannerType: CalcitePlanner.* +---- RESULTS +'0',0 +'1',1 +'2',2 +'3',3 +'4',4 +---- TYPES +string,tinyint +==== +---- QUERY +select d1,d2,d3,d4,d5,d6 from functional.decimal_tbl; +---- RESULTS +1234,2222,1.2345678900,0.12345678900000000000000000000000000000,12345.78900,1 +12345,333,123.4567890000,0.12345678900000000000000000000000000000,11.22000,1 +12345,333,1234.5678900000,0.12345678900000000000000000000000000000,0.10000,1 +132842,333,12345.6789000000,0.12345678900000000000000000000000000000,0.77889,1 +2345,111,12.3456789000,0.12345678900000000000000000000000000000,3.14100,1 +---- TYPES +decimal,decimal,decimal,decimal,decimal,decimal +---- RUNTIME_PROFILE +row_regex: .*PlannerType: CalcitePlanner.* +==== +---- QUERY +select * from functional.chars_tiny; +---- RESULTS +'1aaaa','1bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb','1cccc' +'2aaaa','2bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb','2cccccc' +'3aaa ','3bbbbb ','3ccc' +'4aa ','4bbbb ','4cc' +'5a ','5bbb ','5c' +'6a ','6b ','6c' +'6a ','6b ','6c' +'NULL','NULL','NULL' +'a ','b ','c' +---- TYPES +# varchar shows up as string, just as it does in the chars.test file +char,char,string +---- RUNTIME_PROFILE +row_regex: .*PlannerType: CalcitePlanner.* +==== +---- QUERY +select * from functional.date_tbl; +---- RESULTS +0,0001-01-01,0001-01-01 +1,0001-12-31,0001-01-01 +10,2017-11-28,1399-06-27 +11,NULL,1399-06-27 +12,2018-12-31,1399-06-27 +2,0002-01-01,0001-01-01 +20,0001-06-21,2017-11-27 +21,0001-06-22,2017-11-27 +22,0001-06-23,2017-11-27 +23,0001-06-24,2017-11-27 +24,0001-06-25,2017-11-27 +25,0001-06-26,2017-11-27 +26,0001-06-27,2017-11-27 +27,0001-06-28,2017-11-27 +28,0001-06-29,2017-11-27 +29,2017-11-28,2017-11-27 +3,1399-12-31,0001-01-01 +30,9999-12-01,9999-12-31 +31,9999-12-31,9999-12-31 +4,2017-11-28,0001-01-01 +5,9999-12-31,0001-01-01 +6,NULL,0001-01-01 +---- TYPES +int,date,date +---- RUNTIME_PROFILE +row_regex: .*PlannerType: CalcitePlanner.* +==== +---- QUERY +# creating a new table. We cannot use any functions at this point to +# manipulate the binary data (as the binary test currently does), so +# this just grabs the rows that can be checked. +create table ascii_binary as select * from functional.binary_tbl where id <= 4; +select * from ascii_binary; +---- RESULTS +1,'ascii','binary1' +2,'ascii','binary2' +3,'null','NULL' +4,'empty','' +---- TYPES +int,string,binary +---- RUNTIME_PROFILE +row_regex: .*PlannerType: CalcitePlanner.* +==== diff --git a/tests/custom_cluster/test_calcite_planner.py b/tests/custom_cluster/test_calcite_planner.py new file mode 100644 index 000000000..1b49df791 --- /dev/null +++ b/tests/custom_cluster/test_calcite_planner.py @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import absolute_import, division, print_function +import logging +import pytest + +from tests.common.custom_cluster_test_suite import CustomClusterTestSuite + +LOG = logging.getLogger(__name__) + + +class TestCalcitePlanner(CustomClusterTestSuite): + + @classmethod + def setup_class(cls): + super(TestCalcitePlanner, cls).setup_class() + + @classmethod + def get_workload(cls): + return 'functional-query' + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args(start_args="--use_calcite_planner=true") + def test_calcite_frontend(self, vector, unique_database): + self.run_test_case('QueryTest/calcite', vector, use_db=unique_database) diff --git a/tests/util/workload_management.py b/tests/util/workload_management.py index 4cdacfe54..c092f85c5 100644 --- a/tests/util/workload_management.py +++ b/tests/util/workload_management.py @@ -417,7 +417,7 @@ def assert_query(query_tbl, client, expected_cluster_id, raw_profile=None, impal index += 1 assert sql_results.column_labels[index] == EXECUTOR_GROUPS ret_data[EXECUTOR_GROUPS] = data[index] - exec_groups = re.search(r'\n\s+(Executor group \d+:.*?)\n\s+ImpalaServer', profile_text, + exec_groups = re.search(r'\n\s+(Executor group \d+:.*?)\n\s+PlannerInfo', profile_text, re.DOTALL) if query_state_value == "EXCEPTION": assert exec_groups is None, "executor groups should not have been found"