diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc index 4f0c0e94a..aab80c870 100644 --- a/be/src/catalog/catalog-server.cc +++ b/be/src/catalog/catalog-server.cc @@ -318,6 +318,12 @@ DEFINE_bool(truncate_external_tables_with_hms, true, "Always use HMS to truncate "external tables. When false, HMS api is only used for tables being replicated. Using" "HMS has the effect of deleting files recursively and triggering an HMS event."); +DEFINE_bool(disable_hms_sync_by_default, false, "Catalogd flag that globally skips " + "HiveMetastore (HMS) event processing by default. If 'true', events are skipped for" + "all objects (with the exception to database level events) unless " + "'impala.disableHmsSync' is explicitly set to 'false' on a database or table." + "This simplifies rolling out event processing job-by-job."); + DECLARE_string(state_store_host); DECLARE_int32(state_store_port); DECLARE_string(state_store_2_host); diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc index afdc92c1a..f806e6f49 100644 --- a/be/src/util/backend-gflag-util.cc +++ b/be/src/util/backend-gflag-util.cc @@ -148,6 +148,7 @@ DECLARE_int32(catalog_reset_max_threads); DECLARE_string(warmup_tables_config_file); DECLARE_bool(keeps_warmup_tables_loaded); DECLARE_bool(truncate_external_tables_with_hms); +DECLARE_bool(disable_hms_sync_by_default); // HS2 SAML2.0 configuration // Defined here because TAG_FLAG caused issues in global-flags.cc @@ -596,6 +597,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) { FLAGS_tuple_cache_cost_coefficient_read_rows); cfg.__set_min_jdbc_scan_cardinality(FLAGS_min_jdbc_scan_cardinality); cfg.__set_max_stmt_metadata_loader_threads(FLAGS_max_stmt_metadata_loader_threads); + cfg.__set_disable_hms_sync_by_default(FLAGS_disable_hms_sync_by_default); return Status::OK(); } diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift index d8bc737f0..d43ec8b7e 100644 --- a/common/thrift/BackendGflags.thrift +++ b/common/thrift/BackendGflags.thrift @@ -361,4 +361,6 @@ struct TBackendGflags { 165: required i32 min_jdbc_scan_cardinality 166: required i32 max_stmt_metadata_loader_threads + + 167: required bool disable_hms_sync_by_default } diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java index b0a446f13..9931e3aa2 100644 --- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java +++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java @@ -1218,7 +1218,7 @@ public class MetastoreShim extends Hive3MetastoreShimBase { writeEventInfoList.get(i).getTableObj(), Table.class); if (event.getCatalogOpExecutor().getCatalog().isHmsEventSyncDisabled(tbl)) { LOG.debug("Not adding write ids to table {}.{} for event {} " + - "since table/db level flag {} is set to true", + "since table/db level flag {} or global level flag is set to true", tbl.getDbName(), tbl.getTableName(), event.getEventId(), MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC.getKey()); continue; diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index b797c4ede..46c434615 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -4831,9 +4831,9 @@ public class CatalogServiceCatalog extends Catalog { return; } if (isHmsEventSyncDisabled(tbl.getMetaStoreTable())) { - LOG.debug("Not adding write ids to table {}.{} for event {} " + - "since table/db level flag {} is set to true", dbName, tblName, eventId, - MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC.getKey()); + LOG.debug("Not adding write ids to table {}.{} for event {} since table/db level" + + " flag {} or disable_hms_sync_by_default is set to true", dbName, + tblName, eventId, MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC.getKey()); return; } if (eventId > 0 && eventId <= tbl.getCreateEventId()) { @@ -4902,7 +4902,10 @@ public class CatalogServiceCatalog extends Catalog { } String dbFlagVal = getDbProperty(tbl.getDbName(), MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC.getKey()); - return Boolean.parseBoolean(dbFlagVal); + if (dbFlagVal != null) { + return Boolean.parseBoolean(dbFlagVal); + } + return BackendConfig.INSTANCE.isDisableHmsSyncByDefault(); } /** diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java index 938a7dd32..bb9a447ce 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java @@ -1255,9 +1255,17 @@ public class MetastoreEvents { + "database {}", MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC.getKey(), dbFlagVal, dbName_); + // flag value of null also returns false + return Boolean.valueOf(dbFlagVal); } - // flag value of null also returns false - return Boolean.valueOf(dbFlagVal); + boolean globalDisableHmsSync = BackendConfig.INSTANCE.isDisableHmsSyncByDefault(); + if (globalDisableHmsSync) { + debugLog("Table level for table {} or Db level for db {}, flag {} is not set. " + + "Global flag disable_hms_sync_by_default is set to {}", + msTbl_.getTableName(), dbName_, MetastoreEventPropertyKey + .DISABLE_EVENT_HMS_SYNC.getKey(), globalDisableHmsSync); + } + return globalDisableHmsSync; } /** diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java index b2239168c..efd4b6383 100644 --- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java +++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java @@ -624,4 +624,12 @@ public class BackendConfig { public int getMaxStmtMetadataLoaderThreads() { return backendCfg_.max_stmt_metadata_loader_threads; } + + public boolean isDisableHmsSyncByDefault() { + return backendCfg_.disable_hms_sync_by_default; + } + + public void setDisableHmsSyncByDefault(boolean disableHmsSyncByDefault) { + backendCfg_.disable_hms_sync_by_default = disableHmsSyncByDefault; + } } diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py index 5020e63f6..3483bd323 100644 --- a/tests/custom_cluster/test_events_custom_configs.py +++ b/tests/custom_cluster/test_events_custom_configs.py @@ -1820,6 +1820,73 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase): # Case-IV: Truncate table from Hive is currently generating single alter_partition # events. HIVE-28668 will address it. + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + catalogd_args="--hms_event_polling_interval_s=1 " + "--disable_hms_sync_by_default=true") + def test_disable_hms_sync_globally(self, unique_database): + """Verify IMPALA-14131: hms events are synced/skipped based on global flag + --disable_hms_sync_by_default and the db/table property 'impala.disableHmsSync'""" + tbl1 = unique_database + ".test_disable_hms_sync_1" + tbl2 = unique_database + ".test_disable_hms_sync_2" + EventProcessorUtils.wait_for_event_processing(self) + + # Case 1: verify global config + events_skipped_before = EventProcessorUtils.get_int_metric('events-skipped', 0) + self.run_stmt_in_hive( + """create table {} (id int) partitioned by (year int); + create table {} (id int);""".format(tbl1, tbl2)) + EventProcessorUtils.wait_for_event_processing(self) + events_skipped_after = EventProcessorUtils.get_int_metric('events-skipped', 0) + assert events_skipped_after > events_skipped_before + table_names = self.client.execute("show tables in {}".format(unique_database))\ + .get_data() + assert not table_names + + def _check_insert_events(tbl, expected_val, skip_events=0, part=''): + EventProcessorUtils.wait_for_event_processing(self) + events_skipped_before = EventProcessorUtils.get_int_metric('events-skipped', 0) + # modify data externally + self.run_stmt_in_hive( + """insert into {tb1} {partition} values(1),(2);""" + .format(tb1=tbl, partition=part)) + EventProcessorUtils.wait_for_event_processing(self) + events_skipped_after = EventProcessorUtils.get_int_metric('events-skipped', 0) + assert events_skipped_after == events_skipped_before + skip_events, \ + "Expected {} events to be skipped, but {} events were skipped.".format( + skip_events, events_skipped_after - events_skipped_before) + data = self.client.execute("select * from {}".format(tbl)) + assert len(data.data) == expected_val, \ + "Expected {} rows in table {}, but found {}.".format(expected_val, tbl, + len(data.data)) + + # Case 2: Enable hms sync at database level but disabled globally + def validate_hms_sync(unique_database, tbl, partition=''): + # load tables in cache + self.client.execute("invalidate metadata {}".format(tbl)) + self.client.execute("describe {}".format(tbl)) + self.run_stmt_in_hive( + """ALTER DATABASE {} SET DBPROPERTIES ('impala.disableHmsSync'='false')""" + .format(unique_database)) + _check_insert_events(tbl, 2, 0, partition) + + validate_hms_sync(unique_database, tbl1, partition='partition(year=2024)') + validate_hms_sync(unique_database, tbl2) + + # Case 3: disable hms sync at database level and enable it at table level + self.run_stmt_in_hive( + """ALTER DATABASE {} SET DBPROPERTIES ('impala.disableHmsSync'='true')""" + .format(unique_database)) + self.client.execute( + """alter table {} SET TBLPROPERTIES ('impala.disableHmsSync'='false')""" + .format(tbl1)) + self.client.execute( + """alter table {} SET TBLPROPERTIES ('impala.disableHmsSync'='false')""" + .format(tbl2)) + EventProcessorUtils.wait_for_event_processing(self) + _check_insert_events(tbl1, 4, skip_events=1, part='partition(year=2024)') + _check_insert_events(tbl2, 4, skip_events=0) + @SkipIfFS.hive class TestEventProcessingWithImpala(TestEventProcessingCustomConfigsBase):