IMPALA-14330: set a valid createEventId in global INVALIDATE METADATA

In global INVALIDATE METADATA (catalog reset), catalogd creates
IncompleteTable for all the known table names. However, the
createEventId is uninitialized so remain as -1. Tables could be dropped
unintentionally by stale DropTable or AlterTableRename events.

Ideally when catalogd creates an IncompleteTable during reset(), it
should fetch the latest event on that table and use its event id as the
createEventId. However, fetching such event ids for all tables is
impractical to finish in a reasonable time. It also adds a significant
load on HMS.

As a compromise, this patch uses the current event id when the reset()
operation starts, and sets it to all IncompleteTable objects created in
this reset operation. This is enough to handle self CreateTable /
DropTable / AlterTableRename events since such self-events generated
before that id will be skipped. Such self-events generated after that id
are triggered by concurrent DDLs which will wait until the corresponding
table list is updated in reset(). The DDL will also update createEventId
to skip stale DropTable / AlterTableRename events.

Concurrent CreateTable DDLs could set a stale createEventId if their HMS
operation finish before reset() and their catalog operations finish
after reset() creates the table. To address this, we add a check in
setCreateEventId() to skip stale event ids.

The current event id of reset() is also used in DeleteEventLog to track
tables removed by this operation.

Refactored IncompleteTable.createUninitializedTable() to force passing a
createEventId as a parameter.

To ease debugging, adds logs when a table is added/removed in HMS events
processing. Also adds logs when the catalog version of a table changes
and adds logs when start processing a rename event.

This patch also refactors CatalogOpExecutor.alterTableOrViewRename() by
extracting some codes into methods. A race issue is identified and fixed
that DeleteEventLog should be updated before renameTable() updates the
catalog cache so the removed old table won't be added back by
concurrently processing of a stale CREATE_TABLE event.

_run_ddls_with_invalidation in test_concurrent_ddls.py could still fail
with timeout when running with sync_ddl=true. The reason is when the DDL
hits IMPALA-9135 and hangs, it needs catalogd to send new catalog
updates to reach the max waiting attempts (see waitForSyncDdlVersion()).
However, if all other concurrent threads already finish, there won't be
any new catalog updates so the DDL will wait forever and finally result
in the test timed out. To workaround this, this patch adds another
concurrent thread that keeps creating new tables until the test finish.

Tests:
 - Ran the following tests in test_concurrent_ddls.py 10 rounds. Each
   round takes 11 mins.
   - test_ddls_with_invalidate_metadata
   - test_ddls_with_invalidate_metadata_sync_ddl
   - test_mixed_catalog_ddls_with_invalidate_metadata
   - test_mixed_catalog_ddls_with_invalidate_metadata_sync_ddl
   - test_local_catalog_ddls_with_invalidate_metadata
   - test_local_catalog_ddls_with_invalidate_metadata_sync_ddl
   - test_local_catalog_ddls_with_invalidate_metadata_unlock_gap

Change-Id: I6506821dedf7701cdfa58d14cae5760ee178c4ec
Reviewed-on: http://gerrit.cloudera.org:8080/23346
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
stiga-huang
2025-08-25 18:29:07 +08:00
committed by Impala Public Jenkins
parent 28cff4022d
commit 6f3deabb9d
12 changed files with 157 additions and 62 deletions

View File

