IMPALA-14283: Invalidate the cache when served by a new catalogd

Before this patch, coordinator just invalidates the catalog cache when
witness the catalog service id changes in DDL/DML responses or
statestore catalog updates. This is enough in the legacy catalog mode
since these are the only ways that coordinator gets metadata from
catalogd. However, in local catalog mode, coordinator sends
getPartialCatalogObject requests to fetch metadata from catalogd. If the
request is now served by a new catalogd (e.g. due to HA failover),
coordinator should invalidate its catalog cache in case catalog version
overlaps on the same table and unintentionally reuse stale metadata.

To ensure performance, catalogServiceIdLock_ in CatalogdMetaProvider is
refactored to be a ReentrantReadWriteLock. Most of the usages on it just
need the read lock.

This patch also adds the catalog service id in the profile.

Tests:
 - Ran test_warmed_up_metadata_failover_catchup 50 times.
 - Ran FE tests: CatalogdMetaProviderTest and LocalCatalogTest.
 - Ran CORE tests

Change-Id: I751e43f5d594497a521313579defc5b179dc06ce
Reviewed-on: http://gerrit.cloudera.org:8080/23236
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
Tested-by: Quanlong Huang <huangquanlong@gmail.com>
This commit is contained in:
stiga-huang
2025-08-02 10:03:57 +08:00
committed by Quanlong Huang
parent 447c016ae1
commit aec7380b75
6 changed files with 90 additions and 35 deletions

View File

@@ -622,7 +622,8 @@ enum CatalogLookupStatus {
// TODO: Fix partition lookup logic to not do it with IDs.
PARTITION_NOT_FOUND,
DATA_SOURCE_NOT_FOUND,
VERSION_MISMATCH
VERSION_MISMATCH,
CATALOG_SERVICE_CHANGED
}
// RPC response for GetPartialCatalogObject.
@@ -646,6 +647,9 @@ struct TGetPartialCatalogObjectResponse {
// Loaded time in catalogd corresponding to 'object_version_number'.
9: optional i64 object_loaded_time_ms
// The CatalogService service ID this result came from.
10: optional Types.TUniqueId catalog_service_id
}

View File

