IMPALA-11482: Alter Table Execute Rollback for Iceberg tables.

Iceberg table modifications cause new table snapshots to be created;
these snapshots represent an earlier version of the table. The Iceberg
API provides a way to rollback the table to a previous snapshot.

This change adds the ability to execute a rollback on Iceberg tables
using the following statements:

- ALTER TABLE <tbl> EXECUTE ROLLBACK(<snapshot id>)
- ALTER TABLE <tbl> EXECUTE ROLLBACK('<timestamp>')

The latter form of the command rolls back to the most recent snapshot
that has a creation timestamp that is older than the specified
timestamp.

Note that when a table is rolled back to a snapshot, a new snapshot is
created with the same snapshot id, but with a new creation timestamp.

Testing:
 - Added analysis unit tests.
 - Added e2e tests.
 - Converted test_time_travel to use get_snapshots() from iceberg_util.
 - Add a utility class to allow pytests to create tables with various
   iceberg catalogs.

Change-Id: Ic74913d3b81103949ffb5eef7cc936303494f8b9
Reviewed-on: http://gerrit.cloudera.org:8080/19002
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Andrew Sherman
2022-09-13 18:29:57 -07:00
committed by Impala Public Jenkins
parent 0c7c6a335e
commit 29586d6631
14 changed files with 675 additions and 202 deletions

View File

@@ -414,12 +414,38 @@ struct TAlterTableSetPartitionSpecParams {
1: required CatalogObjects.TIcebergPartitionSpec partition_spec
}
// Parameters for ALTER TABLE EXECUTE operations.
struct TAlterTableExecuteParams {
// The parameter of the ExpireSnapshot.expireOlderThan(timestampMillis) Iceberg call.
// Parameters for ALTER TABLE EXECUTE EXPIRE_SNAPSHOTS operations.
struct TAlterTableExecuteExpireSnapshotsParams {
1: required i64 older_than_millis
}
// ALTER TABLE EXECUTE ROLLBACK can be to a date or snapshot id.
enum TRollbackType {
TIME_ID = 0
VERSION_ID = 1
}
// Parameters for ALTER TABLE EXECUTE ROLLBACK operations.
struct TAlterTableExecuteRollbackParams {
// Is rollback to a date or snapshot id.
1: required TRollbackType kind
// If kind is TIME_ID this is the date to rollback to.
2: optional i64 timestamp_millis
// If kind is VERSION_ID this is the id to rollback to.
3: optional i64 snapshot_id
}
// Parameters for ALTER TABLE EXECUTE ... operations.
struct TAlterTableExecuteParams {
// Parameters for ALTER TABLE EXECUTE EXPIRE_SNAPSHOTS
1: optional TAlterTableExecuteExpireSnapshotsParams expire_snapshots_params
// Parameters for ALTER TABLE EXECUTE ROLLBACK
2: optional TAlterTableExecuteRollbackParams execute_rollback_params
}
// Parameters for all ALTER TABLE commands.
struct TAlterTableParams {
1: required TAlterTableType alter_type
@@ -478,7 +504,7 @@ struct TAlterTableParams {
// Parameters for ALTER TABLE SET PARTITION SPEC
19: optional TAlterTableSetPartitionSpecParams set_partition_spec_params
// Parameters for ALTER TABLE EXECUTE
// Parameters for ALTER TABLE EXECUTE operations
20: optional TAlterTableExecuteParams set_execute_params
}

View File

@@ -37,6 +37,7 @@ import org.apache.impala.analysis.SetOperationStmt.SetOperator;
import org.apache.impala.analysis.RangePartition;
import org.apache.impala.analysis.TableSampleClause;
import org.apache.impala.analysis.AlterTableAddDropRangePartitionStmt.Operation;
import org.apache.impala.analysis.AlterTableExecuteStmt;
import org.apache.impala.analysis.IcebergPartitionSpec;
import org.apache.impala.analysis.IcebergPartitionField;
import org.apache.impala.analysis.IcebergPartitionTransform;
@@ -1361,7 +1362,7 @@ alter_tbl_stmt ::=
:}
| KW_ALTER KW_TABLE table_name:table KW_EXECUTE function_call_expr:expr
{:
RESULT = new AlterTableExecuteStmt(table, expr);
RESULT = AlterTableExecuteStmt.createExecuteStmt(table, expr);
:}
;

View File

@@ -0,0 +1,97 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.analysis;
import com.google.common.base.Preconditions;
import org.apache.impala.catalog.FeIcebergTable;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.InternalException;
import org.apache.impala.thrift.TAlterTableExecuteExpireSnapshotsParams;
import org.apache.impala.thrift.TAlterTableExecuteParams;
import org.apache.impala.thrift.TAlterTableParams;
import org.apache.impala.thrift.TAlterTableType;
import org.apache.impala.util.ExprUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Represents an ALTER TABLE <tbl> EXECUTE EXPIRE_SNAPSHOTS(<parameters>) statement on
* Iceberg tables, the parameter is (<timestamp>).
*/
public class AlterTableExecuteExpireSnapshotsStmt extends AlterTableExecuteStmt {
private final static Logger LOG =
LoggerFactory.getLogger(AlterTableExecuteExpireSnapshotsStmt.class);
protected final static String USAGE = "EXPIRE_SNAPSHOTS(<expression>)";
protected AlterTableExecuteExpireSnapshotsStmt(TableName tableName, Expr fnCallExpr) {
super(tableName, fnCallExpr);
}
@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
super.analyze(analyzer);
Preconditions.checkState(getTargetTable() instanceof FeIcebergTable);
analyzeFunctionCallExpr(analyzer, USAGE);
analyzeOlderThan(analyzer);
}
@Override
public String toSql(ToSqlOptions options) {
return fnCallExpr_.toSql();
}
@Override
public TAlterTableParams toThrift() {
TAlterTableParams params = super.toThrift();
params.setAlter_type(TAlterTableType.EXECUTE);
TAlterTableExecuteParams executeParams = new TAlterTableExecuteParams();
TAlterTableExecuteExpireSnapshotsParams executeExpireSnapshotsParams =
new TAlterTableExecuteExpireSnapshotsParams();
executeParams.setExpire_snapshots_params(executeExpireSnapshotsParams);
executeExpireSnapshotsParams.setOlder_than_millis(olderThanMillis_);
params.setSet_execute_params(executeParams);
return params;
}
protected void analyzeOlderThan(Analyzer analyzer) throws AnalysisException {
Preconditions.checkNotNull(fnParamValue_);
fnParamValue_.analyze(analyzer);
if (!fnParamValue_.isConstant()) {
throw new AnalysisException(USAGE + " must be a constant expression: " + toSql());
}
if (fnParamValue_.getType().isStringType()) {
fnParamValue_ = new CastExpr(Type.TIMESTAMP, fnParamValue_);
}
if (!fnParamValue_.getType().isTimestamp()) {
throw new AnalysisException(USAGE + " must be a timestamp type but is '"
+ fnParamValue_.getType() + "': " + fnParamValue_.toSql());
}
try {
olderThanMillis_ =
ExprUtil.localTimestampToUnixTimeMicros(analyzer, fnParamValue_) / 1000;
LOG.debug(USAGE + " millis: " + olderThanMillis_);
} catch (InternalException ie) {
throw new AnalysisException("Invalid TIMESTAMP expression has been given to "
+ USAGE + ": " + ie.getMessage(),
ie);
}
}
}

