mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
IMPALA-12337: Implement delete orphan files for Iceberg table
This patch implements delete orphan files query for Iceberg table. The following statement becomes available for Iceberg tables: - ALTER TABLE <tbl> EXECUTE remove_orphan_files(<timestamp>) The bulk of implementation copies Hive's implementation of org.apache.iceberg.actions.DeleteOrphanFiles interface (HIVE-27906, 6b2e21a93ef3c1776b689a7953fc59dbf52e4be4), which this patch rename to ImpalaIcebergDeleteOrphanFiles.java. Upon execute(), ImpalaIcebergDeleteOrphanFiles class instance will gather all URI of valid data files and Iceberg metadata files using Iceberg API. These valid URIs then will be compared to recursive file listing obtained through Hadoop FileSystem API under table's 'data' and 'metadata' directory accordingly. Any unmatched URI from FileSystem API listing that has modification time less than 'olderThanTimestamp' parameter will then be removed via Iceberg FileIO API of given Iceberg table. Note that this is a destructive query that will wipe out any files within Iceberg table's 'data' and 'metadata' directory that is not addressable by any valid snapshots. The execution happens in CatalogD via IcebergCatalogOpExecutor.alterTableExecuteRemoveOrphanFiles(). CatalogD supplied CatalogOpExecutor.icebergExecutorService_ as executor service to execute the Iceberg API planFiles and FileIO API for deletion. Also fixed toSql() implementation for all ALTER TABLE EXECUTE queries. Testing: - Add FE and EE tests. Change-Id: I5979cdf15048d5a2c4784918533f65f32e888de0 Reviewed-on: http://gerrit.cloudera.org:8080/23042 Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Reviewed-by: Zoltan Borok-Nagy <boroknagyz@cloudera.com>
This commit is contained in:
@@ -444,6 +444,11 @@ struct TAlterTableExecuteRollbackParams {
|
|||||||
3: optional i64 snapshot_id
|
3: optional i64 snapshot_id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Parameters for ALTER TABLE EXECUTE REMOVE_ORPHAN_FILES operations.
|
||||||
|
struct TAlterTableExecuteRemoveOrphanFilesParams {
|
||||||
|
1: required i64 older_than_millis
|
||||||
|
}
|
||||||
|
|
||||||
// Parameters for ALTER TABLE EXECUTE ... operations.
|
// Parameters for ALTER TABLE EXECUTE ... operations.
|
||||||
struct TAlterTableExecuteParams {
|
struct TAlterTableExecuteParams {
|
||||||
// Parameters for ALTER TABLE EXECUTE EXPIRE_SNAPSHOTS
|
// Parameters for ALTER TABLE EXECUTE EXPIRE_SNAPSHOTS
|
||||||
@@ -451,6 +456,9 @@ struct TAlterTableExecuteParams {
|
|||||||
|
|
||||||
// Parameters for ALTER TABLE EXECUTE ROLLBACK
|
// Parameters for ALTER TABLE EXECUTE ROLLBACK
|
||||||
2: optional TAlterTableExecuteRollbackParams execute_rollback_params
|
2: optional TAlterTableExecuteRollbackParams execute_rollback_params
|
||||||
|
|
||||||
|
// Parameters for ALTER TABLE EXECUTE REMOVE_ORPHAN_FILES
|
||||||
|
3: optional TAlterTableExecuteRemoveOrphanFilesParams remove_orphan_files_params
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parameters for all ALTER TABLE commands.
|
// Parameters for all ALTER TABLE commands.
|
||||||
|
|||||||
@@ -56,11 +56,6 @@ public class AlterTableExecuteExpireSnapshotsStmt extends AlterTableExecuteStmt
|
|||||||
analyzeOlderThan(analyzer);
|
analyzeOlderThan(analyzer);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toSql(ToSqlOptions options) {
|
|
||||||
return fnCallExpr_.toSql();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TAlterTableParams toThrift() {
|
public TAlterTableParams toThrift() {
|
||||||
TAlterTableParams params = super.toThrift();
|
TAlterTableParams params = super.toThrift();
|
||||||
@@ -78,7 +73,8 @@ public class AlterTableExecuteExpireSnapshotsStmt extends AlterTableExecuteStmt
|
|||||||
Preconditions.checkNotNull(fnParamValue_);
|
Preconditions.checkNotNull(fnParamValue_);
|
||||||
fnParamValue_.analyze(analyzer);
|
fnParamValue_.analyze(analyzer);
|
||||||
if (!fnParamValue_.isConstant()) {
|
if (!fnParamValue_.isConstant()) {
|
||||||
throw new AnalysisException(USAGE + " must be a constant expression: " + toSql());
|
throw new AnalysisException(
|
||||||
|
USAGE + " must be a constant expression: " + fnCallExpr_.toSql());
|
||||||
}
|
}
|
||||||
if (fnParamValue_.getType().isStringType()) {
|
if (fnParamValue_.getType().isStringType()) {
|
||||||
fnParamValue_ = new CastExpr(Type.TIMESTAMP, fnParamValue_);
|
fnParamValue_ = new CastExpr(Type.TIMESTAMP, fnParamValue_);
|
||||||
|
|||||||
@@ -0,0 +1,125 @@
|
|||||||
|
// Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
// or more contributor license agreements. See the NOTICE file
|
||||||
|
// distributed with this work for additional information
|
||||||
|
// regarding copyright ownership. The ASF licenses this file
|
||||||
|
// to you under the Apache License, Version 2.0 (the
|
||||||
|
// "License"); you may not use this file except in compliance
|
||||||
|
// with the License. You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing,
|
||||||
|
// software distributed under the License is distributed on an
|
||||||
|
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
// KIND, either express or implied. See the License for the
|
||||||
|
// specific language governing permissions and limitations
|
||||||
|
// under the License.
|
||||||
|
|
||||||
|
package org.apache.impala.analysis;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
import org.apache.impala.catalog.FeIcebergTable;
|
||||||
|
import org.apache.impala.catalog.FeTable;
|
||||||
|
import org.apache.impala.catalog.Type;
|
||||||
|
import org.apache.impala.common.AnalysisException;
|
||||||
|
import org.apache.impala.common.InternalException;
|
||||||
|
import org.apache.impala.thrift.TAlterTableExecuteParams;
|
||||||
|
import org.apache.impala.thrift.TAlterTableExecuteRemoveOrphanFilesParams;
|
||||||
|
import org.apache.impala.thrift.TAlterTableParams;
|
||||||
|
import org.apache.impala.thrift.TAlterTableType;
|
||||||
|
import org.apache.impala.util.ExprUtil;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents an ALTER TABLE EXECUTE REMOVE_ORPHAN_FILES(<parameter>) statement.
|
||||||
|
* Valid <parameter> can either be:
|
||||||
|
* - Timestamp expression, such as (NOW() - interval 5 days).
|
||||||
|
* - Timestamp literal, such as '2025-01-01 10:00:00'.
|
||||||
|
* However, user must be careful to not pass NOW() as timestamp expression because
|
||||||
|
* it will remove files of in-progress operations.
|
||||||
|
*/
|
||||||
|
public class AlterTableExecuteRemoveOrphanFilesStmt extends AlterTableExecuteStmt {
|
||||||
|
public static final String USAGE = "EXECUTE REMOVE_ORPHAN_FILES(<expression>):";
|
||||||
|
private final static Logger LOG =
|
||||||
|
LoggerFactory.getLogger(AlterTableExecuteRemoveOrphanFilesStmt.class);
|
||||||
|
|
||||||
|
public AlterTableExecuteRemoveOrphanFilesStmt(
|
||||||
|
TableName tableName, FunctionCallExpr fnCallExpr) {
|
||||||
|
super(tableName, fnCallExpr);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getOperation() {
|
||||||
|
return "EXECUTE REMOVE_ORPHAN_FILES";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void analyze(Analyzer analyzer) throws AnalysisException {
|
||||||
|
super.analyze(analyzer);
|
||||||
|
FeTable table = getTargetTable();
|
||||||
|
if (!(table instanceof FeIcebergTable)) {
|
||||||
|
throw new AnalysisException(
|
||||||
|
"ALTER TABLE EXECUTE REMOVE_ORPHAN_FILES is only supported "
|
||||||
|
+ "for Iceberg tables: " + table.getTableName());
|
||||||
|
}
|
||||||
|
analyzeFunctionCallExpr(analyzer, USAGE);
|
||||||
|
analyzeParameter(analyzer);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void analyzeParameter(Analyzer analyzer) throws AnalysisException {
|
||||||
|
Preconditions.checkNotNull(fnParamValue_);
|
||||||
|
fnParamValue_.analyze(analyzer);
|
||||||
|
|
||||||
|
if (!fnParamValue_.isConstant()) {
|
||||||
|
throw new AnalysisException(
|
||||||
|
USAGE + " <expression> must be a constant expression: " + fnCallExpr_.toSql());
|
||||||
|
}
|
||||||
|
|
||||||
|
Expr timestampExpr = getParamConvertibleToTimestamp();
|
||||||
|
if (timestampExpr == null) {
|
||||||
|
throw new AnalysisException(USAGE + " <expression> must be a timestamp, but is '"
|
||||||
|
+ fnParamValue_.getType() + "': " + fnCallExpr_.toSql());
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
olderThanMillis_ =
|
||||||
|
ExprUtil.localTimestampToUnixTimeMicros(analyzer, timestampExpr) / 1000;
|
||||||
|
LOG.debug(USAGE + " millis: " + olderThanMillis_);
|
||||||
|
} catch (InternalException ie) {
|
||||||
|
throw new AnalysisException("An invalid TIMESTAMP expression has been given "
|
||||||
|
+ "to " + USAGE + " the expression " + fnParamValue_.toSql()
|
||||||
|
+ " cannot be converted to a TIMESTAMP",
|
||||||
|
ie);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If field fnParamValue_ is a Timestamp, or can be cast to a Timestamp,
|
||||||
|
* then return an Expr for the Timestamp.
|
||||||
|
* @return null if the fnParamValue_ cannot be converted to a Timestamp.
|
||||||
|
*/
|
||||||
|
private Expr getParamConvertibleToTimestamp() {
|
||||||
|
Preconditions.checkNotNull(fnParamValue_);
|
||||||
|
Expr timestampExpr = fnParamValue_;
|
||||||
|
if (timestampExpr.getType().isStringType()) {
|
||||||
|
timestampExpr = new CastExpr(Type.TIMESTAMP, fnParamValue_);
|
||||||
|
}
|
||||||
|
if (timestampExpr.getType().isTimestamp()) { return timestampExpr; }
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TAlterTableParams toThrift() {
|
||||||
|
TAlterTableParams params = super.toThrift();
|
||||||
|
params.setAlter_type(TAlterTableType.EXECUTE);
|
||||||
|
TAlterTableExecuteParams executeParams = new TAlterTableExecuteParams();
|
||||||
|
TAlterTableExecuteRemoveOrphanFilesParams removeOrphanParams =
|
||||||
|
new TAlterTableExecuteRemoveOrphanFilesParams();
|
||||||
|
executeParams.setRemove_orphan_files_params(removeOrphanParams);
|
||||||
|
removeOrphanParams.setOlder_than_millis(olderThanMillis_);
|
||||||
|
params.setSet_execute_params(executeParams);
|
||||||
|
return params;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -74,7 +74,7 @@ public class AlterTableExecuteRollbackStmt extends AlterTableExecuteStmt {
|
|||||||
|
|
||||||
if (!fnParamValue_.isConstant()) {
|
if (!fnParamValue_.isConstant()) {
|
||||||
throw new AnalysisException(
|
throw new AnalysisException(
|
||||||
USAGE + " <expression> must be a constant expression: EXECUTE " + toSql());
|
USAGE + " <expression> must be a constant expression: " + fnCallExpr_.toSql());
|
||||||
}
|
}
|
||||||
if ((fnParamValue_ instanceof LiteralExpr)
|
if ((fnParamValue_ instanceof LiteralExpr)
|
||||||
&& (fnParamValue_.getType().isIntegerType())) {
|
&& (fnParamValue_.getType().isIntegerType())) {
|
||||||
@@ -104,7 +104,7 @@ public class AlterTableExecuteRollbackStmt extends AlterTableExecuteStmt {
|
|||||||
} else {
|
} else {
|
||||||
throw new AnalysisException(USAGE
|
throw new AnalysisException(USAGE
|
||||||
+ " <expression> must be an integer type or a timestamp, but is '"
|
+ " <expression> must be an integer type or a timestamp, but is '"
|
||||||
+ fnParamValue_.getType() + "': EXECUTE " + toSql());
|
+ fnParamValue_.getType() + "': " + fnCallExpr_.toSql());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -125,11 +125,6 @@ public class AlterTableExecuteRollbackStmt extends AlterTableExecuteStmt {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toSql(ToSqlOptions options) {
|
|
||||||
return fnCallExpr_.toSql();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TAlterTableParams toThrift() {
|
public TAlterTableParams toThrift() {
|
||||||
TAlterTableParams params = super.toThrift();
|
TAlterTableParams params = super.toThrift();
|
||||||
|
|||||||
@@ -44,6 +44,17 @@ public class AlterTableExecuteStmt extends AlterTableStmt {
|
|||||||
@Override
|
@Override
|
||||||
public String getOperation() { return "EXECUTE"; }
|
public String getOperation() { return "EXECUTE"; }
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toSql(ToSqlOptions options) {
|
||||||
|
StringBuilder sb = new StringBuilder("ALTER TABLE ")
|
||||||
|
.append(getDb())
|
||||||
|
.append(".")
|
||||||
|
.append(getTbl())
|
||||||
|
.append(" EXECUTE ")
|
||||||
|
.append(fnCallExpr_.toSql(options));
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return an instance of a subclass of AlterTableExecuteStmt that can analyze the
|
* Return an instance of a subclass of AlterTableExecuteStmt that can analyze the
|
||||||
* execute statement for the function call expression in 'expr'.
|
* execute statement for the function call expression in 'expr'.
|
||||||
@@ -57,11 +68,14 @@ public class AlterTableExecuteStmt extends AlterTableStmt {
|
|||||||
case "EXPIRE_SNAPSHOTS":
|
case "EXPIRE_SNAPSHOTS":
|
||||||
return new AlterTableExecuteExpireSnapshotsStmt(tableName, fnCallExpr);
|
return new AlterTableExecuteExpireSnapshotsStmt(tableName, fnCallExpr);
|
||||||
case "ROLLBACK": return new AlterTableExecuteRollbackStmt(tableName, fnCallExpr);
|
case "ROLLBACK": return new AlterTableExecuteRollbackStmt(tableName, fnCallExpr);
|
||||||
|
case "REMOVE_ORPHAN_FILES":
|
||||||
|
return new AlterTableExecuteRemoveOrphanFilesStmt(tableName, fnCallExpr);
|
||||||
default:
|
default:
|
||||||
throw new AnalysisException(String.format("'%s' is not supported by ALTER "
|
throw new AnalysisException(String.format("'%s' is not supported by ALTER "
|
||||||
+ "TABLE <table> EXECUTE. Supported operations are: "
|
+ "TABLE <table> EXECUTE. Supported operations are: "
|
||||||
+ "EXPIRE_SNAPSHOTS(<expression>), "
|
+ "EXPIRE_SNAPSHOTS(<expression>), "
|
||||||
+ "ROLLBACK(<expression>).",
|
+ "ROLLBACK(<expression>), "
|
||||||
|
+ "REMOVE_ORPHAN_FILES(<expression>).",
|
||||||
functionNameOrig));
|
functionNameOrig));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -71,10 +85,11 @@ public class AlterTableExecuteStmt extends AlterTableStmt {
|
|||||||
// fnCallExpr_ analyzed here manually, because it is not an actual function but a
|
// fnCallExpr_ analyzed here manually, because it is not an actual function but a
|
||||||
// catalog operation.
|
// catalog operation.
|
||||||
String fnName = fnCallExpr_.getFnName().toString();
|
String fnName = fnCallExpr_.getFnName().toString();
|
||||||
Preconditions.checkState(
|
Preconditions.checkState(StringUtils.equalsAnyIgnoreCase(
|
||||||
StringUtils.equalsAnyIgnoreCase(fnName, "EXPIRE_SNAPSHOTS", "ROLLBACK"));
|
fnName, "EXPIRE_SNAPSHOTS", "ROLLBACK", "REMOVE_ORPHAN_FILES"));
|
||||||
if (fnCallExpr_.getParams().size() != 1) {
|
if (fnCallExpr_.getParams().size() != 1) {
|
||||||
throw new AnalysisException(usage + " must have one parameter: " + toSql());
|
throw new AnalysisException(
|
||||||
|
usage + " must have one parameter: " + fnCallExpr_.toSql());
|
||||||
}
|
}
|
||||||
fnParamValue_ = fnCallExpr_.getParams().exprs().get(0);
|
fnParamValue_ = fnCallExpr_.getParams().exprs().get(0);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,250 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.impala.catalog.iceberg;
|
||||||
|
|
||||||
|
import static org.apache.iceberg.MetadataTableType.ALL_ENTRIES;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
|
import org.apache.iceberg.DataTask;
|
||||||
|
import org.apache.iceberg.FileScanTask;
|
||||||
|
import org.apache.iceberg.HasTableOperations;
|
||||||
|
import org.apache.iceberg.ManifestFile;
|
||||||
|
import org.apache.iceberg.MetadataTableUtils;
|
||||||
|
import org.apache.iceberg.ReachableFileUtil;
|
||||||
|
import org.apache.iceberg.Snapshot;
|
||||||
|
import org.apache.iceberg.StructLike;
|
||||||
|
import org.apache.iceberg.Table;
|
||||||
|
import org.apache.iceberg.TableScan;
|
||||||
|
import org.apache.iceberg.actions.DeleteOrphanFiles;
|
||||||
|
import org.apache.iceberg.io.CloseableIterable;
|
||||||
|
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
|
||||||
|
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
|
||||||
|
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
|
||||||
|
import org.apache.iceberg.util.Tasks;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.UncheckedIOException;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementation of DeleteOrphanFiles for Impala.
|
||||||
|
* This is copied from HiveIcebergDeleteOrphanFiles.java with minimal changes
|
||||||
|
* (HIVE-27906, 6b2e21a93ef3c1776b689a7953fc59dbf52e4be4).
|
||||||
|
*
|
||||||
|
* Upon execute(), this class instance will gather all URI of valid data files
|
||||||
|
* and Iceberg metadata files using Iceberg API. These valid URIs then will be
|
||||||
|
* compared to recursive file listing obtained through Hadoop FileSystem API
|
||||||
|
* under table's 'data' and 'metadata' directory accordingly. Any unmatched URI
|
||||||
|
* from FileSystem API listing that has modification time less than
|
||||||
|
* 'olderThanTimestamp' parameter will then be removed via Iceberg FileIO API
|
||||||
|
* of given Iceberg table.
|
||||||
|
*/
|
||||||
|
public class ImpalaIcebergDeleteOrphanFiles implements DeleteOrphanFiles {
|
||||||
|
public static final String METADATA_FOLDER_NAME = "metadata";
|
||||||
|
public static final String DATA_FOLDER_NAME = "data";
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(ImpalaIcebergDeleteOrphanFiles.class);
|
||||||
|
private String tableLocation;
|
||||||
|
private long olderThanTimestamp =
|
||||||
|
System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3);
|
||||||
|
private Consumer<String> deleteFunc;
|
||||||
|
private ExecutorService deleteExecutorService =
|
||||||
|
MoreExecutors.newDirectExecutorService();
|
||||||
|
|
||||||
|
private final Configuration conf;
|
||||||
|
private final Table table;
|
||||||
|
|
||||||
|
public ImpalaIcebergDeleteOrphanFiles(Configuration conf, Table table) {
|
||||||
|
this.conf = conf;
|
||||||
|
this.table = table;
|
||||||
|
this.deleteFunc = file -> table.io().deleteFile(file);
|
||||||
|
this.tableLocation = table.location();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ImpalaIcebergDeleteOrphanFiles location(String location) {
|
||||||
|
this.tableLocation = location;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ImpalaIcebergDeleteOrphanFiles olderThan(long newOlderThanTimestamp) {
|
||||||
|
this.olderThanTimestamp = newOlderThanTimestamp;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Implement later, if there is any use case.
|
||||||
|
@Override
|
||||||
|
public ImpalaIcebergDeleteOrphanFiles deleteWith(Consumer<String> newDeleteFunc) {
|
||||||
|
this.deleteFunc = newDeleteFunc;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ImpalaIcebergDeleteOrphanFiles executeDeleteWith(
|
||||||
|
ExecutorService executorService) {
|
||||||
|
this.deleteExecutorService = executorService;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Result execute() {
|
||||||
|
LOG.info("Cleaning orphan files for {}", table.name());
|
||||||
|
ImpalaIcebergDeleteOrphanFilesResult result =
|
||||||
|
new ImpalaIcebergDeleteOrphanFilesResult();
|
||||||
|
result.addDeletedFiles(cleanContentFiles(olderThanTimestamp));
|
||||||
|
result.addDeletedFiles(cleanMetadata(olderThanTimestamp));
|
||||||
|
|
||||||
|
LOG.debug("Deleting {} files while cleaning orphan files for {}",
|
||||||
|
result.deletedFiles.size(), table.name());
|
||||||
|
Tasks.foreach(result.deletedFiles)
|
||||||
|
.executeWith(deleteExecutorService)
|
||||||
|
.retry(3)
|
||||||
|
.stopRetryOn(FileNotFoundException.class)
|
||||||
|
.suppressFailureWhenFinished()
|
||||||
|
.onFailure((file, thrown) -> LOG.warn("Delete failed for file: {}", file, thrown))
|
||||||
|
.run(deleteFunc::accept);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Set<String> cleanContentFiles(long lastTime) {
|
||||||
|
Set<String> validFiles =
|
||||||
|
Sets.union(getAllContentFilePath(), getAllStatisticsFilePath(table));
|
||||||
|
LOG.debug("Valid content file for {} are {}", table.name(), validFiles.size());
|
||||||
|
try {
|
||||||
|
Path dataPath = new Path(tableLocation, DATA_FOLDER_NAME);
|
||||||
|
return getFilesToBeDeleted(lastTime, validFiles, dataPath);
|
||||||
|
} catch (IOException ex) { throw new UncheckedIOException(ex); }
|
||||||
|
}
|
||||||
|
|
||||||
|
private Set<String> getAllContentFilePath() {
|
||||||
|
Set<String> validFilesPath = Sets.newHashSet();
|
||||||
|
Table metadataTable = getMetadataTable();
|
||||||
|
|
||||||
|
TableScan tableScan = metadataTable.newScan();
|
||||||
|
CloseableIterable<FileScanTask> manifestFileScanTasks =
|
||||||
|
tableScan.planWith(deleteExecutorService).planFiles();
|
||||||
|
CloseableIterable<StructLike> entries =
|
||||||
|
CloseableIterable.concat(entriesOfManifest(manifestFileScanTasks));
|
||||||
|
|
||||||
|
for (StructLike entry : entries) {
|
||||||
|
StructLike fileRecord = entry.get(4, StructLike.class);
|
||||||
|
String filePath = fileRecord.get(1, String.class);
|
||||||
|
validFilesPath.add(getUriPath(filePath));
|
||||||
|
}
|
||||||
|
return validFilesPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Iterable<CloseableIterable<StructLike>> entriesOfManifest(
|
||||||
|
CloseableIterable<FileScanTask> fileScanTasks) {
|
||||||
|
return Iterables.transform(fileScanTasks, task -> {
|
||||||
|
assert task != null;
|
||||||
|
return ((DataTask) task).rows();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Set<String> getAllStatisticsFilePath(Table table) {
|
||||||
|
return ReachableFileUtil.statisticsFilesLocations(table)
|
||||||
|
.stream()
|
||||||
|
.map(ImpalaIcebergDeleteOrphanFiles::getUriPath)
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Set<String> cleanMetadata(long lastTime) {
|
||||||
|
LOG.info("{} start clean metadata files", table.name());
|
||||||
|
try {
|
||||||
|
Set<String> validFiles = getValidMetadataFiles(table);
|
||||||
|
LOG.debug("Valid metadata files for {} are {}", table.name(), validFiles);
|
||||||
|
Path metadataLocation = new Path(tableLocation, METADATA_FOLDER_NAME);
|
||||||
|
return getFilesToBeDeleted(lastTime, validFiles, metadataLocation);
|
||||||
|
} catch (IOException ioe) { throw new UncheckedIOException(ioe); }
|
||||||
|
}
|
||||||
|
|
||||||
|
private Set<String> getFilesToBeDeleted(
|
||||||
|
long lastTime, Set<String> validFiles, Path location) throws IOException {
|
||||||
|
Set<String> filesToDelete = Sets.newHashSet();
|
||||||
|
FileSystem fs = location.getFileSystem(conf);
|
||||||
|
RemoteIterator<LocatedFileStatus> metadataLocations = fs.listFiles(location, true);
|
||||||
|
while (metadataLocations.hasNext()) {
|
||||||
|
LocatedFileStatus metadataFile = metadataLocations.next();
|
||||||
|
if (metadataFile.getModificationTime() < lastTime
|
||||||
|
&& !validFiles.contains(getUriPath(metadataFile.getPath().toString()))) {
|
||||||
|
filesToDelete.add(metadataFile.getPath().toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return filesToDelete;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Table getMetadataTable() {
|
||||||
|
return MetadataTableUtils.createMetadataTableInstance(
|
||||||
|
((HasTableOperations) table).operations(), table.name(),
|
||||||
|
table.name() + "#" + ALL_ENTRIES.name(), ALL_ENTRIES);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Set<String> getValidMetadataFiles(Table icebergTable) {
|
||||||
|
Set<String> validFiles = Sets.newHashSet();
|
||||||
|
Iterable<Snapshot> snapshots = icebergTable.snapshots();
|
||||||
|
for (Snapshot snapshot : snapshots) {
|
||||||
|
String manifestListLocation = snapshot.manifestListLocation();
|
||||||
|
validFiles.add(getUriPath(manifestListLocation));
|
||||||
|
|
||||||
|
List<ManifestFile> manifestFiles = snapshot.allManifests(icebergTable.io());
|
||||||
|
for (ManifestFile manifestFile : manifestFiles) {
|
||||||
|
validFiles.add(getUriPath(manifestFile.path()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Stream
|
||||||
|
.of(ReachableFileUtil.metadataFileLocations(icebergTable, false).stream(),
|
||||||
|
ReachableFileUtil.statisticsFilesLocations(icebergTable).stream(),
|
||||||
|
Stream.of(ReachableFileUtil.versionHintLocation(icebergTable)))
|
||||||
|
.reduce(Stream::concat)
|
||||||
|
.orElse(Stream.empty())
|
||||||
|
.map(ImpalaIcebergDeleteOrphanFiles::getUriPath)
|
||||||
|
.forEach(validFiles::add);
|
||||||
|
|
||||||
|
return validFiles;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String getUriPath(String path) { return URI.create(path).getPath(); }
|
||||||
|
|
||||||
|
public static class ImpalaIcebergDeleteOrphanFilesResult implements Result {
|
||||||
|
private final Set<String> deletedFiles = Sets.newHashSet();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Iterable<String> orphanFileLocations() {
|
||||||
|
return deletedFiles;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addDeletedFiles(Set<String> files) { this.deletedFiles.addAll(files); }
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1583,6 +1583,21 @@ public class CatalogOpExecutor {
|
|||||||
EventSequence catalogTimeline, InProgressTableModification modification)
|
EventSequence catalogTimeline, InProgressTableModification modification)
|
||||||
throws ImpalaException {
|
throws ImpalaException {
|
||||||
Preconditions.checkState(tbl.isWriteLockedByCurrentThread());
|
Preconditions.checkState(tbl.isWriteLockedByCurrentThread());
|
||||||
|
|
||||||
|
// Some operation does not require Iceberg transaction and HMS update.
|
||||||
|
if (params.getAlter_type() == TAlterTableType.EXECUTE) {
|
||||||
|
Preconditions.checkState(params.isSetSet_execute_params());
|
||||||
|
TAlterTableExecuteParams setExecuteParams = params.getSet_execute_params();
|
||||||
|
if (setExecuteParams.isSetRemove_orphan_files_params()) {
|
||||||
|
String removeOrphanSummary =
|
||||||
|
IcebergCatalogOpExecutor.alterTableExecuteRemoveOrphanFiles(tbl,
|
||||||
|
setExecuteParams.getRemove_orphan_files_params(),
|
||||||
|
icebergExecutorService_);
|
||||||
|
addSummary(response, removeOrphanSummary);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
boolean needsToUpdateHms = !IcebergUtil.isHiveCatalog(tbl.getMetaStoreTable());
|
boolean needsToUpdateHms = !IcebergUtil.isHiveCatalog(tbl.getMetaStoreTable());
|
||||||
try {
|
try {
|
||||||
org.apache.iceberg.Transaction iceTxn = IcebergUtil.getIcebergTransaction(tbl);
|
org.apache.iceberg.Transaction iceTxn = IcebergUtil.getIcebergTransaction(tbl);
|
||||||
@@ -1623,6 +1638,10 @@ public class CatalogOpExecutor {
|
|||||||
setExecuteParams.getExpire_snapshots_params(),
|
setExecuteParams.getExpire_snapshots_params(),
|
||||||
icebergExecutorService_);
|
icebergExecutorService_);
|
||||||
addSummary(response, expireSummary);
|
addSummary(response, expireSummary);
|
||||||
|
} else if (setExecuteParams.isSetRemove_orphan_files_params()) {
|
||||||
|
throw new IllegalStateException(
|
||||||
|
"Alter table execute REMOVE_ORPHAN_FILES should not use "
|
||||||
|
+ "Iceberg Transaction.");
|
||||||
} else {
|
} else {
|
||||||
// Cannot happen, but throw just in case.
|
// Cannot happen, but throw just in case.
|
||||||
throw new IllegalStateException(
|
throw new IllegalStateException(
|
||||||
|
|||||||
@@ -49,6 +49,8 @@ import org.apache.iceberg.expressions.Expressions;
|
|||||||
import org.apache.iceberg.expressions.Term;
|
import org.apache.iceberg.expressions.Term;
|
||||||
import org.apache.iceberg.hive.HiveCatalog;
|
import org.apache.iceberg.hive.HiveCatalog;
|
||||||
import org.apache.impala.analysis.IcebergPartitionSpec;
|
import org.apache.impala.analysis.IcebergPartitionSpec;
|
||||||
|
import org.apache.impala.catalog.CatalogException;
|
||||||
|
import org.apache.impala.catalog.FeFsTable;
|
||||||
import org.apache.impala.catalog.FeIcebergTable;
|
import org.apache.impala.catalog.FeIcebergTable;
|
||||||
import org.apache.impala.catalog.IcebergTable;
|
import org.apache.impala.catalog.IcebergTable;
|
||||||
import org.apache.impala.catalog.TableLoadingException;
|
import org.apache.impala.catalog.TableLoadingException;
|
||||||
@@ -57,11 +59,13 @@ import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKe
|
|||||||
import org.apache.impala.catalog.iceberg.GroupedContentFiles;
|
import org.apache.impala.catalog.iceberg.GroupedContentFiles;
|
||||||
import org.apache.impala.catalog.iceberg.IcebergCatalog;
|
import org.apache.impala.catalog.iceberg.IcebergCatalog;
|
||||||
import org.apache.impala.catalog.iceberg.IcebergHiveCatalog;
|
import org.apache.impala.catalog.iceberg.IcebergHiveCatalog;
|
||||||
|
import org.apache.impala.catalog.iceberg.ImpalaIcebergDeleteOrphanFiles;
|
||||||
import org.apache.impala.common.ImpalaRuntimeException;
|
import org.apache.impala.common.ImpalaRuntimeException;
|
||||||
import org.apache.impala.fb.FbIcebergColumnStats;
|
import org.apache.impala.fb.FbIcebergColumnStats;
|
||||||
import org.apache.impala.fb.FbIcebergDataFile;
|
import org.apache.impala.fb.FbIcebergDataFile;
|
||||||
import org.apache.impala.thrift.TAlterTableDropPartitionParams;
|
import org.apache.impala.thrift.TAlterTableDropPartitionParams;
|
||||||
import org.apache.impala.thrift.TAlterTableExecuteExpireSnapshotsParams;
|
import org.apache.impala.thrift.TAlterTableExecuteExpireSnapshotsParams;
|
||||||
|
import org.apache.impala.thrift.TAlterTableExecuteRemoveOrphanFilesParams;
|
||||||
import org.apache.impala.thrift.TAlterTableExecuteRollbackParams;
|
import org.apache.impala.thrift.TAlterTableExecuteRollbackParams;
|
||||||
import org.apache.impala.thrift.TColumn;
|
import org.apache.impala.thrift.TColumn;
|
||||||
import org.apache.impala.thrift.TIcebergCatalog;
|
import org.apache.impala.thrift.TIcebergCatalog;
|
||||||
@@ -250,6 +254,20 @@ public class IcebergCatalogOpExecutor {
|
|||||||
return "Rollback executed.";
|
return "Rollback executed.";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Executes an ALTER TABLE EXECUTE REMOVE_ORPHAN_FILES.
|
||||||
|
*/
|
||||||
|
public static String alterTableExecuteRemoveOrphanFiles(FeIcebergTable tbl,
|
||||||
|
TAlterTableExecuteRemoveOrphanFilesParams params, ExecutorService executors)
|
||||||
|
throws CatalogException {
|
||||||
|
ImpalaIcebergDeleteOrphanFiles deleteOrphan =
|
||||||
|
new ImpalaIcebergDeleteOrphanFiles(FeFsTable.CONF, tbl.getIcebergApiTable())
|
||||||
|
.olderThan(params.older_than_millis)
|
||||||
|
.executeDeleteWith(executors);
|
||||||
|
deleteOrphan.execute();
|
||||||
|
return "Remove orphan files executed.";
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Deletes files related to specific set of partitions
|
* Deletes files related to specific set of partitions
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -4559,13 +4559,15 @@ public class AnalyzeDDLTest extends FrontendTestBase {
|
|||||||
"rollback('2022-01-04 10:00:00');",
|
"rollback('2022-01-04 10:00:00');",
|
||||||
"ALTER TABLE EXECUTE ROLLBACK is only supported for Iceberg tables: " +
|
"ALTER TABLE EXECUTE ROLLBACK is only supported for Iceberg tables: " +
|
||||||
"functional.alltypes");
|
"functional.alltypes");
|
||||||
AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
|
AnalysisError("alter table functional_parquet.iceberg_partitioned execute "
|
||||||
"rollback(id);", "EXECUTE ROLLBACK(<expression>): " +
|
+ "rollback(id);",
|
||||||
"<expression> must be a constant expression: EXECUTE rollback(id)");
|
"EXECUTE ROLLBACK(<expression>): "
|
||||||
AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
|
+ "<expression> must be a constant expression: rollback(id)");
|
||||||
"rollback(3.14);", "EXECUTE ROLLBACK(<expression>): <expression> " +
|
AnalysisError("alter table functional_parquet.iceberg_partitioned execute "
|
||||||
"must be an integer type or a timestamp, but is 'DECIMAL(3,2)': " +
|
+ "rollback(3.14);",
|
||||||
"EXECUTE rollback(3.14)");
|
"EXECUTE ROLLBACK(<expression>): <expression> "
|
||||||
|
+ "must be an integer type or a timestamp, but is 'DECIMAL(3,2)': "
|
||||||
|
+ "rollback(3.14)");
|
||||||
AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
|
AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
|
||||||
"rollback('2021-02-32 15:52:45');", "An invalid TIMESTAMP expression has been " +
|
"rollback('2021-02-32 15:52:45');", "An invalid TIMESTAMP expression has been " +
|
||||||
"given to EXECUTE ROLLBACK(<expression>): the expression " +
|
"given to EXECUTE ROLLBACK(<expression>): the expression " +
|
||||||
@@ -4583,6 +4585,55 @@ public class AnalyzeDDLTest extends FrontendTestBase {
|
|||||||
"'1111' cannot be converted to a TIMESTAMP");
|
"'1111' cannot be converted to a TIMESTAMP");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void TestAlterExecuteRemoveOrphanFilesSnapshots() {
|
||||||
|
AnalyzesOk("alter table functional_parquet.iceberg_partitioned execute "
|
||||||
|
+ "remove_orphan_files(now());");
|
||||||
|
AnalyzesOk("alter table functional_parquet.iceberg_partitioned execute "
|
||||||
|
+ "remove_orphan_files(now() - interval 20 years);");
|
||||||
|
AnalyzesOk("alter table functional_parquet.iceberg_partitioned execute "
|
||||||
|
+ "remove_orphan_files('2022-01-04 10:00:00');");
|
||||||
|
|
||||||
|
// Negative tests
|
||||||
|
AnalysisError("alter table nodb.alltypes execute "
|
||||||
|
+ "remove_orphan_files('2022-01-04 10:00:00');",
|
||||||
|
"Could not resolve table reference: 'nodb.alltypes'");
|
||||||
|
AnalysisError("alter table functional.alltypes execute "
|
||||||
|
+ "remove_orphan_files('2022-01-04 10:00:00');",
|
||||||
|
"ALTER TABLE EXECUTE REMOVE_ORPHAN_FILES is only supported for Iceberg tables: "
|
||||||
|
+ "functional.alltypes");
|
||||||
|
AnalysisError("alter table functional_parquet.iceberg_partitioned execute "
|
||||||
|
+ "remove_orphan_files();",
|
||||||
|
"EXECUTE REMOVE_ORPHAN_FILES(<expression>): must have one parameter");
|
||||||
|
AnalysisError("alter table functional_parquet.iceberg_partitioned execute "
|
||||||
|
+ "remove_orphan_files(id);",
|
||||||
|
"EXECUTE REMOVE_ORPHAN_FILES(<expression>): "
|
||||||
|
+ "<expression> must be a constant expression: remove_orphan_files(id)");
|
||||||
|
AnalysisError("alter table functional_parquet.iceberg_partitioned execute "
|
||||||
|
+ "remove_orphan_files(3.14);",
|
||||||
|
"EXECUTE REMOVE_ORPHAN_FILES(<expression>): <expression> "
|
||||||
|
+ "must be a timestamp, but is 'DECIMAL(3,2)': "
|
||||||
|
+ "remove_orphan_files(3.14)");
|
||||||
|
AnalysisError("alter table functional_parquet.iceberg_partitioned execute "
|
||||||
|
+ "remove_orphan_files('2021-02-32 15:52:45');",
|
||||||
|
"An invalid TIMESTAMP expression has been "
|
||||||
|
+ "given to EXECUTE REMOVE_ORPHAN_FILES(<expression>): the expression "
|
||||||
|
+ "'2021-02-32 15:52:45' cannot be converted to a TIMESTAMP");
|
||||||
|
AnalysisError("alter table functional_parquet.iceberg_partitioned execute "
|
||||||
|
+ "remove_orphan_files('the beginning');",
|
||||||
|
"An invalid TIMESTAMP expression has been "
|
||||||
|
+ "given to EXECUTE REMOVE_ORPHAN_FILES(<expression>): the expression "
|
||||||
|
+ "'the beginning' cannot be converted to a TIMESTAMP");
|
||||||
|
AnalysisError("alter table functional_parquet.iceberg_partitioned execute "
|
||||||
|
+ "remove_orphan_files(1111,2222);",
|
||||||
|
"EXECUTE REMOVE_ORPHAN_FILES(<expression>): must have one parameter");
|
||||||
|
AnalysisError("alter table functional_parquet.iceberg_partitioned execute "
|
||||||
|
+ "remove_orphan_files('1111');",
|
||||||
|
"An invalid TIMESTAMP expression has been "
|
||||||
|
+ "given to EXECUTE REMOVE_ORPHAN_FILES(<expression>): the expression "
|
||||||
|
+ "'1111' cannot be converted to a TIMESTAMP");
|
||||||
|
}
|
||||||
|
|
||||||
private static String buildLongOwnerName() {
|
private static String buildLongOwnerName() {
|
||||||
StringBuilder comment = new StringBuilder();
|
StringBuilder comment = new StringBuilder();
|
||||||
for (int i = 0; i < MetaStoreUtil.MAX_OWNER_LENGTH + 5; i++) {
|
for (int i = 0; i < MetaStoreUtil.MAX_OWNER_LENGTH + 5; i++) {
|
||||||
|
|||||||
@@ -583,6 +583,28 @@ public class ToSqlTest extends FrontendTestBase {
|
|||||||
"SELECT * FROM functional.alltypes CROSS JOIN functional_parquet.alltypes");
|
"SELECT * FROM functional.alltypes CROSS JOIN functional_parquet.alltypes");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void TestAlterTableExecute() throws AnalysisException {
|
||||||
|
testToSql(
|
||||||
|
"alter table iceberg_partitioned "
|
||||||
|
+ "execute rollback('2022-01-04 10:00:00')",
|
||||||
|
"functional_parquet",
|
||||||
|
"ALTER TABLE functional_parquet.iceberg_partitioned "
|
||||||
|
+ "EXECUTE rollback('2022-01-04 10:00:00')");
|
||||||
|
testToSql(
|
||||||
|
"alter table iceberg_partitioned "
|
||||||
|
+ "execute expire_snapshots('2022-01-04 10:00:00')",
|
||||||
|
"functional_parquet",
|
||||||
|
"ALTER TABLE functional_parquet.iceberg_partitioned "
|
||||||
|
+ "EXECUTE expire_snapshots('2022-01-04 10:00:00')");
|
||||||
|
testToSql(
|
||||||
|
"alter table iceberg_partitioned "
|
||||||
|
+ "execute remove_orphan_files('2022-01-04 10:00:00')",
|
||||||
|
"functional_parquet",
|
||||||
|
"ALTER TABLE functional_parquet.iceberg_partitioned "
|
||||||
|
+ "EXECUTE remove_orphan_files('2022-01-04 10:00:00')");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void TestStructFields() throws AnalysisException {
|
public void TestStructFields() throws AnalysisException {
|
||||||
String[] tables = new String[] { "allcomplextypes", };
|
String[] tables = new String[] { "allcomplextypes", };
|
||||||
|
|||||||
@@ -461,6 +461,74 @@ class TestIcebergTable(IcebergTestSuite):
|
|||||||
assert expected_text in str(output)
|
assert expected_text in str(output)
|
||||||
return output
|
return output
|
||||||
|
|
||||||
|
def test_execute_remove_orphan_files(self, unique_database):
|
||||||
|
tbl_name = 'tbl_with_orphan_files'
|
||||||
|
db_tbl = unique_database + ".tbl_with_orphan_files"
|
||||||
|
with self.create_impala_client() as impalad_client:
|
||||||
|
impalad_client.execute("create table {0} (i int) stored as iceberg"
|
||||||
|
.format(db_tbl))
|
||||||
|
insert_q = "insert into {0} values ({1})"
|
||||||
|
self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 1))
|
||||||
|
self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 2))
|
||||||
|
self.execute_query_expect_success(impalad_client, insert_q.format(db_tbl, 3))
|
||||||
|
result = impalad_client.execute('select i from {} order by i'.format(db_tbl))
|
||||||
|
assert result.data == ['1', '2', '3']
|
||||||
|
|
||||||
|
# Add some junk files to data and metadata dir.
|
||||||
|
DATA_PATH = "/test-warehouse/{0}.db/{1}/data/".format(
|
||||||
|
unique_database, tbl_name)
|
||||||
|
METADATA_PATH = "/test-warehouse/{0}.db/{1}/metadata/".format(
|
||||||
|
unique_database, tbl_name)
|
||||||
|
SRC_DIR = os.path.join(
|
||||||
|
os.environ['IMPALA_HOME'],
|
||||||
|
"testdata/data/iceberg_test/iceberg_mixed_file_format_test/{0}/{1}")
|
||||||
|
|
||||||
|
# Copy first set of junk files.
|
||||||
|
file_parq1 = "00000-0-data-gfurnstahl_20220906113044_157fc172-f5d3-4c70-8653-" \
|
||||||
|
"fff150b6136a-job_16619542960420_0002-1-00001.parquet"
|
||||||
|
file_avro1 = "055baf62-de6d-4583-bf21-f187f9482343-m0.avro"
|
||||||
|
self.filesystem_client.copy_from_local(
|
||||||
|
SRC_DIR.format('data', file_parq1), DATA_PATH)
|
||||||
|
self.filesystem_client.copy_from_local(
|
||||||
|
SRC_DIR.format('metadata', file_avro1), METADATA_PATH)
|
||||||
|
assert self.filesystem_client.exists(DATA_PATH + file_parq1)
|
||||||
|
assert self.filesystem_client.exists(METADATA_PATH + file_avro1)
|
||||||
|
# Keep current time.
|
||||||
|
result = impalad_client.execute('select cast(now() as string)')
|
||||||
|
cp1_time = result.data[0]
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
# Copy second set of junk files.
|
||||||
|
file_parq2 = "00000-0-data-gfurnstahl_20220906114830_907f72c7-36ac-4135-8315-" \
|
||||||
|
"27ff880faff0-job_16619542960420_0004-1-00001.parquet"
|
||||||
|
file_avro2 = "871d1473-8566-46c0-a530-a2256b3f396f-m0.avro"
|
||||||
|
self.filesystem_client.copy_from_local(
|
||||||
|
SRC_DIR.format('data', file_parq2), DATA_PATH)
|
||||||
|
self.filesystem_client.copy_from_local(
|
||||||
|
SRC_DIR.format('metadata', file_avro2), METADATA_PATH)
|
||||||
|
assert self.filesystem_client.exists(DATA_PATH + file_parq2)
|
||||||
|
assert self.filesystem_client.exists(METADATA_PATH + file_avro2)
|
||||||
|
|
||||||
|
# Execute REMOVE_ORPHAN_FILES at specific timestamp.
|
||||||
|
result = impalad_client.execute(
|
||||||
|
"ALTER TABLE {0} EXECUTE REMOVE_ORPHAN_FILES('{1}')".format(db_tbl, cp1_time))
|
||||||
|
assert result.data[0] == 'Remove orphan files executed.'
|
||||||
|
assert not self.filesystem_client.exists(DATA_PATH + file_parq1)
|
||||||
|
assert not self.filesystem_client.exists(METADATA_PATH + file_parq1)
|
||||||
|
assert self.filesystem_client.exists(DATA_PATH + file_parq2)
|
||||||
|
assert self.filesystem_client.exists(METADATA_PATH + file_avro2)
|
||||||
|
|
||||||
|
# Execute REMOVE_ORPHAN_FILES at now().
|
||||||
|
result = impalad_client.execute(
|
||||||
|
"ALTER TABLE {0} EXECUTE REMOVE_ORPHAN_FILES(now())".format(db_tbl))
|
||||||
|
assert result.data[0] == 'Remove orphan files executed.'
|
||||||
|
assert not self.filesystem_client.exists(DATA_PATH + file_parq2)
|
||||||
|
assert not self.filesystem_client.exists(METADATA_PATH + file_parq2)
|
||||||
|
|
||||||
|
# Assert table still queryable.
|
||||||
|
result = impalad_client.execute('select i from {} order by i'.format(db_tbl))
|
||||||
|
assert result.data == ['1', '2', '3']
|
||||||
|
|
||||||
def test_describe_history_params(self, unique_database):
|
def test_describe_history_params(self, unique_database):
|
||||||
tbl_name = unique_database + ".describe_history"
|
tbl_name = unique_database + ".describe_history"
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user