@@ -4213,7 +4213,9 @@ public class CatalogServiceCatalog extends Catalog {
try (ThreadNameAnnotator tna = new ThreadNameAnnotator(
"Get Partial Catalog Object - " +
Catalog.toCatalogObjectKey(req.object_desc))) {
return doGetPartialCatalogObject(req, reason);
TGetPartialCatalogObjectResponse resp = doGetPartialCatalogObject(req, reason);
resp.setCatalog_service_id(JniCatalog.getServiceId());
return resp;
} finally {
partialObjectFetchAccess_.release();
}

View File

@@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import com.codahale.metrics.Histogram;
@@ -340,7 +341,8 @@ public class CatalogdMetaProvider implements MetaProvider {
*/
@GuardedBy("catalogServiceIdLock_")
private TUniqueId catalogServiceId_ = Catalog.INITIAL_CATALOG_SERVICE_ID;
private final Object catalogServiceIdLock_ = new Object();
private final ReentrantReadWriteLock catalogServiceIdLock_ =
new ReentrantReadWriteLock(true /*fair ordering*/);
/**
* Object that is used to synchronize on and signal when the catalog is ready for use.
@@ -400,6 +402,15 @@ public class CatalogdMetaProvider implements MetaProvider {
return "Catalogd (URI TODO)";
}
public TUniqueId getCatalogServiceId() {
catalogServiceIdLock_.readLock().lock();
try {
return catalogServiceId_;
} finally {
catalogServiceIdLock_.readLock().unlock();
}
}
public CacheStats getCacheStats() {
return cache_.stats();
}
@@ -469,6 +480,13 @@ public class CatalogdMetaProvider implements MetaProvider {
if (resp.isSetStatus() && resp.status.status_code != TErrorCode.OK) {
throw new TException(String.join("\n", resp.status.error_msgs));
}
if (resp.isSetCatalog_service_id()
&& witnessCatalogServiceId(resp.getCatalog_service_id())) {
throw new InconsistentMetadataFetchException(
CatalogLookupStatus.CATALOG_SERVICE_CHANGED,
String.format("Catalog service ID changed to %s",
PrintId(resp.getCatalog_service_id())));
}
// If we get a "not found" response, then we assume that this was a case of an
// inconsistent cache. For example, we might have cached the list of tables within
@@ -1521,13 +1539,16 @@ public class CatalogdMetaProvider implements MetaProvider {
// NOTE: the return value is ignored when this function is called by a DDL
// operation.
synchronized (catalogServiceIdLock_) {
catalogServiceIdLock_.readLock().lock();
try {
// Set catalog_object_version_lower_bound to lastResetCatalogVersion_ + 1. All
// catalog objects with catalog version <= lastResetCatalogVersion_ should have
// been invalidated. See more comments above the definition of
// lastResetCatalogVersion_.
return new TUpdateCatalogCacheResponse(catalogServiceId_,
lastResetCatalogVersion_.get() + 1, lastSeenCatalogVersion_.get());
} finally {
catalogServiceIdLock_.readLock().unlock();
}
}
@@ -1606,40 +1627,57 @@ public class CatalogdMetaProvider implements MetaProvider {
/**
* Witness a service ID received from the catalog. We can see the service IDs
* either from a DDL response (in which case the service ID is part of the RPC
* response object) or from a statestore topic update (in which case the service ID
* is part of the published CATALOG object).
* response object), or from a statestore topic update (in which case the service ID
* is part of the published CATALOG object), or from a GetPartialCatalogObject response.
*
* If we notice the service ID changed, we need to invalidate our cache.
* If we notice the service ID changed (except the initial startup), we need to
* invalidate our cache. Returns true in this case so callers can replan the query.
*/
private void witnessCatalogServiceId(TUniqueId serviceId) {
synchronized (catalogServiceIdLock_) {
if (!catalogServiceId_.equals(serviceId)) {
if (!catalogServiceId_.equals(Catalog.INITIAL_CATALOG_SERVICE_ID)) {
LOG.warn("Detected catalog service restart: service ID changed from " +
"{} to {}. Invalidating all cached metadata on this coordinator.",
PrintId(catalogServiceId_), PrintId(serviceId));
}
catalogServiceId_ = serviceId;
private boolean witnessCatalogServiceId(TUniqueId serviceId) {
// Fast path to check if the catalog service id changes.
catalogServiceIdLock_.readLock().lock();
try {
if (catalogServiceId_.equals(serviceId)) return false;
} finally {
catalogServiceIdLock_.readLock().unlock();
}
catalogServiceIdLock_.writeLock().lock();
try {
// Double-check the condition because another thread might have updated it
// while we were waiting for the write lock.
if (catalogServiceId_.equals(serviceId)) return false;
boolean needsReplan = true;
if (!catalogServiceId_.equals(Catalog.INITIAL_CATALOG_SERVICE_ID)) {
LOG.warn("Detected catalog service restart: service ID changed from " +
"{} to {}. Invalidating all cached metadata on this coordinator.",
PrintId(catalogServiceId_), PrintId(serviceId));
cache_.invalidateAll();
// Clear cached items from the previous catalogd instance. Otherwise, we'll
// ignore new updates from the new catalogd instance since they have lower
// versions.
hdfsCachePools_.clear();
// TODO(todd): we probably need to invalidate the auth policy too.
// we are probably better off detecting this at a higher level and
// reinstantiating the metaprovider entirely, similar to how ImpaladCatalog
// handles this.
// TODO(todd): slight race here: a concurrent request from the old catalog
// could theoretically be just about to write something back into the cache
// after we do the above invalidate. Maybe we would be better off replacing
// the whole cache object, or doing a soft barrier here to wait for any
// concurrent cache accessors to cycle out. Another option is to associate
// the catalog service ID as part of all of the cache keys.
//
// This is quite unlikely to be an issue in practice, so deferring it to later
// clean-up.
} else {
// Callers don't need replan if this is the first reset.
needsReplan = false;
}
catalogServiceId_ = serviceId;
// Clear cached items from the previous catalogd instance. Otherwise, we'll
// ignore new updates from the new catalogd instance since they have lower
// versions.
hdfsCachePools_.clear();
// TODO(todd): we probably need to invalidate the auth policy too.
// we are probably better off detecting this at a higher level and
// reinstantiating the metaprovider entirely, similar to how ImpaladCatalog
// handles this.
// TODO(todd): slight race here: a concurrent request from the old catalog
// could theoretically be just about to write something back into the cache
// after we do the above invalidate. Maybe we would be better off replacing
// the whole cache object, or doing a soft barrier here to wait for any
// concurrent cache accessors to cycle out. Another option is to associate
// the catalog service ID as part of all of the cache keys.
//
// This is quite unlikely to be an issue in practice, so deferring it to later
// clean-up.
return needsReplan;
} finally {
catalogServiceIdLock_.writeLock().unlock();
}
}

View File

@@ -81,9 +81,14 @@ public class LocalCatalog implements FeCatalog {
private Map<String, FeDb> dbs_ = new HashMap<>();
private Map<String, HdfsCachePool> hdfsCachePools_ = null;
private String nullPartitionKeyValue_;
// Catalog service id when MetaProvider is CatalogdMetaProvider
private TUniqueId catalogServiceId_ = Catalog.INITIAL_CATALOG_SERVICE_ID;
public LocalCatalog(MetaProvider metaProvider) {
metaProvider_ = Preconditions.checkNotNull(metaProvider);
if (metaProvider instanceof CatalogdMetaProvider) {
catalogServiceId_ = ((CatalogdMetaProvider) metaProvider).getCatalogServiceId();
}
}
public String getProviderURI() {
@@ -298,7 +303,7 @@ public class LocalCatalog implements FeCatalog {
@Override
public TUniqueId getCatalogServiceId() {
throw new UnsupportedOperationException("TODO");
return catalogServiceId_;
}
@Override

View File

@@ -2928,6 +2928,11 @@ public class Frontend {
new Date(t.getLastLoadedTimeMs()).toString()))
.collect(Collectors.joining("\n")));
// Add the catalog service id that shows where the metadata comes (usually from the
// current active catalogd).
FrontendProfile.getCurrent().addInfoString("Catalog Service ID",
PrintId(stmtTableCache.catalog.getCatalogServiceId()));
// Analyze and authorize stmt
AnalysisContext analysisCtx = new AnalysisContext(queryCtx, authzFactory_, timeline);
AnalysisResult analysisResult = analysisCtx.analyzeAndAuthorize(compilerFactory,

View File

@@ -343,6 +343,7 @@ class TestObservability(ImpalaTestSuite):
r'Frontend:',
r'Referenced Tables:',
r'Original Table Versions:',
r'Catalog Service ID:',
r'CatalogFetch.ColumnStats.Hits',
r'CatalogFetch.ColumnStats.Misses',
r'CatalogFetch.ColumnStats.Requests',