View File

@@ -0,0 +1,147 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.analysis;
import com.google.common.base.Preconditions;
import org.apache.impala.catalog.FeIcebergTable;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.InternalException;
import org.apache.impala.thrift.TAlterTableExecuteParams;
import org.apache.impala.thrift.TAlterTableExecuteRollbackParams;
import org.apache.impala.thrift.TAlterTableParams;
import org.apache.impala.thrift.TAlterTableType;
import org.apache.impala.thrift.TRollbackType;
import org.apache.impala.util.ExprUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Represents an ALTER TABLE EXECUTE ROLLBACK(<parameter>) statement.
* The parameter can be a snapshot id, or a timestamp.
* A rollback to a snapshot id causes a new snapshot to be
* created with the same snapshot id, but with a new creation timestamp.
* A rollback to a timestamp rolls back to the latest snapshot
* that has a creation timestamp that is older than the specified
* timestamp.
*/
public class AlterTableExecuteRollbackStmt extends AlterTableExecuteStmt {
public static final String USAGE = "EXECUTE ROLLBACK(<expression>):";
private final static Logger LOG =
LoggerFactory.getLogger(AlterTableExecuteRollbackStmt.class);
private long snapshotVersion_;
private TRollbackType kind_;
public AlterTableExecuteRollbackStmt(TableName tableName, FunctionCallExpr fnCallExpr) {
super(tableName, fnCallExpr);
}
@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
super.analyze(analyzer);
FeTable table = getTargetTable();
if (!(table instanceof FeIcebergTable)) {
throw new AnalysisException("ALTER TABLE EXECUTE ROLLBACK is only supported "
+ "for Iceberg tables: " + table.getTableName());
}
analyzeFunctionCallExpr(analyzer, USAGE);
analyzeParameter(analyzer);
}
private void analyzeParameter(Analyzer analyzer) throws AnalysisException {
Preconditions.checkNotNull(fnParamValue_);
fnParamValue_.analyze(analyzer);
if (!fnParamValue_.isConstant()) {
throw new AnalysisException(
USAGE + " <expression> must be a constant expression: EXECUTE " + toSql());
}
if ((fnParamValue_ instanceof LiteralExpr)
&& (fnParamValue_.getType().isIntegerType())) {
// Parameter is a snapshot id
kind_ = TRollbackType.VERSION_ID;
snapshotVersion_ = fnParamValue_.evalToInteger(analyzer, USAGE);
if (snapshotVersion_ < 0) {
throw new AnalysisException("Invalid version number has been given to " + USAGE
+ ": " + snapshotVersion_);
}
LOG.debug(USAGE + " version: " + snapshotVersion_);
} else {
Expr timestampEpr = getParamConvertibleToTimestamp();
if (timestampEpr != null) {
// Parameter is a timestamp.
kind_ = TRollbackType.TIME_ID;
try {
olderThanMillis_ =
ExprUtil.localTimestampToUnixTimeMicros(analyzer, timestampEpr) / 1000;
LOG.debug(USAGE + " millis: " + olderThanMillis_);
} catch (InternalException ie) {
throw new AnalysisException("An invalid TIMESTAMP expression has been given "
+ "to " + USAGE + " the expression " + fnParamValue_.toSql()
+ " cannot be converted to a TIMESTAMP",
ie);
}
} else {
throw new AnalysisException(USAGE
+ " <expression> must be an integer type or a timestamp, but is '"
+ fnParamValue_.getType() + "': EXECUTE " + toSql());
}
}
}
/**
* If field fnParamValue_ is a Timestamp, or can be cast to a Timestamp,
* then return an Expr for the Timestamp.
* @return null if the fnParamValue_ cannot be converted to a Timestamp.
*/
private Expr getParamConvertibleToTimestamp() {
Expr timestampExpr = fnParamValue_;
if (timestampExpr.getType().isStringType()) {
timestampExpr = new CastExpr(Type.TIMESTAMP, fnParamValue_);
}
if (timestampExpr.getType().isTimestamp()) {
return timestampExpr;
}
return null;
}
@Override
public String toSql(ToSqlOptions options) {
return fnCallExpr_.toSql();
}
@Override
public TAlterTableParams toThrift() {
TAlterTableParams params = super.toThrift();
params.setAlter_type(TAlterTableType.EXECUTE);
TAlterTableExecuteParams executeParams = new TAlterTableExecuteParams();
TAlterTableExecuteRollbackParams executeRollbackParams =
new TAlterTableExecuteRollbackParams();
executeParams.setExecute_rollback_params(executeRollbackParams);
executeRollbackParams.setKind(kind_);
switch (kind_) {
case TIME_ID: executeRollbackParams.setTimestamp_millis(olderThanMillis_); break;
case VERSION_ID: executeRollbackParams.setSnapshot_id(snapshotVersion_); break;
default: throw new IllegalStateException("Bad kind of execute rollback " + kind_);
}
params.setSet_execute_params(executeParams);
return params;
}
}

View File

