IMPALA-14307: Correctly update createEventId and DeleteEventLog in AlterTableRename

When EventProcessor is paused, e.g. due to a global INVALIDATE METADATA
operation, in alterTableOrViewRename() we don't fetch the event id of
the ALTER_TABLE event. This causes the createEventId of the new table
being -1 and the DeleteEventLog entry of the old table is missing. So
stale ALTER_TABLE RENAME events could incorrectly remove the new table
or add the old table.

The other case is in the fallback invalidation added in IMPALA-13989
that handles rename failure inside catalog (but succeeds in HMS). The
createEventId is also set as -1.

This patch fixes these by always setting a correct/meaningful
createEventId. When fetching the ALTER_TABLE event fails, we try to use
the event id before the HMS operation. It could be a little bit stale
but much better than -1.

Modified CatalogServiceCatalog#isEventProcessingActive() to just check
if event processing is enabled and renamed it to
isEventProcessingEnabled(). Note that this method is only used in DDLs
that check their self events. We should allow these checks even when
EventProcessor is not in the ACTIVE state. So when EventProcessor is
recovered, fields like createEventId in tables are still correct.

Removed the code of tracking in-flight events at the end of rename since
the new table is in unloaded state and only the createEventId is useful.
The catalog version used is also incorrect since it's not used in
CatalogServiceCatalog#renameTable() so it doesn't make sence to use it.
Removed the InProgressTableModification parameter of
alterTableOrViewRename() since it's not used anymore.

This patch also fixes a bug in getRenamedTableFromEvents() that it
always returns the first event id in the list. It should use the rename
event it finds.

Tests
 - Added e2e test and ran it 40 times.

Change-Id: Ie7c305e5aaafc8bbdb85830978182394619fad08
Reviewed-on: http://gerrit.cloudera.org:8080/23291
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
stiga-huang
2025-08-13 22:06:24 +08:00
committed by Impala Public Jenkins
parent 8053a68f39
commit a68f716458
5 changed files with 92 additions and 33 deletions

View File

