diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc index 63b054109..2208bca77 100644 --- a/be/src/catalog/catalog-server.cc +++ b/be/src/catalog/catalog-server.cc @@ -241,12 +241,11 @@ DEFINE_string(default_skipped_hms_event_types, "the latest event time in HMS."); DEFINE_string(common_hms_event_types, "ADD_PARTITION,ALTER_PARTITION,DROP_PARTITION," - "ADD_PARTITION,ALTER_PARTITION,DROP_PARTITION,CREATE_TABLE,ALTER_TABLE,DROP_TABLE," + "ALTER_PARTITIONS,CREATE_TABLE,ALTER_TABLE,DROP_TABLE,RELOAD,COMMIT_COMPACTION_EVENT" "CREATE_DATABASE,ALTER_DATABASE,DROP_DATABASE,INSERT,OPEN_TXN,COMMIT_TXN,ABORT_TXN," "ALLOC_WRITE_ID_EVENT,ACID_WRITE_EVENT,BATCH_ACID_WRITE_EVENT," "UPDATE_TBL_COL_STAT_EVENT,DELETE_TBL_COL_STAT_EVENT,UPDATE_PART_COL_STAT_EVENT," - "UPDATE_PART_COL_STAT_EVENT_BATCH,DELETE_PART_COL_STAT_EVENT,COMMIT_COMPACTION_EVENT," - "RELOAD", + "UPDATE_PART_COL_STAT_EVENT_BATCH,DELETE_PART_COL_STAT_EVENT,", "Common HMS event types that will be used in eventTypeSkipList when fetching events " "from HMS. The strings come from constants in " "org.apache.hadoop.hive.metastore.messaging.MessageBuilder. When bumping Hive " diff --git a/bin/create-test-configuration.sh b/bin/create-test-configuration.sh index f8c376e92..6eb51fde7 100755 --- a/bin/create-test-configuration.sh +++ b/bin/create-test-configuration.sh @@ -169,6 +169,13 @@ rm -f hive-site-housekeeping-on/hive-site.xml ln -s "${CONFIG_DIR}/hive-site_housekeeping_on.xml" \ hive-site-housekeeping-on/hive-site.xml +export HIVE_VARIANT=events_config_change +$IMPALA_HOME/bin/generate_xml_config.py hive-site.xml.py hive-site_events_config.xml +mkdir -p hive-site-events-config +rm -f hive-site-events-config/hive-site.xml +ln -s "${CONFIG_DIR}/hive-site_events_config.xml" \ + hive-site-events-config/hive-site.xml + export HIVE_VARIANT=ranger_auth HIVE_RANGER_CONF_DIR=hive-site-ranger-auth $IMPALA_HOME/bin/generate_xml_config.py hive-site.xml.py hive-site_ranger_auth.xml diff --git a/bin/impala-config.sh b/bin/impala-config.sh index 88aab1813..8a4ec56e6 100755 --- a/bin/impala-config.sh +++ b/bin/impala-config.sh @@ -240,19 +240,19 @@ fi : ${IMPALA_TOOLCHAIN_HOST:=native-toolchain.s3.amazonaws.com} export IMPALA_TOOLCHAIN_HOST -export CDP_BUILD_NUMBER=58457853 +export CDP_BUILD_NUMBER=66846208 export CDP_MAVEN_REPOSITORY=\ "https://${IMPALA_TOOLCHAIN_HOST}/build/cdp_components/${CDP_BUILD_NUMBER}/maven" -export CDP_AVRO_JAVA_VERSION=1.11.1.7.3.1.0-160 -export CDP_HADOOP_VERSION=3.1.1.7.3.1.0-160 -export CDP_HBASE_VERSION=2.4.17.7.3.1.0-160 -export CDP_HIVE_VERSION=3.1.3000.7.3.1.0-160 -export CDP_ICEBERG_VERSION=1.3.1.7.3.1.0-160 -export CDP_KNOX_VERSION=2.0.0.7.3.1.0-160 -export CDP_OZONE_VERSION=1.3.0.7.3.1.0-160 -export CDP_PARQUET_VERSION=1.12.3.7.3.1.0-160 -export CDP_RANGER_VERSION=2.4.0.7.3.1.0-160 -export CDP_TEZ_VERSION=0.9.1.7.3.1.0-160 +export CDP_AVRO_JAVA_VERSION=1.11.1.7.3.1.500-30 +export CDP_HADOOP_VERSION=3.1.1.7.3.1.500-30 +export CDP_HBASE_VERSION=2.4.17.7.3.1.500-30 +export CDP_HIVE_VERSION=3.1.3000.7.3.1.500-30 +export CDP_ICEBERG_VERSION=1.3.1.7.3.1.500-30 +export CDP_KNOX_VERSION=2.0.0.7.3.1.500-30 +export CDP_OZONE_VERSION=1.4.0.7.3.1.500-30 +export CDP_PARQUET_VERSION=1.12.3.7.3.1.500-30 +export CDP_RANGER_VERSION=2.4.0.7.3.1.500-30 +export CDP_TEZ_VERSION=0.9.1.7.3.1.500-30 # Ref: https://infra.apache.org/release-download-pages.html#closer : ${APACHE_MIRROR:="https://www.apache.org/dyn/closer.cgi"} diff --git a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java index ac0a0657c..24fe4933b 100644 --- a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java +++ b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java @@ -448,6 +448,14 @@ public class MetastoreShim extends Hive3MetastoreShimBase { throw new UnsupportedOperationException("Reload event is not supported."); } + /** + * CDP Hive-3 only function. + */ + public static AlterPartitionsInfo getFieldsFromAlterPartitionsEvent( + NotificationEvent event) throws MetastoreNotificationException { + throw new UnsupportedOperationException("AlterPartitions event is not supported."); + } + /** * CDP Hive-3 only function. */ diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java index e80669919..10f6c3b6c 100644 --- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java +++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java @@ -79,6 +79,7 @@ import org.apache.hadoop.hive.metastore.api.WriteEventInfo; import org.apache.hadoop.hive.metastore.api.WriteNotificationLogBatchRequest; import org.apache.hadoop.hive.metastore.api.WriteNotificationLogRequest; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.messaging.AlterPartitionsMessage; import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage; import org.apache.hadoop.hive.metastore.messaging.CommitCompactionMessage; @@ -606,6 +607,39 @@ public class MetastoreShim extends Hive3MetastoreShimBase { return updatedFields; } + /** + * This method extracts the table, partitions, and isTruncateOp fields from the + * notification event and returns them in a AlterPartitionsInfo class object. + * + * @param event Metastore notification event, + * @return a AlterPartitionsInfo class object required for the reload event. + */ + public static AlterPartitionsInfo getFieldsFromAlterPartitionsEvent( + NotificationEvent event) throws MetastoreNotificationException{ + Preconditions.checkNotNull(event.getMessage()); + AlterPartitionsMessage alterPartitionsMessage = + MetastoreEventsProcessor.getMessageDeserializer() + .getAlterPartitionsMessage(event.getMessage()); + AlterPartitionsInfo alterPartitionsInfo = null; + try { + Iterator partitionsIterator = Preconditions.checkNotNull( + alterPartitionsMessage.getPartitionObjs().iterator()); + List partitionsAfter = + new ArrayList<>(); + while (partitionsIterator.hasNext()) { + partitionsAfter.add(partitionsIterator.next()); + } + org.apache.hadoop.hive.metastore.api.Table msTbl = Preconditions.checkNotNull( + alterPartitionsMessage.getTableObj()); + boolean isTruncateOp = alterPartitionsMessage.getIsTruncateOp(); + alterPartitionsInfo = new AlterPartitionsInfo(msTbl, partitionsAfter, + isTruncateOp); + } catch (Exception e) { + throw new MetastoreNotificationException(e); + } + return alterPartitionsInfo; + } + /** * This method extracts the partition name field from the * notification event and returns it in the form of string. diff --git a/fe/src/main/java/org/apache/impala/catalog/Hive3MetastoreShimBase.java b/fe/src/main/java/org/apache/impala/catalog/Hive3MetastoreShimBase.java index 11a22c52b..447636ea1 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Hive3MetastoreShimBase.java +++ b/fe/src/main/java/org/apache/impala/catalog/Hive3MetastoreShimBase.java @@ -842,4 +842,43 @@ public class Hive3MetastoreShimBase { public static boolean validateColumnName(String name) { return MetaStoreUtils.validateColumnName(name); } + + /** + * Constructs a new AlterPartitionsInfo object. + */ + public static class AlterPartitionsInfo { + private final org.apache.hadoop.hive.metastore.api.Table msTable; + private final List partitions; + private final boolean isTruncate; + + public AlterPartitionsInfo(org.apache.hadoop.hive.metastore.api.Table msTable, + List partitions, + boolean isTruncate) { + this.msTable = msTable; + this.partitions = partitions; + this.isTruncate = isTruncate; + } + + /** + * Returns the Thrift representation of the table. + */ + public org.apache.hadoop.hive.metastore.api.Table getMsTable() { + return msTable; + } + + /** + * Returns the list of Thrift partition objects affected by the reload. + * Can be null or empty if not applicable. + */ + public List getPartitions() { + return partitions; + } + + /** + * Returns true if the reload operation was due to a TRUNCATE TABLE. + */ + public boolean isTruncate() { + return isTruncate; + } + } } diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java index 2405e91f6..1055b212c 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.Map; @@ -128,6 +129,7 @@ public class MetastoreEvents { DROP_DATABASE("DROP_DATABASE"), ALTER_DATABASE("ALTER_DATABASE"), ADD_PARTITION("ADD_PARTITION"), + ALTER_BATCH_PARTITIONS("ALTER_BATCH_PARTITIONS"), ALTER_PARTITION("ALTER_PARTITION"), ALTER_PARTITIONS("ALTER_PARTITIONS"), DROP_PARTITION("DROP_PARTITION"), @@ -227,6 +229,8 @@ public class MetastoreEvents { return new DropPartitionEvent(catalogOpExecutor_, metrics, event); case ALTER_PARTITION: return new AlterPartitionEvent(catalogOpExecutor_, metrics, event); + case ALTER_PARTITIONS: + return new AlterPartitionsEvent(catalogOpExecutor_, metrics, event); case RELOAD: return new ReloadEvent(catalogOpExecutor_, metrics, event); case INSERT: @@ -1367,6 +1371,52 @@ public class MetastoreEvents { return false; } + protected void processAlterPartitionEvent(boolean isTruncateOp, + List partitionsAfter) + throws CatalogException, MetastoreNotificationException { + // Reload the whole table if it's a transactional table or materialized view. + // Materialized views are treated as a special case because it's possible to + // receive partition event on MVs, but they are regular views in Impala. That + // cause problems on the reloading partition logic which expects it to be a + // HdfsTable. + if (AcidUtils.isTransactionalTable(msTbl_.getParameters()) + || MetaStoreUtils.isMaterializedViewTable(msTbl_)) { + reloadTransactionalTable(partitionsAfter); + } else { + try { + // load file metadata only if storage descriptor of partitionAfter_ differs + // from sd of HdfsPartition. If the alter_partition event type is of truncate + // then force load the file metadata. + FileMetadataLoadOpts fileMetadataLoadOpts = + isTruncateOp ? FileMetadataLoadOpts.FORCE_LOAD : + FileMetadataLoadOpts.LOAD_IF_SD_CHANGED; + reloadPartitions(partitionsAfter, fileMetadataLoadOpts, getEventDesc(), + false); + } catch (CatalogException e) { + throw new MetastoreNotificationException( + debugString("Refresh partitions on table {} failed. Event " + + "processing cannot continue. Issue an invalidate command to reset " + + "the event processor state.", getFullyQualifiedTblName(), e)); + } + } + } + + protected void reloadTransactionalTable( + List partitionsAfter) + throws CatalogException { + boolean incrementalRefresh = + BackendConfig.INSTANCE.getHMSEventIncrementalRefreshTransactionalTable(); + if (incrementalRefresh) { + reloadPartitionsFromEvent(partitionsAfter, getEventDesc() + + " FOR TRANSACTIONAL TABLE"); + } else { + boolean notSkipped = reloadTableFromCatalog(true); + if (!notSkipped) { + metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); + } + } + } + @Override protected void process() throws MetastoreNotificationException, CatalogException { Timer.Context context = null; @@ -2657,7 +2707,7 @@ public class MetastoreEvents { @Override protected MetastoreEventType getBatchEventType() { - return MetastoreEventType.ALTER_PARTITIONS; + return MetastoreEventType.ALTER_BATCH_PARTITIONS; } @Override @@ -2721,32 +2771,7 @@ public class MetastoreEvents { getEventId()); return; } - // Reload the whole table if it's a transactional table or materialized view. - // Materialized views are treated as a special case because it's possible to - // receive partition event on MVs, but they are regular views in Impala. That - // cause problems on the reloading partition logic which expects it to be a - // HdfsTable. - if (AcidUtils.isTransactionalTable(msTbl_.getParameters()) - || MetaStoreUtils.isMaterializedViewTable(msTbl_)) { - reloadTransactionalTable(); - } else { - // Refresh the partition that was altered. - Preconditions.checkNotNull(partitionAfter_); - try { - // load file metadata only if storage descriptor of partitionAfter_ differs - // from sd of HdfsPartition. If the alter_partition event type is of truncate - // then force load the file metadata. - FileMetadataLoadOpts fileMetadataLoadOpts = - isTruncateOp_ ? FileMetadataLoadOpts.FORCE_LOAD : - FileMetadataLoadOpts.LOAD_IF_SD_CHANGED; - reloadPartitions(Arrays.asList(partitionAfter_), fileMetadataLoadOpts, - getEventDesc(), false); - } catch (CatalogException e) { - throw new MetastoreNotificationNeedsInvalidateException( - debugString("Refresh partition on table {} partition {} failed.", - getFullyQualifiedTblName(), partName_), e); - } - } + processAlterPartitionEvent(isTruncateOp_, Arrays.asList(partitionAfter_)); } @Override @@ -2775,20 +2800,63 @@ public class MetastoreEvents { Arrays.asList(getTPartitionSpecFromHmsPartition(msTbl_, partitionAfter_)), partitionAfter_.getParameters()); } + } - private void reloadTransactionalTable() throws CatalogException { - boolean incrementalRefresh = - BackendConfig.INSTANCE.getHMSEventIncrementalRefreshTransactionalTable(); - if (incrementalRefresh) { - reloadPartitionsFromEvent(Collections.singletonList(partitionAfter_), - getEventDesc() + " FOR TRANSACTIONAL TABLE"); - } else { - boolean notSkipped = reloadTableFromCatalog(true); - if (!notSkipped) { - metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); - } + public static class AlterPartitionsEvent extends MetastoreTableEvent { + public static final String EVENT_TYPE = "ALTER_PARTITIONS"; + // the list of partition objects of alter operation, as parsed from the + // NotificationEvent + private final List partitionsAfter_; + private final boolean isTruncateOp_; + + /** + * Prevent instantiation from outside should use MetastoreEventFactory instead + */ + private AlterPartitionsEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics, + NotificationEvent event) throws MetastoreNotificationException { + super(catalogOpExecutor, metrics, event); + Preconditions.checkState(getEventType().equals( + MetastoreEventType.ALTER_PARTITIONS)); + try { + MetastoreShim.AlterPartitionsInfo alterPartitionsInfo = + MetastoreShim.getFieldsFromAlterPartitionsEvent(event); + msTbl_ = alterPartitionsInfo.getMsTable(); + partitionsAfter_ = alterPartitionsInfo.getPartitions(); + isTruncateOp_ = alterPartitionsInfo.isTruncate(); + } catch (Exception e) { + throw new MetastoreNotificationException( + debugString("Unable to parse the alter partition message"), e); } } + + @Override + public void processTableEvent() throws MetastoreNotificationException, + CatalogException { + if (partitionsAfter_.isEmpty()) { + metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); + warnLog("Not processing the alter partitions event {} as no partitions are " + + "received in the event.", getEventId()); + return; + } + if (isSelfEvent()) { + infoLog("Not processing the event as it is a self-event"); + return; + } + + if (isOlderEvent(partitionsAfter_.get(0))) { + infoLog("Not processing the alter partition event {} as it is an older event", + getEventId()); + return; + } + processAlterPartitionEvent(isTruncateOp_, partitionsAfter_); + } + + @Override + public SelfEventContext getSelfEventContext() { + return new SelfEventContext(dbName_, tblName_, + Arrays.asList(getTPartitionSpecFromHmsPartition(msTbl_, + partitionsAfter_.get(0))), partitionsAfter_.get(0).getParameters()); + } } /** diff --git a/fe/src/test/resources/hive-site.xml.py b/fe/src/test/resources/hive-site.xml.py index d3a3609b4..d2bf9a9d9 100644 --- a/fe/src/test/resources/hive-site.xml.py +++ b/fe/src/test/resources/hive-site.xml.py @@ -95,6 +95,11 @@ elif VARIANT == 'housekeeping_on': CONFIG.update({ 'hive.metastore.housekeeping.threads.on': 'true', }) +elif VARIANT == 'events_config_change': + # HMS config change needed for HIVE-27746 to emit ALTER_PARTITIONS event + CONFIG.update({ + 'hive.metastore.alterPartitions.notification.v2.enabled': 'true', + }) # HBase-related configs. # Impala processes need to connect to zookeeper on INTERNAL_LISTEN_HOST for HBase. diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py index 5d3b11b26..7a0f7031d 100644 --- a/tests/custom_cluster/test_events_custom_configs.py +++ b/tests/custom_cluster/test_events_custom_configs.py @@ -39,6 +39,8 @@ from tests.util.iceberg_util import IcebergCatalogs HIVE_SITE_HOUSEKEEPING_ON =\ getenv('IMPALA_HOME') + '/fe/src/test/resources/hive-site-housekeeping-on' +HIVE_SITE_ALTER_PARTITIONS_EVENT =\ + getenv('IMPALA_HOME') + '/fe/src/test/resources/hive-site-events-config' TRUNCATE_TBL_STMT = 'truncate table' # The statestore heartbeat and topic update frequency (ms). Set low for testing. STATESTORE_RPC_FREQUENCY_MS = 100 @@ -1694,6 +1696,67 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase): self.client.execute("""DROP DATABASE {} CASCADE""".format(unique_database)) self.client.execute("""CREATE DATABASE {}""".format(unique_database)) + @SkipIf.is_test_jdk + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args="--use_local_catalog=true", + catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=1 " + "--debug_actions=catalogd_event_processing_delay:SLEEP@1000", + hive_conf_dir=HIVE_SITE_ALTER_PARTITIONS_EVENT) + def test_alter_partitions_event_from_metastore(self, unique_database): + tbl = unique_database + ".test_alter_partitions" + self.client.execute("create table {} (id int) partitioned by (year int)" + .format(tbl)) + + def _verify_alter_partitions_event(events): + event_found = False + for event in events: + if event.eventType == "ALTER_PARTITIONS": + event_found = True + else: + logging.debug("Found " + str(event)) + return event_found + + # Verify that test always generates single ALTER_PARTITIONS event + self.client.execute( + "insert into {} partition(year) values (0,2024), (1,2023), (2,2022)" + .format(tbl)) + EventProcessorUtils.wait_for_event_processing(self, 10) + + # Case-I: compute stats from hive + parts_refreshed_before = EventProcessorUtils.get_int_metric("partitions-refreshed") + batch_events_before = EventProcessorUtils.get_int_metric("batch-events-created") + self.run_stmt_in_hive("analyze table {} compute statistics".format(tbl)) + EventProcessorUtils.wait_for_event_processing(self, 10) + batch_events_after = EventProcessorUtils.get_int_metric("batch-events-created") + parts_refreshed_after = EventProcessorUtils.get_int_metric("partitions-refreshed") + assert batch_events_after == batch_events_before # verify there are no new batches + assert parts_refreshed_after == parts_refreshed_before + 3 + + # Case-II: compute stats from impala + last_event_id = EventProcessorUtils.get_current_notification_id(self.hive_client) + self.client.execute("compute stats {}".format(tbl)) + events_skipped_before = EventProcessorUtils.get_int_metric('events-skipped', 0) + EventProcessorUtils.wait_for_event_processing(self, 10) + events = EventProcessorUtils.get_next_notification(self.hive_client, last_event_id) + # There will be COMMIT_TXN, ALLOC_WRITE_ID_EVENT, ALTER_PARTITIONS in any order + events_skipped_after = EventProcessorUtils.get_int_metric('events-skipped', 0) + assert _verify_alter_partitions_event(events) + assert events_skipped_after > events_skipped_before + + # Case-III: truncate table from Impala + last_event_id = EventProcessorUtils.get_current_notification_id(self.hive_client) + self.client.execute("truncate table {}".format(tbl)) + events_skipped_before = EventProcessorUtils.get_int_metric('events-skipped', 0) + EventProcessorUtils.wait_for_event_processing(self, 10) + events = EventProcessorUtils.get_next_notification(self.hive_client, last_event_id) + events_skipped_after = EventProcessorUtils.get_int_metric('events-skipped', 0) + assert _verify_alter_partitions_event(events) + assert events_skipped_after > events_skipped_before + + # Case-IV: Truncate table from Hive is currently generating single alter_partition + # events. HIVE-28668 will address it. + @SkipIfFS.hive class TestEventProcessingWithImpala(TestEventProcessingCustomConfigsBase):