IMPALA-11672: Update 'transient_lastDdlTime' for Iceberg tables

'transient_lastDdlTime' table property was not updated for Iceberg
tables before this change. Now it is updated after DDL operations
including DROP PARTITION as well.

Renaming an Iceberg table is an exception:
Iceberg does not keep track of the table name in the metadata files,
so there is no Iceberg transaction to change it.
The table name is a concept that exists only in the catalog.
If we rename the table, we only edit our catalog entry, but the metadata
stored on the file system - the table's state - does not change.
Therefore renaming an Iceberg table does not change the
'transient_lastDdlTime' table property because rename is a
catalog-level operation for Iceberg tables, and not table-level.

Testing:
 - added managed and non-managed Iceberg table DDL tests to
   test_last_ddl_update.py

Change-Id: I7e5f63b50bd37c80faf482c4baf4221be857c54b
Reviewed-on: http://gerrit.cloudera.org:8080/22831
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Noemi Pap-Takacs
2025-04-29 14:06:50 +02:00
committed by Impala Public Jenkins
parent 7756e5bc32
commit 8170ec124d
2 changed files with 144 additions and 43 deletions

View File

@@ -1182,8 +1182,11 @@ public class CatalogOpExecutor {
/**
* Execute the ALTER TABLE command according to the TAlterTableParams and refresh the
* table metadata, except for RENAME, ADD PARTITION and DROP PARTITION. This call is
* thread-safe, i.e. concurrent operations on the same table are serialized.
* table metadata, except for:
* - RENAME for Iceberg tables
* - RENAME, ADD PARTITION and DROP PARTITION for HDFS tables.
* This call is thread-safe, i.e. concurrent operations on the same table are
* serialized.
*/
private void alterTable(TAlterTableParams params, @Nullable String debugAction,
boolean wantMinimalResult, TDdlExecResponse response, EventSequence catalogTimeline)
@@ -1299,14 +1302,23 @@ public class CatalogOpExecutor {
case DROP_PARTITION:
TAlterTableDropPartitionParams dropPartParams =
params.getDrop_partition_params();
// Drop the partition from the corresponding table. If "purge" option is
// specified partition data is purged by skipping Trash, if configured.
alterTableDropPartition(tbl, dropPartParams.getPartition_set(),
dropPartParams.isIf_exists(), dropPartParams.isPurge(),
numUpdatedPartitions, catalogTimeline, modification);
responseSummaryMsg =
"Dropped " + numUpdatedPartitions.getRef() + " partition(s).";
reloadMetadata = false;
if (tbl instanceof IcebergTable) {
// The partitions of Iceberg tables are not listed in HMS. DROP PARTITION is a
// metadata-only change for Iceberg tables.
updateHmsAfterIcebergOnlyModification(
(IcebergTable) tbl, catalogTimeline,modification);
} else {
// DROP PARTITION for HDFS tables is different, because partitions are listed
// in HMS.
// Drop the partition from the corresponding table. If "purge" option is
// specified partition data is purged by skipping Trash, if configured.
alterTableDropPartition(tbl, dropPartParams.getPartition_set(),
dropPartParams.isIf_exists(), dropPartParams.isPurge(),
numUpdatedPartitions, catalogTimeline, modification);
responseSummaryMsg =
"Dropped " + numUpdatedPartitions.getRef() + " partition(s).";
reloadMetadata = false;
}
break;
case RENAME_TABLE:
case RENAME_VIEW:
@@ -1420,6 +1432,13 @@ public class CatalogOpExecutor {
tbl, params.getSet_owner_params(), response, catalogTimeline, modification);
responseSummaryMsg = "Updated table/view.";
break;
// ALTER a non-managed Iceberg table.
case EXECUTE:
case SET_PARTITION_SPEC:
Preconditions.checkState(tbl instanceof IcebergTable);
updateHmsAfterIcebergOnlyModification(
(IcebergTable) tbl, catalogTimeline, modification);
break;
default:
throw new UnsupportedOperationException(
"Unknown ALTER TABLE operation type: " + params.getAlter_type());
@@ -1582,7 +1601,6 @@ public class CatalogOpExecutor {
case EXECUTE:
Preconditions.checkState(params.isSetSet_execute_params());
// All the EXECUTE functions operate only on Iceberg data.
needsToUpdateHms = false;
TAlterTableExecuteParams setExecuteParams = params.getSet_execute_params();
if (setExecuteParams.isSetExecute_rollback_params()) {
String rollbackSummary = IcebergCatalogOpExecutor.alterTableExecuteRollback(
@@ -1601,7 +1619,6 @@ public class CatalogOpExecutor {
break;
case SET_PARTITION_SPEC:
// Partition spec is not stored in HMS.
needsToUpdateHms = false;
TAlterTableSetPartitionSpecParams setPartSpecParams =
params.getSet_partition_spec_params();
IcebergCatalogOpExecutor.alterTableSetPartitionSpec(tbl,
@@ -1617,8 +1634,7 @@ public class CatalogOpExecutor {
addSummary(response, "Updated table.");
break;
case DROP_PARTITION:
// Metadata change only
needsToUpdateHms = false;
// Partitions are not stored in HMS, this is a metadata change only.
long droppedPartitions = IcebergCatalogOpExecutor.alterTableDropPartition(
iceTxn, params.getDrop_partition_params());
addSummary(
@@ -1633,6 +1649,14 @@ public class CatalogOpExecutor {
params.getAlter_type());
}
catalogTimeline.markEvent("Iceberg operations are prepared for commit");
// Modify "transient_lastDdlTime" table property through Iceberg.
// Non-managed Iceberg tables also need to update this property in HMS separately
// in 'alterTable()', regardless of whether the modification itself needs to update
// HMS or affects only Iceberg metadata stored on the file system.
Map<String, String> property = new HashMap<>();
property.put(Table.TBL_PROP_LAST_DDL_TIME,
Long.toString(System.currentTimeMillis() / 1000));
IcebergCatalogOpExecutor.setTblProperties(iceTxn, property);
if (!needsToUpdateHms) {
// registerInflightEvent() before committing transaction.
modification.registerInflightEvent();
@@ -1653,8 +1677,7 @@ public class CatalogOpExecutor {
}
if (!needsToUpdateHms) {
// We don't need to update HMS because either it is already done by Iceberg's
// HiveCatalog, or we modified the Iceberg data which is not stored in HMS.
// We don't need to update HMS because it is already done by Iceberg's HiveCatalog.
loadTableMetadata(tbl, modification.newVersionNumber(), true, true,
"ALTER Iceberg TABLE " + params.getAlter_type().name(), debugAction,
catalogTimeline);
@@ -1681,8 +1704,8 @@ public class CatalogOpExecutor {
/**
* Iceberg format from V2 supports row-level modifications. We set write modes to
* "merge-on-read" which is the write mode Impala will eventually
* support (IMPALA-11664). Unless the user specified otherwise in the table properties.
* "merge-on-read" which is the write mode Impala supports.
* Unless the user specified otherwise in the table properties.
*/
private void addMergeOnReadPropertiesIfNeeded(IcebergTable tbl,
Map<String, String> properties) {
@@ -1714,6 +1737,23 @@ public class CatalogOpExecutor {
return true;
}
/**
* Update HMS about an ALTER TABLE issued on a non-managed Iceberg table. In case the
* modification only affects Iceberg metadata, there would be no update to HMS, but
* 'transient_lastDdlTime' table property still needs to be updated in Hive Metastore.
*/
private void updateHmsAfterIcebergOnlyModification(IcebergTable tbl,
EventSequence catalogTimeline, InProgressTableModification modification)
throws ImpalaRuntimeException {
// Managed Iceberg tables in HMS are updated by Iceberg's HiveCatalog together with
// committing the Iceberg transaction. HMS should not be updated directly in parallel
// with that because of potential data loss.
Preconditions.checkState(!IcebergUtil.isHiveCatalog(tbl.getMetaStoreTable()));
Preconditions.checkState(tbl.isWriteLockedByCurrentThread());
org.apache.hadoop.hive.metastore.api.Table msTbl = tbl.getMetaStoreTable().deepCopy();
applyAlterAndInProgressTableModification(msTbl, catalogTimeline, modification);
}
/**
* Loads the metadata of a table 'tbl' and assigns a new catalog version.
* 'reloadFileMetadata' and 'reloadTableSchema'

View File

@@ -39,6 +39,12 @@ class TestLastDdlTimeUpdate(ImpalaTestSuite):
# to regress here.
cls.ImpalaTestMatrix.add_constraint(lambda v: False)
class TableFormat:
HDFS = 1
KUDU = 2
INTEGRATED_ICEBERG = 3
NON_INTEGRATED_ICEBERG = 4
# Convenience class to make calls to TestLastDdlTimeUpdate.run_test() shorter by
# storing common arguments as members and substituting table name and HDFS warehouse
# path to the query string.
@@ -76,13 +82,18 @@ class TestLastDdlTimeUpdate(ImpalaTestSuite):
"""
self.run_test(query, self.TimeState.UNCHANGED, self.TimeState.CHANGED)
def expect_ddl_time_change_on_rename(self, new_tbl_name):
def expect_ddl_time_change_on_rename(self, new_tbl_name, expect_change):
"""
Checks that after an ALTER TABLE ... RENAME query transient_lastDdlTime is higher on
the new table than it was on the old table.
"""
query = "alter table %(TBL)s rename to {}".format(self.db_name + "." + new_tbl_name)
self.run_test(query, self.TimeState.CHANGED, self.TimeState.UNCHANGED, new_tbl_name)
if expect_change:
self.run_test(
query, self.TimeState.CHANGED, self.TimeState.UNCHANGED, new_tbl_name)
else:
self.run_test(
query, self.TimeState.UNCHANGED, self.TimeState.UNCHANGED, new_tbl_name)
def expect_stat_time_set(self, query):
"""Running the query should not change transient_lastDdlTime while
@@ -160,25 +171,33 @@ class TestLastDdlTimeUpdate(ImpalaTestSuite):
statsTime = table.parameters.get(LAST_COMPUTE_STATS_TIME_KEY, "")
return (ddlTime, statsTime)
def _create_table(self, fq_tbl_name, is_kudu):
if is_kudu:
self.execute_query("create table %s (i int primary key) "
"partition by hash(i) partitions 3 stored as kudu" % fq_tbl_name)
else:
def _create_table(self, fq_tbl_name, format):
if format == self.TableFormat.HDFS:
self.execute_query("create external table %s (i int) "
"partitioned by (j int, s string)" % fq_tbl_name)
"partitioned by (j int, s string)" % fq_tbl_name)
elif format == self.TableFormat.KUDU:
self.execute_query("create table %s (i int primary key) partition by "
"hash(i) partitions 3 stored as kudu" % fq_tbl_name)
elif format == self.TableFormat.INTEGRATED_ICEBERG:
self.execute_query("create table %s (i int) partitioned by (j int, s string) "
"stored by iceberg" % fq_tbl_name)
elif format == self.TableFormat.NON_INTEGRATED_ICEBERG:
self.execute_query("create table %s (i int) partitioned by (j int, s string) "
"stored by iceberg tblproperties('iceberg.catalog'='hadoop.tables')"
% fq_tbl_name)
def _create_and_init_test_helper(self, unique_database, tbl_name, is_kudu):
def _create_and_init_test_helper(self, unique_database, tbl_name, table_format):
helper = TestLastDdlTimeUpdate.TestHelper(self, unique_database, tbl_name)
self._create_table(helper.fq_tbl_name, is_kudu)
self._create_table(helper.fq_tbl_name, table_format)
# compute statistics to fill table property impala.lastComputeStatsTime
self.execute_query("compute stats %s" % helper.fq_tbl_name)
return helper
def test_alter(self, vector, unique_database):
def test_hdfs_alter(self, vector, unique_database):
TBL_NAME = "alter_test_tbl"
h = self._create_and_init_test_helper(unique_database, TBL_NAME, False)
h = self._create_and_init_test_helper(
unique_database, TBL_NAME, self.TableFormat.HDFS)
# add/drop partitions
h.expect_no_time_change("alter table %(TBL)s add partition (j=1, s='2012')")
@@ -214,10 +233,10 @@ class TestLastDdlTimeUpdate(ImpalaTestSuite):
# compute sampled statistics
h.expect_stat_time_change("compute stats %(TBL)s tablesample system(20)")
def test_insert(self, vector, unique_database):
def test_hdfs_insert(self, vector, unique_database):
TBL_NAME = "insert_test_tbl"
h = self._create_and_init_test_helper(unique_database, TBL_NAME, False)
h = self._create_and_init_test_helper(
unique_database, TBL_NAME, self.TableFormat.HDFS)
# static partition insert
h.expect_no_time_change("insert into %(TBL)s partition(j=1, s='2012') select 10")
@@ -233,28 +252,69 @@ class TestLastDdlTimeUpdate(ImpalaTestSuite):
def test_kudu_alter_and_insert(self, vector, unique_database):
TBL_NAME = "kudu_test_tbl"
h = self._create_and_init_test_helper(unique_database, TBL_NAME, True)
h = self._create_and_init_test_helper(
unique_database, TBL_NAME, self.TableFormat.KUDU)
# insert
h.expect_no_time_change("insert into %s values (1)" % h.fq_tbl_name)
self.run_common_test_cases(h)
def test_rename(self, vector, unique_database):
# Test non-Kudu table
OLD_TBL_NAME = "rename_from_test_tbl"
NEW_TBL_NAME = "rename_to_test_tbl"
def test_iceberg_alter_and_insert(self, vector, unique_database):
TBL_NAMES = ("iceberg_test_tbl", "non_integrated_iceberg_test_tbl")
helpers = (self._create_and_init_test_helper(
unique_database, TBL_NAMES[0], self.TableFormat.INTEGRATED_ICEBERG),
self._create_and_init_test_helper(
unique_database, TBL_NAMES[1], self.TableFormat.NON_INTEGRATED_ICEBERG))
h = self._create_and_init_test_helper(unique_database, OLD_TBL_NAME, False)
h.expect_ddl_time_change_on_rename(NEW_TBL_NAME)
for h in helpers:
# insert
h.expect_no_time_change("insert into %(TBL)s select 10, 2, '2025'")
h.expect_no_time_change("insert into %(TBL)s select 20, 1, '2012'")
# add, alter and drop column
h.expect_ddl_time_change("alter table %(TBL)s add column a boolean")
h.expect_ddl_time_change("alter table %(TBL)s alter column a set comment 'bool'")
h.expect_ddl_time_change("alter table %(TBL)s drop column a")
# drop partition
h.expect_ddl_time_change("alter table %(TBL)s drop partition (j=1, s='2012')")
# set partition spec
h.expect_ddl_time_change("alter table %(TBL)s set partition spec (void(j))")
# expire snapshots
h.expect_ddl_time_change("alter table %(TBL)s execute expire_snapshots(now())")
self.run_common_test_cases(h)
def test_rename(self, vector, unique_database):
# Test HDFS table
OLD_HDFS_TBL_NAME = "hdfs_rename_from_test_tbl"
NEW_HDFS_TBL_NAME = "hdfs_rename_to_test_tbl"
h = self._create_and_init_test_helper(
unique_database, OLD_HDFS_TBL_NAME, self.TableFormat.HDFS)
h.expect_ddl_time_change_on_rename(NEW_HDFS_TBL_NAME, True)
# Test Kudu table
OLD_KUDU_TBL_NAME = "kudu_rename_from_test_tbl"
NEW_KUDU_TBL_NAME = "kudu_rename_to_test_tbl"
h = self._create_and_init_test_helper(unique_database, OLD_KUDU_TBL_NAME, True)
h.expect_ddl_time_change_on_rename(NEW_KUDU_TBL_NAME)
h = self._create_and_init_test_helper(
unique_database, OLD_KUDU_TBL_NAME, self.TableFormat.KUDU)
h.expect_ddl_time_change_on_rename(NEW_KUDU_TBL_NAME, True)
# Tests that should behave the same with HDFS and Kudu tables.
# The name of an Iceberg table is not tracked by Iceberg in the metadata files.
# Table name is a catalog-level abstraction, therefore rename is a catalog-level
# operation which does not change 'transient_lastDdlTime' table property.
# Iceberg tables that use 'hadoop.tables' as catalog cannot be renamed.
OLD_ICEBERG_TBL_NAME = "iceberg_rename_from_test_tbl"
NEW_ICEBERG_TBL_NAME = "iceberg_rename_to_test_tbl"
h = self._create_and_init_test_helper(
unique_database, OLD_ICEBERG_TBL_NAME, self.TableFormat.INTEGRATED_ICEBERG)
h.expect_ddl_time_change_on_rename(NEW_ICEBERG_TBL_NAME, False)
# Tests that should behave the same with HDFS, Kudu and Iceberg tables.
def run_common_test_cases(self, test_helper):
h = test_helper
# rename columns
@@ -262,6 +322,7 @@ class TestLastDdlTimeUpdate(ImpalaTestSuite):
h.expect_ddl_time_change("alter table %(TBL)s change column k i int")
# change table property
h.expect_ddl_time_change("alter table %(TBL)s set tblproperties ('a'='b')")
h.expect_ddl_time_change("alter table %(TBL)s unset tblproperties ('a')")
# changing table statistics manually
h.expect_ddl_time_change("alter table %(TBL)s set tblproperties ('numRows'='1')")