@@ -601,10 +601,8 @@ public class CatalogServiceCatalog extends Catalog {
return metastoreEventProcessor_;
}
public boolean isEventProcessingActive() {
return metastoreEventProcessor_ instanceof MetastoreEventsProcessor
&& EventProcessorStatus.ACTIVE
.equals(((MetastoreEventsProcessor) metastoreEventProcessor_).getStatus());
public boolean isEventProcessingEnabled() {
return metastoreEventProcessor_ instanceof MetastoreEventsProcessor;
}
/**
@@ -1282,7 +1280,7 @@ public class CatalogServiceCatalog extends Catalog {
* @return true if given event information evaluates to a self-event, false otherwise
*/
public boolean evaluateSelfEvent(SelfEventContext ctx) throws CatalogException {
Preconditions.checkState(isEventProcessingActive(),
Preconditions.checkState(isEventProcessingEnabled(),
"Event processing should be enabled when calling this method");
boolean isInsertEvent = ctx.isInsertEventContext();
long versionNumber =
@@ -1416,7 +1414,7 @@ public class CatalogServiceCatalog extends Catalog {
*/
public boolean addVersionsForInflightEvents(
boolean isInsertEvent, Table tbl, long versionNumber) {
if (!isEventProcessingActive()) return false;
if (!isEventProcessingEnabled()) return false;
boolean added = tbl.addToVersionsForInflightEvents(isInsertEvent, versionNumber);
if (added) {
LOG.info("Added {} {} in table's {} in-flight events",
@@ -1435,7 +1433,7 @@ public class CatalogServiceCatalog extends Catalog {
* @return true if versionNumber is added to in-flight list. Otherwise, return false.
*/
public boolean addVersionsForInflightEvents(Db db, long versionNumber) {
if (!isEventProcessingActive()) return false;
if (!isEventProcessingEnabled()) return false;
boolean added = db.addToVersionsForInflightEvents(versionNumber);
if (added) {
LOG.info("Added catalog version {} in database's {} in-flight events",
@@ -3268,6 +3266,8 @@ public class CatalogServiceCatalog extends Catalog {
* removed from the catalog cache.
* Sets dbWasAdded to true if both a new database and table were added to the catalog
* cache.
* 'eventId' is used to update createEventId of the table which avoids the table being
* dropped in processing older events.
*/
public TCatalogObject invalidateTable(TTableName tableName,
Reference<Boolean> tblWasRemoved, Reference<Boolean> dbWasAdded,

View File

@@ -93,7 +93,7 @@ public class TableLoader {
msTbl = msClient.getHiveClient().getTable(db.getName(), tblName);
catalogTimeline.markEvent(FETCHED_HMS_TABLE);
}
if (eventId != -1 && catalog_.isEventProcessingActive()) {
if (eventId != -1 && catalog_.isEventProcessingEnabled()) {
// If the eventId is not -1 it means this table was likely created by Impala.
// However, since the load operation of the table can happen much later, it is
// possible that the table was recreated outside Impala and hence the eventId

View File

@@ -1821,9 +1821,13 @@ public class MetastoreEvents {
} else {
if (oldTblRemoved.getRef()) {
metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLES_REMOVED).inc();
infoLog("Removed table {}.{}", tableBefore_.getDbName(),
tableBefore_.getTableName());
}
if (newTblAdded.getRef()) {
metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLES_ADDED).inc();
infoLog("Added table {}.{}", tableAfter_.getDbName(),
tableAfter_.getTableName());
}
}
}

View File

@@ -1162,7 +1162,7 @@ public class CatalogOpExecutor {
* parameters. No-op if event processing is disabled.
*/
private void addCatalogServiceIdentifiersToTable() {
if (!catalog_.isEventProcessingActive()) return;
if (!catalog_.isEventProcessingEnabled()) return;
org.apache.hadoop.hive.metastore.api.Table msTbl = table_.getMetaStoreTable();
msTbl.putToParameters(MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(),
catalog_.getCatalogServiceId());
@@ -1236,7 +1236,7 @@ public class CatalogOpExecutor {
|| params.getAlter_type() == TAlterTableType.RENAME_TABLE) {
alterTableOrViewRename(tbl,
TableName.fromThrift(params.getRename_params().getNew_table_name()),
modification, wantMinimalResult, response, catalogTimeline, debugAction);
wantMinimalResult, response, catalogTimeline, debugAction);
modification.validateInProgressModificationComplete();
return;
}
@@ -1938,7 +1938,7 @@ public class CatalogOpExecutor {
private void addCatalogServiceIdentifiers(
org.apache.hadoop.hive.metastore.api.Table msTbl, String catalogServiceId,
long catalogVersion) {
if (!catalog_.isEventProcessingActive()) return;
if (!catalog_.isEventProcessingEnabled()) return;
msTbl.putToParameters(
MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(),
catalogServiceId);
@@ -2395,7 +2395,7 @@ public class CatalogOpExecutor {
private List<NotificationEvent> getNextMetastoreEventsForTableIfEnabled(
EventSequence catalogTimeline, long eventId, String dbName, String tblName,
String eventType) throws MetastoreNotificationException {
if (!catalog_.isEventProcessingActive()) return Collections.emptyList();
if (!catalog_.isEventProcessingEnabled()) return Collections.emptyList();
List<NotificationEvent> events = MetastoreEventsProcessor
.getNextMetastoreEventsInBatchesForTable(catalog_, eventId, dbName, tblName,
eventType);
@@ -2412,7 +2412,7 @@ public class CatalogOpExecutor {
private List<NotificationEvent> getNextMetastoreEventsForDbIfEnabled(
EventSequence catalogTimeline, long eventId, String dbName, String eventType)
throws MetastoreNotificationException {
if (!catalog_.isEventProcessingActive()) return Collections.emptyList();
if (!catalog_.isEventProcessingEnabled()) return Collections.emptyList();
List<NotificationEvent> events = MetastoreEventsProcessor
.getNextMetastoreEventsInBatchesForDb(catalog_, eventId, dbName, eventType);
catalogTimeline.markEvent(FETCHED_HMS_EVENT_BATCH);
@@ -2426,7 +2426,7 @@ public class CatalogOpExecutor {
private List<NotificationEvent> getNextMetastoreDropEventsForDbIfEnabled(
EventSequence catalogTimeline, long eventId, String dbName)
throws MetastoreNotificationException {
if (!catalog_.isEventProcessingActive()) return Collections.emptyList();
if (!catalog_.isEventProcessingEnabled()) return Collections.emptyList();
List<String> eventTypes = Lists.newArrayList(
DropDatabaseEvent.EVENT_TYPE, DropTableEvent.EVENT_TYPE);
NotificationFilter filter = e -> dbName.equalsIgnoreCase(e.getDbName())
@@ -2527,8 +2527,10 @@ public class CatalogOpExecutor {
.getMetastoreEventProcessor().getEventsFactory().get(notificationEvent, null);
Preconditions.checkState(event instanceof AlterTableEvent);
AlterTableEvent alterEvent = (AlterTableEvent) event;
// Skip other alter events in case there are concurrent modification from other
// engines.
if (!alterEvent.isRename()) continue;
return new Pair<>(events.get(0).getEventId(),
return new Pair<>(alterEvent.getEventId(),
new Pair<>(alterEvent.getBeforeTable(), alterEvent.getAfterTable()));
} catch (MetastoreNotificationException e) {
throw new CatalogException("Unable to create a metastore event", e);
@@ -5754,8 +5756,7 @@ public class CatalogOpExecutor {
* reloaded on the next access.
*/
private void alterTableOrViewRename(Table oldTbl, TableName newTableName,
InProgressTableModification modification, boolean wantMinimalResult,
TDdlExecResponse response, EventSequence catalogTimeline,
boolean wantMinimalResult, TDdlExecResponse response, EventSequence catalogTimeline,
@Nullable String debugAction) throws ImpalaException {
Preconditions.checkState(oldTbl.isWriteLockedByCurrentThread());
TableName tableName = oldTbl.getTableName();
@@ -5763,10 +5764,14 @@ public class CatalogOpExecutor {
oldTbl.getMetaStoreTable().deepCopy();
msTbl.setDbName(newTableName.getDb());
msTbl.setTableName(newTableName.getTbl());
// Gets the latest event id before we trigger the alter_table HMS RPC. We then use
// this id to find the triggered ALTER_TABLE event for self-event detection based on
// createEventId and EventDeleteLog.
long eventId = -1;
try (MetaStoreClient msClient = catalog_.getMetaStoreClient(catalogTimeline)) {
eventId = getCurrentEventId(msClient);
catalogTimeline.markEvent(FETCHED_LATEST_HMS_EVENT_ID + eventId);
if (catalog_.isEventProcessingEnabled()) {
try (MetaStoreClient msClient = catalog_.getMetaStoreClient(catalogTimeline)) {
eventId = getCurrentEventId(msClient, catalogTimeline);
}
}
// If oldTbl is a synchronized Kudu table, rename the underlying Kudu table.
boolean isSynchronizedKuduTable = (oldTbl instanceof KuduTable) &&
@@ -5817,11 +5822,19 @@ public class CatalogOpExecutor {
catalog_.renameTable(tableName.toThrift(), newTableName.toThrift());
Preconditions.checkNotNull(result);
if (renamedTable != null) {
org.apache.hadoop.hive.metastore.api.Table tblBefore = renamedTable.second.first;
addToDeleteEventLog(renamedTable.first, DeleteEventLog
.getTblKey(tblBefore.getDbName(), tblBefore.getTableName()));
eventId = renamedTable.first;
LOG.info("Got ALTER_TABLE RENAME event id {}.", eventId);
} else if (catalog_.isEventProcessingEnabled()) {
// Using the eventId at the beginning of the operation is better than nothing.
LOG.warn("ALTER_TABLE RENAME event not found. Using {} in createEventId of {} " +
"and DeleteEventLog for {}.",
eventId, newTableName, tableName);
}
if (catalog_.isEventProcessingEnabled()) {
addToDeleteEventLog(eventId, DeleteEventLog
.getTblKey(tableName.getDb(), tableName.getTbl()));
if (result.second != null) {
result.second.setCreateEventId(renamedTable.first);
result.second.setCreateEventId(eventId);
}
}
TCatalogObject oldTblDesc = null, newTblDesc = null;
@@ -5846,18 +5859,15 @@ public class CatalogOpExecutor {
// The rename succeeded in HMS but failed in the catalog cache. The cache is in an
// inconsistent state, so invalidate the new table to reload it.
newTblDesc = catalog_.invalidateTable(newTableName.toThrift(),
new Reference<>(), new Reference<>(), catalogTimeline);
new Reference<>(), new Reference<>(), catalogTimeline, eventId);
if (newTblDesc == null) {
throw new ImpalaRuntimeException(String.format(
"The new table/view %s was concurrently removed during rename.",
newTableName));
}
LOG.info("Invalidated {} to recover from catalog rename failure", newTableName);
} else {
Preconditions.checkNotNull(result.first);
// TODO: call addVersionsForInflightEvents using InProgressTableModification object
// that is passed into catalog_.renameTable()
catalog_.addVersionsForInflightEvents(
false, result.second, modification.newVersionNumber());
newTblDesc = wantMinimalResult ?
result.second.toInvalidationObject() : result.second.toTCatalogObject();
}
@@ -5875,7 +5885,7 @@ public class CatalogOpExecutor {
* collected.
*/
public void addToDeleteEventLog(long eventId, String objectKey) {
if (!catalog_.isEventProcessingActive()) {
if (!catalog_.isEventProcessingEnabled()) {
LOG.trace("Not adding event {}:{} since events processing is not active", eventId,
objectKey);
return;
@@ -6591,7 +6601,7 @@ public class CatalogOpExecutor {
*/
private void addCatalogServiceIdentifiers(
org.apache.hadoop.hive.metastore.api.Table msTbl, Partition partition) {
if (!catalog_.isEventProcessingActive()) return;
if (!catalog_.isEventProcessingEnabled()) return;
Preconditions.checkState(msTbl.isSetParameters());
Preconditions.checkNotNull(partition, "Partition is null");
Map<String, String> tblParams = msTbl.getParameters();
@@ -6625,7 +6635,7 @@ public class CatalogOpExecutor {
*/
private void addToInflightVersionsOfPartition(
Map<String, String> partitionParams, HdfsPartition.Builder partBuilder) {
if (!catalog_.isEventProcessingActive()) return;
if (!catalog_.isEventProcessingEnabled()) return;
Preconditions.checkState(partitionParams != null);
String version = partitionParams
.get(MetastoreEventPropertyKey.CATALOG_VERSION.getKey());
@@ -8203,7 +8213,7 @@ public class CatalogOpExecutor {
*/
private void addCatalogServiceIdentifiers(
Database msDb, String catalogServiceId, long newCatalogVersion) {
if (!catalog_.isEventProcessingActive()) return;
if (!catalog_.isEventProcessingEnabled()) return;
Preconditions.checkNotNull(msDb);
msDb.putToParameters(MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(),
catalogServiceId);

View File

@@ -25,6 +25,7 @@ import threading
from tests.common.test_dimensions import (
create_single_exec_option_dimension,
add_mandatory_exec_option)
from tests.common.impala_cluster import ImpalaCluster
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import SkipIfFS, SkipIfHive2, SkipIfCatalogV2
from tests.common.test_vector import HS2
@@ -744,3 +745,47 @@ class TestEventSyncWaiting(ImpalaTestSuite):
t.close()
self.execute_query("drop database if exists {} cascade".format(db))
self.execute_query("drop database if exists {}_2 cascade".format(db))
class TestSelfRenameEvent(ImpalaTestSuite):
@pytest.mark.execute_serially
def test_self_rename_events(self, unique_database):
"""Regression test for IMPALA-14307"""
try:
catalogd = ImpalaCluster.get_e2e_test_cluster().catalogd
self.execute_query("create table {}.tbl_a(i int)".format(unique_database))
# Wait until the CREATE_DATABASE and CREATE_TABLE events are skipped.
EventProcessorUtils.wait_for_event_processing(self)
self.execute_query(
"alter table {0}.tbl_a rename to {0}.tbl_b".format(unique_database))
self.execute_query(":event_processor('pause')")
with self.create_impala_client() as alter_client:
version_after_create = catalogd.service.get_catalog_version()
alter_client.set_configuration(
{"debug_action": "catalogd_table_rename_delay:SLEEP@6000"})
alter_handle = alter_client.execute_async(
"alter table {0}.tbl_b rename to {0}.tbl_a".format(unique_database))
alter_client.wait_for_admission_control(alter_handle, timeout_s=10)
# Wait for at most 10 second until catalogd increase the version for rename
# operation. This indicates the rename starts.
start_time = time.time()
while (time.time() - start_time < 10.0
and catalogd.service.get_catalog_version() <= version_after_create):
time.sleep(0.05)
# Sleep to let catalogd sends the alter_table HMS RPC
time.sleep(1)
# Invalidate tbl_b to remove it in catalog
self.execute_query("invalidate metadata {}.tbl_b".format(unique_database))
alter_client.wait_for_finished_timeout(alter_handle, timeout=10)
alter_client.close_query(alter_handle)
# Resume event processing. The first ALTER_TABLE RENAME events should be skipped.
events_skipped_before = EventProcessorUtils.get_int_metric('events-skipped', 0)
self.execute_query(":event_processor('start')")
EventProcessorUtils.wait_for_event_processing(self)
events_skipped_after = EventProcessorUtils.get_int_metric('events-skipped', 0)
assert events_skipped_after == events_skipped_before + 2
finally:
# Recover event processing to avoid impacting other tests
self.execute_query(":event_processor('start')")