IMPALA-14416: JniFrontend.getDbs() should handle InconsistentMetadataFetchException

JniFrontend.getDbs() returns the thrift representation of all the dbs.
This might trigger multiple getPartialCatalogObject requests to catalogd
and could fail in InconsistentMetadataFetchException, e.g. if a db is
removed after coordinator fetching the db name list and before
coordinator fetching the msDb of that db.

This patch fixes the issue by retrying the above steps when hitting
InconsistentMetadataFetchException, similar to what other methods in
Frontend do. Adds getThriftDbs() in Frontend to directly return the
thrift db list so JniFrontend can use it directly and the retry can be
added inside Frontend.java.

TestAuthorization.test_local_catalog_show_dbs_with_transient_db is an
existing test to verify a similar problem. Running this test with
authorization disabled can reproduce the current bug. So this patch
extracts the test code into
TestLocalCatalogRetries._run_show_dbs_with_transient_db() and share it
in both authz enabled and disabled tests.

Tests
 - Ran TestLocalCatalogRetries.test_show_dbs_retry 60 times. Without the
   fix, it fails in about a dozen times.

Change-Id: Ib337f88a2ac0f35142417f6cee51d30497f12845
Reviewed-on: http://gerrit.cloudera.org:8080/23402
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
stiga-huang
2025-09-09 20:42:37 +08:00
committed by Impala Public Jenkins
parent f4c0c396ff
commit a41c5cbfdd
5 changed files with 66 additions and 33 deletions

View File

@@ -1149,8 +1149,7 @@ public class Frontend {
* exception. Inconsistent metadata comes up due to interleaving catalog object updates
* with retrieving those objects. Instead of bubbling up the issue to the user, retrying
* can get the user's operation to run on a consistent snapshot and to succeed.
* Retries are *not* needed for accessing top-level objects such as databases, since
* they do not have a parent, so cannot be inconsistent.
* Max number of retries is configured by local_catalog_max_fetch_retries.
* TODO: this class is typically used in a loop at the call-site. replace with lambdas
* in Java 8 to simplify the looping boilerplate.
*/
@@ -1481,7 +1480,8 @@ public class Frontend {
/**
* Returns all databases in catalog cache that match the pattern of 'matcher' and are
* accessible to 'user'.
* accessible to 'user'. Callers should handle InconsistentMetadataFetchException when
* using these dbs.
*/
public List<? extends FeDb> getDbs(PatternMatcher matcher, User user)
throws UserCancelledException, InternalException {
@@ -1506,6 +1506,32 @@ public class Frontend {
return dbs;
}
/**
* Returns thrift representation of all databases in catalog cache that match the
* pattern of 'matcher' and are accessible to 'user'. Retries on
* InconsistentMetadataFetchException are handled in this method (see comments of
* RetryTracker)
*/
public List<TDatabase> getThriftDbs(PatternMatcher matcher, User user)
throws UserCancelledException, InternalException {
Frontend.RetryTracker retries = new Frontend.RetryTracker(
String.format("Fetching db list for user %s", user.getName()));
while (true) {
try {
List<? extends FeDb> dbs = getDbs(matcher, user);
List<TDatabase> tDbs = Lists.newArrayListWithCapacity(dbs.size());
// LocalDb.toThrift() might trigger getPartialCatalogObject request to catalogd
// which could fail if the db is dropped concurrently. In this case,
// InconsistentMetadataFetchException will be thrown and we will retry from
// getting the db list.
for (FeDb db : dbs) tDbs.add(db.toThrift());
return tDbs;
} catch (InconsistentMetadataFetchException e) {
retries.handleRetryOrThrow(e);
}
}
}
/**
* Handles DESCRIBE HISTORY queries.
*/

View File

@@ -387,13 +387,9 @@ public class JniFrontend {
TSessionState session = params.isSetSession() ? params.getSession() : null;
User user = getUser(session);
List<? extends FeDb> dbs = frontend_.getDbs(
PatternMatcher.createHivePatternMatcher(params.pattern), user);
TGetDbsResult result = new TGetDbsResult();
List<TDatabase> tDbs = Lists.newArrayListWithCapacity(dbs.size());
for (FeDb db: dbs) tDbs.add(db.toThrift());
result.setDbs(tDbs);
result.setDbs(frontend_.getThriftDbs(
PatternMatcher.createHivePatternMatcher(params.pattern), user));
try {
TSerializer serializer = new TSerializer(protocolFactory_);
return serializer.serialize(result);

View File

@@ -20,8 +20,6 @@
from __future__ import absolute_import, division, print_function
from getpass import getuser
import random
import threading
import time
import pytest
@@ -29,6 +27,7 @@ from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.file_utils import assert_file_in_dir_contains
from tests.common.test_result_verifier import error_msg_equal
from tests.common.test_vector import HS2
from tests.custom_cluster.test_local_catalog import TestLocalCatalogRetries
PRIVILEGES = ['all', 'alter', 'drop', 'insert', 'refresh', 'select']
ADMIN = "admin"
@@ -192,25 +191,5 @@ class TestAuthorization(CustomClusterTestSuite):
# Use admin user to have create+drop privileges.
unique_database = unique_name + "_db"
admin_client = self.create_impala_client(user=ADMIN)
stop = False
def create_drop_db():
while not stop:
admin_client.execute("create database " + unique_database)
# Sleep some time so coordinator can get the updates of it.
time.sleep(0.1)
if stop:
break
admin_client.execute("drop database " + unique_database)
t = threading.Thread(target=create_drop_db)
t.start()
try:
for i in range(100):
self.execute_query("show databases")
# Sleep some time so the db can be dropped.
time.sleep(0.2)
finally:
stop = True
t.join()
admin_client.execute("drop database if exists " + unique_database)
TestLocalCatalogRetries._run_show_dbs_with_transient_db(
unique_database, admin_client, self.client)

View File

View File

@@ -428,6 +428,38 @@ class TestLocalCatalogRetries(CustomClusterTestSuite):
client1.close()
client2.close()
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal")
def test_show_dbs_retry(self, unique_name):
self._run_show_dbs_with_transient_db(
unique_name + "_db", self.create_impala_client(), self.client)
@classmethod
def _run_show_dbs_with_transient_db(cls, unique_database, admin_client, user_client):
stop = False
def create_drop_db():
while not stop:
admin_client.execute("create database " + unique_database)
# Sleep some time so coordinator can get the updates of it.
time.sleep(0.1)
if stop:
break
admin_client.execute("drop database " + unique_database)
t = threading.Thread(target=create_drop_db)
t.start()
try:
for i in range(100):
user_client.execute("show databases")
# Sleep some time so the db can be dropped.
time.sleep(0.2)
finally:
stop = True
t.join()
admin_client.execute("drop database if exists " + unique_database)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true --inject_latency_after_catalog_fetch_ms=50",