@@ -77,6 +77,7 @@ import org.apache.impala.authorization.AuthorizationPolicy;
import org.apache.impala.catalog.CatalogResetManager.PrefetchedDatabaseObjects;
import org.apache.impala.catalog.FeFsTable.Utils;
import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
import org.apache.impala.catalog.events.DeleteEventLog;
import org.apache.impala.catalog.events.ExternalEventsProcessor;
import org.apache.impala.catalog.events.MetastoreEvents.EventFactoryForSyncToLatestEvent;
import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventFactory;
@@ -2291,9 +2292,12 @@ public class CatalogServiceCatalog extends Catalog {
* Returns the invalidated 'Db' object along with list of tables to be loaded by
* the TableLoadingMgr. Returns null if the method encounters an exception during
* invalidation.
* 'currentEventId' is set as createEventId of all tables under this db. Also used to
* track removed tables in EventDeleteLog.
*/
private Pair<Db, List<TTableName>> invalidateDb(String dbName, Db existingDb,
PrefetchedDatabaseObjects prefetchedObjects, EventSequence catalogTimeline) {
PrefetchedDatabaseObjects prefetchedObjects, EventSequence catalogTimeline,
long currentEventId) {
try {
Db newDb = new Db(dbName, prefetchedObjects.getMsDb());
// existingDb is usually null when the Catalog loads for the first time.
@@ -2323,7 +2327,7 @@ public class CatalogServiceCatalog extends Catalog {
LOG.trace("Get {}", tblMeta);
Table incompleteTbl = IncompleteTable.createUninitializedTable(newDb, tableName,
MetastoreShim.mapToInternalTableType(tblMeta.getTableType()),
tblMeta.getComments());
tblMeta.getComments(), currentEventId);
incompleteTbl.setCatalogVersion(incrementAndGetCatalogVersion());
newDb.addTable(incompleteTbl);
++numTables;
@@ -2362,6 +2366,11 @@ public class CatalogServiceCatalog extends Catalog {
existingDb, removedTableName);
removedTable.setCatalogVersion(incrementAndGetCatalogVersion());
deleteLog_.addRemovedObject(removedTable.toTCatalogObject());
if (isEventProcessingEnabled()) {
metastoreEventProcessor_.getDeleteEventLog().addRemovedObject(
currentEventId, DeleteEventLog.getTblKey(
existingDb.getName(), removedTableName));
}
}
}
return Pair.create(newDb, tblsToBackgroundLoad);
@@ -2458,7 +2467,8 @@ public class CatalogServiceCatalog extends Catalog {
allHmsDbs = msClient.getHiveClient().getAllDatabases();
catalogTimeline.markEvent("Got database list");
}
rebuildDbCache(allHmsDbs, unlockedTimer, catalogTimeline, isSyncDdl);
rebuildDbCache(allHmsDbs, unlockedTimer, catalogTimeline, isSyncDdl,
currentEventId);
scheduleWarmupTables();
catalogTimeline.markEvent("Updated catalog cache");
} catch (Exception e) {
@@ -2494,7 +2504,7 @@ public class CatalogServiceCatalog extends Catalog {
* Entering and exiting this method should hold versionLock_.writeLock().
*/
private void rebuildDbCache(List<String> allHmsDbs, Stopwatch unlockedTimer,
EventSequence catalogTimeline, boolean isSyncDdl) {
EventSequence catalogTimeline, boolean isSyncDdl, long currentEventId) {
Preconditions.checkState(versionLock_.writeLock().isHeldByCurrentThread());
// Not all Java UDFs are persisted to the metastore. The ones which aren't
@@ -2539,7 +2549,8 @@ public class CatalogServiceCatalog extends Catalog {
// null.
PrefetchedDatabaseObjects hmsObjects = resettingDbPair.second.get();
Db oldDb = dbCache_.get(dbName);
invalidatedDb = invalidateDb(dbName, oldDb, hmsObjects, catalogTimeline);
invalidatedDb = invalidateDb(dbName, oldDb, hmsObjects, catalogTimeline,
currentEventId);
} catch (Exception e) {
LOG.warn("Error fetching HMS objects for database " + dbName, e);
}
@@ -2711,9 +2722,8 @@ public class CatalogServiceCatalog extends Catalog {
deleteLog_.addRemovedObject(existingTbl.toMinimalTCatalogObject());
}
Table incompleteTable = IncompleteTable.createUninitializedTable(
db, tblName, tblType, tblComment);
db, tblName, tblType, tblComment, createEventId);
incompleteTable.setCatalogVersion(incrementAndGetCatalogVersion());
incompleteTable.setCreateEventId(createEventId);
db.addTable(incompleteTable);
return db.getTable(tblName);
} finally {
@@ -3045,9 +3055,10 @@ public class CatalogServiceCatalog extends Catalog {
* 2. null, T_new: Invalid configuration
* 3. T_old, null: Old table was removed but new table was not added.
* 4. T_old, T_new: Old table was removed and new table was added.
* Updates 'createEventId' of the new table using 'alterEventId'.
*/
public Pair<Table, Table> renameTable(
TTableName oldTableName, TTableName newTableName) {
TTableName oldTableName, TTableName newTableName, long alterEventId) {
// Remove the old table name from the cache and add the new table.
Db db = getDb(oldTableName.getDb_name());
if (db == null) return Pair.create(null, null);
@@ -3059,10 +3070,16 @@ public class CatalogServiceCatalog extends Catalog {
Table oldTable =
removeTable(oldTableName.getDb_name(), oldTableName.getTable_name());
if (oldTable == null) return Pair.create(null, null);
if (alterEventId < oldTable.getCreateEventId()) {
// This is usually due to alterEventId = -1, e.g. failed to fetch and check
// HMS events in the callers. Fallback to use the original createEventId.
alterEventId = oldTable.getCreateEventId();
LOG.warn("Reusing original createEventId {} for table {}.{}", alterEventId,
newTableName.getDb_name(), newTableName.getTable_name());
}
return Pair.create(oldTable,
addIncompleteTable(newTableName.getDb_name(), newTableName.getTable_name(),
oldTable.getTableType(), oldTable.getTableComment(),
oldTable.getCreateEventId()));
oldTable.getTableType(), oldTable.getTableComment(), alterEventId));
} finally {
versionLock_.writeLock().unlock();
}
@@ -3377,9 +3394,9 @@ public class CatalogServiceCatalog extends Catalog {
Table existingTbl = db.getTable(tblName);
if (existingTbl == null) return null;
incompleteTable = IncompleteTable.createUninitializedTable(db, tblName,
existingTbl.getTableType(), existingTbl.getTableComment());
existingTbl.getTableType(), existingTbl.getTableComment(),
existingTbl.getCreateEventId());
incompleteTable.setCatalogVersion(incrementAndGetCatalogVersion());
incompleteTable.setCreateEventId(existingTbl.getCreateEventId());
db.addTable(incompleteTable);
} finally {
versionLock_.writeLock().unlock();

View File

@@ -530,6 +530,13 @@ public class Db extends CatalogObjectImpl implements FeDb {
}
}
@Override
public void setCatalogVersion(long newVersion) {
LOG.info("Setting the catalog version of Db@{} {} to {}",
Integer.toHexString(hashCode()), getName(), newVersion);
super.setCatalogVersion(newVersion);
}
@Override
protected void setTCatalogObject(TCatalogObject catalogObject) {
catalogObject.setDb(toThrift());

View File

@@ -3089,7 +3089,12 @@ public class HdfsTable extends Table implements FeFsTable {
pendingVersionNumber_, version);
versionToBeSet = pendingVersionNumber_;
}
LOG.trace("Setting the hdfs table {} version {}", getFullName(), versionToBeSet);
// Log the catalog version with table name and object id to ease debugging
// race issues. Object id (ClassName@HexHashCode) is used to identified if we have
// replaced the table object.
LOG.info("Setting the catalog version of {}@{} {} to {}",
getClass().getSimpleName(), Integer.toHexString(hashCode()),
getFullName(), versionToBeSet);
super.setCatalogVersion(versionToBeSet);
}
}

