IMPALA-13593: Enable event processor to consume ALTER_PARTITIONS events

from metastore

HIVE-27746 introduced ALTER_PARTITIONS event type which is an
optimization of reducing the bulk ALTER_PARTITION events into a single
event. The components version is updated to pick up this change. It
would be a good optimization to include this in Impala so that the
number of events consumed by event processor would be significantly
reduced and help event processor to catch up with events quickly.

This patch enables the ability to consume ALTER_PARTITIONS event. The
downside of this patch is that, there is no before_partitions object in
the event message. This can cause partitions to be refreshed even on
trivial changes to them. HIVE-29141 will address this concern.

Testing:
- Added an end-to-end test to verify consuming the ALTER_PARTITIONS
event. Also, bigger time outs were added in this test as there was
flakiness observed while looping this test several times.

Change-Id: I009a87ef5e2c331272f9e2d7a6342cc860e64737
Reviewed-on: http://gerrit.cloudera.org:8080/22554
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
This commit is contained in:
Sai Hemanth Gantasala
2024-12-09 14:41:00 -08:00
committed by Csaba Ringhofer
parent 669d74f467
commit b67a9cecb3
9 changed files with 275 additions and 52 deletions

View File

@@ -241,12 +241,11 @@ DEFINE_string(default_skipped_hms_event_types,
"the latest event time in HMS.");
DEFINE_string(common_hms_event_types, "ADD_PARTITION,ALTER_PARTITION,DROP_PARTITION,"
"ADD_PARTITION,ALTER_PARTITION,DROP_PARTITION,CREATE_TABLE,ALTER_TABLE,DROP_TABLE,"
"ALTER_PARTITIONS,CREATE_TABLE,ALTER_TABLE,DROP_TABLE,RELOAD,COMMIT_COMPACTION_EVENT"
"CREATE_DATABASE,ALTER_DATABASE,DROP_DATABASE,INSERT,OPEN_TXN,COMMIT_TXN,ABORT_TXN,"
"ALLOC_WRITE_ID_EVENT,ACID_WRITE_EVENT,BATCH_ACID_WRITE_EVENT,"
"UPDATE_TBL_COL_STAT_EVENT,DELETE_TBL_COL_STAT_EVENT,UPDATE_PART_COL_STAT_EVENT,"
"UPDATE_PART_COL_STAT_EVENT_BATCH,DELETE_PART_COL_STAT_EVENT,COMMIT_COMPACTION_EVENT,"
"RELOAD",
"UPDATE_PART_COL_STAT_EVENT_BATCH,DELETE_PART_COL_STAT_EVENT,",
"Common HMS event types that will be used in eventTypeSkipList when fetching events "
"from HMS. The strings come from constants in "
"org.apache.hadoop.hive.metastore.messaging.MessageBuilder. When bumping Hive "

View File

@@ -169,6 +169,13 @@ rm -f hive-site-housekeeping-on/hive-site.xml
ln -s "${CONFIG_DIR}/hive-site_housekeeping_on.xml" \
hive-site-housekeeping-on/hive-site.xml
export HIVE_VARIANT=events_config_change
$IMPALA_HOME/bin/generate_xml_config.py hive-site.xml.py hive-site_events_config.xml
mkdir -p hive-site-events-config
rm -f hive-site-events-config/hive-site.xml
ln -s "${CONFIG_DIR}/hive-site_events_config.xml" \
hive-site-events-config/hive-site.xml
export HIVE_VARIANT=ranger_auth
HIVE_RANGER_CONF_DIR=hive-site-ranger-auth
$IMPALA_HOME/bin/generate_xml_config.py hive-site.xml.py hive-site_ranger_auth.xml

View File

@@ -240,19 +240,19 @@ fi
: ${IMPALA_TOOLCHAIN_HOST:=native-toolchain.s3.amazonaws.com}
export IMPALA_TOOLCHAIN_HOST
export CDP_BUILD_NUMBER=58457853
export CDP_BUILD_NUMBER=66846208
export CDP_MAVEN_REPOSITORY=\
"https://${IMPALA_TOOLCHAIN_HOST}/build/cdp_components/${CDP_BUILD_NUMBER}/maven"
export CDP_AVRO_JAVA_VERSION=1.11.1.7.3.1.0-160
export CDP_HADOOP_VERSION=3.1.1.7.3.1.0-160
export CDP_HBASE_VERSION=2.4.17.7.3.1.0-160
export CDP_HIVE_VERSION=3.1.3000.7.3.1.0-160
export CDP_ICEBERG_VERSION=1.3.1.7.3.1.0-160
export CDP_KNOX_VERSION=2.0.0.7.3.1.0-160
export CDP_OZONE_VERSION=1.3.0.7.3.1.0-160
export CDP_PARQUET_VERSION=1.12.3.7.3.1.0-160
export CDP_RANGER_VERSION=2.4.0.7.3.1.0-160
export CDP_TEZ_VERSION=0.9.1.7.3.1.0-160
export CDP_AVRO_JAVA_VERSION=1.11.1.7.3.1.500-30
export CDP_HADOOP_VERSION=3.1.1.7.3.1.500-30
export CDP_HBASE_VERSION=2.4.17.7.3.1.500-30
export CDP_HIVE_VERSION=3.1.3000.7.3.1.500-30
export CDP_ICEBERG_VERSION=1.3.1.7.3.1.500-30
export CDP_KNOX_VERSION=2.0.0.7.3.1.500-30
export CDP_OZONE_VERSION=1.4.0.7.3.1.500-30
export CDP_PARQUET_VERSION=1.12.3.7.3.1.500-30
export CDP_RANGER_VERSION=2.4.0.7.3.1.500-30
export CDP_TEZ_VERSION=0.9.1.7.3.1.500-30
# Ref: https://infra.apache.org/release-download-pages.html#closer
: ${APACHE_MIRROR:="https://www.apache.org/dyn/closer.cgi"}

View File

@@ -448,6 +448,14 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
throw new UnsupportedOperationException("Reload event is not supported.");
}
/**
* CDP Hive-3 only function.
*/
public static AlterPartitionsInfo getFieldsFromAlterPartitionsEvent(
NotificationEvent event) throws MetastoreNotificationException {
throw new UnsupportedOperationException("AlterPartitions event is not supported.");
}
/**
* CDP Hive-3 only function.
*/