@@ -17,104 +17,63 @@
package org.apache.impala.analysis;
import org.apache.impala.catalog.FeIcebergTable;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.InternalException;
import org.apache.impala.thrift.TAlterTableExecuteParams;
import org.apache.impala.thrift.TAlterTableParams;
import org.apache.impala.thrift.TAlterTableType;
import org.apache.impala.util.ExprUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.apache.impala.common.AnalysisException;
/**
* Represents an ALTER TABLE <tbl> EXECUTE <operation>(<parameters>) statement on Iceberg
* tables, supported operations:
* - expire_snapshots(<timestamp>): uses the ExpireSnapshot API to expire snaphosts,
* calls the ExpireSnapshot.expireOlderThan(timestampMillis) method.
* TableProperties.MIN_SNAPSHOTS_TO_KEEP table property manages how many snapshots
* should be retained even when all snapshots are selected by expireOlderThan().
* tables. For supported operations see the subclasses.
*/
public class AlterTableExecuteStmt extends AlterTableStmt {
private final static Logger LOG = LoggerFactory.getLogger(AlterTableExecuteStmt.class);
private final static String USAGE = "EXPIRE_SNAPSHOTS(<expression>)";
// Expression of the function call after EXECUTE keyword. Parsed into an operation and
// a value of that operation.
private FunctionCallExpr fnCallExpr_;
protected FunctionCallExpr fnCallExpr_;
// Value expression from fnCallExpr_.
private Expr fnParamValue_;
protected Expr fnParamValue_;
// The value after extracted from fnParamValue_ expression.
private long olderThanMillis_ = -1;
protected long olderThanMillis_ = -1;
protected AlterTableExecuteStmt(TableName tableName, Expr fnCallExpr) {
super(tableName);
fnCallExpr_ = (FunctionCallExpr)fnCallExpr;
fnCallExpr_ = (FunctionCallExpr) fnCallExpr;
}
@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
super.analyze(analyzer);
Preconditions.checkState(getTargetTable() instanceof FeIcebergTable);
analyzeFunctionCallExpr(analyzer);
analyzeOlderThan(analyzer);
/**
* Return an instance of a subclass of AlterTableExecuteStmt that can analyze the
* execute statement for the function call expression in 'expr'.
*/
public static AlterTableStmt createExecuteStmt(TableName tableName, Expr expr)
throws AnalysisException {
FunctionCallExpr fnCallExpr = (FunctionCallExpr) expr;
String functionNameOrig = fnCallExpr.getFnName().toString();
String functionName = functionNameOrig.toUpperCase();
switch (functionName) {
case "EXPIRE_SNAPSHOTS":
return new AlterTableExecuteExpireSnapshotsStmt(tableName, fnCallExpr);
case "ROLLBACK": return new AlterTableExecuteRollbackStmt(tableName, fnCallExpr);
default:
throw new AnalysisException(String.format("'%s' is not supported by ALTER "
+ "TABLE <table> EXECUTE. Supported operations are: "
+ "EXPIRE_SNAPSHOTS(<expression>), "
+ "ROLLBACK(<expression>).",
functionNameOrig));
}
}
private void analyzeFunctionCallExpr(Analyzer analyzer) throws AnalysisException {
protected void analyzeFunctionCallExpr(Analyzer ignoredAnalyzer, String usage)
throws AnalysisException {
// fnCallExpr_ analyzed here manually, because it is not an actual function but a
// catalog operation.
String fnName = fnCallExpr_.getFnName().toString();
if (!fnName.toUpperCase().equals("EXPIRE_SNAPSHOTS")) {
throw new AnalysisException(String.format("'%s' is not supported by ALTER " +
"TABLE <table> EXECUTE. Supported operation is %s.", fnName, USAGE));
}
Preconditions.checkState(
StringUtils.equalsAnyIgnoreCase(fnName, "EXPIRE_SNAPSHOTS", "ROLLBACK"));
if (fnCallExpr_.getParams().size() != 1) {
throw new AnalysisException(USAGE + " must have one parameter: " + toSql());
throw new AnalysisException(usage + " must have one parameter: " + toSql());
}
fnParamValue_ = fnCallExpr_.getParams().exprs().get(0);
}
private void analyzeOlderThan(Analyzer analyzer) throws AnalysisException {
Preconditions.checkNotNull(fnParamValue_);
fnParamValue_.analyze(analyzer);
if (!fnParamValue_.isConstant()) {
throw new AnalysisException(USAGE + " must be a constant expression: " + toSql());
}
if (fnParamValue_.getType().isStringType()) {
fnParamValue_ = new CastExpr(Type.TIMESTAMP, fnParamValue_);
}
if (!fnParamValue_.getType().isTimestamp()) {
throw new AnalysisException(USAGE + " must be a timestamp type but is '" +
fnParamValue_.getType() + "': " + fnParamValue_.toSql());
}
try {
olderThanMillis_ =
ExprUtil.localTimestampToUnixTimeMicros(analyzer, fnParamValue_) / 1000;
LOG.debug(USAGE + " millis: " + String.valueOf(olderThanMillis_));
} catch (InternalException ie) {
throw new AnalysisException("Invalid TIMESTAMP expression has been given to " +
USAGE + ": " + ie.getMessage(), ie);
}
}
@Override
public String toSql(ToSqlOptions options) {
return fnCallExpr_.toSql();
}
@Override
public TAlterTableParams toThrift() {
TAlterTableParams params = super.toThrift();
params.setAlter_type(TAlterTableType.EXECUTE);
TAlterTableExecuteParams executeParams = new TAlterTableExecuteParams();
executeParams.setOlder_than_millis(olderThanMillis_);
params.setSet_execute_params(executeParams);
return params;
}
}

View File

@@ -178,7 +178,7 @@ public interface FeFsTable extends FeTable {
Map<Long, ? extends PrunablePartition> getPartitionMap();
/**
* @param the index of the target partitioning column
* @param col the index of the target partitioning column
* @return a map from value to a set of partitions for which column 'col'
* has that value.
*/

View File

@@ -165,6 +165,7 @@ import org.apache.impala.thrift.TAlterTableAddPartitionParams;
import org.apache.impala.thrift.TAlterTableAlterColParams;
import org.apache.impala.thrift.TAlterTableDropColParams;
import org.apache.impala.thrift.TAlterTableDropPartitionParams;
import org.apache.impala.thrift.TAlterTableExecuteParams;
import org.apache.impala.thrift.TAlterTableOrViewSetOwnerParams;
import org.apache.impala.thrift.TAlterTableParams;
import org.apache.impala.thrift.TAlterTableReplaceColsParams;
@@ -1367,9 +1368,23 @@ public class CatalogOpExecutor {
break;
case EXECUTE:
Preconditions.checkState(params.isSetSet_execute_params());
String summary = IcebergCatalogOpExecutor.alterTableExecute(iceTxn,
params.getSet_execute_params());
addSummary(response, summary);
// All the EXECUTE functions operate only on Iceberg data.
needsToUpdateHms = false;
TAlterTableExecuteParams setExecuteParams = params.getSet_execute_params();
if (setExecuteParams.isSetExecute_rollback_params()) {
String rollbackSummary = IcebergCatalogOpExecutor.alterTableExecuteRollback(
iceTxn, tbl, setExecuteParams.getExecute_rollback_params());
addSummary(response, rollbackSummary);
} else if (setExecuteParams.isSetExpire_snapshots_params()) {
String expireSummary =
IcebergCatalogOpExecutor.alterTableExecuteExpireSnapshots(
iceTxn, setExecuteParams.getExpire_snapshots_params());
addSummary(response, expireSummary);
} else {
// Cannot happen, but throw just in case.
throw new IllegalStateException(
"Alter table execute statement is not implemented.");
}
break;
case SET_PARTITION_SPEC:
// Set partition spec uses 'TableOperations', not transactions.
@@ -1393,7 +1408,7 @@ public class CatalogOpExecutor {
break;
case REPLACE_COLUMNS:
// It doesn't make sense to replace all the columns of an Iceberg table as it
// would basically make all existing data unaccessible.
// would basically make all existing data inaccessible.
default:
throw new UnsupportedOperationException(
"Unsupported ALTER TABLE operation for Iceberg tables: " +
@@ -1414,7 +1429,7 @@ public class CatalogOpExecutor {
if (!needsToUpdateHms) {
// We don't need to update HMS because either it is already done by Iceberg's
// HiveCatalog, or we modified the PARTITION SPEC which is not stored in HMS.
// HiveCatalog, or we modified the Iceberg data which is not stored in HMS.
loadTableMetadata(tbl, newCatalogVersion, true, true, null, "ALTER Iceberg TABLE " +
params.getAlter_type().name());
catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion);

View File

@@ -23,16 +23,17 @@ import java.util.List;
import java.util.Map;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.BaseReplacePartitions;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.ManageSnapshots;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SnapshotManager;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
@@ -42,32 +43,36 @@ import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expressions;
import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey;
import org.apache.impala.catalog.FeIcebergTable;
import org.apache.impala.catalog.IcebergTable;
import org.apache.impala.catalog.TableLoadingException;
import org.apache.impala.catalog.TableNotFoundException;
import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey;
import org.apache.impala.catalog.iceberg.IcebergCatalog;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.fb.FbIcebergColumnStats;
import org.apache.impala.fb.FbIcebergDataFile;
import org.apache.impala.thrift.TAlterTableExecuteParams;
import org.apache.impala.thrift.TAlterTableExecuteExpireSnapshotsParams;
import org.apache.impala.thrift.TAlterTableExecuteRollbackParams;
import org.apache.impala.thrift.TColumn;
import org.apache.impala.thrift.TIcebergCatalog;
import org.apache.impala.thrift.TIcebergOperationParam;
import org.apache.impala.thrift.TIcebergPartitionSpec;
import org.apache.impala.thrift.TRollbackType;
import org.apache.impala.util.IcebergSchemaConverter;
import org.apache.impala.util.IcebergUtil;
import org.apache.log4j.Logger;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This is a helper for the CatalogOpExecutor to provide Iceberg related DDL functionality
* such as creating and dropping tables from Iceberg.
*/
public class IcebergCatalogOpExecutor {
public static final Logger LOG = Logger.getLogger(IcebergCatalogOpExecutor.class);
public static final Logger LOG =
LoggerFactory.getLogger(IcebergCatalogOpExecutor.class);
/**
* Create Iceberg table by Iceberg api
@@ -177,14 +182,49 @@ public class IcebergCatalogOpExecutor {
tableOp.commit(metadata, newMetadata);
}
public static String alterTableExecute(Transaction txn,
TAlterTableExecuteParams params) {
/**
* Use the ExpireSnapshot API to expire snapshots by calling the
* ExpireSnapshot.expireOlderThan(timestampMillis) method.
* TableProperties.MIN_SNAPSHOTS_TO_KEEP table property manages how many snapshots
* should be retained even when all snapshots are selected by expireOlderThan().
*/
public static String alterTableExecuteExpireSnapshots(
Transaction txn, TAlterTableExecuteExpireSnapshotsParams params) {
ExpireSnapshots expireApi = txn.expireSnapshots();
Preconditions.checkState(params.isSetOlder_than_millis());
expireApi.expireOlderThan(params.older_than_millis);
expireApi.commit();
return "Snapshots have been expired.";
}
/**
* Executes an ALTER TABLE EXECUTE ROLLBACK.
*/
public static String alterTableExecuteRollback(
Transaction iceTxn, FeIcebergTable tbl, TAlterTableExecuteRollbackParams params) {
TRollbackType kind = params.getKind();
ManageSnapshots manageSnapshots = iceTxn.manageSnapshots();
switch (kind) {
case TIME_ID:
Preconditions.checkState(params.isSetTimestamp_millis());
long timestampMillis = params.getTimestamp_millis();
LOG.info("Rollback iceberg table to snapshot before timestamp {}",
timestampMillis);
manageSnapshots.rollbackToTime(timestampMillis);
break;
case VERSION_ID:
Preconditions.checkState(params.isSetSnapshot_id());
long snapshotId = params.getSnapshot_id();
LOG.info("Rollback iceberg table to snapshot id {}", snapshotId);
manageSnapshots.rollbackTo(snapshotId);
break;
default: throw new IllegalStateException("Bad kind of execute rollback " + kind);
}
// Commit the update.
manageSnapshots.commit();
return "Rollback executed.";
}
/**
* Drops a column from a Iceberg table.
*/
@@ -370,4 +410,4 @@ public class IcebergCatalogOpExecutor {
String.valueOf(version));
updateProps.commit();
}
}
}

View File

@@ -408,7 +408,8 @@ import org.apache.impala.thrift.TReservedWordsVersion;
"year", "month", "day", "hour", "minute", "second",
"begin", "call", "check", "classifier", "close", "identity", "language",
"localtime", "member", "module", "new", "nullif", "old", "open", "parameter",
"period", "result", "return", "sql", "start", "system", "time", "user", "value"
"period", "result", "return", "rollback", "sql", "start", "system", "time",
"user", "value"
}));
}