View File

@@ -190,8 +190,12 @@ public class IncompleteTable extends Table implements FeIncompleteTable {
}
public static IncompleteTable createUninitializedTable(Db db, String name,
TImpalaTableType tableType, String tableComment) {
return new IncompleteTable(db, name, tableType, tableComment, null);
TImpalaTableType tableType, String tableComment, long createEventId) {
IncompleteTable tbl = new IncompleteTable(db, name, tableType, tableComment, null);
// Use suppressLogging=true to avoid excessive logs for all tables during catalogd
// startup.
tbl.setCreateEventId(createEventId, true);
return tbl;
}
public static IncompleteTable createFailedMetadataLoadTable(Db db, String name,

View File

@@ -240,15 +240,21 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
// last synced id in full table reload
public long getCreateEventId() { return createEventId_; }
public void setCreateEventId(long eventId) {
// TODO: Add a preconditions check for eventId < lastSycnedEventId
public void setCreateEventId(long eventId, boolean suppressLogging) {
if (eventId < createEventId_) {
if (!suppressLogging) {
LOG.warn("Ignored stale createEventId: {}. Current id: {}",
eventId, createEventId_);
}
return;
}
createEventId_ = eventId;
LOG.debug("createEventId_ for table: {} set to: {}", getFullName(), createEventId_);
// TODO: Should we reset lastSyncedEvent Id if it is less than event Id?
// If we don't reset it - we may start syncing table from an event id which
// is less than create event id
if (!suppressLogging) {
LOG.debug("createEventId_ for table: {} set to: {}", getFullName(), createEventId_);
}
// Don't need to sync table from events older than create event id.
if (lastSyncedEventId_ < eventId) {
setLastSyncedEventId(eventId);
setLastSyncedEventId(eventId, suppressLogging);
}
}
@@ -257,9 +263,15 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
}
public void setLastSyncedEventId(long eventId) {
setLastSyncedEventId(eventId, false);
}
public void setLastSyncedEventId(long eventId, boolean suppressLogging) {
// TODO: Add a preconditions check for eventId >= createEventId_
LOG.debug("lastSyncedEventId_ for table: {} set from {} to {}", getFullName(),
lastSyncedEventId_, eventId);
if (!suppressLogging) {
LOG.debug("lastSyncedEventId_ for table: {} set from {} to {}", getFullName(),
lastSyncedEventId_, eventId);
}
lastSyncedEventId_ = eventId;
}
@@ -581,9 +593,13 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
// keep the legacy behavior as showing the table type as TABLE.
tblType = TImpalaTableType.TABLE;
}
// Use -1 as 'createEventId' since this is either in coordinator where no HMS events
// are being processed or imported from a COPY TESTCASE statement which shouldn't be
// used in production (it's used to examine metadata in dev env).
newTable =
IncompleteTable.createUninitializedTable(parentDb, thriftTable.getTbl_name(),
tblType, MetadataOp.getTableComment(thriftTable.getMetastore_table()));
tblType, MetadataOp.getTableComment(thriftTable.getMetastore_table()),
/*createEventId*/-1);
}
newTable.storedInImpaladCatalogCache_ = loadedInImpalad;
try {

View File

@@ -134,7 +134,7 @@ public class TableLoader {
"Unrecognized table type for table: " + fullTblName);
}
table.updateHMSLoadTableSchemaTime(hmsLoadTime);
table.setCreateEventId(eventId);
table.setCreateEventId(eventId, /*suppressLogging*/false);
long latestEventId = -1;
if (syncToLatestEventId) {
// acquire write lock on table since MetastoreEventProcessor.syncToLatestEventId

View File

@@ -1855,6 +1855,9 @@ public class MetastoreEvents {
private void processRename() throws CatalogException {
if (!isRename_) return;
infoLog("Processing rename from {}.{} to {}.{}",
tableBefore_.getDbName(), tableBefore_.getTableName(),
tableAfter_.getDbName(), tableAfter_.getTableName());
Reference<Boolean> oldTblRemoved = new Reference<>();
Reference<Boolean> newTblAdded = new Reference<>();
catalogOpExecutor_

View File

@@ -1352,19 +1352,18 @@ public class CatalogMetastoreServiceHandler extends MetastoreServiceHandler {
oldMsTable.getTableName());
TTableName newTTable = new TTableName(newMsTable.getDbName(),
newMsTable.getTableName());
// Update DeleteEventLog before we modify the catalog cache to avoid the table
// being added concurrently (by other events) during renameTable().
catalogOpExecutor_.addToDeleteEventLog(alterEvent.getEventId(),
DeleteEventLog.getTblKey(oldTTable.getDb_name(), oldTTable.getTable_name()));
// Rename the table in the Catalog and get the resulting catalog object.
// ALTER TABLE/VIEW RENAME is implemented as an ADD + DROP.
Pair<org.apache.impala.catalog.Table, org.apache.impala.catalog.Table> result =
catalog_.renameTable(oldTTable, newTTable);
catalog_.renameTable(oldTTable, newTTable, alterEvent.getEventId());
if (result == null || result.first == null || result.second == null) {
throw new CatalogException("failed to rename table " + oldTTable + " to " +
newTTable + " for " + apiName);
}
// first set the last synced event id to the alter table's event id
result.second.setLastSyncedEventId(alterEvent.getEventId());
result.second.setCreateEventId(alterEvent.getEventId());
catalogOpExecutor_.addToDeleteEventLog(alterEvent.getEventId(),
DeleteEventLog.getTblKey(oldTTable.getDb_name(), oldTTable.getTable_name()));
} finally {
catalog_.getLock().writeLock().unlock();
}

View File

@@ -3263,7 +3263,7 @@ public abstract class MetastoreServiceHandler extends AbstractThriftHiveMetastor
" to new table " + newDbName + "." + newTableName;
LOG.debug("Renaming " + tableInfo);
Pair<org.apache.impala.catalog.Table, org.apache.impala.catalog.Table> result =
catalog_.renameTable(oldTable, newTable);
catalog_.renameTable(oldTable, newTable, -1);
if (result == null || result.first == null || result.second == null) {
LOG.debug("Couldn't rename " + tableInfo);
} else {

View File

@@ -842,6 +842,8 @@ public class CatalogOpExecutor {
tblAddedLater.setRef(true);
return false;
}
LOG.debug("EventId: {} Removing table {}.{} since its create event id is {}",
eventId, dbName, tblName, tblToBeRemoved.getCreateEventId());
Table removedTbl = db.removeTable(tblToBeRemoved.getName());
removedTbl.setCatalogVersion(catalog_.incrementAndGetCatalogVersion());
catalog_.getDeleteLog().addRemovedObject(removedTbl.toMinimalTCatalogObject());
@@ -896,14 +898,15 @@ public class CatalogOpExecutor {
"EventId: {} Table was not added since it was removed later", eventId);
return false;
}
Table incompleteTable = IncompleteTable.createUninitializedTable(db, tblName,
MetastoreShim.mapToInternalTableType(msTbl.getTableType()),
MetadataOp.getTableComment(msTbl));
incompleteTable.setCatalogVersion(catalog_.incrementAndGetCatalogVersion());
// set the createEventId of the table to eventId since we are adding table
// due to the given eventId.
incompleteTable.setCreateEventId(eventId);
Table incompleteTable = IncompleteTable.createUninitializedTable(db, tblName,
MetastoreShim.mapToInternalTableType(msTbl.getTableType()),
MetadataOp.getTableComment(msTbl), eventId);
incompleteTable.setCatalogVersion(catalog_.incrementAndGetCatalogVersion());
db.addTable(incompleteTable);
LOG.debug("EventId: {} Added table {}. Catalog version: {}",
eventId, incompleteTable.getFullName(), incompleteTable.getCatalogVersion());
return true;
} finally {
getMetastoreDdlLock().unlock();
@@ -5809,18 +5812,36 @@ public class CatalogOpExecutor {
String.format(HMS_RPC_ERROR_FORMAT_STR, "alter_table"), e);
}
}
List<NotificationEvent> events = null;
// the alter table event is generated on the renamed table
events = getNextMetastoreEventsForTableIfEnabled(catalogTimeline, eventId,
msTbl.getDbName(), msTbl.getTableName(), AlterTableEvent.EVENT_TYPE);
Pair<Long, Pair<org.apache.hadoop.hive.metastore.api.Table,
org.apache.hadoop.hive.metastore.api.Table>> renamedTable =
getRenamedTableFromEvents(events);
eventId = trackAlterTableRenameEvent(tableName, newTableName, eventId,
catalogTimeline);
// Rename the table in the Catalog and get the resulting catalog object.
// ALTER TABLE/VIEW RENAME is implemented as an ADD + DROP.
Pair<Table, Table> result =
catalog_.renameTable(tableName.toThrift(), newTableName.toThrift());
catalog_.renameTable(tableName.toThrift(), newTableName.toThrift(), eventId);
Preconditions.checkNotNull(result);
Pair<TCatalogObject, TCatalogObject> objs = handleCatalogRenameResult(
oldTbl, newTableName, wantMinimalResult, result, eventId, catalogTimeline);
response.result.addToRemoved_catalog_objects(objs.first);
response.result.addToUpdated_catalog_objects(objs.second);
response.result.setVersion(objs.second.getCatalog_version());
addSummary(response, "Renaming was successful.");
}
/**
* Finds the ALTER_TABLE HMS event triggered by this rename operation and updates
* DeleteEventLog for it. Returns the event id.
*/
private long trackAlterTableRenameEvent(TableName oldTableName, TableName newTableName,
long startEventId, EventSequence catalogTimeline)
throws MetastoreNotificationException, CatalogException {
// the alter table event is generated on the renamed table
List<NotificationEvent> events = getNextMetastoreEventsForTableIfEnabled(
catalogTimeline, startEventId, newTableName.getDb(), newTableName.getTbl(),
AlterTableEvent.EVENT_TYPE);
Pair<Long, Pair<org.apache.hadoop.hive.metastore.api.Table,
org.apache.hadoop.hive.metastore.api.Table>> renamedTable =
getRenamedTableFromEvents(events);
long eventId = startEventId;
if (renamedTable != null) {
eventId = renamedTable.first;
LOG.info("Got ALTER_TABLE RENAME event id {}.", eventId);
@@ -5828,16 +5849,27 @@ public class CatalogOpExecutor {
// Using the eventId at the beginning of the operation is better than nothing.
LOG.warn("ALTER_TABLE RENAME event not found. Using {} in createEventId of {} " +
"and DeleteEventLog for {}.",
eventId, newTableName, tableName);
eventId, newTableName, oldTableName);
}
// Update DeleteEventLog before we modify the catalog cache to avoid the table being
// added concurrently (by other events) during renameTable().
if (catalog_.isEventProcessingEnabled()) {
addToDeleteEventLog(eventId, DeleteEventLog
.getTblKey(tableName.getDb(), tableName.getTbl()));
if (result.second != null) {
result.second.setCreateEventId(eventId);
}
.getTblKey(oldTableName.getDb(), oldTableName.getTbl()));
}
TCatalogObject oldTblDesc = null, newTblDesc = null;
return eventId;
}
/**
* Handles the rename results in catalog, including error handling for failures in
* bringing up the new table by implicitly invalidate the table.
* Returns TCatalogObject of the old and new tables.
*/
private Pair<TCatalogObject, TCatalogObject> handleCatalogRenameResult(
Table oldTbl, TableName newTableName, boolean wantMinimalResult,
Pair<Table, Table> result, long alterTableEventId, EventSequence catalogTimeline)
throws ImpalaRuntimeException {
TCatalogObject oldTblDesc, newTblDesc;
if (result.first == null) {
// The old table object has been removed by a concurrent operation, e.g. INVALIDATE
// METADATA <table>. Fetch the latest delete from deleteLog.
@@ -5848,8 +5880,8 @@ public class CatalogOpExecutor {
oldTblDesc.setCatalog_version(version);
} else {
LOG.warn("Deletion update on the old table {} not found. Impalad might still "
+ "have its metadata until the deletion update arrives from statestore.",
tableName);
+ "have its metadata until the deletion update arrives from statestore.",
oldTbl.getFullName());
}
} else {
oldTblDesc = wantMinimalResult ?
@@ -5859,7 +5891,7 @@ public class CatalogOpExecutor {
// The rename succeeded in HMS but failed in the catalog cache. The cache is in an
// inconsistent state, so invalidate the new table to reload it.
newTblDesc = catalog_.invalidateTable(newTableName.toThrift(),
new Reference<>(), new Reference<>(), catalogTimeline, eventId);
new Reference<>(), new Reference<>(), catalogTimeline, alterTableEventId);
if (newTblDesc == null) {
throw new ImpalaRuntimeException(String.format(
"The new table/view %s was concurrently removed during rename.",
@@ -5871,10 +5903,7 @@ public class CatalogOpExecutor {
newTblDesc = wantMinimalResult ?
result.second.toInvalidationObject() : result.second.toTCatalogObject();
}
response.result.addToRemoved_catalog_objects(oldTblDesc);
response.result.addToUpdated_catalog_objects(newTblDesc);
response.result.setVersion(newTblDesc.getCatalog_version());
addSummary(response, "Renaming was successful.");
return new Pair<>(oldTblDesc, newTblDesc);
}
/**
@@ -5886,7 +5915,7 @@ public class CatalogOpExecutor {
*/
public void addToDeleteEventLog(long eventId, String objectKey) {
if (!catalog_.isEventProcessingEnabled()) {
LOG.trace("Not adding event {}:{} since events processing is not active", eventId,
LOG.trace("Not adding event {}:{} since events processing is not enabled", eventId,
objectKey);
return;
}

View File

@@ -537,8 +537,8 @@ public class CatalogHmsSyncToLatestEventIdTest extends AbstractCatalogMetastoreT
createTableInHms(TEST_DB_NAME, tblName, true);
IncompleteTable tbl =
IncompleteTable.createUninitializedTable(catalog_.getDb(TEST_DB_NAME),
tblName, MetadataOp.getImpalaTableType(tableType_), null);
tbl.setCreateEventId(getLatestEventIdFromHMS());
tblName, MetadataOp.getImpalaTableType(tableType_), null,
getLatestEventIdFromHMS());
catalog_.addTable(catalog_.getDb(TEST_DB_NAME), tbl);
long prevLastSyncedEventId =
catalog_.getTable(TEST_DB_NAME, tblName).getLastSyncedEventId();

View File

@@ -174,12 +174,27 @@ class TestConcurrentDdls(CustomClusterTestSuite):
worker = [None] * (NUM_ITERS + 1)
for i in range(1, NUM_ITERS + 1):
worker[i] = pool.apply_async(run_ddls, (i,))
# INSERT with sync_ddl=true could hit IMPALA-9135 and hanging infinitely if there are
# no more catalog updates, e.g. all other threads have finished. This leads to
# timeout in this test. As a workaround, run a thread to keep creating new tables
# to trigger new catalog updates.
stop = False
if sync_ddl:
def create_tbls():
i = 0
while not stop:
tls.client.execute("create table {}.tmp_tbl{} (i int)".format(db, i))
time.sleep(10)
i += 1
pool.apply_async(create_tbls)
for i in range(1, NUM_ITERS + 1):
try:
worker[i].get(timeout=100)
except TimeoutError:
stop = True
dump_server_stacktraces()
assert False, "Timeout in thread run_ddls(%d)" % i
stop = True
@classmethod
def is_transient_error(cls, err):