diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 195605e6e..b5acac0ad 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -1182,8 +1182,11 @@ public class CatalogOpExecutor { /** * Execute the ALTER TABLE command according to the TAlterTableParams and refresh the - * table metadata, except for RENAME, ADD PARTITION and DROP PARTITION. This call is - * thread-safe, i.e. concurrent operations on the same table are serialized. + * table metadata, except for: + * - RENAME for Iceberg tables + * - RENAME, ADD PARTITION and DROP PARTITION for HDFS tables. + * This call is thread-safe, i.e. concurrent operations on the same table are + * serialized. */ private void alterTable(TAlterTableParams params, @Nullable String debugAction, boolean wantMinimalResult, TDdlExecResponse response, EventSequence catalogTimeline) @@ -1299,14 +1302,23 @@ public class CatalogOpExecutor { case DROP_PARTITION: TAlterTableDropPartitionParams dropPartParams = params.getDrop_partition_params(); - // Drop the partition from the corresponding table. If "purge" option is - // specified partition data is purged by skipping Trash, if configured. - alterTableDropPartition(tbl, dropPartParams.getPartition_set(), - dropPartParams.isIf_exists(), dropPartParams.isPurge(), - numUpdatedPartitions, catalogTimeline, modification); - responseSummaryMsg = - "Dropped " + numUpdatedPartitions.getRef() + " partition(s)."; - reloadMetadata = false; + if (tbl instanceof IcebergTable) { + // The partitions of Iceberg tables are not listed in HMS. DROP PARTITION is a + // metadata-only change for Iceberg tables. + updateHmsAfterIcebergOnlyModification( + (IcebergTable) tbl, catalogTimeline,modification); + } else { + // DROP PARTITION for HDFS tables is different, because partitions are listed + // in HMS. + // Drop the partition from the corresponding table. If "purge" option is + // specified partition data is purged by skipping Trash, if configured. + alterTableDropPartition(tbl, dropPartParams.getPartition_set(), + dropPartParams.isIf_exists(), dropPartParams.isPurge(), + numUpdatedPartitions, catalogTimeline, modification); + responseSummaryMsg = + "Dropped " + numUpdatedPartitions.getRef() + " partition(s)."; + reloadMetadata = false; + } break; case RENAME_TABLE: case RENAME_VIEW: @@ -1420,6 +1432,13 @@ public class CatalogOpExecutor { tbl, params.getSet_owner_params(), response, catalogTimeline, modification); responseSummaryMsg = "Updated table/view."; break; + // ALTER a non-managed Iceberg table. + case EXECUTE: + case SET_PARTITION_SPEC: + Preconditions.checkState(tbl instanceof IcebergTable); + updateHmsAfterIcebergOnlyModification( + (IcebergTable) tbl, catalogTimeline, modification); + break; default: throw new UnsupportedOperationException( "Unknown ALTER TABLE operation type: " + params.getAlter_type()); @@ -1582,7 +1601,6 @@ public class CatalogOpExecutor { case EXECUTE: Preconditions.checkState(params.isSetSet_execute_params()); // All the EXECUTE functions operate only on Iceberg data. - needsToUpdateHms = false; TAlterTableExecuteParams setExecuteParams = params.getSet_execute_params(); if (setExecuteParams.isSetExecute_rollback_params()) { String rollbackSummary = IcebergCatalogOpExecutor.alterTableExecuteRollback( @@ -1601,7 +1619,6 @@ public class CatalogOpExecutor { break; case SET_PARTITION_SPEC: // Partition spec is not stored in HMS. - needsToUpdateHms = false; TAlterTableSetPartitionSpecParams setPartSpecParams = params.getSet_partition_spec_params(); IcebergCatalogOpExecutor.alterTableSetPartitionSpec(tbl, @@ -1617,8 +1634,7 @@ public class CatalogOpExecutor { addSummary(response, "Updated table."); break; case DROP_PARTITION: - // Metadata change only - needsToUpdateHms = false; + // Partitions are not stored in HMS, this is a metadata change only. long droppedPartitions = IcebergCatalogOpExecutor.alterTableDropPartition( iceTxn, params.getDrop_partition_params()); addSummary( @@ -1633,6 +1649,14 @@ public class CatalogOpExecutor { params.getAlter_type()); } catalogTimeline.markEvent("Iceberg operations are prepared for commit"); + // Modify "transient_lastDdlTime" table property through Iceberg. + // Non-managed Iceberg tables also need to update this property in HMS separately + // in 'alterTable()', regardless of whether the modification itself needs to update + // HMS or affects only Iceberg metadata stored on the file system. + Map property = new HashMap<>(); + property.put(Table.TBL_PROP_LAST_DDL_TIME, + Long.toString(System.currentTimeMillis() / 1000)); + IcebergCatalogOpExecutor.setTblProperties(iceTxn, property); if (!needsToUpdateHms) { // registerInflightEvent() before committing transaction. modification.registerInflightEvent(); @@ -1653,8 +1677,7 @@ public class CatalogOpExecutor { } if (!needsToUpdateHms) { - // We don't need to update HMS because either it is already done by Iceberg's - // HiveCatalog, or we modified the Iceberg data which is not stored in HMS. + // We don't need to update HMS because it is already done by Iceberg's HiveCatalog. loadTableMetadata(tbl, modification.newVersionNumber(), true, true, "ALTER Iceberg TABLE " + params.getAlter_type().name(), debugAction, catalogTimeline); @@ -1681,8 +1704,8 @@ public class CatalogOpExecutor { /** * Iceberg format from V2 supports row-level modifications. We set write modes to - * "merge-on-read" which is the write mode Impala will eventually - * support (IMPALA-11664). Unless the user specified otherwise in the table properties. + * "merge-on-read" which is the write mode Impala supports. + * Unless the user specified otherwise in the table properties. */ private void addMergeOnReadPropertiesIfNeeded(IcebergTable tbl, Map properties) { @@ -1714,6 +1737,23 @@ public class CatalogOpExecutor { return true; } + /** + * Update HMS about an ALTER TABLE issued on a non-managed Iceberg table. In case the + * modification only affects Iceberg metadata, there would be no update to HMS, but + * 'transient_lastDdlTime' table property still needs to be updated in Hive Metastore. + */ + private void updateHmsAfterIcebergOnlyModification(IcebergTable tbl, + EventSequence catalogTimeline, InProgressTableModification modification) + throws ImpalaRuntimeException { + // Managed Iceberg tables in HMS are updated by Iceberg's HiveCatalog together with + // committing the Iceberg transaction. HMS should not be updated directly in parallel + // with that because of potential data loss. + Preconditions.checkState(!IcebergUtil.isHiveCatalog(tbl.getMetaStoreTable())); + Preconditions.checkState(tbl.isWriteLockedByCurrentThread()); + org.apache.hadoop.hive.metastore.api.Table msTbl = tbl.getMetaStoreTable().deepCopy(); + applyAlterAndInProgressTableModification(msTbl, catalogTimeline, modification); + } + /** * Loads the metadata of a table 'tbl' and assigns a new catalog version. * 'reloadFileMetadata' and 'reloadTableSchema' diff --git a/tests/metadata/test_last_ddl_time_update.py b/tests/metadata/test_last_ddl_time_update.py index 6b202933f..a5e48c7fd 100644 --- a/tests/metadata/test_last_ddl_time_update.py +++ b/tests/metadata/test_last_ddl_time_update.py @@ -39,6 +39,12 @@ class TestLastDdlTimeUpdate(ImpalaTestSuite): # to regress here. cls.ImpalaTestMatrix.add_constraint(lambda v: False) + class TableFormat: + HDFS = 1 + KUDU = 2 + INTEGRATED_ICEBERG = 3 + NON_INTEGRATED_ICEBERG = 4 + # Convenience class to make calls to TestLastDdlTimeUpdate.run_test() shorter by # storing common arguments as members and substituting table name and HDFS warehouse # path to the query string. @@ -76,13 +82,18 @@ class TestLastDdlTimeUpdate(ImpalaTestSuite): """ self.run_test(query, self.TimeState.UNCHANGED, self.TimeState.CHANGED) - def expect_ddl_time_change_on_rename(self, new_tbl_name): + def expect_ddl_time_change_on_rename(self, new_tbl_name, expect_change): """ Checks that after an ALTER TABLE ... RENAME query transient_lastDdlTime is higher on the new table than it was on the old table. """ query = "alter table %(TBL)s rename to {}".format(self.db_name + "." + new_tbl_name) - self.run_test(query, self.TimeState.CHANGED, self.TimeState.UNCHANGED, new_tbl_name) + if expect_change: + self.run_test( + query, self.TimeState.CHANGED, self.TimeState.UNCHANGED, new_tbl_name) + else: + self.run_test( + query, self.TimeState.UNCHANGED, self.TimeState.UNCHANGED, new_tbl_name) def expect_stat_time_set(self, query): """Running the query should not change transient_lastDdlTime while @@ -160,25 +171,33 @@ class TestLastDdlTimeUpdate(ImpalaTestSuite): statsTime = table.parameters.get(LAST_COMPUTE_STATS_TIME_KEY, "") return (ddlTime, statsTime) - def _create_table(self, fq_tbl_name, is_kudu): - if is_kudu: - self.execute_query("create table %s (i int primary key) " - "partition by hash(i) partitions 3 stored as kudu" % fq_tbl_name) - else: + def _create_table(self, fq_tbl_name, format): + if format == self.TableFormat.HDFS: self.execute_query("create external table %s (i int) " - "partitioned by (j int, s string)" % fq_tbl_name) + "partitioned by (j int, s string)" % fq_tbl_name) + elif format == self.TableFormat.KUDU: + self.execute_query("create table %s (i int primary key) partition by " + "hash(i) partitions 3 stored as kudu" % fq_tbl_name) + elif format == self.TableFormat.INTEGRATED_ICEBERG: + self.execute_query("create table %s (i int) partitioned by (j int, s string) " + "stored by iceberg" % fq_tbl_name) + elif format == self.TableFormat.NON_INTEGRATED_ICEBERG: + self.execute_query("create table %s (i int) partitioned by (j int, s string) " + "stored by iceberg tblproperties('iceberg.catalog'='hadoop.tables')" + % fq_tbl_name) - def _create_and_init_test_helper(self, unique_database, tbl_name, is_kudu): + def _create_and_init_test_helper(self, unique_database, tbl_name, table_format): helper = TestLastDdlTimeUpdate.TestHelper(self, unique_database, tbl_name) - self._create_table(helper.fq_tbl_name, is_kudu) + self._create_table(helper.fq_tbl_name, table_format) # compute statistics to fill table property impala.lastComputeStatsTime self.execute_query("compute stats %s" % helper.fq_tbl_name) return helper - def test_alter(self, vector, unique_database): + def test_hdfs_alter(self, vector, unique_database): TBL_NAME = "alter_test_tbl" - h = self._create_and_init_test_helper(unique_database, TBL_NAME, False) + h = self._create_and_init_test_helper( + unique_database, TBL_NAME, self.TableFormat.HDFS) # add/drop partitions h.expect_no_time_change("alter table %(TBL)s add partition (j=1, s='2012')") @@ -214,10 +233,10 @@ class TestLastDdlTimeUpdate(ImpalaTestSuite): # compute sampled statistics h.expect_stat_time_change("compute stats %(TBL)s tablesample system(20)") - - def test_insert(self, vector, unique_database): + def test_hdfs_insert(self, vector, unique_database): TBL_NAME = "insert_test_tbl" - h = self._create_and_init_test_helper(unique_database, TBL_NAME, False) + h = self._create_and_init_test_helper( + unique_database, TBL_NAME, self.TableFormat.HDFS) # static partition insert h.expect_no_time_change("insert into %(TBL)s partition(j=1, s='2012') select 10") @@ -233,28 +252,69 @@ class TestLastDdlTimeUpdate(ImpalaTestSuite): def test_kudu_alter_and_insert(self, vector, unique_database): TBL_NAME = "kudu_test_tbl" - h = self._create_and_init_test_helper(unique_database, TBL_NAME, True) + h = self._create_and_init_test_helper( + unique_database, TBL_NAME, self.TableFormat.KUDU) # insert h.expect_no_time_change("insert into %s values (1)" % h.fq_tbl_name) self.run_common_test_cases(h) - def test_rename(self, vector, unique_database): - # Test non-Kudu table - OLD_TBL_NAME = "rename_from_test_tbl" - NEW_TBL_NAME = "rename_to_test_tbl" + def test_iceberg_alter_and_insert(self, vector, unique_database): + TBL_NAMES = ("iceberg_test_tbl", "non_integrated_iceberg_test_tbl") + helpers = (self._create_and_init_test_helper( + unique_database, TBL_NAMES[0], self.TableFormat.INTEGRATED_ICEBERG), + self._create_and_init_test_helper( + unique_database, TBL_NAMES[1], self.TableFormat.NON_INTEGRATED_ICEBERG)) - h = self._create_and_init_test_helper(unique_database, OLD_TBL_NAME, False) - h.expect_ddl_time_change_on_rename(NEW_TBL_NAME) + for h in helpers: + # insert + h.expect_no_time_change("insert into %(TBL)s select 10, 2, '2025'") + h.expect_no_time_change("insert into %(TBL)s select 20, 1, '2012'") + + # add, alter and drop column + h.expect_ddl_time_change("alter table %(TBL)s add column a boolean") + h.expect_ddl_time_change("alter table %(TBL)s alter column a set comment 'bool'") + h.expect_ddl_time_change("alter table %(TBL)s drop column a") + + # drop partition + h.expect_ddl_time_change("alter table %(TBL)s drop partition (j=1, s='2012')") + + # set partition spec + h.expect_ddl_time_change("alter table %(TBL)s set partition spec (void(j))") + + # expire snapshots + h.expect_ddl_time_change("alter table %(TBL)s execute expire_snapshots(now())") + + self.run_common_test_cases(h) + + def test_rename(self, vector, unique_database): + # Test HDFS table + OLD_HDFS_TBL_NAME = "hdfs_rename_from_test_tbl" + NEW_HDFS_TBL_NAME = "hdfs_rename_to_test_tbl" + + h = self._create_and_init_test_helper( + unique_database, OLD_HDFS_TBL_NAME, self.TableFormat.HDFS) + h.expect_ddl_time_change_on_rename(NEW_HDFS_TBL_NAME, True) # Test Kudu table OLD_KUDU_TBL_NAME = "kudu_rename_from_test_tbl" NEW_KUDU_TBL_NAME = "kudu_rename_to_test_tbl" - h = self._create_and_init_test_helper(unique_database, OLD_KUDU_TBL_NAME, True) - h.expect_ddl_time_change_on_rename(NEW_KUDU_TBL_NAME) + h = self._create_and_init_test_helper( + unique_database, OLD_KUDU_TBL_NAME, self.TableFormat.KUDU) + h.expect_ddl_time_change_on_rename(NEW_KUDU_TBL_NAME, True) - # Tests that should behave the same with HDFS and Kudu tables. + # The name of an Iceberg table is not tracked by Iceberg in the metadata files. + # Table name is a catalog-level abstraction, therefore rename is a catalog-level + # operation which does not change 'transient_lastDdlTime' table property. + # Iceberg tables that use 'hadoop.tables' as catalog cannot be renamed. + OLD_ICEBERG_TBL_NAME = "iceberg_rename_from_test_tbl" + NEW_ICEBERG_TBL_NAME = "iceberg_rename_to_test_tbl" + h = self._create_and_init_test_helper( + unique_database, OLD_ICEBERG_TBL_NAME, self.TableFormat.INTEGRATED_ICEBERG) + h.expect_ddl_time_change_on_rename(NEW_ICEBERG_TBL_NAME, False) + + # Tests that should behave the same with HDFS, Kudu and Iceberg tables. def run_common_test_cases(self, test_helper): h = test_helper # rename columns @@ -262,6 +322,7 @@ class TestLastDdlTimeUpdate(ImpalaTestSuite): h.expect_ddl_time_change("alter table %(TBL)s change column k i int") # change table property h.expect_ddl_time_change("alter table %(TBL)s set tblproperties ('a'='b')") + h.expect_ddl_time_change("alter table %(TBL)s unset tblproperties ('a')") # changing table statistics manually h.expect_ddl_time_change("alter table %(TBL)s set tblproperties ('numRows'='1')")