diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift index 51379682f..9ce17da09 100644 --- a/common/thrift/CatalogService.thrift +++ b/common/thrift/CatalogService.thrift @@ -622,7 +622,8 @@ enum CatalogLookupStatus { // TODO: Fix partition lookup logic to not do it with IDs. PARTITION_NOT_FOUND, DATA_SOURCE_NOT_FOUND, - VERSION_MISMATCH + VERSION_MISMATCH, + CATALOG_SERVICE_CHANGED } // RPC response for GetPartialCatalogObject. @@ -646,6 +647,9 @@ struct TGetPartialCatalogObjectResponse { // Loaded time in catalogd corresponding to 'object_version_number'. 9: optional i64 object_loaded_time_ms + + // The CatalogService service ID this result came from. + 10: optional Types.TUniqueId catalog_service_id } diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index a5fce4204..5b3f26fa2 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -4213,7 +4213,9 @@ public class CatalogServiceCatalog extends Catalog { try (ThreadNameAnnotator tna = new ThreadNameAnnotator( "Get Partial Catalog Object - " + Catalog.toCatalogObjectKey(req.object_desc))) { - return doGetPartialCatalogObject(req, reason); + TGetPartialCatalogObjectResponse resp = doGetPartialCatalogObject(req, reason); + resp.setCatalog_service_id(JniCatalog.getServiceId()); + return resp; } finally { partialObjectFetchAccess_.release(); } diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java index 89779d4b0..6734726bf 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java @@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import com.codahale.metrics.Histogram; @@ -340,7 +341,8 @@ public class CatalogdMetaProvider implements MetaProvider { */ @GuardedBy("catalogServiceIdLock_") private TUniqueId catalogServiceId_ = Catalog.INITIAL_CATALOG_SERVICE_ID; - private final Object catalogServiceIdLock_ = new Object(); + private final ReentrantReadWriteLock catalogServiceIdLock_ = + new ReentrantReadWriteLock(true /*fair ordering*/); /** * Object that is used to synchronize on and signal when the catalog is ready for use. @@ -400,6 +402,15 @@ public class CatalogdMetaProvider implements MetaProvider { return "Catalogd (URI TODO)"; } + public TUniqueId getCatalogServiceId() { + catalogServiceIdLock_.readLock().lock(); + try { + return catalogServiceId_; + } finally { + catalogServiceIdLock_.readLock().unlock(); + } + } + public CacheStats getCacheStats() { return cache_.stats(); } @@ -469,6 +480,13 @@ public class CatalogdMetaProvider implements MetaProvider { if (resp.isSetStatus() && resp.status.status_code != TErrorCode.OK) { throw new TException(String.join("\n", resp.status.error_msgs)); } + if (resp.isSetCatalog_service_id() + && witnessCatalogServiceId(resp.getCatalog_service_id())) { + throw new InconsistentMetadataFetchException( + CatalogLookupStatus.CATALOG_SERVICE_CHANGED, + String.format("Catalog service ID changed to %s", + PrintId(resp.getCatalog_service_id()))); + } // If we get a "not found" response, then we assume that this was a case of an // inconsistent cache. For example, we might have cached the list of tables within @@ -1521,13 +1539,16 @@ public class CatalogdMetaProvider implements MetaProvider { // NOTE: the return value is ignored when this function is called by a DDL // operation. - synchronized (catalogServiceIdLock_) { + catalogServiceIdLock_.readLock().lock(); + try { // Set catalog_object_version_lower_bound to lastResetCatalogVersion_ + 1. All // catalog objects with catalog version <= lastResetCatalogVersion_ should have // been invalidated. See more comments above the definition of // lastResetCatalogVersion_. return new TUpdateCatalogCacheResponse(catalogServiceId_, lastResetCatalogVersion_.get() + 1, lastSeenCatalogVersion_.get()); + } finally { + catalogServiceIdLock_.readLock().unlock(); } } @@ -1606,40 +1627,57 @@ public class CatalogdMetaProvider implements MetaProvider { /** * Witness a service ID received from the catalog. We can see the service IDs * either from a DDL response (in which case the service ID is part of the RPC - * response object) or from a statestore topic update (in which case the service ID - * is part of the published CATALOG object). + * response object), or from a statestore topic update (in which case the service ID + * is part of the published CATALOG object), or from a GetPartialCatalogObject response. * - * If we notice the service ID changed, we need to invalidate our cache. + * If we notice the service ID changed (except the initial startup), we need to + * invalidate our cache. Returns true in this case so callers can replan the query. */ - private void witnessCatalogServiceId(TUniqueId serviceId) { - synchronized (catalogServiceIdLock_) { - if (!catalogServiceId_.equals(serviceId)) { - if (!catalogServiceId_.equals(Catalog.INITIAL_CATALOG_SERVICE_ID)) { - LOG.warn("Detected catalog service restart: service ID changed from " + - "{} to {}. Invalidating all cached metadata on this coordinator.", - PrintId(catalogServiceId_), PrintId(serviceId)); - } - catalogServiceId_ = serviceId; + private boolean witnessCatalogServiceId(TUniqueId serviceId) { + // Fast path to check if the catalog service id changes. + catalogServiceIdLock_.readLock().lock(); + try { + if (catalogServiceId_.equals(serviceId)) return false; + } finally { + catalogServiceIdLock_.readLock().unlock(); + } + catalogServiceIdLock_.writeLock().lock(); + try { + // Double-check the condition because another thread might have updated it + // while we were waiting for the write lock. + if (catalogServiceId_.equals(serviceId)) return false; + boolean needsReplan = true; + if (!catalogServiceId_.equals(Catalog.INITIAL_CATALOG_SERVICE_ID)) { + LOG.warn("Detected catalog service restart: service ID changed from " + + "{} to {}. Invalidating all cached metadata on this coordinator.", + PrintId(catalogServiceId_), PrintId(serviceId)); cache_.invalidateAll(); - // Clear cached items from the previous catalogd instance. Otherwise, we'll - // ignore new updates from the new catalogd instance since they have lower - // versions. - hdfsCachePools_.clear(); - // TODO(todd): we probably need to invalidate the auth policy too. - // we are probably better off detecting this at a higher level and - // reinstantiating the metaprovider entirely, similar to how ImpaladCatalog - // handles this. - - // TODO(todd): slight race here: a concurrent request from the old catalog - // could theoretically be just about to write something back into the cache - // after we do the above invalidate. Maybe we would be better off replacing - // the whole cache object, or doing a soft barrier here to wait for any - // concurrent cache accessors to cycle out. Another option is to associate - // the catalog service ID as part of all of the cache keys. - // - // This is quite unlikely to be an issue in practice, so deferring it to later - // clean-up. + } else { + // Callers don't need replan if this is the first reset. + needsReplan = false; } + catalogServiceId_ = serviceId; + // Clear cached items from the previous catalogd instance. Otherwise, we'll + // ignore new updates from the new catalogd instance since they have lower + // versions. + hdfsCachePools_.clear(); + // TODO(todd): we probably need to invalidate the auth policy too. + // we are probably better off detecting this at a higher level and + // reinstantiating the metaprovider entirely, similar to how ImpaladCatalog + // handles this. + + // TODO(todd): slight race here: a concurrent request from the old catalog + // could theoretically be just about to write something back into the cache + // after we do the above invalidate. Maybe we would be better off replacing + // the whole cache object, or doing a soft barrier here to wait for any + // concurrent cache accessors to cycle out. Another option is to associate + // the catalog service ID as part of all of the cache keys. + // + // This is quite unlikely to be an issue in practice, so deferring it to later + // clean-up. + return needsReplan; + } finally { + catalogServiceIdLock_.writeLock().unlock(); } } diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java index 260da8810..b689d0edc 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java @@ -81,9 +81,14 @@ public class LocalCatalog implements FeCatalog { private Map dbs_ = new HashMap<>(); private Map hdfsCachePools_ = null; private String nullPartitionKeyValue_; + // Catalog service id when MetaProvider is CatalogdMetaProvider + private TUniqueId catalogServiceId_ = Catalog.INITIAL_CATALOG_SERVICE_ID; public LocalCatalog(MetaProvider metaProvider) { metaProvider_ = Preconditions.checkNotNull(metaProvider); + if (metaProvider instanceof CatalogdMetaProvider) { + catalogServiceId_ = ((CatalogdMetaProvider) metaProvider).getCatalogServiceId(); + } } public String getProviderURI() { @@ -298,7 +303,7 @@ public class LocalCatalog implements FeCatalog { @Override public TUniqueId getCatalogServiceId() { - throw new UnsupportedOperationException("TODO"); + return catalogServiceId_; } @Override diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index 0aefa8ea2..fad6f556f 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -2928,6 +2928,11 @@ public class Frontend { new Date(t.getLastLoadedTimeMs()).toString())) .collect(Collectors.joining("\n"))); + // Add the catalog service id that shows where the metadata comes (usually from the + // current active catalogd). + FrontendProfile.getCurrent().addInfoString("Catalog Service ID", + PrintId(stmtTableCache.catalog.getCatalogServiceId())); + // Analyze and authorize stmt AnalysisContext analysisCtx = new AnalysisContext(queryCtx, authzFactory_, timeline); AnalysisResult analysisResult = analysisCtx.analyzeAndAuthorize(compilerFactory, diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py index eb1533ab2..ce1e0fab7 100644 --- a/tests/query_test/test_observability.py +++ b/tests/query_test/test_observability.py @@ -343,6 +343,7 @@ class TestObservability(ImpalaTestSuite): r'Frontend:', r'Referenced Tables:', r'Original Table Versions:', + r'Catalog Service ID:', r'CatalogFetch.ColumnStats.Hits', r'CatalogFetch.ColumnStats.Misses', r'CatalogFetch.ColumnStats.Requests',