IMPALA-12940: Added filtering capability for Calcite planner

The Filter RelNode is now handled in the Calcite planner.

The parsing and analysis is done by Calcite so there were no
changes added to that portion. The ImpalaFilterRel class was
created to handled the conversion of the Calcite LogicalFilter
to create a filter condition within the Impala plan nodes.

There is no explicit filter plan node in Impala. Instead, the
filter condition attaches itself to an existing plan node. The
filter condition gets passed into the children plan nodes through
the ParentPlanRelContext.

The ExprConjunctsConverter class is responsible for creating the
filter Expr list that is used. The list contains separate AND
conditions that are on the top level.

Change-Id: If104bf1cd801d5ee92dd7e43d398a21a18be5d97
Reviewed-on: http://gerrit.cloudera.org:8080/21498
Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
This commit is contained in:
Steve Carlin
2024-03-25 16:17:44 -07:00
committed by Joe McDonnell
parent a99de990b0
commit a6db27850a
10 changed files with 275 additions and 13 deletions

View File

@@ -19,22 +19,18 @@ package org.apache.impala.calcite.functions;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlKind;
import org.apache.impala.analysis.FunctionName;
import org.apache.impala.calcite.type.ImpalaTypeConverter;
import org.apache.impala.catalog.BuiltinsDb;
import org.apache.impala.catalog.Db;
import org.apache.impala.catalog.Function;
import org.apache.impala.catalog.ScalarFunction;
import org.apache.impala.catalog.Type;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +44,20 @@ public class FunctionResolver {
protected static final Logger LOG =
LoggerFactory.getLogger(FunctionResolver.class.getName());
// Map of the Calcite Kind to an Impala function name
public static Map<SqlKind, String> CALCITE_KIND_TO_IMPALA_FUNC =
ImmutableMap.<SqlKind, String> builder()
.put(SqlKind.EQUALS, "eq")
.build();
public static Function getFunction(String name, SqlKind kind,
List<RelDataType> argTypes) {
String mappedName = CALCITE_KIND_TO_IMPALA_FUNC.get(kind);
return mappedName == null
? getFunction(name, argTypes)
: getFunction(mappedName, argTypes);
}
public static Function getFunction(String name, List<RelDataType> argTypes) {
String lowercaseName = name.toLowerCase();

View File

@@ -49,7 +49,7 @@ public class RexCallConverter {
String funcName = rexCall.getOperator().getName().toLowerCase();
Function fn = getFunction(funcName, rexCall.getOperands(), rexCall.getType());
Function fn = getFunction(rexCall);
Type impalaRetType = ImpalaTypeConverter.createImpalaType(fn.getReturnType(),
rexCall.getType().getPrecision(), rexCall.getType().getScale());
@@ -57,13 +57,13 @@ public class RexCallConverter {
return new AnalyzedFunctionCallExpr(fn, params, rexCall, impalaRetType, analyzer);
}
private static Function getFunction(String name, List<RexNode> args,
RelDataType retType) throws ImpalaException {
List<RelDataType> argTypes = Lists.transform(args, RexNode::getType);
Function fn = FunctionResolver.getFunction(name, argTypes);
private static Function getFunction(RexCall call) throws ImpalaException {
List<RelDataType> argTypes = Lists.transform(call.getOperands(), RexNode::getType);
String name = call.getOperator().getName();
Function fn = FunctionResolver.getFunction(name, call.getKind(), argTypes);
if (fn == null) {
throw new AnalysisException("Could not find function \"" + name + "\" in Impala "
+ "with args " + argTypes + " and return type " + retType);
+ "with args " + argTypes + " and return type " + call.getType());
}
return fn;
}

View File

@@ -20,6 +20,7 @@ package org.apache.impala.calcite.rel.node;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.logical.LogicalTableScan;
@@ -43,6 +44,18 @@ public class ConvertToImpalaRelRules {
}
}
public static class ImpalaFilterRule extends RelOptRule {
public ImpalaFilterRule() {
super(operand(LogicalFilter.class, any()));
}
@Override
public void onMatch(RelOptRuleCall call) {
final LogicalFilter filter = call.rel(0);
call.transformTo(new ImpalaFilterRel(filter));
}
}
public static class ImpalaScanRule extends RelOptRule {
public ImpalaScanRule() {

View File

@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.impala.calcite.rel.node;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.impala.common.ImpalaException;
import org.apache.calcite.rel.core.Filter;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* ImpalaFilterRel: There is no Impala PlanNode that maps directly
* from this RelNode. When this RelNode gets hit in the tree, it passes
* its filter condition down into a RelNode that can handle the filter.
*/
public class ImpalaFilterRel extends Filter
implements ImpalaPlanRel {
protected static final Logger LOG =
LoggerFactory.getLogger(ImpalaFilterRel.class.getName());
public ImpalaFilterRel(Filter filter) {
super(filter.getCluster(), filter.getTraitSet(), filter.getInput(),
filter.getCondition());
}
private ImpalaFilterRel(RelOptCluster cluster, RelTraitSet traits,
RelNode child, RexNode condition) {
super(cluster, traits, child, condition);
}
@Override
public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
return new ImpalaFilterRel(getCluster(), traitSet, input, condition);
}
@Override
public NodeWithExprs getPlanNode(ParentPlanRelContext context) throws ImpalaException {
Preconditions.checkState(getInputs().size() == 1);
return getChildPlanNode(context);
}
private NodeWithExprs getChildPlanNode(ParentPlanRelContext context)
throws ImpalaException {
ImpalaPlanRel relInput = (ImpalaPlanRel) getInput(0);
ParentPlanRelContext.Builder builder =
new ParentPlanRelContext.Builder(context, this);
RexNode newFilterCondition =
createNewCondition(context.filterCondition_, getCondition());
builder.setFilterCondition(newFilterCondition);
// need to set the inputRefs. The HdfsScan RelNode needs to know which columns are
// needed from the table in order to implement the filter condition. The input ref
// used here may nor may not be projected out. So a union needs to be done with
// potential existing projected input refs from a parent RelNode.
// Note that if the parent RelNode hasn't set any input refs, it is assumed that all
// input refs are needed (the default case when inputRefs_ is null).
if (context.inputRefs_ != null) {
ImmutableBitSet inputRefs =
RelOptUtil.InputFinder.bits(Lists.newArrayList(getCondition()), null);
builder.setInputRefs(inputRefs.union(context.inputRefs_));
}
return relInput.getPlanNode(builder.build());
}
private RexNode createNewCondition(RexNode previousCondition, RexNode newCondition) {
if (previousCondition == null) {
return newCondition;
}
List<RexNode> conditions = ImmutableList.of(previousCondition, newCondition);
return RexUtil.composeConjunction(getCluster().getRexBuilder(), conditions);
}
}

View File

@@ -26,6 +26,7 @@ import org.apache.impala.analysis.SlotDescriptor;
import org.apache.impala.analysis.SlotRef;
import org.apache.impala.analysis.TupleDescriptor;
import org.apache.impala.calcite.rel.phys.ImpalaHdfsScanNode;
import org.apache.impala.calcite.rel.util.ExprConjunctsConverter;
import org.apache.impala.calcite.schema.CalciteTable;
import org.apache.impala.catalog.FeFsPartition;
import org.apache.impala.catalog.HdfsTable;
@@ -63,11 +64,17 @@ public class ImpalaHdfsScanRel extends TableScan
// outputExprs will contain all the needed columns from the table
List<Expr> outputExprs = createScanOutputExprs(tupleDesc.getSlots());
// break up the filter condition (if given) to ones that can be used for
// partition pruning and ones that cannot.
ExprConjunctsConverter converter = new ExprConjunctsConverter(
context.filterCondition_, outputExprs, getCluster().getRexBuilder(),
context.ctx_.getRootAnalyzer());
List<? extends FeFsPartition> impalaPartitions = table.getPrunedPartitions(
context.ctx_.getRootAnalyzer(), tupleDesc);
// TODO: filters are not handled yet, nor are partitions
List<Expr> filterConjuncts = new ArrayList<>();
// TODO: All conjuncts will be nonpartitioned conjuncts until the partition
// pruning feature is committed.
List<Expr> filterConjuncts = converter.getImpalaConjuncts();
List<Expr> partitionConjuncts = new ArrayList<>();
PlanNodeId nodeId = context.ctx_.getNextNodeId();

View File

@@ -102,6 +102,8 @@ public class ImpalaProjectRel extends Project
private NodeWithExprs getChildPlanNode(ParentPlanRelContext context
) throws ImpalaException {
Preconditions.checkState(context.filterCondition_ == null,
"Failure, Filter RelNode needs to be passed through the Project Rel Node.");
ImpalaPlanRel relInput = (ImpalaPlanRel) getInput(0);
ParentPlanRelContext.Builder builder =
new ParentPlanRelContext.Builder(context, this);

View File

@@ -17,6 +17,7 @@
package org.apache.impala.calcite.rel.node;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.planner.PlannerContext;
@@ -31,16 +32,21 @@ public class ParentPlanRelContext {
// ctx: This doesn't change throughout the tree
public final PlannerContext ctx_;
// filterCondition: A filter which can be used by the current node.
public final RexNode filterCondition_;
// The input refs used by the parent PlanRel Node
public final ImmutableBitSet inputRefs_;
private ParentPlanRelContext(Builder builder) {
this.ctx_ = builder.context_;
this.filterCondition_ = builder.filterCondition_;
this.inputRefs_ = builder.inputRefs_;
}
public static class Builder {
private PlannerContext context_;
private RexNode filterCondition_;
private ImmutableBitSet inputRefs_;
public Builder(PlannerContext plannerContext) {
@@ -49,6 +55,11 @@ public class ParentPlanRelContext {
public Builder(ParentPlanRelContext planRelContext, ImpalaPlanRel planRel) {
this.context_ = planRelContext.ctx_;
this.filterCondition_ = planRelContext.filterCondition_;
}
public void setFilterCondition(RexNode filterCondition) {
this.filterCondition_ = filterCondition;
}
public void setInputRefs(ImmutableBitSet inputRefs) {

View File

@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.impala.calcite.rel.util;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexShuttle;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.rex.RexVisitor;
import org.apache.calcite.sql.SqlKind;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.Expr;
import org.apache.impala.common.ImpalaException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* ExprConjunctsConverter takes a RexNode conjunct and converts it
* into an Impala Expr object. The child node input refs are included
* as an input.
*/
public class ExprConjunctsConverter {
private static final Logger LOG = LoggerFactory.getLogger(ExprConjunctsConverter.class);
private final List<Expr> allConjuncts_;
public ExprConjunctsConverter(RexNode conjunct, List<Expr> inputExprs,
RexBuilder rexBuilder, Analyzer analyzer) throws ImpalaException {
ImmutableList.Builder<Expr> builder = new ImmutableList.Builder();
if (conjunct != null) {
CreateExprVisitor visitor =
new CreateExprVisitor(rexBuilder, inputExprs, analyzer);
List<RexNode> andOperands = getAndConjuncts(conjunct);
for (RexNode andOperand : andOperands) {
Expr convertedExpr = CreateExprVisitor.getExpr(visitor, andOperand);
builder.add(convertedExpr);
}
}
this.allConjuncts_ = builder.build();
}
public List<Expr> getImpalaConjuncts() {
return allConjuncts_;
}
/**
* Break the list up by its AND conjuncts. We only care about
* AND clauses on the first level. Calcite does not treat AND
* clauses as binary (e.g. <clause1> AND <clause2> AND <clause3>
* will have all 3 clauses on the first level), so we do not
* need to recursively search for clauses.
*/
private static List<RexNode> getAndConjuncts(RexNode conjunct) {
if (conjunct == null) {
return ImmutableList.of();
}
// If it's not a RexCall, there's no AND operator and we can
// just return the conjunct.
if (!(conjunct instanceof RexCall)) {
return ImmutableList.of(conjunct);
}
RexCall rexCallConjunct = (RexCall) conjunct;
if (rexCallConjunct.getKind() != SqlKind.AND) {
return ImmutableList.of(conjunct);
}
// If it's an AND conjunct, then all the operands represent individual
// AND clauses.
return rexCallConjunct.getOperands();
}
}

View File

@@ -58,6 +58,7 @@ public class CalciteOptimizer implements CompilerStep {
builder.addRuleCollection(
ImmutableList.of(
new ConvertToImpalaRelRules.ImpalaScanRule(),
new ConvertToImpalaRelRules.ImpalaFilterRule(),
new ConvertToImpalaRelRules.ImpalaProjectRule()));
HepPlanner planner = new HepPlanner(builder.build(),

View File

@@ -150,3 +150,18 @@ select cast(cast('2005-12-13 08:00:00' as string) AS TIMESTAMP) from functional
2005-12-13 08:00:00
---- TYPES
timestamp
====
---- QUERY
select * from calcite_alltypes where bigint_col = 20;
---- RESULTS
2,true,2,2,2,20,2.200000047683716,20.2,'01/01/09','2',2009-01-01 00:02:00.100000000,2009,1
---- TYPES
int,boolean,tinyint,smallint,int,bigint,float,double,string,string,timestamp,int,int
====
---- QUERY
select tinyint_col from calcite_alltypes where bigint_col = 20;
---- RESULTS
2
---- TYPES
tinyint
====