diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift index c41762a6f..56398eaf0 100755 --- a/common/thrift/JniCatalog.thrift +++ b/common/thrift/JniCatalog.thrift @@ -414,12 +414,38 @@ struct TAlterTableSetPartitionSpecParams { 1: required CatalogObjects.TIcebergPartitionSpec partition_spec } -// Parameters for ALTER TABLE EXECUTE operations. -struct TAlterTableExecuteParams { - // The parameter of the ExpireSnapshot.expireOlderThan(timestampMillis) Iceberg call. +// Parameters for ALTER TABLE EXECUTE EXPIRE_SNAPSHOTS operations. +struct TAlterTableExecuteExpireSnapshotsParams { 1: required i64 older_than_millis } +// ALTER TABLE EXECUTE ROLLBACK can be to a date or snapshot id. +enum TRollbackType { + TIME_ID = 0 + VERSION_ID = 1 +} + +// Parameters for ALTER TABLE EXECUTE ROLLBACK operations. +struct TAlterTableExecuteRollbackParams { + // Is rollback to a date or snapshot id. + 1: required TRollbackType kind + + // If kind is TIME_ID this is the date to rollback to. + 2: optional i64 timestamp_millis + + // If kind is VERSION_ID this is the id to rollback to. + 3: optional i64 snapshot_id +} + +// Parameters for ALTER TABLE EXECUTE ... operations. +struct TAlterTableExecuteParams { + // Parameters for ALTER TABLE EXECUTE EXPIRE_SNAPSHOTS + 1: optional TAlterTableExecuteExpireSnapshotsParams expire_snapshots_params + + // Parameters for ALTER TABLE EXECUTE ROLLBACK + 2: optional TAlterTableExecuteRollbackParams execute_rollback_params +} + // Parameters for all ALTER TABLE commands. struct TAlterTableParams { 1: required TAlterTableType alter_type @@ -478,7 +504,7 @@ struct TAlterTableParams { // Parameters for ALTER TABLE SET PARTITION SPEC 19: optional TAlterTableSetPartitionSpecParams set_partition_spec_params - // Parameters for ALTER TABLE EXECUTE + // Parameters for ALTER TABLE EXECUTE operations 20: optional TAlterTableExecuteParams set_execute_params } diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup index dcf8eba5c..b136a4631 100755 --- a/fe/src/main/cup/sql-parser.cup +++ b/fe/src/main/cup/sql-parser.cup @@ -37,6 +37,7 @@ import org.apache.impala.analysis.SetOperationStmt.SetOperator; import org.apache.impala.analysis.RangePartition; import org.apache.impala.analysis.TableSampleClause; import org.apache.impala.analysis.AlterTableAddDropRangePartitionStmt.Operation; +import org.apache.impala.analysis.AlterTableExecuteStmt; import org.apache.impala.analysis.IcebergPartitionSpec; import org.apache.impala.analysis.IcebergPartitionField; import org.apache.impala.analysis.IcebergPartitionTransform; @@ -1361,7 +1362,7 @@ alter_tbl_stmt ::= :} | KW_ALTER KW_TABLE table_name:table KW_EXECUTE function_call_expr:expr {: - RESULT = new AlterTableExecuteStmt(table, expr); + RESULT = AlterTableExecuteStmt.createExecuteStmt(table, expr); :} ; diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteExpireSnapshotsStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteExpireSnapshotsStmt.java new file mode 100644 index 000000000..16d8a3753 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteExpireSnapshotsStmt.java @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.analysis; + +import com.google.common.base.Preconditions; + +import org.apache.impala.catalog.FeIcebergTable; +import org.apache.impala.catalog.Type; +import org.apache.impala.common.AnalysisException; +import org.apache.impala.common.InternalException; +import org.apache.impala.thrift.TAlterTableExecuteExpireSnapshotsParams; +import org.apache.impala.thrift.TAlterTableExecuteParams; +import org.apache.impala.thrift.TAlterTableParams; +import org.apache.impala.thrift.TAlterTableType; +import org.apache.impala.util.ExprUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Represents an ALTER TABLE EXECUTE EXPIRE_SNAPSHOTS() statement on + * Iceberg tables, the parameter is (). + */ +public class AlterTableExecuteExpireSnapshotsStmt extends AlterTableExecuteStmt { + private final static Logger LOG = + LoggerFactory.getLogger(AlterTableExecuteExpireSnapshotsStmt.class); + + protected final static String USAGE = "EXPIRE_SNAPSHOTS()"; + + protected AlterTableExecuteExpireSnapshotsStmt(TableName tableName, Expr fnCallExpr) { + super(tableName, fnCallExpr); + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + super.analyze(analyzer); + Preconditions.checkState(getTargetTable() instanceof FeIcebergTable); + analyzeFunctionCallExpr(analyzer, USAGE); + analyzeOlderThan(analyzer); + } + + @Override + public String toSql(ToSqlOptions options) { + return fnCallExpr_.toSql(); + } + + @Override + public TAlterTableParams toThrift() { + TAlterTableParams params = super.toThrift(); + params.setAlter_type(TAlterTableType.EXECUTE); + TAlterTableExecuteParams executeParams = new TAlterTableExecuteParams(); + TAlterTableExecuteExpireSnapshotsParams executeExpireSnapshotsParams = + new TAlterTableExecuteExpireSnapshotsParams(); + executeParams.setExpire_snapshots_params(executeExpireSnapshotsParams); + executeExpireSnapshotsParams.setOlder_than_millis(olderThanMillis_); + params.setSet_execute_params(executeParams); + return params; + } + + protected void analyzeOlderThan(Analyzer analyzer) throws AnalysisException { + Preconditions.checkNotNull(fnParamValue_); + fnParamValue_.analyze(analyzer); + if (!fnParamValue_.isConstant()) { + throw new AnalysisException(USAGE + " must be a constant expression: " + toSql()); + } + if (fnParamValue_.getType().isStringType()) { + fnParamValue_ = new CastExpr(Type.TIMESTAMP, fnParamValue_); + } + if (!fnParamValue_.getType().isTimestamp()) { + throw new AnalysisException(USAGE + " must be a timestamp type but is '" + + fnParamValue_.getType() + "': " + fnParamValue_.toSql()); + } + try { + olderThanMillis_ = + ExprUtil.localTimestampToUnixTimeMicros(analyzer, fnParamValue_) / 1000; + LOG.debug(USAGE + " millis: " + olderThanMillis_); + } catch (InternalException ie) { + throw new AnalysisException("Invalid TIMESTAMP expression has been given to " + + USAGE + ": " + ie.getMessage(), + ie); + } + } +} diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteRollbackStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteRollbackStmt.java new file mode 100644 index 000000000..ea4da0b67 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteRollbackStmt.java @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.analysis; + +import com.google.common.base.Preconditions; + +import org.apache.impala.catalog.FeIcebergTable; +import org.apache.impala.catalog.FeTable; +import org.apache.impala.catalog.Type; +import org.apache.impala.common.AnalysisException; +import org.apache.impala.common.InternalException; +import org.apache.impala.thrift.TAlterTableExecuteParams; +import org.apache.impala.thrift.TAlterTableExecuteRollbackParams; +import org.apache.impala.thrift.TAlterTableParams; +import org.apache.impala.thrift.TAlterTableType; +import org.apache.impala.thrift.TRollbackType; +import org.apache.impala.util.ExprUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Represents an ALTER TABLE EXECUTE ROLLBACK() statement. + * The parameter can be a snapshot id, or a timestamp. + * A rollback to a snapshot id causes a new snapshot to be + * created with the same snapshot id, but with a new creation timestamp. + * A rollback to a timestamp rolls back to the latest snapshot + * that has a creation timestamp that is older than the specified + * timestamp. + */ +public class AlterTableExecuteRollbackStmt extends AlterTableExecuteStmt { + public static final String USAGE = "EXECUTE ROLLBACK():"; + private final static Logger LOG = + LoggerFactory.getLogger(AlterTableExecuteRollbackStmt.class); + private long snapshotVersion_; + private TRollbackType kind_; + + public AlterTableExecuteRollbackStmt(TableName tableName, FunctionCallExpr fnCallExpr) { + super(tableName, fnCallExpr); + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + super.analyze(analyzer); + FeTable table = getTargetTable(); + if (!(table instanceof FeIcebergTable)) { + throw new AnalysisException("ALTER TABLE EXECUTE ROLLBACK is only supported " + + "for Iceberg tables: " + table.getTableName()); + } + analyzeFunctionCallExpr(analyzer, USAGE); + analyzeParameter(analyzer); + } + + private void analyzeParameter(Analyzer analyzer) throws AnalysisException { + Preconditions.checkNotNull(fnParamValue_); + fnParamValue_.analyze(analyzer); + + if (!fnParamValue_.isConstant()) { + throw new AnalysisException( + USAGE + " must be a constant expression: EXECUTE " + toSql()); + } + if ((fnParamValue_ instanceof LiteralExpr) + && (fnParamValue_.getType().isIntegerType())) { + // Parameter is a snapshot id + kind_ = TRollbackType.VERSION_ID; + snapshotVersion_ = fnParamValue_.evalToInteger(analyzer, USAGE); + if (snapshotVersion_ < 0) { + throw new AnalysisException("Invalid version number has been given to " + USAGE + + ": " + snapshotVersion_); + } + LOG.debug(USAGE + " version: " + snapshotVersion_); + } else { + Expr timestampEpr = getParamConvertibleToTimestamp(); + if (timestampEpr != null) { + // Parameter is a timestamp. + kind_ = TRollbackType.TIME_ID; + try { + olderThanMillis_ = + ExprUtil.localTimestampToUnixTimeMicros(analyzer, timestampEpr) / 1000; + LOG.debug(USAGE + " millis: " + olderThanMillis_); + } catch (InternalException ie) { + throw new AnalysisException("An invalid TIMESTAMP expression has been given " + + "to " + USAGE + " the expression " + fnParamValue_.toSql() + + " cannot be converted to a TIMESTAMP", + ie); + } + } else { + throw new AnalysisException(USAGE + + " must be an integer type or a timestamp, but is '" + + fnParamValue_.getType() + "': EXECUTE " + toSql()); + } + } + } + + /** + * If field fnParamValue_ is a Timestamp, or can be cast to a Timestamp, + * then return an Expr for the Timestamp. + * @return null if the fnParamValue_ cannot be converted to a Timestamp. + */ + private Expr getParamConvertibleToTimestamp() { + Expr timestampExpr = fnParamValue_; + if (timestampExpr.getType().isStringType()) { + timestampExpr = new CastExpr(Type.TIMESTAMP, fnParamValue_); + } + if (timestampExpr.getType().isTimestamp()) { + return timestampExpr; + } + return null; + } + + @Override + public String toSql(ToSqlOptions options) { + return fnCallExpr_.toSql(); + } + + @Override + public TAlterTableParams toThrift() { + TAlterTableParams params = super.toThrift(); + params.setAlter_type(TAlterTableType.EXECUTE); + TAlterTableExecuteParams executeParams = new TAlterTableExecuteParams(); + TAlterTableExecuteRollbackParams executeRollbackParams = + new TAlterTableExecuteRollbackParams(); + executeParams.setExecute_rollback_params(executeRollbackParams); + executeRollbackParams.setKind(kind_); + switch (kind_) { + case TIME_ID: executeRollbackParams.setTimestamp_millis(olderThanMillis_); break; + case VERSION_ID: executeRollbackParams.setSnapshot_id(snapshotVersion_); break; + default: throw new IllegalStateException("Bad kind of execute rollback " + kind_); + } + params.setSet_execute_params(executeParams); + return params; + } +} diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java index 141aa386d..0a0e33206 100644 --- a/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableExecuteStmt.java @@ -17,104 +17,63 @@ package org.apache.impala.analysis; -import org.apache.impala.catalog.FeIcebergTable; -import org.apache.impala.catalog.Type; -import org.apache.impala.common.AnalysisException; -import org.apache.impala.common.InternalException; -import org.apache.impala.thrift.TAlterTableExecuteParams; -import org.apache.impala.thrift.TAlterTableParams; -import org.apache.impala.thrift.TAlterTableType; -import org.apache.impala.util.ExprUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.google.common.base.Preconditions; +import org.apache.commons.lang3.StringUtils; +import org.apache.impala.common.AnalysisException; + /** * Represents an ALTER TABLE EXECUTE () statement on Iceberg - * tables, supported operations: - * - expire_snapshots(): uses the ExpireSnapshot API to expire snaphosts, - * calls the ExpireSnapshot.expireOlderThan(timestampMillis) method. - * TableProperties.MIN_SNAPSHOTS_TO_KEEP table property manages how many snapshots - * should be retained even when all snapshots are selected by expireOlderThan(). + * tables. For supported operations see the subclasses. */ public class AlterTableExecuteStmt extends AlterTableStmt { - private final static Logger LOG = LoggerFactory.getLogger(AlterTableExecuteStmt.class); - - private final static String USAGE = "EXPIRE_SNAPSHOTS()"; // Expression of the function call after EXECUTE keyword. Parsed into an operation and // a value of that operation. - private FunctionCallExpr fnCallExpr_; - + protected FunctionCallExpr fnCallExpr_; // Value expression from fnCallExpr_. - private Expr fnParamValue_; - + protected Expr fnParamValue_; // The value after extracted from fnParamValue_ expression. - private long olderThanMillis_ = -1; + protected long olderThanMillis_ = -1; protected AlterTableExecuteStmt(TableName tableName, Expr fnCallExpr) { super(tableName); - fnCallExpr_ = (FunctionCallExpr)fnCallExpr; + fnCallExpr_ = (FunctionCallExpr) fnCallExpr; } - @Override - public void analyze(Analyzer analyzer) throws AnalysisException { - super.analyze(analyzer); - Preconditions.checkState(getTargetTable() instanceof FeIcebergTable); - analyzeFunctionCallExpr(analyzer); - analyzeOlderThan(analyzer); + /** + * Return an instance of a subclass of AlterTableExecuteStmt that can analyze the + * execute statement for the function call expression in 'expr'. + */ + public static AlterTableStmt createExecuteStmt(TableName tableName, Expr expr) + throws AnalysisException { + FunctionCallExpr fnCallExpr = (FunctionCallExpr) expr; + String functionNameOrig = fnCallExpr.getFnName().toString(); + String functionName = functionNameOrig.toUpperCase(); + switch (functionName) { + case "EXPIRE_SNAPSHOTS": + return new AlterTableExecuteExpireSnapshotsStmt(tableName, fnCallExpr); + case "ROLLBACK": return new AlterTableExecuteRollbackStmt(tableName, fnCallExpr); + default: + throw new AnalysisException(String.format("'%s' is not supported by ALTER " + + "TABLE EXECUTE. Supported operations are: " + + "EXPIRE_SNAPSHOTS(), " + + "ROLLBACK().", + functionNameOrig)); + } } - private void analyzeFunctionCallExpr(Analyzer analyzer) throws AnalysisException { + protected void analyzeFunctionCallExpr(Analyzer ignoredAnalyzer, String usage) + throws AnalysisException { // fnCallExpr_ analyzed here manually, because it is not an actual function but a // catalog operation. String fnName = fnCallExpr_.getFnName().toString(); - if (!fnName.toUpperCase().equals("EXPIRE_SNAPSHOTS")) { - throw new AnalysisException(String.format("'%s' is not supported by ALTER " + - "TABLE
EXECUTE. Supported operation is %s.", fnName, USAGE)); - } + Preconditions.checkState( + StringUtils.equalsAnyIgnoreCase(fnName, "EXPIRE_SNAPSHOTS", "ROLLBACK")); if (fnCallExpr_.getParams().size() != 1) { - throw new AnalysisException(USAGE + " must have one parameter: " + toSql()); + throw new AnalysisException(usage + " must have one parameter: " + toSql()); } fnParamValue_ = fnCallExpr_.getParams().exprs().get(0); } - private void analyzeOlderThan(Analyzer analyzer) throws AnalysisException { - Preconditions.checkNotNull(fnParamValue_); - fnParamValue_.analyze(analyzer); - if (!fnParamValue_.isConstant()) { - throw new AnalysisException(USAGE + " must be a constant expression: " + toSql()); - } - if (fnParamValue_.getType().isStringType()) { - fnParamValue_ = new CastExpr(Type.TIMESTAMP, fnParamValue_); - } - if (!fnParamValue_.getType().isTimestamp()) { - throw new AnalysisException(USAGE + " must be a timestamp type but is '" + - fnParamValue_.getType() + "': " + fnParamValue_.toSql()); - } - try { - olderThanMillis_ = - ExprUtil.localTimestampToUnixTimeMicros(analyzer, fnParamValue_) / 1000; - LOG.debug(USAGE + " millis: " + String.valueOf(olderThanMillis_)); - } catch (InternalException ie) { - throw new AnalysisException("Invalid TIMESTAMP expression has been given to " + - USAGE + ": " + ie.getMessage(), ie); - } - } - - @Override - public String toSql(ToSqlOptions options) { - return fnCallExpr_.toSql(); - } - - @Override - public TAlterTableParams toThrift() { - TAlterTableParams params = super.toThrift(); - params.setAlter_type(TAlterTableType.EXECUTE); - TAlterTableExecuteParams executeParams = new TAlterTableExecuteParams(); - executeParams.setOlder_than_millis(olderThanMillis_); - params.setSet_execute_params(executeParams); - return params; - } } diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java index 28a82ac03..e2fa4e713 100644 --- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java @@ -178,7 +178,7 @@ public interface FeFsTable extends FeTable { Map getPartitionMap(); /** - * @param the index of the target partitioning column + * @param col the index of the target partitioning column * @return a map from value to a set of partitions for which column 'col' * has that value. */ diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 3830ffa7e..6e2a3f478 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -165,6 +165,7 @@ import org.apache.impala.thrift.TAlterTableAddPartitionParams; import org.apache.impala.thrift.TAlterTableAlterColParams; import org.apache.impala.thrift.TAlterTableDropColParams; import org.apache.impala.thrift.TAlterTableDropPartitionParams; +import org.apache.impala.thrift.TAlterTableExecuteParams; import org.apache.impala.thrift.TAlterTableOrViewSetOwnerParams; import org.apache.impala.thrift.TAlterTableParams; import org.apache.impala.thrift.TAlterTableReplaceColsParams; @@ -1367,9 +1368,23 @@ public class CatalogOpExecutor { break; case EXECUTE: Preconditions.checkState(params.isSetSet_execute_params()); - String summary = IcebergCatalogOpExecutor.alterTableExecute(iceTxn, - params.getSet_execute_params()); - addSummary(response, summary); + // All the EXECUTE functions operate only on Iceberg data. + needsToUpdateHms = false; + TAlterTableExecuteParams setExecuteParams = params.getSet_execute_params(); + if (setExecuteParams.isSetExecute_rollback_params()) { + String rollbackSummary = IcebergCatalogOpExecutor.alterTableExecuteRollback( + iceTxn, tbl, setExecuteParams.getExecute_rollback_params()); + addSummary(response, rollbackSummary); + } else if (setExecuteParams.isSetExpire_snapshots_params()) { + String expireSummary = + IcebergCatalogOpExecutor.alterTableExecuteExpireSnapshots( + iceTxn, setExecuteParams.getExpire_snapshots_params()); + addSummary(response, expireSummary); + } else { + // Cannot happen, but throw just in case. + throw new IllegalStateException( + "Alter table execute statement is not implemented."); + } break; case SET_PARTITION_SPEC: // Set partition spec uses 'TableOperations', not transactions. @@ -1393,7 +1408,7 @@ public class CatalogOpExecutor { break; case REPLACE_COLUMNS: // It doesn't make sense to replace all the columns of an Iceberg table as it - // would basically make all existing data unaccessible. + // would basically make all existing data inaccessible. default: throw new UnsupportedOperationException( "Unsupported ALTER TABLE operation for Iceberg tables: " + @@ -1414,7 +1429,7 @@ public class CatalogOpExecutor { if (!needsToUpdateHms) { // We don't need to update HMS because either it is already done by Iceberg's - // HiveCatalog, or we modified the PARTITION SPEC which is not stored in HMS. + // HiveCatalog, or we modified the Iceberg data which is not stored in HMS. loadTableMetadata(tbl, newCatalogVersion, true, true, null, "ALTER Iceberg TABLE " + params.getAlter_type().name()); catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion); diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java index 75aa6e9a0..10d19f7c5 100644 --- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java @@ -23,16 +23,17 @@ import java.util.List; import java.util.Map; import org.apache.iceberg.AppendFiles; -import org.apache.iceberg.BaseReplacePartitions; import org.apache.iceberg.BaseTable; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.DeleteFiles; import org.apache.iceberg.ExpireSnapshots; +import org.apache.iceberg.ManageSnapshots; import org.apache.iceberg.Metrics; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.ReplacePartitions; import org.apache.iceberg.Schema; +import org.apache.iceberg.SnapshotManager; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; @@ -42,32 +43,36 @@ import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expressions; -import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey; import org.apache.impala.catalog.FeIcebergTable; import org.apache.impala.catalog.IcebergTable; import org.apache.impala.catalog.TableLoadingException; import org.apache.impala.catalog.TableNotFoundException; +import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey; import org.apache.impala.catalog.iceberg.IcebergCatalog; import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.fb.FbIcebergColumnStats; import org.apache.impala.fb.FbIcebergDataFile; -import org.apache.impala.thrift.TAlterTableExecuteParams; +import org.apache.impala.thrift.TAlterTableExecuteExpireSnapshotsParams; +import org.apache.impala.thrift.TAlterTableExecuteRollbackParams; import org.apache.impala.thrift.TColumn; import org.apache.impala.thrift.TIcebergCatalog; import org.apache.impala.thrift.TIcebergOperationParam; import org.apache.impala.thrift.TIcebergPartitionSpec; +import org.apache.impala.thrift.TRollbackType; import org.apache.impala.util.IcebergSchemaConverter; import org.apache.impala.util.IcebergUtil; -import org.apache.log4j.Logger; import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This is a helper for the CatalogOpExecutor to provide Iceberg related DDL functionality * such as creating and dropping tables from Iceberg. */ public class IcebergCatalogOpExecutor { - public static final Logger LOG = Logger.getLogger(IcebergCatalogOpExecutor.class); + public static final Logger LOG = + LoggerFactory.getLogger(IcebergCatalogOpExecutor.class); /** * Create Iceberg table by Iceberg api @@ -177,14 +182,49 @@ public class IcebergCatalogOpExecutor { tableOp.commit(metadata, newMetadata); } - public static String alterTableExecute(Transaction txn, - TAlterTableExecuteParams params) { + /** + * Use the ExpireSnapshot API to expire snapshots by calling the + * ExpireSnapshot.expireOlderThan(timestampMillis) method. + * TableProperties.MIN_SNAPSHOTS_TO_KEEP table property manages how many snapshots + * should be retained even when all snapshots are selected by expireOlderThan(). + */ + public static String alterTableExecuteExpireSnapshots( + Transaction txn, TAlterTableExecuteExpireSnapshotsParams params) { ExpireSnapshots expireApi = txn.expireSnapshots(); + Preconditions.checkState(params.isSetOlder_than_millis()); expireApi.expireOlderThan(params.older_than_millis); expireApi.commit(); return "Snapshots have been expired."; } + /** + * Executes an ALTER TABLE EXECUTE ROLLBACK. + */ + public static String alterTableExecuteRollback( + Transaction iceTxn, FeIcebergTable tbl, TAlterTableExecuteRollbackParams params) { + TRollbackType kind = params.getKind(); + ManageSnapshots manageSnapshots = iceTxn.manageSnapshots(); + switch (kind) { + case TIME_ID: + Preconditions.checkState(params.isSetTimestamp_millis()); + long timestampMillis = params.getTimestamp_millis(); + LOG.info("Rollback iceberg table to snapshot before timestamp {}", + timestampMillis); + manageSnapshots.rollbackToTime(timestampMillis); + break; + case VERSION_ID: + Preconditions.checkState(params.isSetSnapshot_id()); + long snapshotId = params.getSnapshot_id(); + LOG.info("Rollback iceberg table to snapshot id {}", snapshotId); + manageSnapshots.rollbackTo(snapshotId); + break; + default: throw new IllegalStateException("Bad kind of execute rollback " + kind); + } + // Commit the update. + manageSnapshots.commit(); + return "Rollback executed."; + } + /** * Drops a column from a Iceberg table. */ @@ -370,4 +410,4 @@ public class IcebergCatalogOpExecutor { String.valueOf(version)); updateProps.commit(); } -} +} \ No newline at end of file diff --git a/fe/src/main/jflex/sql-scanner.flex b/fe/src/main/jflex/sql-scanner.flex index 6a96ef100..418334be1 100644 --- a/fe/src/main/jflex/sql-scanner.flex +++ b/fe/src/main/jflex/sql-scanner.flex @@ -408,7 +408,8 @@ import org.apache.impala.thrift.TReservedWordsVersion; "year", "month", "day", "hour", "minute", "second", "begin", "call", "check", "classifier", "close", "identity", "language", "localtime", "member", "module", "new", "nullif", "old", "open", "parameter", - "period", "result", "return", "sql", "start", "system", "time", "user", "value" + "period", "result", "return", "rollback", "sql", "start", "system", "time", + "user", "value" })); } diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java index 5986fb1c8..c4f656d3b 100755 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java @@ -4281,8 +4281,8 @@ public class AnalyzeDDLTest extends FrontendTestBase { // Negative tests AnalysisError("alter table functional_parquet.iceberg_partitioned execute " + "unsupported_operation(123456789);", "'unsupported_operation' is not supported " + - "by ALTER TABLE
EXECUTE. Supported operation is " + - "EXPIRE_SNAPSHOTS()"); + "by ALTER TABLE
EXECUTE. Supported operations are: " + + "EXPIRE_SNAPSHOTS(), ROLLBACK()"); AnalysisError("alter table functional_parquet.iceberg_partitioned execute " + "expire_snapshots(now(), 3);", "EXPIRE_SNAPSHOTS() must have one " + "parameter: expire_snapshots(now(), 3)"); @@ -4297,6 +4297,49 @@ public class AnalyzeDDLTest extends FrontendTestBase { " been given to EXPIRE_SNAPSHOTS()"); } + @Test + public void TestAlterExecuteRollback() { + AnalyzesOk("alter table functional_parquet.iceberg_partitioned execute " + + "rollback('2022-01-04 10:00:00');"); + AnalyzesOk("alter table functional_parquet.iceberg_partitioned execute " + + "rollback(123456);"); + // Timestamp can be an expression. + AnalyzesOk("alter table functional_parquet.iceberg_partitioned execute " + + "rollback(cast('2021-08-09 15:52:45' as timestamp) - interval 2 days + " + + "interval 3 hours);"); + + // Negative tests + AnalysisError("alter table nodb.alltypes execute " + + "rollback('2022-01-04 10:00:00');", + "Could not resolve table reference: 'nodb.alltypes'"); + AnalysisError("alter table functional.alltypes execute " + + "rollback('2022-01-04 10:00:00');", + "ALTER TABLE EXECUTE ROLLBACK is only supported for Iceberg tables: " + + "functional.alltypes"); + AnalysisError("alter table functional_parquet.iceberg_partitioned execute " + + "rollback(id);", "EXECUTE ROLLBACK(): " + + " must be a constant expression: EXECUTE rollback(id)"); + AnalysisError("alter table functional_parquet.iceberg_partitioned execute " + + "rollback(3.14);", "EXECUTE ROLLBACK(): " + + "must be an integer type or a timestamp, but is 'DECIMAL(3,2)': " + + "EXECUTE rollback(3.14)"); + AnalysisError("alter table functional_parquet.iceberg_partitioned execute " + + "rollback('2021-02-32 15:52:45');", "An invalid TIMESTAMP expression has been " + + "given to EXECUTE ROLLBACK(): the expression " + + "'2021-02-32 15:52:45' cannot be converted to a TIMESTAMP"); + AnalysisError("alter table functional_parquet.iceberg_partitioned execute " + + "rollback('the beginning');", "An invalid TIMESTAMP expression has been " + + "given to EXECUTE ROLLBACK(): the expression " + + "'the beginning' cannot be converted to a TIMESTAMP"); + AnalysisError("alter table functional_parquet.iceberg_partitioned execute " + + "rollback(1111,2222);", + "EXECUTE ROLLBACK(): must have one parameter"); + AnalysisError("alter table functional_parquet.iceberg_partitioned execute " + + "rollback('1111');", "An invalid TIMESTAMP expression has been " + + "given to EXECUTE ROLLBACK(): the expression " + + "'1111' cannot be converted to a TIMESTAMP"); + } + private static String buildLongOwnerName() { StringBuilder comment = new StringBuilder(); for (int i = 0; i < MetaStoreUtil.MAX_OWNER_LENGTH + 5; i++) { diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-rollback-negative.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-rollback-negative.test new file mode 100644 index 000000000..4c2c3cd18 --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-rollback-negative.test @@ -0,0 +1,27 @@ +==== +---- QUERY +# EXECUTE ROLLBACK with an invalid snapshot id on a partitioned Iceberg table. +ALTER TABLE functional_parquet.iceberg_partitioned EXECUTE ROLLBACK(1) +---- CATCH +Cannot roll back to unknown snapshot id: 1 +==== +---- QUERY +# EXECUTE ROLLBACK to a too old date on a partitioned Iceberg table. +set timezone=CET; +ALTER TABLE functional_parquet.iceberg_partitioned EXECUTE ROLLBACK('2020-08-31 07:58:00') +---- CATCH +Cannot roll back, no valid snapshot older than: 1598853480000 +==== +---- QUERY +# EXECUTE ROLLBACK to an Invalid timestamp expression on a partitioned Iceberg table. +set timezone=CET; +ALTER TABLE functional_parquet.iceberg_partitioned EXECUTE ROLLBACK('1111'); +---- CATCH +An invalid TIMESTAMP expression has been given to EXECUTE ROLLBACK(): the expression '1111' cannot be converted to a TIMESTAMP +==== +---- QUERY +# EXECUTE ROLLBACK fails on a non-Iceberg table. +ALTER TABLE functional_parquet.alltypestiny EXECUTE ROLLBACK(1111111) +---- CATCH +ALTER TABLE EXECUTE ROLLBACK is only supported for Iceberg tables +==== \ No newline at end of file diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py index dc7ac51c9..0a53c31f7 100644 --- a/tests/custom_cluster/test_events_custom_configs.py +++ b/tests/custom_cluster/test_events_custom_configs.py @@ -28,6 +28,7 @@ from tests.common.skip import SkipIfFS from tests.util.hive_utils import HiveDbWrapper from tests.util.event_processor_utils import EventProcessorUtils from tests.util.filesystem_utils import WAREHOUSE +from tests.util.iceberg_util import IcebergCatalogs @SkipIfFS.hive @@ -482,6 +483,7 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite): def test_iceberg_self_events(self, unique_database): """This test checks that Impala doesn't refresh Iceberg tables on self events.""" tbl_name = unique_database + ".test_iceberg_events" + iceberg_catalogs = IcebergCatalogs(unique_database) def check_self_events(query, skips_events=True): tbls_refreshed_before, partitions_refreshed_before, \ @@ -495,19 +497,8 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite): if skips_events: assert events_skipped_after > events_skipped_before - hadoop_tables = "'iceberg.catalog'='hadoop.tables'" - hadoop_catalog = ("'iceberg.catalog'='hadoop.catalog', " + - "'iceberg.catalog_location'='/test-warehouse/{0}/hadoop_catalog_test/'".format( - unique_database)) - hive_catalog = "'iceberg.catalog'='hive.catalog'" - hive_catalogs = "'iceberg.catalog'='ice_hive_cat'" - hadoop_catalogs = "'iceberg.catalog'='ice_hadoop_cat'" - - all_catalogs = [hadoop_tables, hadoop_catalog, hive_catalog, hive_catalogs, - hadoop_catalogs] - - for catalog in all_catalogs: - is_hive_catalog = catalog == hive_catalog or catalog == hive_catalogs + for catalog in iceberg_catalogs.get_iceberg_catalog_properties(): + is_hive_catalog = iceberg_catalogs.is_a_hive_catalog(catalog) self.client.execute(""" CREATE TABLE {0} (i int) STORED AS ICEBERG TBLPROPERTIES ({1})""".format(tbl_name, catalog)) diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py index 6f64ced0e..a8e05baf5 100644 --- a/tests/query_test/test_iceberg.py +++ b/tests/query_test/test_iceberg.py @@ -22,6 +22,8 @@ import logging import os import pytest import random + +import re import time from subprocess import check_call @@ -33,14 +35,14 @@ import json from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.iceberg_test_suite import IcebergTestSuite -from tests.common.skip import SkipIf, SkipIfDockerizedCluster, SkipIfLocal +from tests.common.skip import SkipIf, SkipIfDockerizedCluster from tests.common.file_utils import ( create_iceberg_table_from_directory, create_table_from_parquet) from tests.shell.util import run_impala_shell_cmd from tests.util.filesystem_utils import get_fs_path, IS_HDFS from tests.util.get_parquet_metadata import get_parquet_metadata -from tests.util.iceberg_util import cast_ts, quote, parse_timestamp +from tests.util.iceberg_util import cast_ts, quote, get_snapshots, IcebergCatalogs LOG = logging.getLogger(__name__) @@ -69,58 +71,61 @@ class TestIcebergTable(IcebergTestSuite): def test_expire_snapshots(self, unique_database): tbl_name = unique_database + ".expire_snapshots" + iceberg_catalogs = IcebergCatalogs(unique_database) + for catalog_properties in iceberg_catalogs.get_iceberg_catalog_properties(): + # We are setting the TIMEZONE query option in this test, so let's create a local + # impala client. + with self.create_impala_client() as impalad_client: + # Iceberg doesn't create a snapshot entry for the initial empty table + impalad_client.execute(""" + create table {0} (i int) stored as iceberg + TBLPROPERTIES ({1})""".format(tbl_name, catalog_properties)) + ts_0 = datetime.datetime.now() + insert_q = "insert into {0} values (1)".format(tbl_name) + ts_1 = self.execute_query_ts(impalad_client, insert_q) + time.sleep(5) + impalad_client.execute(insert_q) + time.sleep(5) + ts_2 = self.execute_query_ts(impalad_client, insert_q) + impalad_client.execute(insert_q) - # We are setting the TIMEZONE query option in this test, so let's create a local - # impala client. - with self.create_impala_client() as impalad_client: - # Iceberg doesn't create a snapshot entry for the initial empty table - impalad_client.execute("create table {0} (i int) stored as iceberg" - .format(tbl_name)) - ts_0 = datetime.datetime.now() - insert_q = "insert into {0} values (1)".format(tbl_name) - ts_1 = self.execute_query_ts(impalad_client, insert_q) - time.sleep(5) - impalad_client.execute(insert_q) - time.sleep(5) - ts_2 = self.execute_query_ts(impalad_client, insert_q) - impalad_client.execute(insert_q) + # There should be 4 snapshots initially + self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 4) + # Expire the oldest snapshot and test that the oldest one was expired + expire_q = "alter table {0} execute expire_snapshots({1})" + impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_1))) + self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3) + self.expect_num_snapshots_from(impalad_client, tbl_name, ts_1, 3) - # There should be 4 snapshots initially - self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 4) - # Expire the oldest snapshot and test that the oldest one was expired - expire_q = "alter table {0} execute expire_snapshots({1})" - impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_1))) - self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3) - self.expect_num_snapshots_from(impalad_client, tbl_name, ts_1, 3) + # Expire with a timestamp in which the interval does not touch existing snapshot + impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_1))) + self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3) - # Expire with a timestamp in which the interval does not touch existing snapshot - impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_1))) - self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3) + # Expire all, but retain 1 + impalad_client.execute(expire_q.format(tbl_name, + cast_ts(datetime.datetime.now()))) + self.expect_num_snapshots_from(impalad_client, tbl_name, ts_2, 1) - # Expire all, but retain 1 - impalad_client.execute(expire_q.format(tbl_name, - cast_ts(datetime.datetime.now()))) - self.expect_num_snapshots_from(impalad_client, tbl_name, ts_2, 1) + # Change number of retained snapshots, then expire all + impalad_client.execute("""alter table {0} set tblproperties + ('history.expire.min-snapshots-to-keep' = '2')""".format(tbl_name)) + impalad_client.execute(insert_q) + impalad_client.execute(insert_q) + impalad_client.execute(expire_q.format(tbl_name, + cast_ts(datetime.datetime.now()))) + self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 2) - # Change number of retained snapshots, then expire all - impalad_client.execute("""alter table {0} set tblproperties - ('history.expire.min-snapshots-to-keep' = '2')""".format(tbl_name)) - impalad_client.execute(insert_q) - impalad_client.execute(insert_q) - impalad_client.execute(expire_q.format(tbl_name, - cast_ts(datetime.datetime.now()))) - self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 2) - - # Check that timezone is interpreted in local timezone controlled by query option - # TIMEZONE. - impalad_client.execute("SET TIMEZONE='Asia/Tokyo'") - impalad_client.execute(insert_q) - ts_tokyo = self.impala_now(impalad_client) - impalad_client.execute("SET TIMEZONE='Europe/Budapest'") - impalad_client.execute(insert_q) - impalad_client.execute("SET TIMEZONE='Asia/Tokyo'") - impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_tokyo))) - self.expect_num_snapshots_from(impalad_client, tbl_name, ts_tokyo, 1) + # Check that timezone is interpreted in local timezone controlled by query option + # TIMEZONE. + impalad_client.execute("SET TIMEZONE='Asia/Tokyo'") + impalad_client.execute(insert_q) + ts_tokyo = self.impala_now(impalad_client) + impalad_client.execute("SET TIMEZONE='Europe/Budapest'") + impalad_client.execute(insert_q) + impalad_client.execute("SET TIMEZONE='Asia/Tokyo'") + impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_tokyo))) + self.expect_num_snapshots_from(impalad_client, tbl_name, ts_tokyo, 1) + impalad_client.execute("DROP TABLE {0}".format(tbl_name)) def test_truncate_iceberg_tables(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-truncate', vector, use_db=unique_database) @@ -128,7 +133,7 @@ class TestIcebergTable(IcebergTestSuite): # With IMPALA-11429 there is an extra "ALTER TABLE SET OWNER" right after executing # "CREATE TABLE". As a result dropping the table location right after CREATE TABLE will # trigger a known bug: IMPALA-11509. Hence, turning this test off until there is a fix - # for this issue. Note, we could add a sleep righ after table creation that could + # for this issue. Note, we could add a sleep right after table creation that could # workaround the above mentioned bug but then we would hit another issue: IMPALA-11502. @SkipIf.not_dfs def test_drop_incomplete_table(self, vector, unique_database): @@ -231,31 +236,127 @@ class TestIcebergTable(IcebergTestSuite): tblproperties('iceberg.catalog'='hadoop.tables')""".format(tbl_name)) self.client.execute("INSERT INTO {0} VALUES (1)".format(tbl_name)) self.client.execute("INSERT INTO {0} VALUES (2)".format(tbl_name)) - result = self.client.execute("DESCRIBE HISTORY {0}".format(tbl_name)) - assert(len(result.data) == 2) - first_snapshot = result.data[0].split("\t") - second_snapshot = result.data[1].split("\t") + snapshots = get_snapshots(self.client, tbl_name, expected_result_size=2) + first_snapshot = snapshots[0] + second_snapshot = snapshots[1] # Check that first snapshot is older than the second snapshot. - assert(first_snapshot[0] < second_snapshot[0]) + assert(first_snapshot.get_creation_time() < second_snapshot.get_creation_time()) # Check that second snapshot's parent ID is the snapshot ID of the first snapshot. - assert(first_snapshot[1] == second_snapshot[2]) + assert(first_snapshot.get_snapshot_id() == second_snapshot.get_parent_id()) # The first snapshot has no parent snapshot ID. - assert(first_snapshot[2] == "NULL") + assert(first_snapshot.get_parent_id() is None) # Check "is_current_ancestor" column. - assert(first_snapshot[3] == "TRUE" and second_snapshot[3] == "TRUE") + assert(first_snapshot.is_current_ancestor()) + assert(second_snapshot.is_current_ancestor()) + + def test_execute_rollback_negative(self, vector): + """Negative test for EXECUTE ROLLBACK.""" + self.run_test_case('QueryTest/iceberg-rollback-negative', vector) + + def test_execute_rollback(self, unique_database): + """Test for EXECUTE ROLLBACK.""" + iceberg_catalogs = IcebergCatalogs(unique_database) + for catalog_properties in iceberg_catalogs.get_iceberg_catalog_properties(): + # Create a table with multiple snapshots. + tbl_name = unique_database + ".iceberg_execute_rollback" + # We are setting the TIMEZONE query option in this test, so let's create a local + # impala client. + with self.create_impala_client() as impalad_client: + impalad_client.execute(""" + create table {0} (i int) stored as iceberg + TBLPROPERTIES ({1})""".format(tbl_name, catalog_properties)) + initial_snapshots = 3 + for i in range(initial_snapshots): + impalad_client.execute("INSERT INTO {0} VALUES ({1})".format(tbl_name, i)) + snapshots = get_snapshots(impalad_client, tbl_name, + expected_result_size=initial_snapshots) + + output = self.rollback_to_id(tbl_name, snapshots[1].get_snapshot_id()) + LOG.info("success output={0}".format(output)) + + # We rolled back, but that creates a new snapshot, so now there are 4. + snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=4) + # The new snapshot has the same id (and parent id) as the snapshot we rolled back + # to, but it has a different creation time. + assert snapshots[1].get_snapshot_id() == snapshots[3].get_snapshot_id() + assert snapshots[1].get_parent_id() == snapshots[3].get_parent_id() + assert snapshots[1].get_creation_time() < snapshots[3].get_creation_time() + # The "orphaned" snapshot is now not a current ancestor. + assert not snapshots[2].is_current_ancestor() + + # We cannot roll back to a snapshot that is not a current ancestor. + output = self.rollback_to_id_expect_failure(tbl_name, + snapshots[2].get_snapshot_id(), + expected_text="Cannot roll back to snapshot, not an ancestor of the current " + "state") + + # Create another snapshot. + before_insert = datetime.datetime.now() + impalad_client.execute("INSERT INTO {0} VALUES ({1})".format(tbl_name, 4)) + snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=5) + + # Rollback to before the last insert. + self.rollback_to_ts(impalad_client, tbl_name, before_insert) + # This creates another snapshot. + snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=6) + # The snapshot id is the same, the dates differ + assert snapshots[3].get_snapshot_id() == snapshots[5].get_snapshot_id() + assert snapshots[3].get_creation_time() < snapshots[5].get_creation_time() + assert not snapshots[4].is_current_ancestor() + + # Show that the EXECUTE ROLLBACK is respecting the current timezone. + # To do this we try to roll back to a time for which there is no + # snapshot, this will fail with an error message that includes the specified + # time. We parse out that time. By doing this in two timezones we can see + # that the parameter being used was affected by the current timezone. + one_hour_ago = before_insert - datetime.timedelta(hours=1) + # We use Timezones from Japan and Iceland to avoid any DST complexities. + impalad_client.execute("SET TIMEZONE='Asia/Tokyo'") + japan_ts = self.get_snapshot_ts_from_failed_rollback( + impalad_client, tbl_name, one_hour_ago) + impalad_client.execute("SET TIMEZONE='Iceland'") + iceland_ts = self.get_snapshot_ts_from_failed_rollback( + impalad_client, tbl_name, one_hour_ago) + diff_hours = (iceland_ts - japan_ts) / (1000 * 60 * 60) + assert diff_hours == 9 + + impalad_client.execute("DROP TABLE {0}".format(tbl_name)) + + def get_snapshot_ts_from_failed_rollback(self, client, tbl_name, ts): + """Run an EXECUTE ROLLBACK which is expected to fail. + Parse the error message to extract the timestamp for which there + was no snapshot, and convert the string to an integer""" + try: + self.rollback_to_ts(client, tbl_name, ts) + assert False, "Query should have failed" + except ImpalaBeeswaxException as e: + result = re.search(r".*no valid snapshot older than: (\d+)", str(e)) + time_str = result.group(1) + snapshot_ts = int(time_str) + assert snapshot_ts > 0, "did not decode snapshot ts from {0}".format(result) + return snapshot_ts + + def rollback_to_ts(self, client, tbl_name, ts): + """Rollback a table to a snapshot timestamp.""" + query = "ALTER TABLE {0} EXECUTE ROLLBACK ('{1}');".format(tbl_name, ts.isoformat()) + return self.execute_query_expect_success(client, query) + + def rollback_to_id(self, tbl_name, id): + """Rollback a table to a snapshot id.""" + query = "ALTER TABLE {0} EXECUTE ROLLBACK ({1});".format(tbl_name, id) + return self.execute_query_expect_success(self.client, query) + + def rollback_to_id_expect_failure(self, tbl_name, id, expected_text=None): + """Attempt to roll back a table to a snapshot id, expecting a failure.""" + query = "ALTER TABLE {0} EXECUTE ROLLBACK ({1});".format(tbl_name, id) + output = self.execute_query_expect_failure(self.client, query) + if expected_text: + assert expected_text in str(output) + return output def test_describe_history_params(self, unique_database): tbl_name = unique_database + ".describe_history" - def expect_results_between(ts_start, ts_end, expected_result_size): - query = "DESCRIBE HISTORY {0} BETWEEN {1} AND {2};".format( - tbl_name, cast_ts(ts_start), cast_ts(ts_end)) - data = impalad_client.execute(query) - assert len(data.data) == expected_result_size - for i in range(len(data.data)): - result_ts_dt = parse_timestamp(data.data[i].split('\t')[0]) - assert result_ts_dt >= ts_start and result_ts_dt <= ts_end - # We are setting the TIMEZONE query option in this test, so let's create a local # impala client. with self.create_impala_client() as impalad_client: @@ -279,10 +380,11 @@ class TestIcebergTable(IcebergTestSuite): self.expect_num_snapshots_from(impalad_client, tbl_name, ts_3, 0) # Describe history with BETWEEN AND predicate - expect_results_between(ts_1, ts_2, 1) - expect_results_between(ts_1 - datetime.timedelta(hours=1), ts_2, 2) - expect_results_between(ts_1 - datetime.timedelta(hours=1), ts_2 + - datetime.timedelta(hours=1), 3) + self.expect_results_between(impalad_client, tbl_name, ts_1, ts_2, 1) + self.expect_results_between(impalad_client, tbl_name, + ts_1 - datetime.timedelta(hours=1), ts_2, 2) + self.expect_results_between(impalad_client, tbl_name, + ts_1 - datetime.timedelta(hours=1), ts_2 + datetime.timedelta(hours=1), 3) # Check that timezone is interpreted in local timezone controlled by query option # TIMEZONE. Persist the local times first and create a new snapshot. @@ -341,14 +443,6 @@ class TestIcebergTable(IcebergTestSuite): tbl_name, snapshot_id), expected) - def get_snapshots(): - data = impalad_client.execute("describe history {0}".format(tbl_name)) - ret = list() - for row in data.data: - fields = row.split('\t') - ret.append(fields[1]) - return ret - def impala_now(): now_data = impalad_client.execute("select now()") return now_data.data[0] @@ -409,12 +503,12 @@ class TestIcebergTable(IcebergTestSuite): ij_cols) # Query table as of snapshot IDs. - snapshots = get_snapshots() - expect_results_v(snapshots[0], ['1'], i_cols) - expect_results_v(snapshots[1], ['1', '2'], i_cols) - expect_results_v(snapshots[2], [], i_cols) - expect_results_v(snapshots[3], ['100'], i_cols) - expect_results_v(snapshots[4], ['100\tNULL', '3\t103'], ij_cols) + snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=5) + expect_results_v(snapshots[0].get_snapshot_id(), ['1'], i_cols) + expect_results_v(snapshots[1].get_snapshot_id(), ['1', '2'], i_cols) + expect_results_v(snapshots[2].get_snapshot_id(), [], i_cols) + expect_results_v(snapshots[3].get_snapshot_id(), ['100'], i_cols) + expect_results_v(snapshots[4].get_snapshot_id(), ['100\tNULL', '3\t103'], ij_cols) # Test of plain count star optimization # 'NumRowGroups' and 'NumFileMetadataRead' should not appear in profile @@ -427,11 +521,11 @@ class TestIcebergTable(IcebergTestSuite): expect_for_count_star_t(cast_ts(ts_4) + " - interval 5 seconds", '0') expect_for_count_star_t(cast_ts(ts_5), '2') expect_for_count_star_t(cast_ts(ts_5) + " + interval 1 hours", '2') - expect_for_count_star_v(snapshots[0], '1') - expect_for_count_star_v(snapshots[1], '2') - expect_for_count_star_v(snapshots[2], '0') - expect_for_count_star_v(snapshots[3], '1') - expect_for_count_star_v(snapshots[4], '2') + expect_for_count_star_v(snapshots[0].get_snapshot_id(), '1') + expect_for_count_star_v(snapshots[1].get_snapshot_id(), '2') + expect_for_count_star_v(snapshots[2].get_snapshot_id(), '0') + expect_for_count_star_v(snapshots[3].get_snapshot_id(), '1') + expect_for_count_star_v(snapshots[4].get_snapshot_id(), '2') # SELECT diff expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}' @@ -442,18 +536,19 @@ class TestIcebergTable(IcebergTestSuite): expect_results("""SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_new} MINUS SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_old}""".format( - tbl=tbl_name, v_new=snapshots[1], v_old=snapshots[0]), + tbl=tbl_name, v_new=snapshots[1].get_snapshot_id(), + v_old=snapshots[0].get_snapshot_id()), ['2'], i_cols) # Mix SYSTEM_TIME and SYSTEM_VERSION expect_results("""SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_new} MINUS SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_old}'""".format( - tbl=tbl_name, v_new=snapshots[1], ts_old=ts_1), + tbl=tbl_name, v_new=snapshots[1].get_snapshot_id(), ts_old=ts_1), ['2'], i_cols) expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}' MINUS SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_old}""".format( - tbl=tbl_name, ts_new=ts_2, v_old=snapshots[0]), + tbl=tbl_name, ts_new=ts_2, v_old=snapshots[0].get_snapshot_id()), ['2'], i_cols) expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}' MINUS diff --git a/tests/util/iceberg_util.py b/tests/util/iceberg_util.py index 924b5e249..21703acfd 100644 --- a/tests/util/iceberg_util.py +++ b/tests/util/iceberg_util.py @@ -106,3 +106,34 @@ def get_snapshots(impalad_client, tbl_name, ts_start=None, ts_end=None, for snapshot_str in rows.data: results.append(Snapshot(snapshot_str)) return results + + +class IcebergCatalogs: + """Utility class to generate TBLPROPERTIES corresponding to various iceberg catalogs.""" + + def __init__(self, database): + """Create a IcebergCatalogs object parameterized by database name.""" + self.database = database + + hive_catalog = "'iceberg.catalog'='hive.catalog'" + hive_catalogs = "'iceberg.catalog'='ice_hive_cat'" + hadoop_tables = "'iceberg.catalog'='hadoop.tables'" + hadoop_catalogs = "'iceberg.catalog'='ice_hadoop_cat'" + + def get_iceberg_catalog_properties(self): + """Return a list containing TBLPROPERTIES corresponding to various iceberg catalogs. + The TBLPROPERTIES can be used to create tables.""" + hadoop_catalog = ("'iceberg.catalog'='hadoop.catalog', " + + "'iceberg.catalog_location'='/test-warehouse/{0}/hadoop_catalog_test/'".format( + self.database)) + return [ + self.hadoop_tables, + self.hive_catalog, + self.hive_catalogs, + self.hadoop_catalogs, + hadoop_catalog + ] + + def is_a_hive_catalog(self, catalog): + """Return true if the catalog property is for a Hive catalog.""" + return catalog == self.hive_catalog or catalog == self.hive_catalogs