IMPALA-13725: Add Iceberg table repair functionalities

In some cases users delete files directly from storage without
going through the Iceberg API, e.g. they remove old partitions.

This corrupts the table, and makes queries that try to read the
missing files fail.
This change introduces a repair statement that deletes the
dangling references of missing files from the metadata.
Note that the table cannot be repaired if there are missing
delete files because Iceberg's DeleteFiles API which is used
to execute the operation allows removing only data files.

Testing:
 - E2E
   - HDFS
   - S3, Ozone
 - analysis

Change-Id: I514403acaa3b8c0a7b2581d676b82474d846d38e
Reviewed-on: http://gerrit.cloudera.org:8080/23512
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Noemi Pap-Takacs
2025-10-07 16:45:31 +02:00
parent 2ac5a24dc0
commit fdad9d3204
8 changed files with 283 additions and 10 deletions

View File

@@ -449,6 +449,10 @@ struct TAlterTableExecuteRemoveOrphanFilesParams {
1: required i64 older_than_millis 1: required i64 older_than_millis
} }
// Parameters for ALTER TABLE EXECUTE REPAIR_METADATA operations.
struct TAlterTableExecuteRepairMetadataParams {
}
// Parameters for ALTER TABLE EXECUTE ... operations. // Parameters for ALTER TABLE EXECUTE ... operations.
struct TAlterTableExecuteParams { struct TAlterTableExecuteParams {
// Parameters for ALTER TABLE EXECUTE EXPIRE_SNAPSHOTS // Parameters for ALTER TABLE EXECUTE EXPIRE_SNAPSHOTS
@@ -459,6 +463,9 @@ struct TAlterTableExecuteParams {
// Parameters for ALTER TABLE EXECUTE REMOVE_ORPHAN_FILES // Parameters for ALTER TABLE EXECUTE REMOVE_ORPHAN_FILES
3: optional TAlterTableExecuteRemoveOrphanFilesParams remove_orphan_files_params 3: optional TAlterTableExecuteRemoveOrphanFilesParams remove_orphan_files_params
// True iff it is an ALTER TABLE EXECUTE REPAIR statement.
4: optional TAlterTableExecuteRepairMetadataParams repair_metadata_params
} }
// Parameters for all ALTER TABLE commands. // Parameters for all ALTER TABLE commands.

View File

@@ -0,0 +1,63 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.analysis;
import hiveexec.com.google.common.base.Preconditions;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.FeIcebergTable;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.thrift.TAlterTableExecuteParams;
import org.apache.impala.thrift.TAlterTableExecuteRepairMetadataParams;
import org.apache.impala.thrift.TAlterTableParams;
import org.apache.impala.thrift.TAlterTableType;
public class AlterTableExecuteRepairMetadataStmt extends AlterTableExecuteStmt {
protected final static String USAGE = "EXECUTE REPAIR_METADATA()";
protected AlterTableExecuteRepairMetadataStmt(TableName tableName, Expr fnCallExpr) {
super(tableName, fnCallExpr);
}
@Override
public String getOperation() { return "EXECUTE REPAIR_METADATA"; }
@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
super.analyze(analyzer);
FeTable table = getTargetTable();
if (!(table instanceof FeIcebergTable)) {
throw new AnalysisException(
"ALTER TABLE EXECUTE REPAIR_METADATA is only supported "
+ "for Iceberg tables: " + table.getTableName());
}
analyzeFunctionCallExpr(analyzer, USAGE);
}
@Override
public TAlterTableParams toThrift() {
TAlterTableParams params = super.toThrift();
params.setAlter_type(TAlterTableType.EXECUTE);
TAlterTableExecuteParams executeParams = new TAlterTableExecuteParams();
TAlterTableExecuteRepairMetadataParams repairMetadataParams =
new TAlterTableExecuteRepairMetadataParams();
executeParams.setRepair_metadata_params(repairMetadataParams);
params.setSet_execute_params(executeParams);
return params;
}
}

View File