View File

@@ -4281,8 +4281,8 @@ public class AnalyzeDDLTest extends FrontendTestBase {
// Negative tests
AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
"unsupported_operation(123456789);", "'unsupported_operation' is not supported " +
"by ALTER TABLE <table> EXECUTE. Supported operation is " +
"EXPIRE_SNAPSHOTS(<expression>)");
"by ALTER TABLE <table> EXECUTE. Supported operations are: " +
"EXPIRE_SNAPSHOTS(<expression>), ROLLBACK(<expression>)");
AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
"expire_snapshots(now(), 3);", "EXPIRE_SNAPSHOTS(<expression>) must have one " +
"parameter: expire_snapshots(now(), 3)");
@@ -4297,6 +4297,49 @@ public class AnalyzeDDLTest extends FrontendTestBase {
" been given to EXPIRE_SNAPSHOTS(<expression>)");
}
@Test
public void TestAlterExecuteRollback() {
AnalyzesOk("alter table functional_parquet.iceberg_partitioned execute " +
"rollback('2022-01-04 10:00:00');");
AnalyzesOk("alter table functional_parquet.iceberg_partitioned execute " +
"rollback(123456);");
// Timestamp can be an expression.
AnalyzesOk("alter table functional_parquet.iceberg_partitioned execute " +
"rollback(cast('2021-08-09 15:52:45' as timestamp) - interval 2 days + " +
"interval 3 hours);");
// Negative tests
AnalysisError("alter table nodb.alltypes execute " +
"rollback('2022-01-04 10:00:00');",
"Could not resolve table reference: 'nodb.alltypes'");
AnalysisError("alter table functional.alltypes execute " +
"rollback('2022-01-04 10:00:00');",
"ALTER TABLE EXECUTE ROLLBACK is only supported for Iceberg tables: " +
"functional.alltypes");
AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
"rollback(id);", "EXECUTE ROLLBACK(<expression>): " +
"<expression> must be a constant expression: EXECUTE rollback(id)");
AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
"rollback(3.14);", "EXECUTE ROLLBACK(<expression>): <expression> " +
"must be an integer type or a timestamp, but is 'DECIMAL(3,2)': " +
"EXECUTE rollback(3.14)");
AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
"rollback('2021-02-32 15:52:45');", "An invalid TIMESTAMP expression has been " +
"given to EXECUTE ROLLBACK(<expression>): the expression " +
"'2021-02-32 15:52:45' cannot be converted to a TIMESTAMP");
AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
"rollback('the beginning');", "An invalid TIMESTAMP expression has been " +
"given to EXECUTE ROLLBACK(<expression>): the expression " +
"'the beginning' cannot be converted to a TIMESTAMP");
AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
"rollback(1111,2222);",
"EXECUTE ROLLBACK(<expression>): must have one parameter");
AnalysisError("alter table functional_parquet.iceberg_partitioned execute " +
"rollback('1111');", "An invalid TIMESTAMP expression has been " +
"given to EXECUTE ROLLBACK(<expression>): the expression " +
"'1111' cannot be converted to a TIMESTAMP");
}
private static String buildLongOwnerName() {
StringBuilder comment = new StringBuilder();
for (int i = 0; i < MetaStoreUtil.MAX_OWNER_LENGTH + 5; i++) {

View File

@@ -0,0 +1,27 @@
====
---- QUERY
# EXECUTE ROLLBACK with an invalid snapshot id on a partitioned Iceberg table.
ALTER TABLE functional_parquet.iceberg_partitioned EXECUTE ROLLBACK(1)
---- CATCH
Cannot roll back to unknown snapshot id: 1
====
---- QUERY
# EXECUTE ROLLBACK to a too old date on a partitioned Iceberg table.
set timezone=CET;
ALTER TABLE functional_parquet.iceberg_partitioned EXECUTE ROLLBACK('2020-08-31 07:58:00')
---- CATCH
Cannot roll back, no valid snapshot older than: 1598853480000
====
---- QUERY
# EXECUTE ROLLBACK to an Invalid timestamp expression on a partitioned Iceberg table.
set timezone=CET;
ALTER TABLE functional_parquet.iceberg_partitioned EXECUTE ROLLBACK('1111');
---- CATCH
An invalid TIMESTAMP expression has been given to EXECUTE ROLLBACK(<expression>): the expression '1111' cannot be converted to a TIMESTAMP
====
---- QUERY
# EXECUTE ROLLBACK fails on a non-Iceberg table.
ALTER TABLE functional_parquet.alltypestiny EXECUTE ROLLBACK(1111111)
---- CATCH
ALTER TABLE EXECUTE ROLLBACK is only supported for Iceberg tables
====

View File

@@ -28,6 +28,7 @@ from tests.common.skip import SkipIfFS
from tests.util.hive_utils import HiveDbWrapper
from tests.util.event_processor_utils import EventProcessorUtils
from tests.util.filesystem_utils import WAREHOUSE
from tests.util.iceberg_util import IcebergCatalogs
@SkipIfFS.hive
@@ -482,6 +483,7 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
def test_iceberg_self_events(self, unique_database):
"""This test checks that Impala doesn't refresh Iceberg tables on self events."""
tbl_name = unique_database + ".test_iceberg_events"
iceberg_catalogs = IcebergCatalogs(unique_database)
def check_self_events(query, skips_events=True):
tbls_refreshed_before, partitions_refreshed_before, \
@@ -495,19 +497,8 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
if skips_events:
assert events_skipped_after > events_skipped_before
hadoop_tables = "'iceberg.catalog'='hadoop.tables'"
hadoop_catalog = ("'iceberg.catalog'='hadoop.catalog', " +
"'iceberg.catalog_location'='/test-warehouse/{0}/hadoop_catalog_test/'".format(
unique_database))
hive_catalog = "'iceberg.catalog'='hive.catalog'"
hive_catalogs = "'iceberg.catalog'='ice_hive_cat'"
hadoop_catalogs = "'iceberg.catalog'='ice_hadoop_cat'"
all_catalogs = [hadoop_tables, hadoop_catalog, hive_catalog, hive_catalogs,
hadoop_catalogs]
for catalog in all_catalogs:
is_hive_catalog = catalog == hive_catalog or catalog == hive_catalogs
for catalog in iceberg_catalogs.get_iceberg_catalog_properties():
is_hive_catalog = iceberg_catalogs.is_a_hive_catalog(catalog)
self.client.execute("""
CREATE TABLE {0} (i int) STORED AS ICEBERG
TBLPROPERTIES ({1})""".format(tbl_name, catalog))

View File

@@ -22,6 +22,8 @@ import logging
import os
import pytest
import random
import re
import time
from subprocess import check_call
@@ -33,14 +35,14 @@ import json
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
from tests.common.iceberg_test_suite import IcebergTestSuite
from tests.common.skip import SkipIf, SkipIfDockerizedCluster, SkipIfLocal
from tests.common.skip import SkipIf, SkipIfDockerizedCluster
from tests.common.file_utils import (
create_iceberg_table_from_directory,
create_table_from_parquet)
from tests.shell.util import run_impala_shell_cmd
from tests.util.filesystem_utils import get_fs_path, IS_HDFS
from tests.util.get_parquet_metadata import get_parquet_metadata
from tests.util.iceberg_util import cast_ts, quote, parse_timestamp
from tests.util.iceberg_util import cast_ts, quote, get_snapshots, IcebergCatalogs
LOG = logging.getLogger(__name__)
@@ -69,58 +71,61 @@ class TestIcebergTable(IcebergTestSuite):
def test_expire_snapshots(self, unique_database):
tbl_name = unique_database + ".expire_snapshots"
iceberg_catalogs = IcebergCatalogs(unique_database)
for catalog_properties in iceberg_catalogs.get_iceberg_catalog_properties():
# We are setting the TIMEZONE query option in this test, so let's create a local
# impala client.
with self.create_impala_client() as impalad_client:
# Iceberg doesn't create a snapshot entry for the initial empty table
impalad_client.execute("""
create table {0} (i int) stored as iceberg
TBLPROPERTIES ({1})""".format(tbl_name, catalog_properties))
ts_0 = datetime.datetime.now()
insert_q = "insert into {0} values (1)".format(tbl_name)
ts_1 = self.execute_query_ts(impalad_client, insert_q)
time.sleep(5)
impalad_client.execute(insert_q)
time.sleep(5)
ts_2 = self.execute_query_ts(impalad_client, insert_q)
impalad_client.execute(insert_q)
# We are setting the TIMEZONE query option in this test, so let's create a local
# impala client.
with self.create_impala_client() as impalad_client:
# Iceberg doesn't create a snapshot entry for the initial empty table
impalad_client.execute("create table {0} (i int) stored as iceberg"
.format(tbl_name))
ts_0 = datetime.datetime.now()
insert_q = "insert into {0} values (1)".format(tbl_name)
ts_1 = self.execute_query_ts(impalad_client, insert_q)
time.sleep(5)
impalad_client.execute(insert_q)
time.sleep(5)
ts_2 = self.execute_query_ts(impalad_client, insert_q)
impalad_client.execute(insert_q)
# There should be 4 snapshots initially
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 4)
# Expire the oldest snapshot and test that the oldest one was expired
expire_q = "alter table {0} execute expire_snapshots({1})"
impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_1)))
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3)
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_1, 3)
# There should be 4 snapshots initially
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 4)
# Expire the oldest snapshot and test that the oldest one was expired
expire_q = "alter table {0} execute expire_snapshots({1})"
impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_1)))
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3)
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_1, 3)
# Expire with a timestamp in which the interval does not touch existing snapshot
impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_1)))
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3)
# Expire with a timestamp in which the interval does not touch existing snapshot
impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_1)))
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 3)
# Expire all, but retain 1
impalad_client.execute(expire_q.format(tbl_name,
cast_ts(datetime.datetime.now())))
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_2, 1)
# Expire all, but retain 1
impalad_client.execute(expire_q.format(tbl_name,
cast_ts(datetime.datetime.now())))
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_2, 1)
# Change number of retained snapshots, then expire all
impalad_client.execute("""alter table {0} set tblproperties
('history.expire.min-snapshots-to-keep' = '2')""".format(tbl_name))
impalad_client.execute(insert_q)
impalad_client.execute(insert_q)
impalad_client.execute(expire_q.format(tbl_name,
cast_ts(datetime.datetime.now())))
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 2)
# Change number of retained snapshots, then expire all
impalad_client.execute("""alter table {0} set tblproperties
('history.expire.min-snapshots-to-keep' = '2')""".format(tbl_name))
impalad_client.execute(insert_q)
impalad_client.execute(insert_q)
impalad_client.execute(expire_q.format(tbl_name,
cast_ts(datetime.datetime.now())))
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_0, 2)
# Check that timezone is interpreted in local timezone controlled by query option
# TIMEZONE.
impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
impalad_client.execute(insert_q)
ts_tokyo = self.impala_now(impalad_client)
impalad_client.execute("SET TIMEZONE='Europe/Budapest'")
impalad_client.execute(insert_q)
impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_tokyo)))
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_tokyo, 1)
# Check that timezone is interpreted in local timezone controlled by query option
# TIMEZONE.
impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
impalad_client.execute(insert_q)
ts_tokyo = self.impala_now(impalad_client)
impalad_client.execute("SET TIMEZONE='Europe/Budapest'")
impalad_client.execute(insert_q)
impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
impalad_client.execute(expire_q.format(tbl_name, cast_ts(ts_tokyo)))
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_tokyo, 1)
impalad_client.execute("DROP TABLE {0}".format(tbl_name))
def test_truncate_iceberg_tables(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-truncate', vector, use_db=unique_database)
@@ -128,7 +133,7 @@ class TestIcebergTable(IcebergTestSuite):
# With IMPALA-11429 there is an extra "ALTER TABLE SET OWNER" right after executing
# "CREATE TABLE". As a result dropping the table location right after CREATE TABLE will
# trigger a known bug: IMPALA-11509. Hence, turning this test off until there is a fix
# for this issue. Note, we could add a sleep righ after table creation that could
# for this issue. Note, we could add a sleep right after table creation that could
# workaround the above mentioned bug but then we would hit another issue: IMPALA-11502.
@SkipIf.not_dfs
def test_drop_incomplete_table(self, vector, unique_database):
@@ -231,31 +236,127 @@ class TestIcebergTable(IcebergTestSuite):
tblproperties('iceberg.catalog'='hadoop.tables')""".format(tbl_name))
self.client.execute("INSERT INTO {0} VALUES (1)".format(tbl_name))
self.client.execute("INSERT INTO {0} VALUES (2)".format(tbl_name))
result = self.client.execute("DESCRIBE HISTORY {0}".format(tbl_name))
assert(len(result.data) == 2)
first_snapshot = result.data[0].split("\t")
second_snapshot = result.data[1].split("\t")
snapshots = get_snapshots(self.client, tbl_name, expected_result_size=2)
first_snapshot = snapshots[0]
second_snapshot = snapshots[1]
# Check that first snapshot is older than the second snapshot.
assert(first_snapshot[0] < second_snapshot[0])
assert(first_snapshot.get_creation_time() < second_snapshot.get_creation_time())
# Check that second snapshot's parent ID is the snapshot ID of the first snapshot.
assert(first_snapshot[1] == second_snapshot[2])
assert(first_snapshot.get_snapshot_id() == second_snapshot.get_parent_id())
# The first snapshot has no parent snapshot ID.
assert(first_snapshot[2] == "NULL")
assert(first_snapshot.get_parent_id() is None)
# Check "is_current_ancestor" column.
assert(first_snapshot[3] == "TRUE" and second_snapshot[3] == "TRUE")
assert(first_snapshot.is_current_ancestor())
assert(second_snapshot.is_current_ancestor())
def test_execute_rollback_negative(self, vector):
"""Negative test for EXECUTE ROLLBACK."""
self.run_test_case('QueryTest/iceberg-rollback-negative', vector)
def test_execute_rollback(self, unique_database):
"""Test for EXECUTE ROLLBACK."""
iceberg_catalogs = IcebergCatalogs(unique_database)
for catalog_properties in iceberg_catalogs.get_iceberg_catalog_properties():
# Create a table with multiple snapshots.
tbl_name = unique_database + ".iceberg_execute_rollback"
# We are setting the TIMEZONE query option in this test, so let's create a local
# impala client.
with self.create_impala_client() as impalad_client:
impalad_client.execute("""
create table {0} (i int) stored as iceberg
TBLPROPERTIES ({1})""".format(tbl_name, catalog_properties))
initial_snapshots = 3
for i in range(initial_snapshots):
impalad_client.execute("INSERT INTO {0} VALUES ({1})".format(tbl_name, i))
snapshots = get_snapshots(impalad_client, tbl_name,
expected_result_size=initial_snapshots)
output = self.rollback_to_id(tbl_name, snapshots[1].get_snapshot_id())
LOG.info("success output={0}".format(output))
# We rolled back, but that creates a new snapshot, so now there are 4.
snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=4)
# The new snapshot has the same id (and parent id) as the snapshot we rolled back
# to, but it has a different creation time.
assert snapshots[1].get_snapshot_id() == snapshots[3].get_snapshot_id()
assert snapshots[1].get_parent_id() == snapshots[3].get_parent_id()
assert snapshots[1].get_creation_time() < snapshots[3].get_creation_time()
# The "orphaned" snapshot is now not a current ancestor.
assert not snapshots[2].is_current_ancestor()
# We cannot roll back to a snapshot that is not a current ancestor.
output = self.rollback_to_id_expect_failure(tbl_name,
snapshots[2].get_snapshot_id(),
expected_text="Cannot roll back to snapshot, not an ancestor of the current "
"state")
# Create another snapshot.
before_insert = datetime.datetime.now()
impalad_client.execute("INSERT INTO {0} VALUES ({1})".format(tbl_name, 4))
snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=5)
# Rollback to before the last insert.
self.rollback_to_ts(impalad_client, tbl_name, before_insert)
# This creates another snapshot.
snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=6)
# The snapshot id is the same, the dates differ
assert snapshots[3].get_snapshot_id() == snapshots[5].get_snapshot_id()
assert snapshots[3].get_creation_time() < snapshots[5].get_creation_time()
assert not snapshots[4].is_current_ancestor()
# Show that the EXECUTE ROLLBACK is respecting the current timezone.
# To do this we try to roll back to a time for which there is no
# snapshot, this will fail with an error message that includes the specified
# time. We parse out that time. By doing this in two timezones we can see
# that the parameter being used was affected by the current timezone.
one_hour_ago = before_insert - datetime.timedelta(hours=1)
# We use Timezones from Japan and Iceland to avoid any DST complexities.
impalad_client.execute("SET TIMEZONE='Asia/Tokyo'")
japan_ts = self.get_snapshot_ts_from_failed_rollback(
impalad_client, tbl_name, one_hour_ago)
impalad_client.execute("SET TIMEZONE='Iceland'")
iceland_ts = self.get_snapshot_ts_from_failed_rollback(
impalad_client, tbl_name, one_hour_ago)
diff_hours = (iceland_ts - japan_ts) / (1000 * 60 * 60)
assert diff_hours == 9
impalad_client.execute("DROP TABLE {0}".format(tbl_name))
def get_snapshot_ts_from_failed_rollback(self, client, tbl_name, ts):
"""Run an EXECUTE ROLLBACK which is expected to fail.
Parse the error message to extract the timestamp for which there
was no snapshot, and convert the string to an integer"""
try:
self.rollback_to_ts(client, tbl_name, ts)
assert False, "Query should have failed"
except ImpalaBeeswaxException as e:
result = re.search(r".*no valid snapshot older than: (\d+)", str(e))
time_str = result.group(1)
snapshot_ts = int(time_str)
assert snapshot_ts > 0, "did not decode snapshot ts from {0}".format(result)
return snapshot_ts
def rollback_to_ts(self, client, tbl_name, ts):
"""Rollback a table to a snapshot timestamp."""
query = "ALTER TABLE {0} EXECUTE ROLLBACK ('{1}');".format(tbl_name, ts.isoformat())
return self.execute_query_expect_success(client, query)
def rollback_to_id(self, tbl_name, id):
"""Rollback a table to a snapshot id."""
query = "ALTER TABLE {0} EXECUTE ROLLBACK ({1});".format(tbl_name, id)
return self.execute_query_expect_success(self.client, query)
def rollback_to_id_expect_failure(self, tbl_name, id, expected_text=None):
"""Attempt to roll back a table to a snapshot id, expecting a failure."""
query = "ALTER TABLE {0} EXECUTE ROLLBACK ({1});".format(tbl_name, id)
output = self.execute_query_expect_failure(self.client, query)
if expected_text:
assert expected_text in str(output)
return output
def test_describe_history_params(self, unique_database):
tbl_name = unique_database + ".describe_history"
def expect_results_between(ts_start, ts_end, expected_result_size):
query = "DESCRIBE HISTORY {0} BETWEEN {1} AND {2};".format(
tbl_name, cast_ts(ts_start), cast_ts(ts_end))
data = impalad_client.execute(query)
assert len(data.data) == expected_result_size
for i in range(len(data.data)):
result_ts_dt = parse_timestamp(data.data[i].split('\t')[0])
assert result_ts_dt >= ts_start and result_ts_dt <= ts_end
# We are setting the TIMEZONE query option in this test, so let's create a local
# impala client.
with self.create_impala_client() as impalad_client:
@@ -279,10 +380,11 @@ class TestIcebergTable(IcebergTestSuite):
self.expect_num_snapshots_from(impalad_client, tbl_name, ts_3, 0)
# Describe history with BETWEEN <ts> AND <ts> predicate
expect_results_between(ts_1, ts_2, 1)
expect_results_between(ts_1 - datetime.timedelta(hours=1), ts_2, 2)
expect_results_between(ts_1 - datetime.timedelta(hours=1), ts_2 +
datetime.timedelta(hours=1), 3)
self.expect_results_between(impalad_client, tbl_name, ts_1, ts_2, 1)
self.expect_results_between(impalad_client, tbl_name,
ts_1 - datetime.timedelta(hours=1), ts_2, 2)
self.expect_results_between(impalad_client, tbl_name,
ts_1 - datetime.timedelta(hours=1), ts_2 + datetime.timedelta(hours=1), 3)
# Check that timezone is interpreted in local timezone controlled by query option
# TIMEZONE. Persist the local times first and create a new snapshot.
@@ -341,14 +443,6 @@ class TestIcebergTable(IcebergTestSuite):
tbl_name, snapshot_id),
expected)
def get_snapshots():
data = impalad_client.execute("describe history {0}".format(tbl_name))
ret = list()
for row in data.data:
fields = row.split('\t')
ret.append(fields[1])
return ret
def impala_now():
now_data = impalad_client.execute("select now()")
return now_data.data[0]
@@ -409,12 +503,12 @@ class TestIcebergTable(IcebergTestSuite):
ij_cols)
# Query table as of snapshot IDs.
snapshots = get_snapshots()
expect_results_v(snapshots[0], ['1'], i_cols)
expect_results_v(snapshots[1], ['1', '2'], i_cols)
expect_results_v(snapshots[2], [], i_cols)
expect_results_v(snapshots[3], ['100'], i_cols)
expect_results_v(snapshots[4], ['100\tNULL', '3\t103'], ij_cols)
snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=5)
expect_results_v(snapshots[0].get_snapshot_id(), ['1'], i_cols)
expect_results_v(snapshots[1].get_snapshot_id(), ['1', '2'], i_cols)
expect_results_v(snapshots[2].get_snapshot_id(), [], i_cols)
expect_results_v(snapshots[3].get_snapshot_id(), ['100'], i_cols)
expect_results_v(snapshots[4].get_snapshot_id(), ['100\tNULL', '3\t103'], ij_cols)
# Test of plain count star optimization
# 'NumRowGroups' and 'NumFileMetadataRead' should not appear in profile
@@ -427,11 +521,11 @@ class TestIcebergTable(IcebergTestSuite):
expect_for_count_star_t(cast_ts(ts_4) + " - interval 5 seconds", '0')
expect_for_count_star_t(cast_ts(ts_5), '2')
expect_for_count_star_t(cast_ts(ts_5) + " + interval 1 hours", '2')
expect_for_count_star_v(snapshots[0], '1')
expect_for_count_star_v(snapshots[1], '2')
expect_for_count_star_v(snapshots[2], '0')
expect_for_count_star_v(snapshots[3], '1')
expect_for_count_star_v(snapshots[4], '2')
expect_for_count_star_v(snapshots[0].get_snapshot_id(), '1')
expect_for_count_star_v(snapshots[1].get_snapshot_id(), '2')
expect_for_count_star_v(snapshots[2].get_snapshot_id(), '0')
expect_for_count_star_v(snapshots[3].get_snapshot_id(), '1')
expect_for_count_star_v(snapshots[4].get_snapshot_id(), '2')
# SELECT diff
expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}'
@@ -442,18 +536,19 @@ class TestIcebergTable(IcebergTestSuite):
expect_results("""SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_new}
MINUS
SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_old}""".format(
tbl=tbl_name, v_new=snapshots[1], v_old=snapshots[0]),
tbl=tbl_name, v_new=snapshots[1].get_snapshot_id(),
v_old=snapshots[0].get_snapshot_id()),
['2'], i_cols)
# Mix SYSTEM_TIME and SYSTEM_VERSION
expect_results("""SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_new}
MINUS
SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_old}'""".format(
tbl=tbl_name, v_new=snapshots[1], ts_old=ts_1),
tbl=tbl_name, v_new=snapshots[1].get_snapshot_id(), ts_old=ts_1),
['2'], i_cols)
expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}'
MINUS
SELECT * FROM {tbl} FOR SYSTEM_VERSION AS OF {v_old}""".format(
tbl=tbl_name, ts_new=ts_2, v_old=snapshots[0]),
tbl=tbl_name, ts_new=ts_2, v_old=snapshots[0].get_snapshot_id()),
['2'], i_cols)
expect_results("""SELECT * FROM {tbl} FOR SYSTEM_TIME AS OF '{ts_new}'
MINUS