View File

@@ -79,6 +79,7 @@ import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
import org.apache.hadoop.hive.metastore.api.WriteNotificationLogBatchRequest;
import org.apache.hadoop.hive.metastore.api.WriteNotificationLogRequest;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.messaging.AlterPartitionsMessage;
import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
import org.apache.hadoop.hive.metastore.messaging.CommitCompactionMessage;
@@ -606,6 +607,39 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
return updatedFields;
}
/**
* This method extracts the table, partitions, and isTruncateOp fields from the
* notification event and returns them in a AlterPartitionsInfo class object.
*
* @param event Metastore notification event,
* @return a AlterPartitionsInfo class object required for the reload event.
*/
public static AlterPartitionsInfo getFieldsFromAlterPartitionsEvent(
NotificationEvent event) throws MetastoreNotificationException{
Preconditions.checkNotNull(event.getMessage());
AlterPartitionsMessage alterPartitionsMessage =
MetastoreEventsProcessor.getMessageDeserializer()
.getAlterPartitionsMessage(event.getMessage());
AlterPartitionsInfo alterPartitionsInfo = null;
try {
Iterator<Partition> partitionsIterator = Preconditions.checkNotNull(
alterPartitionsMessage.getPartitionObjs().iterator());
List<org.apache.hadoop.hive.metastore.api.Partition> partitionsAfter =
new ArrayList<>();
while (partitionsIterator.hasNext()) {
partitionsAfter.add(partitionsIterator.next());
}
org.apache.hadoop.hive.metastore.api.Table msTbl = Preconditions.checkNotNull(
alterPartitionsMessage.getTableObj());
boolean isTruncateOp = alterPartitionsMessage.getIsTruncateOp();
alterPartitionsInfo = new AlterPartitionsInfo(msTbl, partitionsAfter,
isTruncateOp);
} catch (Exception e) {
throw new MetastoreNotificationException(e);
}
return alterPartitionsInfo;
}
/**
* This method extracts the partition name field from the
* notification event and returns it in the form of string.

View File

@@ -842,4 +842,43 @@ public class Hive3MetastoreShimBase {
public static boolean validateColumnName(String name) {
return MetaStoreUtils.validateColumnName(name);
}
/**
* Constructs a new AlterPartitionsInfo object.
*/
public static class AlterPartitionsInfo {
private final org.apache.hadoop.hive.metastore.api.Table msTable;
private final List<org.apache.hadoop.hive.metastore.api.Partition> partitions;
private final boolean isTruncate;
public AlterPartitionsInfo(org.apache.hadoop.hive.metastore.api.Table msTable,
List<org.apache.hadoop.hive.metastore.api.Partition> partitions,
boolean isTruncate) {
this.msTable = msTable;
this.partitions = partitions;
this.isTruncate = isTruncate;
}
/**
* Returns the Thrift representation of the table.
*/
public org.apache.hadoop.hive.metastore.api.Table getMsTable() {
return msTable;
}
/**
* Returns the list of Thrift partition objects affected by the reload.
* Can be null or empty if not applicable.
*/
public List<org.apache.hadoop.hive.metastore.api.Partition> getPartitions() {
return partitions;
}
/**
* Returns true if the reload operation was due to a TRUNCATE TABLE.
*/
public boolean isTruncate() {
return isTruncate;
}
}
}