@@ -19,7 +19,6 @@ package org.apache.impala.analysis;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.apache.impala.common.AnalysisException; import org.apache.impala.common.AnalysisException;
/** /**
@@ -70,12 +69,15 @@ public class AlterTableExecuteStmt extends AlterTableStmt {
case "ROLLBACK": return new AlterTableExecuteRollbackStmt(tableName, fnCallExpr); case "ROLLBACK": return new AlterTableExecuteRollbackStmt(tableName, fnCallExpr);
case "REMOVE_ORPHAN_FILES": case "REMOVE_ORPHAN_FILES":
return new AlterTableExecuteRemoveOrphanFilesStmt(tableName, fnCallExpr); return new AlterTableExecuteRemoveOrphanFilesStmt(tableName, fnCallExpr);
case "REPAIR_METADATA":
return new AlterTableExecuteRepairMetadataStmt(tableName, fnCallExpr);
default: default:
throw new AnalysisException(String.format("'%s' is not supported by ALTER " throw new AnalysisException(String.format("'%s' is not supported by ALTER "
+ "TABLE <table> EXECUTE. Supported operations are: " + "TABLE <table> EXECUTE. Supported operations are: "
+ "EXPIRE_SNAPSHOTS(<expression>), " + "EXPIRE_SNAPSHOTS(<expression>), "
+ "ROLLBACK(<expression>), " + "ROLLBACK(<expression>), "
+ "REMOVE_ORPHAN_FILES(<expression>).", + "REMOVE_ORPHAN_FILES(<expression>)."
+ "REPAIR_METADATA(), ",
functionNameOrig)); functionNameOrig));
} }
} }
@@ -84,14 +86,26 @@ public class AlterTableExecuteStmt extends AlterTableStmt {
throws AnalysisException { throws AnalysisException {
// fnCallExpr_ analyzed here manually, because it is not an actual function but a // fnCallExpr_ analyzed here manually, because it is not an actual function but a
// catalog operation. // catalog operation.
String fnName = fnCallExpr_.getFnName().toString(); String fnName = fnCallExpr_.getFnName().toString().toUpperCase();
Preconditions.checkState(StringUtils.equalsAnyIgnoreCase( switch (fnName) {
fnName, "EXPIRE_SNAPSHOTS", "ROLLBACK", "REMOVE_ORPHAN_FILES")); case "EXPIRE_SNAPSHOTS":
if (fnCallExpr_.getParams().size() != 1) { case "ROLLBACK":
throw new AnalysisException( case "REMOVE_ORPHAN_FILES":
usage + " must have one parameter: " + fnCallExpr_.toSql()); if (fnCallExpr_.getParams().size() != 1) {
throw new AnalysisException(
usage + " must have one parameter: " + fnCallExpr_.toSql());
}
fnParamValue_ = fnCallExpr_.getParams().exprs().get(0);
break;
case "REPAIR_METADATA":
if (fnCallExpr_.getParams().size() != 0) {
throw new AnalysisException(
usage + " should have no parameter: " + fnCallExpr_.toSql());
}
break;
default:
Preconditions.checkState(false, "Invalid function call in ALTER TABLE EXECUTE.");
} }
fnParamValue_ = fnCallExpr_.getParams().exprs().get(0);
} }
} }

View File

@@ -0,0 +1,111 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog.iceberg;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.Transaction;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.FeIcebergTable;
import org.apache.impala.catalog.IcebergContentFileStore;
import org.apache.impala.service.BackendConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
public class ImpalaRepairIcebergTable {
private static final Logger LOG =
LoggerFactory.getLogger(ImpalaRepairIcebergTable.class);
private final FeIcebergTable table_;
public ImpalaRepairIcebergTable(FeIcebergTable table) {
table_ = table;
}
public int execute(Transaction iceTxn) throws CatalogException {
Collection<String> missingFiles;
IcebergContentFileStore fileStore = table_.getContentFileStore();
if (fileStore.hasMissingFile()) {
missingFiles = fileStore.getMissingFiles();
} else {
missingFiles = getMissingFiles(fileStore);
}
// Delete all files from the missing files collection
int numRemovedReferences = missingFiles.size();
if (numRemovedReferences > 0) {
DeleteFiles deleteFiles = iceTxn.newDelete();
for (String path : missingFiles) {
deleteFiles.deleteFile(path);
}
deleteFiles.commit();
LOG.info("Removed {} files during table repair: {}",
numRemovedReferences, getFilesLog(missingFiles));
} else {
LOG.info("No files were removed during table repair.");
}
return numRemovedReferences;
}
private List<String> getMissingFiles(IcebergContentFileStore fileStore)
throws CatalogException {
// Check all data files in parallel and create a list of dangling references.
List<String> missingFiles;
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(BackendConfig.INSTANCE.icebergCatalogNumThreads());
missingFiles = forkJoinPool.submit(() ->
StreamSupport.stream(
fileStore.getAllDataFiles().spliterator(), /*parallel=*/true)
.map(fileDesc -> fileDesc.getAbsolutePath(table_.getLocation()))
.filter(path -> !table_.getIcebergApiTable().io().newInputFile(path).exists())
.collect(Collectors.toList())
).get();
} catch (InterruptedException | ExecutionException e) {
throw new CatalogException(e.getMessage(), e);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}
return missingFiles;
}
private static String getFilesLog(Collection<String> missingFiles) {
int numPathsToLog = 3;
if (LOG.isTraceEnabled()) {
numPathsToLog = 1000;
}
int numRemoved = missingFiles.size();
String fileNames = Joiner.on(", ").join(
Iterables.limit(missingFiles, numPathsToLog));
if (numRemoved > numPathsToLog) {
int remaining = numRemoved - numPathsToLog;
fileNames += String.format(", and %d more.", remaining);
}
return fileNames;
}
}

