IMPALA-14080: Support LocalFsTable table types in Calcite planner.

IMPALA-13947 changes the use_local_catalog default to true. This causes
failure for when the use_calcite_planner query option is set to true.

The Calcite planner was only handling HdfsTable table types. It will
now handle LocalFsTable table types as well.

Currently, if table num rows is missing from table, Calcite planner will
load all partitions to estimate by iterating all partitions. This is
inefficent in local catalog mode and ideally should happen later after
partition prunning. Follow up work is needed to improve this.

Testing:
Reenable local catalog mode in
TestCalcitePlanner.test_calcite_frontend
TestWorkloadManagementSQLDetailsCalcite.test_tpcds_8_decimal

Co-authored-by: Riza Suminto

Change-Id: Ic855779aa64d11b7a8b19dd261c0164e65604e44
Reviewed-on: http://gerrit.cloudera.org:8080/23341
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Steve Carlin
2025-08-22 10:50:13 -07:00
committed by Impala Public Jenkins
parent fd9de7a2e1
commit 048b5689fd
6 changed files with 57 additions and 62 deletions

View File

@@ -27,8 +27,8 @@ import org.apache.calcite.util.ImmutableBitSet;
import org.apache.impala.analysis.Analyzer; import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.BaseTableRef; import org.apache.impala.analysis.BaseTableRef;
import org.apache.impala.analysis.Expr; import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.Path;
import org.apache.impala.analysis.ExprSubstitutionMap; import org.apache.impala.analysis.ExprSubstitutionMap;
import org.apache.impala.analysis.Path;
import org.apache.impala.analysis.SlotDescriptor; import org.apache.impala.analysis.SlotDescriptor;
import org.apache.impala.analysis.SlotRef; import org.apache.impala.analysis.SlotRef;
import org.apache.impala.analysis.TupleDescriptor; import org.apache.impala.analysis.TupleDescriptor;
@@ -39,22 +39,21 @@ import org.apache.impala.calcite.schema.CalciteTable;
import org.apache.impala.calcite.util.SimplifiedAnalyzer; import org.apache.impala.calcite.util.SimplifiedAnalyzer;
import org.apache.impala.catalog.Column; import org.apache.impala.catalog.Column;
import org.apache.impala.catalog.FeFsPartition; import org.apache.impala.catalog.FeFsPartition;
import org.apache.impala.catalog.HdfsTable; import org.apache.impala.catalog.FeFsTable;
import org.apache.impala.catalog.Type; import org.apache.impala.catalog.Type;
import org.apache.impala.common.ImpalaException; import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.UnsupportedFeatureException; import org.apache.impala.common.UnsupportedFeatureException;
import org.apache.impala.planner.PlanNode; import org.apache.impala.planner.PlanNode;
import org.apache.impala.planner.PlanNodeId; import org.apache.impala.planner.PlanNodeId;
import org.apache.impala.planner.SingleNodePlanner;
import org.apache.impala.planner.ScanNode; import org.apache.impala.planner.ScanNode;
import org.apache.impala.planner.SingleNodePlanner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* ImpalaHdfsScanRel. Calcite RelNode which maps to an Impala TableScan node. * ImpalaHdfsScanRel. Calcite RelNode which maps to an Impala TableScan node.
*/ */
@@ -138,7 +137,7 @@ public class ImpalaHdfsScanRel extends TableScan
private List<Expr> createScanOutputExprs(List<SlotDescriptor> slotDescs) private List<Expr> createScanOutputExprs(List<SlotDescriptor> slotDescs)
throws ImpalaException { throws ImpalaException {
CalciteTable calciteTable = (CalciteTable) getTable(); CalciteTable calciteTable = (CalciteTable) getTable();
HdfsTable table = calciteTable.getHdfsTable(); FeFsTable table = calciteTable.getFeFsTable();
// IMPALA-12961: The output expressions are contained in a list which // IMPALA-12961: The output expressions are contained in a list which
// may have holes in it (if the table scan column is not in the output). // may have holes in it (if the table scan column is not in the output).
// The width of the list must include all columns, including the acid ones, // The width of the list must include all columns, including the acid ones,

View File

@@ -17,24 +17,25 @@
package org.apache.impala.calcite.schema; package org.apache.impala.calcite.schema;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.prepare.CalciteCatalogReader; import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeImpl; import org.apache.calcite.rel.type.RelDataTypeImpl;
import org.apache.calcite.schema.Table; import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.ViewTable;
import org.apache.calcite.schema.impl.AbstractSchema; import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.schema.impl.ViewTable;
import org.apache.impala.analysis.Analyzer; import org.apache.impala.analysis.Analyzer;
import org.apache.impala.calcite.type.ImpalaTypeSystemImpl; import org.apache.impala.calcite.type.ImpalaTypeSystemImpl;
import org.apache.impala.catalog.FeTable; import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.FeView;
import org.apache.impala.catalog.HdfsTable; import org.apache.impala.catalog.HdfsTable;
import org.apache.impala.catalog.View; import org.apache.impala.catalog.local.LocalFsTable;
import org.apache.impala.common.ImpalaException; import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.UnsupportedFeatureException; import org.apache.impala.common.UnsupportedFeatureException;
import com.google.common.collect.ImmutableList;
import java.lang.reflect.Type; import java.lang.reflect.Type;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@@ -65,32 +66,33 @@ public class CalciteDb extends AbstractSchema {
Analyzer analyzer) throws ImpalaException { Analyzer analyzer) throws ImpalaException {
if (tableMap_.containsKey(tableName)) return this; if (tableMap_.containsKey(tableName)) return this;
if (table instanceof HdfsTable) { if (table instanceof LocalFsTable || table instanceof HdfsTable) {
tableMap_.put(tableName.toLowerCase(), tableMap_.put(
new CalciteTable(table, reader_, analyzer)); tableName.toLowerCase(), new CalciteTable(table, reader_, analyzer));
return this; return this;
} }
if (table instanceof View) { if (table instanceof FeView) {
tableMap_.put(tableName.toLowerCase(), createViewTable(table)); tableMap_.put(tableName.toLowerCase(), createViewTable(table));
return this; return this;
} }
throw new UnsupportedFeatureException( throw new UnsupportedFeatureException("Table " + table.getFullName()
"Table " + table.getFullName() + " has unsupported type " + + " has unsupported type " + table.getClass().getSimpleName()
table.getClass().getSimpleName() + ". The Calcite planner only supports " + + ". The Calcite planner only supports "
"HdfsTable's and View's."); + "HdfsTable's and FeView's.");
} }
private static ViewTable createViewTable(FeTable feTable) throws ImpalaException { private static ViewTable createViewTable(FeTable feTable) throws ImpalaException {
RelDataType rowType = CalciteTable.buildColumnsForRelDataType(feTable); RelDataType rowType = CalciteTable.buildColumnsForRelDataType(feTable);
JavaTypeFactory typeFactory = (JavaTypeFactory) ImpalaTypeSystemImpl.TYPE_FACTORY; JavaTypeFactory typeFactory = (JavaTypeFactory) ImpalaTypeSystemImpl.TYPE_FACTORY;
Type elementType = typeFactory.getJavaClass(rowType); Type elementType = typeFactory.getJavaClass(rowType);
return new ViewTable(elementType, return new ViewTable(elementType, RelDataTypeImpl.proto(rowType),
RelDataTypeImpl.proto(rowType), ((View) feTable).getQueryStmt().toSql(), ((FeView) feTable).getQueryStmt().toSql(),
/* schemaPath */ ImmutableList.of(), /* schemaPath */ ImmutableList.of(),
/* viewPath */ ImmutableList.of(feTable.getDb().getName().toLowerCase(), /* viewPath */
feTable.getName().toLowerCase())); ImmutableList.of(
feTable.getDb().getName().toLowerCase(), feTable.getName().toLowerCase()));
} }
public CalciteDb build() { public CalciteDb build() {

View File

@@ -17,6 +17,9 @@
package org.apache.impala.calcite.schema; package org.apache.impala.calcite.schema;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.config.CalciteConnectionConfig; import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.plan.RelOptAbstractTable; import org.apache.calcite.plan.RelOptAbstractTable;
@@ -47,39 +50,36 @@ import org.apache.impala.analysis.SlotDescriptor;
import org.apache.impala.analysis.SlotRef; import org.apache.impala.analysis.SlotRef;
import org.apache.impala.analysis.TableRef; import org.apache.impala.analysis.TableRef;
import org.apache.impala.analysis.TupleDescriptor; import org.apache.impala.analysis.TupleDescriptor;
import org.apache.impala.calcite.rel.util.ImpalaBaseTableRef;
import org.apache.impala.calcite.type.ImpalaTypeConverter;
import org.apache.impala.calcite.type.ImpalaTypeSystemImpl;
import org.apache.impala.calcite.util.SimplifiedAnalyzer;
import org.apache.impala.catalog.Column; import org.apache.impala.catalog.Column;
import org.apache.impala.catalog.FeFsPartition; import org.apache.impala.catalog.FeFsPartition;
import org.apache.impala.catalog.FeFsTable;
import org.apache.impala.catalog.FeTable; import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.FeView; import org.apache.impala.catalog.FeView;
import org.apache.impala.catalog.HdfsFileFormat; import org.apache.impala.catalog.HdfsFileFormat;
import org.apache.impala.catalog.HdfsTable; import org.apache.impala.catalog.HdfsTable;
import org.apache.impala.catalog.IcebergTable; import org.apache.impala.catalog.IcebergTable;
import org.apache.impala.calcite.rel.util.ImpalaBaseTableRef;
import org.apache.impala.calcite.type.ImpalaTypeConverter;
import org.apache.impala.calcite.type.ImpalaTypeSystemImpl;
import org.apache.impala.calcite.util.SimplifiedAnalyzer;
import org.apache.impala.planner.HdfsEstimatedMissingTableStats;
import org.apache.impala.planner.HdfsPartitionPruner;
import org.apache.impala.common.AnalysisException; import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.ImpalaException; import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.Pair; import org.apache.impala.common.Pair;
import org.apache.impala.common.UnsupportedFeatureException; import org.apache.impala.common.UnsupportedFeatureException;
import org.apache.impala.planner.HdfsEstimatedMissingTableStats;
import org.apache.impala.planner.HdfsPartitionPruner;
import org.apache.impala.util.AcidUtils; import org.apache.impala.util.AcidUtils;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
public class CalciteTable extends RelOptAbstractTable public class CalciteTable extends RelOptAbstractTable
implements Table, Prepare.PreparingTable { implements Table, Prepare.PreparingTable {
private final FeFsTable table_;
private final HdfsTable table_;
private final Map<Integer, Integer> impalaPositionMap_; private final Map<Integer, Integer> impalaPositionMap_;
@@ -94,15 +94,18 @@ public class CalciteTable extends RelOptAbstractTable
public CalciteTable(FeTable table, CalciteCatalogReader reader, public CalciteTable(FeTable table, CalciteCatalogReader reader,
Analyzer analyzer) throws ImpalaException { Analyzer analyzer) throws ImpalaException {
super(reader, table.getName(), buildColumnsForRelDataType(table)); super(reader, table.getName(), buildColumnsForRelDataType(table));
this.table_ = (HdfsTable) table; this.table_ = (FeFsTable) table;
this.qualifiedTableName_ = table.getTableName().toPath(); this.qualifiedTableName_ = table.getTableName().toPath();
this.columns_ = table.getColumnsInHiveOrder(); this.columns_ = table.getColumnsInHiveOrder();
this.impalaPositionMap_ = buildPositionMap(); this.impalaPositionMap_ = buildPositionMap();
this.analyzer_ = (SimplifiedAnalyzer) analyzer; this.analyzer_ = (SimplifiedAnalyzer) analyzer;
estimatedMissingStats_ = table_.getNumRows() < 0 // TODO: If table_.getNumRows() is unknown (-1), this logic will load all partitions
? new HdfsEstimatedMissingTableStats(analyzer.getQueryOptions(), table_, // to compute estimation using HdfsEstimatedMissingTableStats. This is potentially
table_.getPartitions(), -1) // expensive and should be avoided in local catalog mode.
: null; estimatedMissingStats_ = table_.getNumRows() < 0 ?
new HdfsEstimatedMissingTableStats(
analyzer.getQueryOptions(), table_, table_.loadAllPartitions(), -1) :
null;
checkIfTableIsSupported(table); checkIfTableIsSupported(table);
} }
@@ -131,11 +134,10 @@ public class CalciteTable extends RelOptAbstractTable
throw new UnsupportedFeatureException("Views are not supported yet."); throw new UnsupportedFeatureException("Views are not supported yet.");
} }
if (!(table instanceof HdfsTable)) { if (!(table instanceof FeFsTable)) {
String tableType = table.getClass().getSimpleName().replace("Table", ""); String tableType = table.getClass().getSimpleName().replace("Table", "");
throw new UnsupportedFeatureException(tableType + " tables are not supported yet."); throw new UnsupportedFeatureException(tableType + " tables are not supported yet.");
} }
} }
public BaseTableRef createBaseTableRef(SimplifiedAnalyzer analyzer public BaseTableRef createBaseTableRef(SimplifiedAnalyzer analyzer
@@ -165,9 +167,7 @@ public class CalciteTable extends RelOptAbstractTable
return impalaPair.first; return impalaPair.first;
} }
public HdfsTable getHdfsTable() { public FeFsTable getFeFsTable() { return table_; }
return table_;
}
@Override @Override
public List<String> getQualifiedName() { public List<String> getQualifiedName() {
@@ -247,7 +247,6 @@ public class CalciteTable extends RelOptAbstractTable
} }
public Column getColumn(int i) { public Column getColumn(int i) {
HdfsTable feFsTable = (HdfsTable) table_;
return columns_.get(i); return columns_.get(i);
} }

View File

@@ -17,12 +17,14 @@
package org.apache.impala.calcite.service; package org.apache.impala.calcite.service;
import org.apache.impala.calcite.rel.node.NodeWithExprs;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.impala.analysis.Analyzer; import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.Expr; import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.JoinOperator; import org.apache.impala.analysis.JoinOperator;
import org.apache.impala.calcite.rel.node.NodeWithExprs;
import org.apache.impala.catalog.FeFsTable;
import org.apache.impala.catalog.FeTable; import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.HdfsTable; import org.apache.impala.catalog.HdfsTable;
import org.apache.impala.common.ImpalaException; import org.apache.impala.common.ImpalaException;
@@ -33,9 +35,9 @@ import org.apache.impala.planner.NestedLoopJoinNode;
import org.apache.impala.planner.ParallelPlanner; import org.apache.impala.planner.ParallelPlanner;
import org.apache.impala.planner.PlanFragment; import org.apache.impala.planner.PlanFragment;
import org.apache.impala.planner.PlanNode; import org.apache.impala.planner.PlanNode;
import org.apache.impala.planner.PlanRootSink;
import org.apache.impala.planner.Planner; import org.apache.impala.planner.Planner;
import org.apache.impala.planner.PlannerContext; import org.apache.impala.planner.PlannerContext;
import org.apache.impala.planner.PlanRootSink;
import org.apache.impala.planner.RuntimeFilterGenerator; import org.apache.impala.planner.RuntimeFilterGenerator;
import org.apache.impala.planner.SingleNodePlanner; import org.apache.impala.planner.SingleNodePlanner;
import org.apache.impala.planner.SingularRowSrcNode; import org.apache.impala.planner.SingularRowSrcNode;
@@ -55,19 +57,17 @@ import org.apache.impala.thrift.TRuntimeFilterMode;
import org.apache.impala.thrift.TRuntimeProfileNode; import org.apache.impala.thrift.TRuntimeProfileNode;
import org.apache.impala.thrift.TStmtType; import org.apache.impala.thrift.TStmtType;
import org.apache.impala.util.EventSequence; import org.apache.impala.util.EventSequence;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
/** /**
* ExecRequestCreator. Responsible for taking a PlanNode and the output Expr list * ExecRequestCreator. Responsible for taking a PlanNode and the output Expr list
* from the top level PlanNode and convert it into a TExecRequest thrift object * from the top level PlanNode and convert it into a TExecRequest thrift object
@@ -329,8 +329,8 @@ public class ExecRequestCreator implements CompilerStep {
private List<TNetworkAddress> getHostLocations(Collection<FeTable> tables) { private List<TNetworkAddress> getHostLocations(Collection<FeTable> tables) {
Set<TNetworkAddress> hostLocations = new HashSet<>(); Set<TNetworkAddress> hostLocations = new HashSet<>();
for (FeTable table : tables) { for (FeTable table : tables) {
if (table instanceof HdfsTable) { if (table instanceof FeFsTable) {
hostLocations.addAll(((HdfsTable) table).getHostIndex().getList()); hostLocations.addAll(((FeFsTable) table).getHostIndex().getList());
} }
} }
return new ArrayList<>(hostLocations); return new ArrayList<>(hostLocations);

View File

@@ -37,10 +37,7 @@ class TestCalcitePlanner(CustomClusterTestSuite):
add_mandatory_exec_option(cls, 'use_calcite_planner', 'true') add_mandatory_exec_option(cls, 'use_calcite_planner', 'true')
@pytest.mark.execute_serially @pytest.mark.execute_serially
@CustomClusterTestSuite.with_args( @CustomClusterTestSuite.with_args(start_args="--env_vars=USE_CALCITE_PLANNER=true")
start_args="--env_vars=USE_CALCITE_PLANNER=true",
impalad_args="--use_local_catalog=false",
catalogd_args="--catalog_topic_mode=full")
def test_calcite_frontend(self, vector, unique_database): def test_calcite_frontend(self, vector, unique_database):
"""Calcite planner does not work in local catalog mode yet.""" """Calcite planner does not work in local catalog mode yet."""
self.run_test_case('QueryTest/calcite', vector, use_db=unique_database) self.run_test_case('QueryTest/calcite', vector, use_db=unique_database)

View File

@@ -429,8 +429,6 @@ class TestWorkloadManagementSQLDetailsCalcite(WorkloadManagementTestSuite):
@CustomClusterTestSuite.with_args( @CustomClusterTestSuite.with_args(
start_args="--use_calcite_planner=true", start_args="--use_calcite_planner=true",
impalad_args="--use_local_catalog=false",
catalogd_args="--catalog_topic_mode=full",
cluster_size=1, workload_mgmt=True) cluster_size=1, workload_mgmt=True)
def test_tpcds_8_decimal(self, vector): def test_tpcds_8_decimal(self, vector):
"""Runs the tpcds-decimal_v2-q8 query using the calcite planner and asserts the query """Runs the tpcds-decimal_v2-q8 query using the calcite planner and asserts the query