View File

@@ -28,6 +28,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Map;
@@ -128,6 +129,7 @@ public class MetastoreEvents {
DROP_DATABASE("DROP_DATABASE"),
ALTER_DATABASE("ALTER_DATABASE"),
ADD_PARTITION("ADD_PARTITION"),
ALTER_BATCH_PARTITIONS("ALTER_BATCH_PARTITIONS"),
ALTER_PARTITION("ALTER_PARTITION"),
ALTER_PARTITIONS("ALTER_PARTITIONS"),
DROP_PARTITION("DROP_PARTITION"),
@@ -227,6 +229,8 @@ public class MetastoreEvents {
return new DropPartitionEvent(catalogOpExecutor_, metrics, event);
case ALTER_PARTITION:
return new AlterPartitionEvent(catalogOpExecutor_, metrics, event);
case ALTER_PARTITIONS:
return new AlterPartitionsEvent(catalogOpExecutor_, metrics, event);
case RELOAD:
return new ReloadEvent(catalogOpExecutor_, metrics, event);
case INSERT:
@@ -1367,6 +1371,52 @@ public class MetastoreEvents {
return false;
}
protected void processAlterPartitionEvent(boolean isTruncateOp,
List<org.apache.hadoop.hive.metastore.api.Partition> partitionsAfter)
throws CatalogException, MetastoreNotificationException {
// Reload the whole table if it's a transactional table or materialized view.
// Materialized views are treated as a special case because it's possible to
// receive partition event on MVs, but they are regular views in Impala. That
// cause problems on the reloading partition logic which expects it to be a
// HdfsTable.
if (AcidUtils.isTransactionalTable(msTbl_.getParameters())
|| MetaStoreUtils.isMaterializedViewTable(msTbl_)) {
reloadTransactionalTable(partitionsAfter);
} else {
try {
// load file metadata only if storage descriptor of partitionAfter_ differs
// from sd of HdfsPartition. If the alter_partition event type is of truncate
// then force load the file metadata.
FileMetadataLoadOpts fileMetadataLoadOpts =
isTruncateOp ? FileMetadataLoadOpts.FORCE_LOAD :
FileMetadataLoadOpts.LOAD_IF_SD_CHANGED;
reloadPartitions(partitionsAfter, fileMetadataLoadOpts, getEventDesc(),
false);
} catch (CatalogException e) {
throw new MetastoreNotificationException(
debugString("Refresh partitions on table {} failed. Event " +
"processing cannot continue. Issue an invalidate command to reset " +
"the event processor state.", getFullyQualifiedTblName(), e));
}
}
}
protected void reloadTransactionalTable(
List<org.apache.hadoop.hive.metastore.api.Partition> partitionsAfter)
throws CatalogException {
boolean incrementalRefresh =
BackendConfig.INSTANCE.getHMSEventIncrementalRefreshTransactionalTable();
if (incrementalRefresh) {
reloadPartitionsFromEvent(partitionsAfter, getEventDesc()
+ " FOR TRANSACTIONAL TABLE");
} else {
boolean notSkipped = reloadTableFromCatalog(true);
if (!notSkipped) {
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
}
}
}
@Override
protected void process() throws MetastoreNotificationException, CatalogException {
Timer.Context context = null;
@@ -2657,7 +2707,7 @@ public class MetastoreEvents {
@Override
protected MetastoreEventType getBatchEventType() {
return MetastoreEventType.ALTER_PARTITIONS;
return MetastoreEventType.ALTER_BATCH_PARTITIONS;
}
@Override
@@ -2721,32 +2771,7 @@ public class MetastoreEvents {
getEventId());
return;
}
// Reload the whole table if it's a transactional table or materialized view.
// Materialized views are treated as a special case because it's possible to
// receive partition event on MVs, but they are regular views in Impala. That
// cause problems on the reloading partition logic which expects it to be a
// HdfsTable.
if (AcidUtils.isTransactionalTable(msTbl_.getParameters())
|| MetaStoreUtils.isMaterializedViewTable(msTbl_)) {
reloadTransactionalTable();
} else {
// Refresh the partition that was altered.
Preconditions.checkNotNull(partitionAfter_);
try {
// load file metadata only if storage descriptor of partitionAfter_ differs
// from sd of HdfsPartition. If the alter_partition event type is of truncate
// then force load the file metadata.
FileMetadataLoadOpts fileMetadataLoadOpts =
isTruncateOp_ ? FileMetadataLoadOpts.FORCE_LOAD :
FileMetadataLoadOpts.LOAD_IF_SD_CHANGED;
reloadPartitions(Arrays.asList(partitionAfter_), fileMetadataLoadOpts,
getEventDesc(), false);
} catch (CatalogException e) {
throw new MetastoreNotificationNeedsInvalidateException(
debugString("Refresh partition on table {} partition {} failed.",
getFullyQualifiedTblName(), partName_), e);
}
}
processAlterPartitionEvent(isTruncateOp_, Arrays.asList(partitionAfter_));
}
@Override
@@ -2775,20 +2800,63 @@ public class MetastoreEvents {
Arrays.asList(getTPartitionSpecFromHmsPartition(msTbl_, partitionAfter_)),
partitionAfter_.getParameters());
}
}
private void reloadTransactionalTable() throws CatalogException {
boolean incrementalRefresh =
BackendConfig.INSTANCE.getHMSEventIncrementalRefreshTransactionalTable();
if (incrementalRefresh) {
reloadPartitionsFromEvent(Collections.singletonList(partitionAfter_),
getEventDesc() + " FOR TRANSACTIONAL TABLE");
} else {
boolean notSkipped = reloadTableFromCatalog(true);
if (!notSkipped) {
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
}
public static class AlterPartitionsEvent extends MetastoreTableEvent {
public static final String EVENT_TYPE = "ALTER_PARTITIONS";
// the list of partition objects of alter operation, as parsed from the
// NotificationEvent
private final List<org.apache.hadoop.hive.metastore.api.Partition> partitionsAfter_;
private final boolean isTruncateOp_;
/**
* Prevent instantiation from outside should use MetastoreEventFactory instead
*/
private AlterPartitionsEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
NotificationEvent event) throws MetastoreNotificationException {
super(catalogOpExecutor, metrics, event);
Preconditions.checkState(getEventType().equals(
MetastoreEventType.ALTER_PARTITIONS));
try {
MetastoreShim.AlterPartitionsInfo alterPartitionsInfo =
MetastoreShim.getFieldsFromAlterPartitionsEvent(event);
msTbl_ = alterPartitionsInfo.getMsTable();
partitionsAfter_ = alterPartitionsInfo.getPartitions();
isTruncateOp_ = alterPartitionsInfo.isTruncate();
} catch (Exception e) {
throw new MetastoreNotificationException(
debugString("Unable to parse the alter partition message"), e);
}
}
@Override
public void processTableEvent() throws MetastoreNotificationException,
CatalogException {
if (partitionsAfter_.isEmpty()) {
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
warnLog("Not processing the alter partitions event {} as no partitions are " +
"received in the event.", getEventId());
return;
}
if (isSelfEvent()) {
infoLog("Not processing the event as it is a self-event");
return;
}
if (isOlderEvent(partitionsAfter_.get(0))) {
infoLog("Not processing the alter partition event {} as it is an older event",
getEventId());
return;
}
processAlterPartitionEvent(isTruncateOp_, partitionsAfter_);
}
@Override
public SelfEventContext getSelfEventContext() {
return new SelfEventContext(dbName_, tblName_,
Arrays.asList(getTPartitionSpecFromHmsPartition(msTbl_,
partitionsAfter_.get(0))), partitionsAfter_.get(0).getParameters());
}
}
/**

View File

@@ -95,6 +95,11 @@ elif VARIANT == 'housekeeping_on':
CONFIG.update({
'hive.metastore.housekeeping.threads.on': 'true',
})
elif VARIANT == 'events_config_change':
# HMS config change needed for HIVE-27746 to emit ALTER_PARTITIONS event
CONFIG.update({
'hive.metastore.alterPartitions.notification.v2.enabled': 'true',
})
# HBase-related configs.
# Impala processes need to connect to zookeeper on INTERNAL_LISTEN_HOST for HBase.

View File

@@ -39,6 +39,8 @@ from tests.util.iceberg_util import IcebergCatalogs
HIVE_SITE_HOUSEKEEPING_ON =\
getenv('IMPALA_HOME') + '/fe/src/test/resources/hive-site-housekeeping-on'
HIVE_SITE_ALTER_PARTITIONS_EVENT =\
getenv('IMPALA_HOME') + '/fe/src/test/resources/hive-site-events-config'
TRUNCATE_TBL_STMT = 'truncate table'
# The statestore heartbeat and topic update frequency (ms). Set low for testing.
STATESTORE_RPC_FREQUENCY_MS = 100
@@ -1694,6 +1696,67 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
self.client.execute("""DROP DATABASE {} CASCADE""".format(unique_database))
self.client.execute("""CREATE DATABASE {}""".format(unique_database))
@SkipIf.is_test_jdk
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=1 "
"--debug_actions=catalogd_event_processing_delay:SLEEP@1000",
hive_conf_dir=HIVE_SITE_ALTER_PARTITIONS_EVENT)
def test_alter_partitions_event_from_metastore(self, unique_database):
tbl = unique_database + ".test_alter_partitions"
self.client.execute("create table {} (id int) partitioned by (year int)"
.format(tbl))
def _verify_alter_partitions_event(events):
event_found = False
for event in events:
if event.eventType == "ALTER_PARTITIONS":
event_found = True
else:
logging.debug("Found " + str(event))
return event_found
# Verify that test always generates single ALTER_PARTITIONS event
self.client.execute(
"insert into {} partition(year) values (0,2024), (1,2023), (2,2022)"
.format(tbl))
EventProcessorUtils.wait_for_event_processing(self, 10)
# Case-I: compute stats from hive
parts_refreshed_before = EventProcessorUtils.get_int_metric("partitions-refreshed")
batch_events_before = EventProcessorUtils.get_int_metric("batch-events-created")
self.run_stmt_in_hive("analyze table {} compute statistics".format(tbl))
EventProcessorUtils.wait_for_event_processing(self, 10)
batch_events_after = EventProcessorUtils.get_int_metric("batch-events-created")
parts_refreshed_after = EventProcessorUtils.get_int_metric("partitions-refreshed")
assert batch_events_after == batch_events_before # verify there are no new batches
assert parts_refreshed_after == parts_refreshed_before + 3
# Case-II: compute stats from impala
last_event_id = EventProcessorUtils.get_current_notification_id(self.hive_client)
self.client.execute("compute stats {}".format(tbl))
events_skipped_before = EventProcessorUtils.get_int_metric('events-skipped', 0)
EventProcessorUtils.wait_for_event_processing(self, 10)
events = EventProcessorUtils.get_next_notification(self.hive_client, last_event_id)
# There will be COMMIT_TXN, ALLOC_WRITE_ID_EVENT, ALTER_PARTITIONS in any order
events_skipped_after = EventProcessorUtils.get_int_metric('events-skipped', 0)
assert _verify_alter_partitions_event(events)
assert events_skipped_after > events_skipped_before
# Case-III: truncate table from Impala
last_event_id = EventProcessorUtils.get_current_notification_id(self.hive_client)
self.client.execute("truncate table {}".format(tbl))
events_skipped_before = EventProcessorUtils.get_int_metric('events-skipped', 0)
EventProcessorUtils.wait_for_event_processing(self, 10)
events = EventProcessorUtils.get_next_notification(self.hive_client, last_event_id)
events_skipped_after = EventProcessorUtils.get_int_metric('events-skipped', 0)
assert _verify_alter_partitions_event(events)
assert events_skipped_after > events_skipped_before
# Case-IV: Truncate table from Hive is currently generating single alter_partition
# events. HIVE-28668 will address it.
@SkipIfFS.hive
class TestEventProcessingWithImpala(TestEventProcessingCustomConfigsBase):