View File

@@ -106,3 +106,34 @@ def get_snapshots(impalad_client, tbl_name, ts_start=None, ts_end=None,
for snapshot_str in rows.data:
results.append(Snapshot(snapshot_str))
return results
class IcebergCatalogs:
"""Utility class to generate TBLPROPERTIES corresponding to various iceberg catalogs."""
def __init__(self, database):
"""Create a IcebergCatalogs object parameterized by database name."""
self.database = database
hive_catalog = "'iceberg.catalog'='hive.catalog'"
hive_catalogs = "'iceberg.catalog'='ice_hive_cat'"
hadoop_tables = "'iceberg.catalog'='hadoop.tables'"
hadoop_catalogs = "'iceberg.catalog'='ice_hadoop_cat'"
def get_iceberg_catalog_properties(self):
"""Return a list containing TBLPROPERTIES corresponding to various iceberg catalogs.
The TBLPROPERTIES can be used to create tables."""
hadoop_catalog = ("'iceberg.catalog'='hadoop.catalog', "
+ "'iceberg.catalog_location'='/test-warehouse/{0}/hadoop_catalog_test/'".format(
self.database))
return [
self.hadoop_tables,
self.hive_catalog,
self.hive_catalogs,
self.hadoop_catalogs,
hadoop_catalog
]
def is_a_hive_catalog(self, catalog):
"""Return true if the catalog property is for a Hive catalog."""
return catalog == self.hive_catalog or catalog == self.hive_catalogs