IMPALA-7669: Gracefully handle concurrent invalidate/partial fetch RPCs

The bug here was that any partial RPC on an IncompleteTable was throwing
an NPE.

Ideally, we attempt to load the table (if we find that it is not loaded)
before making the partial info request, but a concurrent invalidate could
reset the table state and move it back to an uninitialized state.

This patch handles this case better by propagating a meaningful error to
the caller.

Testing:
-------
- Added a test that fails consistently with an NPE without this patch.

Change-Id: I8533f73f25ca42a20f146ddfd95d4213add9b705
Reviewed-on: http://gerrit.cloudera.org:8080/11638
Reviewed-by: Bharath Vissapragada <bharathv@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Bharath Vissapragada
2018-10-09 16:40:59 -07:00
committed by Impala Public Jenkins
parent 28aecd6db8
commit 2b2cf8d966
6 changed files with 60 additions and 3 deletions

View File

@@ -378,6 +378,7 @@ enum CatalogLookupStatus {
OK, OK,
DB_NOT_FOUND, DB_NOT_FOUND,
TABLE_NOT_FOUND, TABLE_NOT_FOUND,
TABLE_NOT_LOADED,
FUNCTION_NOT_FOUND FUNCTION_NOT_FOUND
} }

View File

@@ -2195,6 +2195,10 @@ public class CatalogServiceCatalog extends Catalog {
} }
if (table == null) { if (table == null) {
return createGetPartialCatalogObjectError(CatalogLookupStatus.TABLE_NOT_FOUND); return createGetPartialCatalogObjectError(CatalogLookupStatus.TABLE_NOT_FOUND);
} else if (!table.isLoaded()) {
// Table can still remain in an incomplete state if there was a concurrent
// invalidate request.
return createGetPartialCatalogObjectError(CatalogLookupStatus.TABLE_NOT_LOADED);
} }
// TODO(todd): consider a read-write lock here. // TODO(todd): consider a read-write lock here.
table.getLock().lock(); table.getLock().lock();

View File

@@ -20,6 +20,7 @@ package org.apache.impala.catalog;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.impala.common.ImpalaException; import org.apache.impala.common.ImpalaException;
@@ -136,6 +137,7 @@ public class IncompleteTable extends Table {
@Override @Override
public TGetPartialCatalogObjectResponse getPartialInfo( public TGetPartialCatalogObjectResponse getPartialInfo(
TGetPartialCatalogObjectRequest req) throws TableLoadingException { TGetPartialCatalogObjectRequest req) throws TableLoadingException {
Preconditions.checkNotNull(cause_);
Throwables.propagateIfPossible(cause_, TableLoadingException.class); Throwables.propagateIfPossible(cause_, TableLoadingException.class);
throw new TableLoadingException(cause_.getMessage()); throw new TableLoadingException(cause_.getMessage());
} }

View File

@@ -344,6 +344,7 @@ public class CatalogdMetaProvider implements MetaProvider {
case DB_NOT_FOUND: case DB_NOT_FOUND:
case FUNCTION_NOT_FOUND: case FUNCTION_NOT_FOUND:
case TABLE_NOT_FOUND: case TABLE_NOT_FOUND:
case TABLE_NOT_LOADED:
invalidateCacheForObject(req.object_desc); invalidateCacheForObject(req.object_desc);
throw new InconsistentMetadataFetchException( throw new InconsistentMetadataFetchException(
String.format("Fetching %s failed. Could not find %s", String.format("Fetching %s failed. Could not find %s",

View File

@@ -34,7 +34,6 @@ import avro.shaded.com.google.common.collect.Lists;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.impala.common.InternalException; import org.apache.impala.common.InternalException;
import org.apache.impala.common.RuntimeEnv;
import org.apache.impala.service.BackendConfig; import org.apache.impala.service.BackendConfig;
import org.apache.impala.testutil.CatalogServiceTestCatalog; import org.apache.impala.testutil.CatalogServiceTestCatalog;
import org.apache.impala.thrift.TCatalogInfoSelector; import org.apache.impala.thrift.TCatalogInfoSelector;
@@ -50,8 +49,6 @@ import org.apache.impala.thrift.TTableInfoSelector;
import org.apache.thrift.TDeserializer; import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException; import org.apache.thrift.TException;
import org.apache.thrift.TSerializer; import org.apache.thrift.TSerializer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;

View File

@@ -18,6 +18,7 @@
# Test behaviors specific to --use_local_catalog being enabled. # Test behaviors specific to --use_local_catalog being enabled.
import pytest import pytest
import Queue
import random import random
import threading import threading
import time import time
@@ -228,6 +229,57 @@ class TestCompactCatalogUpdates(CustomClusterTestSuite):
client1.close() client1.close()
client2.close() client2.close()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal")
def test_concurrent_invalidate_with_queries(self, unique_database):
"""
Tests that the queries are replanned when they clash with concurrent invalidates.
"""
# TODO: Merge this with the above test after fixing IMPALA-7717
try:
impalad1 = self.cluster.impalads[0]
impalad2 = self.cluster.impalads[1]
client1 = impalad1.service.create_beeswax_client()
client2 = impalad2.service.create_beeswax_client()
# Track the number of replans.
replans_seen = [0]
replans_seen_lock = threading.Lock()
# Queue to propagate exceptions from failed queries, if any.
failed_queries = Queue.Queue()
def stress_thread(client):
while replans_seen[0] == 0:
q = random.choice([
'invalidate metadata functional.alltypesnopart',
'select count(*) from functional.alltypesnopart',
'select count(*) from functional.alltypesnopart'])
try:
ret = self.execute_query_expect_success(client, q)
except Exception as e:
failed_queries.put((q, str(e)))
if RETRY_PROFILE_MSG in ret.runtime_profile:
with replans_seen_lock:
replans_seen[0] += 1
threads = [threading.Thread(target=stress_thread, args=(c,))
for c in [client1, client2]]
for t in threads:
t.start()
for t in threads:
t.join(30)
assert failed_queries.empty(),\
"Failed query count non zero: %s" % list(failed_queries.queue)
assert replans_seen[0] > 0, "Did not trigger any re-plans"
finally:
client1.close()
client2.close()
@pytest.mark.execute_serially @pytest.mark.execute_serially
@CustomClusterTestSuite.with_args( @CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true --local_catalog_max_fetch_retries=0", impalad_args="--use_local_catalog=true --local_catalog_max_fetch_retries=0",