View File

@@ -1651,7 +1651,18 @@ public class CatalogOpExecutor {
} else if (setExecuteParams.isSetRemove_orphan_files_params()) { } else if (setExecuteParams.isSetRemove_orphan_files_params()) {
throw new IllegalStateException( throw new IllegalStateException(
"Alter table execute REMOVE_ORPHAN_FILES should not use " "Alter table execute REMOVE_ORPHAN_FILES should not use "
+ "Iceberg Transaction."); + "Iceberg Transaction.");
} else if (setExecuteParams.isSetRepair_metadata_params()) {
int numRemovedReferences =
IcebergCatalogOpExecutor.alterTableExecuteRepair(tbl, iceTxn);
// Do not commit empty transaction if there were no missing files to remove.
if (numRemovedReferences == 0) {
addSummary(response, "No missing data files detected.");
catalogTimeline.markEvent("Abandoned empty Iceberg transaction");
return false;
}
addSummary(response, "Iceberg table repaired by deleting "
+ numRemovedReferences + " manifest entries of missing data files.");
} else { } else {
// Cannot happen, but throw just in case. // Cannot happen, but throw just in case.
throw new IllegalStateException( throw new IllegalStateException(

View File

@@ -60,6 +60,7 @@ import org.apache.impala.catalog.iceberg.GroupedContentFiles;
import org.apache.impala.catalog.iceberg.IcebergCatalog; import org.apache.impala.catalog.iceberg.IcebergCatalog;
import org.apache.impala.catalog.iceberg.IcebergHiveCatalog; import org.apache.impala.catalog.iceberg.IcebergHiveCatalog;
import org.apache.impala.catalog.iceberg.ImpalaIcebergDeleteOrphanFiles; import org.apache.impala.catalog.iceberg.ImpalaIcebergDeleteOrphanFiles;
import org.apache.impala.catalog.iceberg.ImpalaRepairIcebergTable;
import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.fb.FbIcebergColumnStats; import org.apache.impala.fb.FbIcebergColumnStats;
import org.apache.impala.fb.FbIcebergDataFile; import org.apache.impala.fb.FbIcebergDataFile;
@@ -268,6 +269,16 @@ public class IcebergCatalogOpExecutor {
return "Remove orphan files executed."; return "Remove orphan files executed.";
} }
/**
* Executes an ALTER TABLE EXECUTE REPAIR_METADATA.
* @return The number of removed missing data files.
*/
public static int alterTableExecuteRepair(FeIcebergTable tbl, Transaction iceTxn)
throws CatalogException {
ImpalaRepairIcebergTable repair = new ImpalaRepairIcebergTable(tbl);
return repair.execute(iceTxn);
}
/** /**
* Deletes files related to specific set of partitions * Deletes files related to specific set of partitions
*/ */

View File

@@ -4883,6 +4883,22 @@ public class AnalyzeDDLTest extends FrontendTestBase {
+ "'1111' cannot be converted to a TIMESTAMP"); + "'1111' cannot be converted to a TIMESTAMP");
} }
@Test
public void TestAlterExecuteRepairMetadata() {
AnalyzesOk("alter table functional_parquet.iceberg_partitioned execute "
+ "repair_metadata();");
// Negative tests
AnalysisError("alter table nodb.alltypes execute repair_metadata();",
"Could not resolve table reference: 'nodb.alltypes'");
AnalysisError("alter table functional.alltypes execute repair_metadata();",
"ALTER TABLE EXECUTE REPAIR_METADATA is only supported for Iceberg tables: "
+ "functional.alltypes");
AnalysisError("alter table functional_parquet.iceberg_partitioned execute "
+ "repair_metadata('2024-02-11 10:00:00');",
"EXECUTE REPAIR_METADATA() should have no parameter");
}
private static String buildLongOwnerName() { private static String buildLongOwnerName() {
StringBuilder comment = new StringBuilder(); StringBuilder comment = new StringBuilder();
for (int i = 0; i < MetaStoreUtil.MAX_OWNER_LENGTH + 5; i++) { for (int i = 0; i < MetaStoreUtil.MAX_OWNER_LENGTH + 5; i++) {

View File

@@ -2254,6 +2254,46 @@ class TestIcebergV2Table(IcebergTestSuite):
tbl_name, second_snapshot.get_snapshot_id())) tbl_name, second_snapshot.get_snapshot_id()))
assert "partitions=2/unknown" in selective_time_travel_data.runtime_profile assert "partitions=2/unknown" in selective_time_travel_data.runtime_profile
def test_table_repair(self, unique_database):
tbl_name = 'tbl_with_removed_files'
db_tbl = unique_database + "." + tbl_name
repair_query = "alter table {0} execute repair_metadata()"
with self.create_impala_client() as impalad_client:
impalad_client.execute(
"create table {0} (i int) stored as iceberg tblproperties('format-version'='2')"
.format(db_tbl))
insert_q = "insert into {0} values ({1})"
self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 1))
self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 2))
self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 3))
self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 4))
self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 5))
result = impalad_client.execute('select i from {0} order by i'.format(db_tbl))
assert result.data == ['1', '2', '3', '4', '5']
TABLE_PATH = '{0}/{1}.db/{2}'.format(WAREHOUSE, unique_database, tbl_name)
DATA_PATH = os.path.join(TABLE_PATH, "data")
# Check that table remains intact if there are no missing files
result = self.execute_query_expect_success(
impalad_client, repair_query.format(db_tbl))
assert result.data[0] == "No missing data files detected."
result = impalad_client.execute('select i from {0} order by i'.format(db_tbl))
assert result.data == ['1', '2', '3', '4', '5']
# Delete 2 data files from the file system directly to corrupt the table.
data_files = self.filesystem_client.ls(DATA_PATH)
self.filesystem_client.delete_file_dir(DATA_PATH + "/" + data_files[0])
self.filesystem_client.delete_file_dir(DATA_PATH + "/" + data_files[1])
self.execute_query_expect_success(impalad_client, "invalidate metadata")
result = self.execute_query_expect_success(
impalad_client, repair_query.format(db_tbl))
assert result.data[0] == \
"Iceberg table repaired by deleting 2 manifest entries of missing data files."
result = impalad_client.execute('select * from {0} order by i'.format(db_tbl))
assert len(result.data) == 3
# Tests to exercise the DIRECTED distribution mode for V2 Iceberg tables. Note, that most # Tests to exercise the DIRECTED distribution mode for V2 Iceberg tables. Note, that most
# of the test coverage is in TestIcebergV2Table.test_read_position_deletes but since it # of the test coverage is in TestIcebergV2Table.test_read_position_deletes but since it
# runs also with the V2 optimizations setting turned off, some tests were moved here. # runs also with the V2 optimizations setting turned off, some tests were moved here.