diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift index 2fa27cd18..766f8cd11 100644 --- a/common/thrift/JniCatalog.thrift +++ b/common/thrift/JniCatalog.thrift @@ -449,6 +449,10 @@ struct TAlterTableExecuteRemoveOrphanFilesParams { 1: required i64 older_than_millis } +// Parameters for ALTER TABLE EXECUTE REPAIR_METADATA operations. +struct TAlterTableExecuteRepairMetadataParams { +} + // Parameters for ALTER TABLE EXECUTE ... operations. struct TAlterTableExecuteParams { // Parameters for ALTER TABLE EXECUTE EXPIRE_SNAPSHOTS @@ -459,6 +463,9 @@ struct TAlterTableExecuteParams { // Parameters for ALTER TABLE EXECUTE REMOVE_ORPHAN_FILES 3: optional TAlterTableExecuteRemoveOrphanFilesParams remove_orphan_files_params + + // True iff it is an ALTER TABLE EXECUTE REPAIR statement. + 4: optional TAlterTableExecuteRepairMetadataParams repair_metadata_params } // Parameters for all ALTER TABLE commands. diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteRepairMetadataStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteRepairMetadataStmt.java new file mode 100644 index 000000000..10102d9a4 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteRepairMetadataStmt.java @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.analysis; + +import hiveexec.com.google.common.base.Preconditions; +import org.apache.impala.catalog.FeTable; +import org.apache.impala.catalog.FeIcebergTable; +import org.apache.impala.common.AnalysisException; +import org.apache.impala.thrift.TAlterTableExecuteParams; +import org.apache.impala.thrift.TAlterTableExecuteRepairMetadataParams; +import org.apache.impala.thrift.TAlterTableParams; +import org.apache.impala.thrift.TAlterTableType; + +public class AlterTableExecuteRepairMetadataStmt extends AlterTableExecuteStmt { + + protected final static String USAGE = "EXECUTE REPAIR_METADATA()"; + + protected AlterTableExecuteRepairMetadataStmt(TableName tableName, Expr fnCallExpr) { + super(tableName, fnCallExpr); + } + + @Override + public String getOperation() { return "EXECUTE REPAIR_METADATA"; } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + super.analyze(analyzer); + FeTable table = getTargetTable(); + if (!(table instanceof FeIcebergTable)) { + throw new AnalysisException( + "ALTER TABLE EXECUTE REPAIR_METADATA is only supported " + + "for Iceberg tables: " + table.getTableName()); + } + analyzeFunctionCallExpr(analyzer, USAGE); + } + + @Override + public TAlterTableParams toThrift() { + TAlterTableParams params = super.toThrift(); + params.setAlter_type(TAlterTableType.EXECUTE); + TAlterTableExecuteParams executeParams = new TAlterTableExecuteParams(); + TAlterTableExecuteRepairMetadataParams repairMetadataParams = + new TAlterTableExecuteRepairMetadataParams(); + executeParams.setRepair_metadata_params(repairMetadataParams); + params.setSet_execute_params(executeParams); + return params; + } +} diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java index 9a65dddcc..501aded9f 100644 --- a/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java @@ -19,7 +19,6 @@ package org.apache.impala.analysis; import com.google.common.base.Preconditions; -import org.apache.commons.lang3.StringUtils; import org.apache.impala.common.AnalysisException; /** @@ -70,12 +69,15 @@ public class AlterTableExecuteStmt extends AlterTableStmt { case "ROLLBACK": return new AlterTableExecuteRollbackStmt(tableName, fnCallExpr); case "REMOVE_ORPHAN_FILES": return new AlterTableExecuteRemoveOrphanFilesStmt(tableName, fnCallExpr); + case "REPAIR_METADATA": + return new AlterTableExecuteRepairMetadataStmt(tableName, fnCallExpr); default: throw new AnalysisException(String.format("'%s' is not supported by ALTER " + "TABLE EXECUTE. Supported operations are: " + "EXPIRE_SNAPSHOTS(), " + "ROLLBACK(), " - + "REMOVE_ORPHAN_FILES().", + + "REMOVE_ORPHAN_FILES()." + + "REPAIR_METADATA(), ", functionNameOrig)); } } @@ -84,14 +86,26 @@ public class AlterTableExecuteStmt extends AlterTableStmt { throws AnalysisException { // fnCallExpr_ analyzed here manually, because it is not an actual function but a // catalog operation. - String fnName = fnCallExpr_.getFnName().toString(); - Preconditions.checkState(StringUtils.equalsAnyIgnoreCase( - fnName, "EXPIRE_SNAPSHOTS", "ROLLBACK", "REMOVE_ORPHAN_FILES")); - if (fnCallExpr_.getParams().size() != 1) { - throw new AnalysisException( - usage + " must have one parameter: " + fnCallExpr_.toSql()); + String fnName = fnCallExpr_.getFnName().toString().toUpperCase(); + switch (fnName) { + case "EXPIRE_SNAPSHOTS": + case "ROLLBACK": + case "REMOVE_ORPHAN_FILES": + if (fnCallExpr_.getParams().size() != 1) { + throw new AnalysisException( + usage + " must have one parameter: " + fnCallExpr_.toSql()); + } + fnParamValue_ = fnCallExpr_.getParams().exprs().get(0); + break; + case "REPAIR_METADATA": + if (fnCallExpr_.getParams().size() != 0) { + throw new AnalysisException( + usage + " should have no parameter: " + fnCallExpr_.toSql()); + } + break; + default: + Preconditions.checkState(false, "Invalid function call in ALTER TABLE EXECUTE."); } - fnParamValue_ = fnCallExpr_.getParams().exprs().get(0); } } diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/ImpalaRepairIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/ImpalaRepairIcebergTable.java new file mode 100644 index 000000000..4ed950cb3 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/ImpalaRepairIcebergTable.java @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.catalog.iceberg; + +import org.apache.iceberg.DeleteFiles; +import org.apache.iceberg.Transaction; +import org.apache.impala.catalog.CatalogException; +import org.apache.impala.catalog.FeIcebergTable; +import org.apache.impala.catalog.IcebergContentFileStore; +import org.apache.impala.service.BackendConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; + + +public class ImpalaRepairIcebergTable { + + private static final Logger LOG = + LoggerFactory.getLogger(ImpalaRepairIcebergTable.class); + private final FeIcebergTable table_; + public ImpalaRepairIcebergTable(FeIcebergTable table) { + table_ = table; + } + + public int execute(Transaction iceTxn) throws CatalogException { + Collection missingFiles; + IcebergContentFileStore fileStore = table_.getContentFileStore(); + if (fileStore.hasMissingFile()) { + missingFiles = fileStore.getMissingFiles(); + } else { + missingFiles = getMissingFiles(fileStore); + } + // Delete all files from the missing files collection + int numRemovedReferences = missingFiles.size(); + if (numRemovedReferences > 0) { + DeleteFiles deleteFiles = iceTxn.newDelete(); + for (String path : missingFiles) { + deleteFiles.deleteFile(path); + } + deleteFiles.commit(); + LOG.info("Removed {} files during table repair: {}", + numRemovedReferences, getFilesLog(missingFiles)); + } else { + LOG.info("No files were removed during table repair."); + } + return numRemovedReferences; + } + + private List getMissingFiles(IcebergContentFileStore fileStore) + throws CatalogException { + // Check all data files in parallel and create a list of dangling references. + List missingFiles; + ForkJoinPool forkJoinPool = null; + try { + forkJoinPool = new ForkJoinPool(BackendConfig.INSTANCE.icebergCatalogNumThreads()); + missingFiles = forkJoinPool.submit(() -> + StreamSupport.stream( + fileStore.getAllDataFiles().spliterator(), /*parallel=*/true) + .map(fileDesc -> fileDesc.getAbsolutePath(table_.getLocation())) + .filter(path -> !table_.getIcebergApiTable().io().newInputFile(path).exists()) + .collect(Collectors.toList()) + ).get(); + } catch (InterruptedException | ExecutionException e) { + throw new CatalogException(e.getMessage(), e); + } finally { + if (forkJoinPool != null) { + forkJoinPool.shutdown(); + } + } + return missingFiles; + } + + private static String getFilesLog(Collection missingFiles) { + int numPathsToLog = 3; + if (LOG.isTraceEnabled()) { + numPathsToLog = 1000; + } + int numRemoved = missingFiles.size(); + String fileNames = Joiner.on(", ").join( + Iterables.limit(missingFiles, numPathsToLog)); + if (numRemoved > numPathsToLog) { + int remaining = numRemoved - numPathsToLog; + fileNames += String.format(", and %d more.", remaining); + } + return fileNames; + } +} diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 163cca702..8cf929946 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -1651,7 +1651,18 @@ public class CatalogOpExecutor { } else if (setExecuteParams.isSetRemove_orphan_files_params()) { throw new IllegalStateException( "Alter table execute REMOVE_ORPHAN_FILES should not use " - + "Iceberg Transaction."); + + "Iceberg Transaction."); + } else if (setExecuteParams.isSetRepair_metadata_params()) { + int numRemovedReferences = + IcebergCatalogOpExecutor.alterTableExecuteRepair(tbl, iceTxn); + // Do not commit empty transaction if there were no missing files to remove. + if (numRemovedReferences == 0) { + addSummary(response, "No missing data files detected."); + catalogTimeline.markEvent("Abandoned empty Iceberg transaction"); + return false; + } + addSummary(response, "Iceberg table repaired by deleting " + + numRemovedReferences + " manifest entries of missing data files."); } else { // Cannot happen, but throw just in case. throw new IllegalStateException( diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java index 90e159bf9..3904dc5c0 100644 --- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java @@ -60,6 +60,7 @@ import org.apache.impala.catalog.iceberg.GroupedContentFiles; import org.apache.impala.catalog.iceberg.IcebergCatalog; import org.apache.impala.catalog.iceberg.IcebergHiveCatalog; import org.apache.impala.catalog.iceberg.ImpalaIcebergDeleteOrphanFiles; +import org.apache.impala.catalog.iceberg.ImpalaRepairIcebergTable; import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.fb.FbIcebergColumnStats; import org.apache.impala.fb.FbIcebergDataFile; @@ -268,6 +269,16 @@ public class IcebergCatalogOpExecutor { return "Remove orphan files executed."; } + /** + * Executes an ALTER TABLE EXECUTE REPAIR_METADATA. + * @return The number of removed missing data files. + */ + public static int alterTableExecuteRepair(FeIcebergTable tbl, Transaction iceTxn) + throws CatalogException { + ImpalaRepairIcebergTable repair = new ImpalaRepairIcebergTable(tbl); + return repair.execute(iceTxn); + } + /** * Deletes files related to specific set of partitions */ diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java index 876265e3b..6fac45273 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java @@ -4883,6 +4883,22 @@ public class AnalyzeDDLTest extends FrontendTestBase { + "'1111' cannot be converted to a TIMESTAMP"); } + @Test + public void TestAlterExecuteRepairMetadata() { + AnalyzesOk("alter table functional_parquet.iceberg_partitioned execute " + + "repair_metadata();"); + + // Negative tests + AnalysisError("alter table nodb.alltypes execute repair_metadata();", + "Could not resolve table reference: 'nodb.alltypes'"); + AnalysisError("alter table functional.alltypes execute repair_metadata();", + "ALTER TABLE EXECUTE REPAIR_METADATA is only supported for Iceberg tables: " + + "functional.alltypes"); + AnalysisError("alter table functional_parquet.iceberg_partitioned execute " + + "repair_metadata('2024-02-11 10:00:00');", + "EXECUTE REPAIR_METADATA() should have no parameter"); + } + private static String buildLongOwnerName() { StringBuilder comment = new StringBuilder(); for (int i = 0; i < MetaStoreUtil.MAX_OWNER_LENGTH + 5; i++) { diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py index d5683198c..762fb4d50 100644 --- a/tests/query_test/test_iceberg.py +++ b/tests/query_test/test_iceberg.py @@ -2254,6 +2254,46 @@ class TestIcebergV2Table(IcebergTestSuite): tbl_name, second_snapshot.get_snapshot_id())) assert "partitions=2/unknown" in selective_time_travel_data.runtime_profile + def test_table_repair(self, unique_database): + tbl_name = 'tbl_with_removed_files' + db_tbl = unique_database + "." + tbl_name + repair_query = "alter table {0} execute repair_metadata()" + with self.create_impala_client() as impalad_client: + impalad_client.execute( + "create table {0} (i int) stored as iceberg tblproperties('format-version'='2')" + .format(db_tbl)) + insert_q = "insert into {0} values ({1})" + self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 1)) + self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 2)) + self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 3)) + self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 4)) + self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 5)) + result = impalad_client.execute('select i from {0} order by i'.format(db_tbl)) + assert result.data == ['1', '2', '3', '4', '5'] + + TABLE_PATH = '{0}/{1}.db/{2}'.format(WAREHOUSE, unique_database, tbl_name) + DATA_PATH = os.path.join(TABLE_PATH, "data") + + # Check that table remains intact if there are no missing files + result = self.execute_query_expect_success( + impalad_client, repair_query.format(db_tbl)) + assert result.data[0] == "No missing data files detected." + result = impalad_client.execute('select i from {0} order by i'.format(db_tbl)) + assert result.data == ['1', '2', '3', '4', '5'] + + # Delete 2 data files from the file system directly to corrupt the table. + data_files = self.filesystem_client.ls(DATA_PATH) + self.filesystem_client.delete_file_dir(DATA_PATH + "/" + data_files[0]) + self.filesystem_client.delete_file_dir(DATA_PATH + "/" + data_files[1]) + self.execute_query_expect_success(impalad_client, "invalidate metadata") + result = self.execute_query_expect_success( + impalad_client, repair_query.format(db_tbl)) + assert result.data[0] == \ + "Iceberg table repaired by deleting 2 manifest entries of missing data files." + result = impalad_client.execute('select * from {0} order by i'.format(db_tbl)) + assert len(result.data) == 3 + + # Tests to exercise the DIRECTED distribution mode for V2 Iceberg tables. Note, that most # of the test coverage is in TestIcebergV2Table.test_read_position_deletes but since it # runs also with the V2 optimizations setting turned off, some tests were moved here.