mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
IMPALA-7669: Gracefully handle concurrent invalidate/partial fetch RPCs
The bug here was that any partial RPC on an IncompleteTable was throwing an NPE. Ideally, we attempt to load the table (if we find that it is not loaded) before making the partial info request, but a concurrent invalidate could reset the table state and move it back to an uninitialized state. This patch handles this case better by propagating a meaningful error to the caller. Testing: ------- - Added a test that fails consistently with an NPE without this patch. Change-Id: I8533f73f25ca42a20f146ddfd95d4213add9b705 Reviewed-on: http://gerrit.cloudera.org:8080/11638 Reviewed-by: Bharath Vissapragada <bharathv@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
committed by
Impala Public Jenkins
parent
28aecd6db8
commit
2b2cf8d966
@@ -378,6 +378,7 @@ enum CatalogLookupStatus {
|
||||
OK,
|
||||
DB_NOT_FOUND,
|
||||
TABLE_NOT_FOUND,
|
||||
TABLE_NOT_LOADED,
|
||||
FUNCTION_NOT_FOUND
|
||||
}
|
||||
|
||||
|
||||
@@ -2195,6 +2195,10 @@ public class CatalogServiceCatalog extends Catalog {
|
||||
}
|
||||
if (table == null) {
|
||||
return createGetPartialCatalogObjectError(CatalogLookupStatus.TABLE_NOT_FOUND);
|
||||
} else if (!table.isLoaded()) {
|
||||
// Table can still remain in an incomplete state if there was a concurrent
|
||||
// invalidate request.
|
||||
return createGetPartialCatalogObjectError(CatalogLookupStatus.TABLE_NOT_LOADED);
|
||||
}
|
||||
// TODO(todd): consider a read-write lock here.
|
||||
table.getLock().lock();
|
||||
|
||||
@@ -20,6 +20,7 @@ package org.apache.impala.catalog;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
|
||||
|
||||
import org.apache.impala.common.ImpalaException;
|
||||
@@ -136,6 +137,7 @@ public class IncompleteTable extends Table {
|
||||
@Override
|
||||
public TGetPartialCatalogObjectResponse getPartialInfo(
|
||||
TGetPartialCatalogObjectRequest req) throws TableLoadingException {
|
||||
Preconditions.checkNotNull(cause_);
|
||||
Throwables.propagateIfPossible(cause_, TableLoadingException.class);
|
||||
throw new TableLoadingException(cause_.getMessage());
|
||||
}
|
||||
|
||||
@@ -344,6 +344,7 @@ public class CatalogdMetaProvider implements MetaProvider {
|
||||
case DB_NOT_FOUND:
|
||||
case FUNCTION_NOT_FOUND:
|
||||
case TABLE_NOT_FOUND:
|
||||
case TABLE_NOT_LOADED:
|
||||
invalidateCacheForObject(req.object_desc);
|
||||
throw new InconsistentMetadataFetchException(
|
||||
String.format("Fetching %s failed. Could not find %s",
|
||||
|
||||
@@ -34,7 +34,6 @@ import avro.shaded.com.google.common.collect.Lists;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
|
||||
import org.apache.impala.common.InternalException;
|
||||
import org.apache.impala.common.RuntimeEnv;
|
||||
import org.apache.impala.service.BackendConfig;
|
||||
import org.apache.impala.testutil.CatalogServiceTestCatalog;
|
||||
import org.apache.impala.thrift.TCatalogInfoSelector;
|
||||
@@ -50,8 +49,6 @@ import org.apache.impala.thrift.TTableInfoSelector;
|
||||
import org.apache.thrift.TDeserializer;
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.TSerializer;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
# Test behaviors specific to --use_local_catalog being enabled.
|
||||
|
||||
import pytest
|
||||
import Queue
|
||||
import random
|
||||
import threading
|
||||
import time
|
||||
@@ -228,6 +229,57 @@ class TestCompactCatalogUpdates(CustomClusterTestSuite):
|
||||
client1.close()
|
||||
client2.close()
|
||||
|
||||
@pytest.mark.execute_serially
|
||||
@CustomClusterTestSuite.with_args(
|
||||
impalad_args="--use_local_catalog=true",
|
||||
catalogd_args="--catalog_topic_mode=minimal")
|
||||
def test_concurrent_invalidate_with_queries(self, unique_database):
|
||||
"""
|
||||
Tests that the queries are replanned when they clash with concurrent invalidates.
|
||||
"""
|
||||
# TODO: Merge this with the above test after fixing IMPALA-7717
|
||||
try:
|
||||
impalad1 = self.cluster.impalads[0]
|
||||
impalad2 = self.cluster.impalads[1]
|
||||
client1 = impalad1.service.create_beeswax_client()
|
||||
client2 = impalad2.service.create_beeswax_client()
|
||||
|
||||
# Track the number of replans.
|
||||
replans_seen = [0]
|
||||
replans_seen_lock = threading.Lock()
|
||||
|
||||
# Queue to propagate exceptions from failed queries, if any.
|
||||
failed_queries = Queue.Queue()
|
||||
|
||||
def stress_thread(client):
|
||||
while replans_seen[0] == 0:
|
||||
q = random.choice([
|
||||
'invalidate metadata functional.alltypesnopart',
|
||||
'select count(*) from functional.alltypesnopart',
|
||||
'select count(*) from functional.alltypesnopart'])
|
||||
try:
|
||||
ret = self.execute_query_expect_success(client, q)
|
||||
except Exception as e:
|
||||
failed_queries.put((q, str(e)))
|
||||
if RETRY_PROFILE_MSG in ret.runtime_profile:
|
||||
with replans_seen_lock:
|
||||
replans_seen[0] += 1
|
||||
|
||||
threads = [threading.Thread(target=stress_thread, args=(c,))
|
||||
for c in [client1, client2]]
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join(30)
|
||||
assert failed_queries.empty(),\
|
||||
"Failed query count non zero: %s" % list(failed_queries.queue)
|
||||
assert replans_seen[0] > 0, "Did not trigger any re-plans"
|
||||
|
||||
finally:
|
||||
client1.close()
|
||||
client2.close()
|
||||
|
||||
|
||||
@pytest.mark.execute_serially
|
||||
@CustomClusterTestSuite.with_args(
|
||||
impalad_args="--use_local_catalog=true --local_catalog_max_fetch_retries=0",
|
||||
|
||||
Reference in New Issue
Block a user