mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
IMPALA-14108: Add support for SHOW FILES IN table PARTITION for Iceberg
tables This patch implements partition filtering support for the SHOW FILES statement on Iceberg tables, based on the functionality added in IMPALA-12243. Prior to this change, the syntax resulted in a NullPointerException. Key changes: - Added ShowFilesStmt.analyzeIceberg() to validate and transform partition expressions using IcebergPartitionExpressionRewriter and IcebergPartitionPredicateConverter. After that, it collects matching file paths using IcebergUtil.planFiles(). - Added FeIcebergTable.Utils.getIcebergTableFilesFromPaths() to accept pre-filtered file lists from the analysis phase. - Enhanced TShowFilesParams thrift struct with optional selected_files field to pass pre-filtered file paths from frontend to backend. Testing: - Analyzer tests for negative cases: non-existent partitions, invalid expressions, non-partition columns, unsupported transforms. - Analyzer tests for positive cases: all transform types, complex expressions. - Authorization tests for non-filtered and filtered syntaxes. - E2E tests covering every partition transform type with various predicates. - Schema evolution and rollback scenarios. The implementation follows AlterTableDropPartition's pattern where the analysis phase performs validation/metadata retrieval and the execution phase handles result formatting and display. Change-Id: Ibb9913e078e6842861bdbb004ed5d67286bd3152 Reviewed-on: http://gerrit.cloudera.org:8080/23455 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
committed by
Impala Public Jenkins
parent
275f03f10d
commit
087b715a2b
@@ -240,7 +240,7 @@ public class AnalysisContext {
|
||||
public boolean isSingleColumnPrivStmt() {
|
||||
return isDescribeTableStmt() || isResetMetadataStmt() || isUseStmt()
|
||||
|| isShowTablesStmt() || isShowMetadataTablesStmt() || isShowViewsStmt()
|
||||
|| isAlterTableStmt() || isShowFunctionsStmt();
|
||||
|| isAlterTableStmt() || isShowFunctionsStmt() || isShowFilesStmt();
|
||||
}
|
||||
|
||||
public boolean isConvertTableToIcebergStmt() {
|
||||
|
||||
@@ -17,15 +17,31 @@
|
||||
|
||||
package org.apache.impala.analysis;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.iceberg.DeleteFile;
|
||||
import org.apache.iceberg.FileScanTask;
|
||||
import org.apache.iceberg.expressions.Expression;
|
||||
import org.apache.iceberg.expressions.Expressions;
|
||||
import org.apache.iceberg.io.CloseableIterable;
|
||||
import org.apache.impala.authorization.Privilege;
|
||||
import org.apache.impala.catalog.FeIcebergTable;
|
||||
import org.apache.impala.catalog.FeFsTable;
|
||||
import org.apache.impala.catalog.FeTable;
|
||||
import org.apache.impala.catalog.TableLoadingException;
|
||||
import org.apache.impala.catalog.paimon.FeShowFileStmtSupport;
|
||||
import org.apache.impala.common.AnalysisException;
|
||||
import org.apache.impala.common.IcebergPartitionPredicateConverter;
|
||||
import org.apache.impala.common.IcebergPredicateConverter;
|
||||
import org.apache.impala.common.ImpalaException;
|
||||
|
||||
import org.apache.impala.thrift.TShowFilesParams;
|
||||
import org.apache.impala.thrift.TTableName;
|
||||
import org.apache.impala.util.IcebergUtil;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
@@ -45,6 +61,9 @@ public class ShowFilesStmt extends StatementBase implements SingleTableStmt {
|
||||
// Set during analysis.
|
||||
protected FeTable table_;
|
||||
|
||||
// File paths selected by Iceberg's partition filtering
|
||||
private List<String> icebergFilePaths_;
|
||||
|
||||
public ShowFilesStmt(TableName tableName, PartitionSet partitionSet) {
|
||||
tableName_ = Preconditions.checkNotNull(tableName);
|
||||
partitionSet_ = partitionSet;
|
||||
@@ -72,10 +91,13 @@ public class ShowFilesStmt extends StatementBase implements SingleTableStmt {
|
||||
// and to allow us to evaluate partition predicates.
|
||||
TableRef tableRef = new TableRef(tableName_.toPath(), null, Privilege.VIEW_METADATA);
|
||||
tableRef = analyzer.resolveTableRef(tableRef);
|
||||
if (tableRef instanceof InlineViewRef ||
|
||||
tableRef instanceof CollectionTableRef) {
|
||||
if (tableRef instanceof InlineViewRef) {
|
||||
throw new AnalysisException(String.format(
|
||||
"SHOW FILES not applicable to a non hdfs table: %s", tableName_));
|
||||
"SHOW FILES not allowed on a view: %s", tableName_));
|
||||
}
|
||||
if (tableRef instanceof CollectionTableRef) {
|
||||
throw new AnalysisException(String.format(
|
||||
"SHOW FILES not allowed on a nested collection: %s", tableName_));
|
||||
}
|
||||
table_ = tableRef.getTable();
|
||||
Preconditions.checkNotNull(table_);
|
||||
@@ -101,14 +123,85 @@ public class ShowFilesStmt extends StatementBase implements SingleTableStmt {
|
||||
partitionSet_.setPrivilegeRequirement(Privilege.VIEW_METADATA);
|
||||
partitionSet_.analyze(analyzer);
|
||||
}
|
||||
|
||||
if (table_ instanceof FeIcebergTable) { analyzeIceberg(analyzer); }
|
||||
}
|
||||
|
||||
public void analyzeIceberg(Analyzer analyzer) throws AnalysisException {
|
||||
if (partitionSet_ == null) {
|
||||
icebergFilePaths_ = null;
|
||||
return;
|
||||
}
|
||||
|
||||
FeIcebergTable table = (FeIcebergTable) table_;
|
||||
// To rewrite transforms and column references
|
||||
IcebergPartitionExpressionRewriter rewriter =
|
||||
new IcebergPartitionExpressionRewriter(analyzer,
|
||||
table.getIcebergApiTable().spec());
|
||||
// For Impala expression to Iceberg expression conversion
|
||||
IcebergPredicateConverter converter =
|
||||
new IcebergPartitionPredicateConverter(table.getIcebergSchema(), analyzer);
|
||||
|
||||
List<Expression> icebergPartitionExprs = new ArrayList<>();
|
||||
for (Expr expr : partitionSet_.getPartitionExprs()) {
|
||||
expr = rewriter.rewrite(expr);
|
||||
expr.analyze(analyzer);
|
||||
analyzer.getConstantFolder().rewrite(expr, analyzer);
|
||||
try {
|
||||
icebergPartitionExprs.add(converter.convert(expr));
|
||||
} catch (ImpalaException e) {
|
||||
throw new AnalysisException(
|
||||
"Invalid partition filtering expression: " + expr.toSql());
|
||||
}
|
||||
}
|
||||
|
||||
try (CloseableIterable<FileScanTask> fileScanTasks = IcebergUtil.planFiles(table,
|
||||
icebergPartitionExprs, null, null)) {
|
||||
// Collect file paths without sorting - sorting will be done in execution phase
|
||||
List<String> filePaths = new ArrayList<>();
|
||||
Set<String> uniquePaths = new HashSet<>();
|
||||
|
||||
for (FileScanTask fileScanTask : fileScanTasks) {
|
||||
if (fileScanTask.residual().isEquivalentTo(Expressions.alwaysTrue())) {
|
||||
// Add delete files
|
||||
for (DeleteFile deleteFile : fileScanTask.deletes()) {
|
||||
String path = deleteFile.path().toString();
|
||||
if (uniquePaths.add(path)) {
|
||||
filePaths.add(path);
|
||||
}
|
||||
}
|
||||
|
||||
// Add data file
|
||||
String dataFilePath = fileScanTask.file().path().toString();
|
||||
if (uniquePaths.add(dataFilePath)) {
|
||||
filePaths.add(dataFilePath);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Store unsorted file paths - lexicographic sorting will be applied in execution
|
||||
icebergFilePaths_ = filePaths;
|
||||
|
||||
if (icebergFilePaths_.isEmpty()) {
|
||||
throw new AnalysisException("No matching partition(s) found.");
|
||||
}
|
||||
} catch (IOException | TableLoadingException e) {
|
||||
throw new AnalysisException("Error loading metadata for Iceberg table", e);
|
||||
}
|
||||
}
|
||||
|
||||
public TShowFilesParams toThrift() {
|
||||
TShowFilesParams params = new TShowFilesParams();
|
||||
params.setTable_name(new TTableName(table_.getDb().getName(), table_.getName()));
|
||||
if (partitionSet_ != null) {
|
||||
// For Iceberg tables, we use selected_files instead of partition_set
|
||||
if (partitionSet_ != null && !(table_ instanceof FeIcebergTable)) {
|
||||
params.setPartition_set(partitionSet_.toThrift());
|
||||
}
|
||||
// For Iceberg tables: icebergFilePaths_ is null for unfiltered queries (no PARTITION
|
||||
// clause) or contains the list of matching file paths for filtered queries.
|
||||
if (table_ instanceof FeIcebergTable && icebergFilePaths_ != null) {
|
||||
params.setSelected_files(icebergFilePaths_);
|
||||
}
|
||||
return params;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -648,7 +648,7 @@ public interface FeFsTable extends FeTable {
|
||||
* @return partition file info, ordered by partition
|
||||
*/
|
||||
public static TResultSet getFiles(FeFsTable table,
|
||||
List<List<TPartitionKeyValue>> partitionSet) {
|
||||
List<List<TPartitionKeyValue>> partitionSet, List<String> icebergFiles) {
|
||||
TResultSet result = new TResultSet();
|
||||
TResultSetMetadata resultSchema = new TResultSetMetadata();
|
||||
result.setSchema(resultSchema);
|
||||
@@ -659,7 +659,8 @@ public interface FeFsTable extends FeTable {
|
||||
result.setRows(new ArrayList<>());
|
||||
|
||||
if (table instanceof FeIcebergTable) {
|
||||
return FeIcebergTable.Utils.getIcebergTableFiles((FeIcebergTable) table, result);
|
||||
return FeIcebergTable.Utils.getIcebergTableFiles(
|
||||
(FeIcebergTable) table, result, icebergFiles);
|
||||
}
|
||||
|
||||
List<? extends FeFsPartition> orderedPartitions;
|
||||
|
||||
@@ -457,8 +457,14 @@ public interface FeIcebergTable extends FeFsTable {
|
||||
/**
|
||||
* Get file info for the given fe iceberg table.
|
||||
*/
|
||||
public static TResultSet getIcebergTableFiles(FeIcebergTable table,
|
||||
TResultSet result) {
|
||||
public static TResultSet getIcebergTableFiles(
|
||||
FeIcebergTable table, TResultSet result, List<String> icebergFiles) {
|
||||
// If icebergFiles is provided (not null), use it for filtered results
|
||||
// Note: Empty lists are handled in frontend analysis and converted to errors
|
||||
if (icebergFiles != null) {
|
||||
return getIcebergTableFilesFromPaths(result, icebergFiles, table);
|
||||
}
|
||||
// No partition filter was specified, show all files
|
||||
List<FileDescriptor> orderedFds = Lists.newArrayList(
|
||||
table.getContentFileStore().getAllFiles());
|
||||
Collections.sort(orderedFds);
|
||||
@@ -474,6 +480,40 @@ public interface FeIcebergTable extends FeFsTable {
|
||||
return result;
|
||||
}
|
||||
|
||||
// Method used when file paths are pre-collected during analysis
|
||||
public static TResultSet getIcebergTableFilesFromPaths(
|
||||
TResultSet result, List<String> filePaths, FeIcebergTable table) {
|
||||
// Sort the file paths for consistent output
|
||||
List<String> sortedFiles = new ArrayList<>(filePaths);
|
||||
Collections.sort(sortedFiles);
|
||||
|
||||
IcebergContentFileStore contentFileStore = table.getContentFileStore();
|
||||
|
||||
for (String filePath : sortedFiles) {
|
||||
TResultRowBuilder rowBuilder = new TResultRowBuilder();
|
||||
rowBuilder.add(filePath);
|
||||
|
||||
// Try to get actual file size from content file store
|
||||
String pathHash = IcebergUtil.getFilePathHash(filePath);
|
||||
FileDescriptor fd = contentFileStore.getDataFileDescriptor(pathHash);
|
||||
if (fd == null) {
|
||||
fd = contentFileStore.getDeleteFileDescriptor(pathHash);
|
||||
}
|
||||
|
||||
if (fd != null) {
|
||||
rowBuilder.add(PrintUtils.printBytes(fd.getFileLength()));
|
||||
} else {
|
||||
rowBuilder.add("Unknown");
|
||||
}
|
||||
|
||||
rowBuilder.add(""); // Partition - empty for now
|
||||
rowBuilder.add(FileSystemUtil.getErasureCodingPolicy(new Path(filePath)));
|
||||
result.addToRows(rowBuilder.get());
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get partition stats for the given fe iceberg table.
|
||||
*/
|
||||
|
||||
@@ -3466,10 +3466,13 @@ public class Frontend {
|
||||
if (table instanceof FeShowFileStmtSupport) {
|
||||
return ((FeShowFileStmtSupport) table).doGetTableFiles(request);
|
||||
} else if (table instanceof FeFsTable) {
|
||||
return FeFsTable.Utils.getFiles((FeFsTable)table, request.getPartition_set());
|
||||
List<String> icebergFiles =
|
||||
request.isSetSelected_files() ? request.getSelected_files() : null;
|
||||
return FeFsTable.Utils.getFiles(
|
||||
(FeFsTable) table, request.getPartition_set(), icebergFiles);
|
||||
} else {
|
||||
throw new InternalException("SHOW FILES only supports Hdfs table. " +
|
||||
"Unsupported table class: " + table.getClass());
|
||||
throw new InternalException("SHOW FILES only supports Hdfs and Iceberg tables. "
|
||||
+ "Unsupported table class: " + table.getClass());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4399,10 +4399,10 @@ public class AnalyzeDDLTest extends FrontendTestBase {
|
||||
// Cannot show files on a non hdfs table.
|
||||
AnalysisError(String.format("show files in functional.alltypes_view %s",
|
||||
partition),
|
||||
"SHOW FILES not applicable to a non hdfs table: functional.alltypes_view");
|
||||
"SHOW FILES not allowed on a view: functional.alltypes_view");
|
||||
AnalysisError(String.format("show files in allcomplextypes.int_array_col %s",
|
||||
partition), createAnalysisCtx("functional"),
|
||||
"SHOW FILES not applicable to a non hdfs table: allcomplextypes.int_array_col");
|
||||
"SHOW FILES not allowed on a nested collection: allcomplextypes.int_array_col");
|
||||
}
|
||||
|
||||
// Not a partition column.
|
||||
@@ -4417,6 +4417,63 @@ public class AnalyzeDDLTest extends FrontendTestBase {
|
||||
// Partition spec does not exist
|
||||
AnalysisError("show files in functional.alltypes partition(year=2010,month=NULL)",
|
||||
"No matching partition(s) found.");
|
||||
|
||||
// Iceberg SHOW FILES tests
|
||||
String icebergPartitioned =
|
||||
"show files in functional_parquet.iceberg_partitioned";
|
||||
String icebergEvolution =
|
||||
"show files in functional_parquet.iceberg_partition_evolution";
|
||||
String icebergNonPartitioned =
|
||||
"show files in functional_parquet.iceberg_non_partitioned";
|
||||
|
||||
// Non-partitioned Iceberg table
|
||||
AnalysisError(icebergNonPartitioned + " partition (user = 'Alan')",
|
||||
"Table is not partitioned: functional_parquet.iceberg_non_partitioned");
|
||||
|
||||
// Valid Iceberg partition filters
|
||||
AnalyzesOk(icebergPartitioned + " partition (action = 'click')");
|
||||
AnalyzesOk(
|
||||
icebergPartitioned + " partition (action = 'click' or action = 'download')");
|
||||
AnalyzesOk(icebergPartitioned + " partition (action in ('click', 'download'))");
|
||||
AnalyzesOk(icebergPartitioned + " partition (hour(event_time) = '2020-01-01-9')");
|
||||
AnalyzesOk(icebergPartitioned + " partition (hour(event_time) > '2020-01-01-01')");
|
||||
AnalyzesOk(icebergPartitioned + " partition (hour(event_time) < '2020-02-01-01')");
|
||||
AnalyzesOk(icebergPartitioned
|
||||
+ " partition (hour(event_time) in ('2020-01-01-9', '2020-01-01-1'))");
|
||||
AnalyzesOk(icebergPartitioned
|
||||
+ " partition (hour(event_time) = '2020-01-01-9', action = 'click')");
|
||||
AnalyzesOk(icebergEvolution + " partition (truncate(4,date_string_col,4) = '1231')");
|
||||
AnalyzesOk(icebergEvolution + " partition (month = 12)");
|
||||
|
||||
// Error cases for Iceberg partition filters
|
||||
// Non-existent partition should throw error (consistent with regular tables)
|
||||
AnalysisError(icebergPartitioned + " partition (action = 'Foo')",
|
||||
"No matching partition(s) found.");
|
||||
AnalysisError(icebergPartitioned + " partition (user = 'Alan')",
|
||||
"Partition exprs cannot contain non-partition column(s): `user`");
|
||||
AnalysisError(
|
||||
icebergPartitioned + " partition (user = 'Alan' or user = 'Lisa' and id > 10)",
|
||||
"Partition exprs cannot contain non-partition column(s): `user`");
|
||||
AnalysisError(icebergPartitioned + " partition (void(action) = 'click')",
|
||||
"VOID transform is not supported for partition selection");
|
||||
AnalysisError(icebergPartitioned + " partition (day(action) = 'Alan')",
|
||||
"Can't filter column 'action' with transform type: 'DAY'");
|
||||
AnalysisError(icebergPartitioned + " partition (action = action)",
|
||||
"Invalid partition filtering expression: action = action");
|
||||
AnalysisError(icebergPartitioned + " partition (action = action and action = 'click' "
|
||||
+ "or hour(event_time) > '2020-01-01-01')",
|
||||
"Invalid partition filtering expression: "
|
||||
+ "action = action AND action = 'click' OR HOUR(event_time) > 438289");
|
||||
AnalysisError(icebergPartitioned + " partition (action)",
|
||||
"Invalid partition filtering expression: action");
|
||||
AnalysisError(icebergPartitioned + " partition (2)",
|
||||
"Invalid partition filtering expression: 2");
|
||||
AnalysisError(icebergPartitioned + " partition (truncate(action))",
|
||||
"BUCKET and TRUNCATE partition transforms should have a parameter");
|
||||
AnalysisError(icebergPartitioned + " partition (truncate('string', action))",
|
||||
"Invalid transform parameter value: string");
|
||||
AnalysisError(icebergPartitioned + " partition (truncate(1, 2, action))",
|
||||
"Invalid partition predicate: truncate(1, 2, action)");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -1346,6 +1346,30 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
|
||||
authzTest.error(accessError("functional"));
|
||||
}
|
||||
|
||||
// Show files for Iceberg tables
|
||||
AuthzTest authzTestIce =
|
||||
authorize("show files in functional_parquet.iceberg_partitioned");
|
||||
for (TPrivilegeLevel privilege : viewMetadataPrivileges()) {
|
||||
authzTestIce.ok(onServer(privilege))
|
||||
.ok(onDatabase("functional_parquet", privilege))
|
||||
.ok(onTable("functional_parquet", "iceberg_partitioned", privilege));
|
||||
}
|
||||
authzTestIce.error(accessError("functional_parquet"));
|
||||
|
||||
// Show files for Iceberg tables with partition filter; more restrictive
|
||||
for (AuthzTest authzTest: new AuthzTest[]{
|
||||
authorize("show files in functional_parquet.iceberg_partitioned " +
|
||||
"partition(action='view')"),
|
||||
authorize("show files in functional_parquet.iceberg_partitioned " +
|
||||
"partition(action='click')")}) {
|
||||
for (TPrivilegeLevel privilege : new TPrivilegeLevel[] {
|
||||
TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER, TPrivilegeLevel.SELECT}) {
|
||||
authzTest.ok(onServer(privilege))
|
||||
.ok(onDatabase("functional_parquet", privilege))
|
||||
.ok(onTable("functional_parquet", "iceberg_partitioned", privilege));
|
||||
}
|
||||
}
|
||||
|
||||
// Show current roles should always be allowed.
|
||||
authorize("show current roles").ok();
|
||||
|
||||
@@ -2644,9 +2668,7 @@ public class AuthorizationStmtTest extends AuthorizationTestBase {
|
||||
.error(accessError(true, "nodb"), onServer(true, allExcept(
|
||||
TPrivilegeLevel.ALL, TPrivilegeLevel.OWNER)));
|
||||
}
|
||||
} finally {
|
||||
authzCatalog_.removeRole("foo");
|
||||
}
|
||||
} finally { authzCatalog_.removeRole("foo"); }
|
||||
boolean exceptionThrown = false;
|
||||
try {
|
||||
parseAndAnalyze("alter database functional set owner role foo",
|
||||
|
||||
Reference in New Issue
Block a user