diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift index 6b94697ab..c0792b324 100644 --- a/common/thrift/CatalogService.thrift +++ b/common/thrift/CatalogService.thrift @@ -378,6 +378,7 @@ enum CatalogLookupStatus { OK, DB_NOT_FOUND, TABLE_NOT_FOUND, + TABLE_NOT_LOADED, FUNCTION_NOT_FOUND } diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index b3c714f4b..95335071d 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -2195,6 +2195,10 @@ public class CatalogServiceCatalog extends Catalog { } if (table == null) { return createGetPartialCatalogObjectError(CatalogLookupStatus.TABLE_NOT_FOUND); + } else if (!table.isLoaded()) { + // Table can still remain in an incomplete state if there was a concurrent + // invalidate request. + return createGetPartialCatalogObjectError(CatalogLookupStatus.TABLE_NOT_LOADED); } // TODO(todd): consider a read-write lock here. table.getLock().lock(); diff --git a/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java b/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java index ba3e5cfc1..7de3d89a9 100644 --- a/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java @@ -20,6 +20,7 @@ package org.apache.impala.catalog; import java.util.List; import java.util.Set; +import com.google.common.base.Preconditions; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.impala.common.ImpalaException; @@ -136,6 +137,7 @@ public class IncompleteTable extends Table { @Override public TGetPartialCatalogObjectResponse getPartialInfo( TGetPartialCatalogObjectRequest req) throws TableLoadingException { + Preconditions.checkNotNull(cause_); Throwables.propagateIfPossible(cause_, TableLoadingException.class); throw new TableLoadingException(cause_.getMessage()); } diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java index 62f1d3e75..e099b5349 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java @@ -344,6 +344,7 @@ public class CatalogdMetaProvider implements MetaProvider { case DB_NOT_FOUND: case FUNCTION_NOT_FOUND: case TABLE_NOT_FOUND: + case TABLE_NOT_LOADED: invalidateCacheForObject(req.object_desc); throw new InconsistentMetadataFetchException( String.format("Fetching %s failed. Could not find %s", diff --git a/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java b/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java index 07d33090e..63e0fda5e 100644 --- a/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java +++ b/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java @@ -34,7 +34,6 @@ import avro.shaded.com.google.common.collect.Lists; import com.google.common.base.Preconditions; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.impala.common.InternalException; -import org.apache.impala.common.RuntimeEnv; import org.apache.impala.service.BackendConfig; import org.apache.impala.testutil.CatalogServiceTestCatalog; import org.apache.impala.thrift.TCatalogInfoSelector; @@ -50,8 +49,6 @@ import org.apache.impala.thrift.TTableInfoSelector; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; import com.google.common.collect.ImmutableList; diff --git a/tests/custom_cluster/test_local_catalog.py b/tests/custom_cluster/test_local_catalog.py index 14a9a54f3..916443b2b 100644 --- a/tests/custom_cluster/test_local_catalog.py +++ b/tests/custom_cluster/test_local_catalog.py @@ -18,6 +18,7 @@ # Test behaviors specific to --use_local_catalog being enabled. import pytest +import Queue import random import threading import time @@ -228,6 +229,57 @@ class TestCompactCatalogUpdates(CustomClusterTestSuite): client1.close() client2.close() + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args="--use_local_catalog=true", + catalogd_args="--catalog_topic_mode=minimal") + def test_concurrent_invalidate_with_queries(self, unique_database): + """ + Tests that the queries are replanned when they clash with concurrent invalidates. + """ + # TODO: Merge this with the above test after fixing IMPALA-7717 + try: + impalad1 = self.cluster.impalads[0] + impalad2 = self.cluster.impalads[1] + client1 = impalad1.service.create_beeswax_client() + client2 = impalad2.service.create_beeswax_client() + + # Track the number of replans. + replans_seen = [0] + replans_seen_lock = threading.Lock() + + # Queue to propagate exceptions from failed queries, if any. + failed_queries = Queue.Queue() + + def stress_thread(client): + while replans_seen[0] == 0: + q = random.choice([ + 'invalidate metadata functional.alltypesnopart', + 'select count(*) from functional.alltypesnopart', + 'select count(*) from functional.alltypesnopart']) + try: + ret = self.execute_query_expect_success(client, q) + except Exception as e: + failed_queries.put((q, str(e))) + if RETRY_PROFILE_MSG in ret.runtime_profile: + with replans_seen_lock: + replans_seen[0] += 1 + + threads = [threading.Thread(target=stress_thread, args=(c,)) + for c in [client1, client2]] + for t in threads: + t.start() + for t in threads: + t.join(30) + assert failed_queries.empty(),\ + "Failed query count non zero: %s" % list(failed_queries.queue) + assert replans_seen[0] > 0, "Did not trigger any re-plans" + + finally: + client1.close() + client2.close() + + @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args="--use_local_catalog=true --local_catalog_max_fetch_retries=0",