IMPALA-14447: Parallelize table loading in getMissingTables()

StmtMetadataLoader.getMissingTables() load missing tables in serial
manner. In local catalog mode, large number of serial table loading can
incur significant round trip latency to CatalogD. This patch parallelize
the table loading by using executor service to lookup and gather all
non-null FeTables from given TableName set.

Modify LocalCatalog.loadDbs() and LocalDb.loadTableNames() slightly to
make it thread-safe. Change FrontendProfile.Scope to support nested
scope referencing the same FrontendProfile instance.

Added new flag max_stmt_metadata_loader_threads to control the maximum
number of threads to use for loading table metadata during query
compilation. It is deafult to 8 threads per query compilation.

If there is only one table to load, max_stmt_metadata_loader_threads set
to 1, or RejectedExecutionException raised, fallback to load table
serially.

Testing:
Run and pass few tests such as test_catalogd_ha.py,
test_concurrent_ddls.py, and test_observability.py.
Add FE tests CatalogdMetaProviderTest.testProfileParallelLoad.
Manually run following query and observe parallel loading by setting
TRACE level log in CatalogdMetaProvider.java.

use functional;
select count(*) from alltypesnopart
union select count(*) from alltypessmall
union select count(*) from alltypestiny
union select count(*) from alltypesagg;

Change-Id: I97a5165844ae846b28338d62e93a20121488d79f
Reviewed-on: http://gerrit.cloudera.org:8080/23436
Reviewed-by: Quanlong Huang <huangquanlong@gmail.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Riza Suminto
2025-09-17 14:05:20 -07:00
committed by Impala Public Jenkins
parent cde4bc016c
commit 1008decc07
9 changed files with 249 additions and 51 deletions

View File

@@ -302,6 +302,10 @@ DEFINE_int32(iceberg_catalog_num_threads, 16,
"Maximum number of threads to use for Iceberg catalog operations. These threads are "
"shared among concurrent Iceberg catalog operation (ie., ExpireSnapshot).");
DEFINE_int32(max_stmt_metadata_loader_threads, 8,
"Maximum number of threads to use for loading table metadata during query "
"compilation.");
// These coefficients have not been determined empirically. The write coefficient
// matches the coefficient for a broadcast sender in DataStreamSink. The read
// coefficient matches the coefficient for an exchange receiver in ExchandeNode.
@@ -358,6 +362,7 @@ DEFINE_validator(query_cpu_count_divisor, &ValidatePositiveDouble);
DEFINE_validator(min_processing_per_thread, &ValidatePositiveInt64);
DEFINE_validator(query_cpu_root_factor, &ValidatePositiveDouble);
DEFINE_validator(iceberg_catalog_num_threads, &ValidatePositiveInt32);
DEFINE_validator(max_stmt_metadata_loader_threads, &ValidatePositiveInt32);
DEFINE_validator(tuple_cache_cost_coefficient_write_bytes, &ValidateNonnegativeDouble);
DEFINE_validator(tuple_cache_cost_coefficient_write_rows, &ValidateNonnegativeDouble);
DEFINE_validator(tuple_cache_cost_coefficient_read_bytes, &ValidateNonnegativeDouble);
@@ -590,6 +595,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
cfg.__set_tuple_cache_cost_coefficient_read_rows(
FLAGS_tuple_cache_cost_coefficient_read_rows);
cfg.__set_min_jdbc_scan_cardinality(FLAGS_min_jdbc_scan_cardinality);
cfg.__set_max_stmt_metadata_loader_threads(FLAGS_max_stmt_metadata_loader_threads);
return Status::OK();
}

View File

@@ -359,4 +359,6 @@ struct TBackendGflags {
164: required double tuple_cache_cost_coefficient_read_rows
165: required i32 min_jdbc_scan_cardinality
166: required i32 max_stmt_metadata_loader_threads
}

View File

@@ -1568,6 +1568,10 @@ public class Analyzer {
// Add paths rooted at a table with an unqualified and fully-qualified table name.
List<TableName> candidateTbls = Path.getCandidateTables(rawPath, getDefaultDb());
if (LOG.isTraceEnabled()) {
LOG.trace("Candidate tables to lookup in catalog/cache for {} {}: {}", pathType,
ToSqlUtils.getPathSql(rawPath), candidateTbls);
}
for (int tblNameIdx = 0; tblNameIdx < candidateTbls.size(); ++tblNameIdx) {
TableName tblName = candidateTbls.get(tblNameIdx);
FeTable tbl = null;

View File

@@ -19,12 +19,23 @@ package org.apache.impala.analysis;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.impala.authorization.TableMask;
import org.apache.impala.authorization.User;
@@ -36,11 +47,15 @@ import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.FeView;
import org.apache.impala.catalog.MaterializedViewHdfsTable;
import org.apache.impala.catalog.Table;
import org.apache.impala.catalog.local.InconsistentMetadataFetchException;
import org.apache.impala.catalog.local.LocalCatalogException;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.InternalException;
import org.apache.impala.common.Pair;
import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.service.Frontend;
import org.apache.impala.service.FrontendProfile;
import org.apache.impala.thrift.TUniqueId;
import org.apache.impala.util.AcidUtils;
import org.apache.impala.util.EventSequence;
@@ -50,6 +65,8 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Loads all table and view metadata relevant for a single SQL statement and returns the
@@ -70,8 +87,9 @@ public class StmtMetadataLoader {
private final TUniqueId queryId_;
// Results of the loading process. See StmtTableCache.
private final Set<String> dbs_ = new HashSet<>();
private final Map<TableName, FeTable> loadedOrFailedTbls_ = new HashMap<>();
// Use thread-safe collection for parallel stream in getMissingTables().
private final Set<String> dbs_ = Collections.synchronizedSet(new HashSet<>());
private final Map<TableName, FeTable> loadedOrFailedTbls_ = new ConcurrentHashMap<>();
// Metrics for the metadata load.
// Number of prioritizedLoad() RPCs issued to the catalogd.
@@ -310,6 +328,91 @@ public class StmtMetadataLoader {
needsAnyTableMasksInQuery_);
}
/**
* Loads the table 'tblName' from 'catalog' and returns it. If the table or its
* database does not exist, returns null. If the table is already in
* 'loadedOrFailedTbls_', returns null. Everything called from here needs to be
* reentrant.
*/
private @Nullable Pair<TableName, FeTable> loadFeTable(
FeCatalog catalog, TableName tblName) {
if (loadedOrFailedTbls_.containsKey(tblName)) return null;
FeDb db = catalog.getDb(tblName.getDb());
if (db == null) return null;
dbs_.add(tblName.getDb());
FeTable tbl = db.getTable(tblName.getTbl());
if (tbl == null) return null;
return Pair.create(tblName, tbl);
}
/**
* Load 'tbls' serially.
*/
private List<Pair<TableName, FeTable>> serialTableLoad(
final FeCatalog catalog, Set<TableName> tbls) {
LOG.trace("Serial load {}", tbls);
return tbls.stream()
.map(tblName -> loadFeTable(catalog, tblName))
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
/**
* Load 'tbls' using executor service to speed up table loadings from CatalogD in case
* of local catalog mode.
*/
private List<Pair<TableName, FeTable>> parallelTableLoad(final FeCatalog catalog,
Set<TableName> tbls, int maxThreads) throws InternalException {
LOG.trace("Parallel load {} with {} max threads", tbls, maxThreads);
FrontendProfile profile = FrontendProfile.getCurrentOrNull();
String queryIdStr =
queryId_ == null ? "<unknown_query_id>" : TUniqueIdUtil.PrintId(queryId_);
List<Pair<TableName, FeTable>> tables = new ArrayList<>();
ExecutorService executorService = null;
try {
executorService = Executors.newFixedThreadPool(maxThreads,
new ThreadFactoryBuilder()
.setNameFormat("MissingTableLoaderThread-" + queryIdStr + "-%d")
.build());
// Transform tbls to a list of tasks.
List<Callable<Pair<TableName, FeTable>>> tasks =
tbls.stream()
.map(tblName -> (Callable<Pair<TableName, FeTable>>) () -> {
try (FrontendProfile.Scope s =
FrontendProfile.newScopeWithExistingProfile(profile)) {
return loadFeTable(catalog, tblName);
}
})
.collect(Collectors.toList());
// Invoke all tasks and wait for them to finish.
List<Future<Pair<TableName, FeTable>>> futures = executorService.invokeAll(tasks);
for (Future<Pair<TableName, FeTable>> future : futures) {
Pair<TableName, FeTable> nameTblPair = future.get();
if (nameTblPair != null) tables.add(nameTblPair);
}
} catch (ExecutionException ex) {
// Unwrap and rethrow the cause if it is one of the declared exceptions.
// TODO: Any other exceptions to propagate?
Throwables.propagateIfPossible(ex.getCause(),
InconsistentMetadataFetchException.class, LocalCatalogException.class);
Throwables.propagateIfPossible(ex.getCause(), InternalException.class);
// Otherwise, wrap as InternalException and rethrow.
throw new InternalException(
"Caught exception during parallel table loading of " + tbls, ex.getCause());
} catch (InterruptedException ex) {
// Wrap as InternalException and rethrow.
throw new InternalException(
"ExecutorService interrupted during parallel table loading of " + tbls, ex);
} catch (RejectedExecutionException ex) {
// Parallel load rejected, fall back to serial load.
tables = serialTableLoad(catalog, tbls);
} finally {
if (executorService != null) executorService.shutdown();
}
return tables;
}
/**
* Determines whether the 'tbls' are loaded in the given catalog or not. Adds the names
* of referenced databases that exist to 'dbs_', and loaded tables to
@@ -319,17 +422,24 @@ public class StmtMetadataLoader {
* Path.getCandidateTables(). Non-existent tables are ignored and not returned or
* added to 'loadedOrFailedTbls_'.
*/
private Set<TableName> getMissingTables(FeCatalog catalog, Set<TableName> tbls,
Map<TableName, FeTable> missingTblsSnapshot) {
private Set<TableName> getMissingTables(final FeCatalog catalog, Set<TableName> tbls,
Map<TableName, FeTable> missingTblsSnapshot) throws InternalException {
Set<TableName> missingTbls = new HashSet<>();
if (tbls.isEmpty()) return missingTbls;
Set<TableName> viewTbls = new HashSet<>();
for (TableName tblName: tbls) {
if (loadedOrFailedTbls_.containsKey(tblName)) continue;
FeDb db = catalog.getDb(tblName.getDb());
if (db == null) continue;
dbs_.add(tblName.getDb());
FeTable tbl = db.getTable(tblName.getTbl());
if (tbl == null) continue;
int maxThreads =
Math.min(tbls.size(), BackendConfig.INSTANCE.getMaxStmtMetadataLoaderThreads());
List<Pair<TableName, FeTable>> tables = null;
if (maxThreads > 1 && BackendConfig.INSTANCE.getBackendCfg().use_local_catalog) {
tables = parallelTableLoad(catalog, tbls, maxThreads);
} else {
tables = serialTableLoad(catalog, tbls);
}
for (Pair<TableName, FeTable> nameTblPair : tables) {
TableName tblName = nameTblPair.first;
FeTable tbl = nameTblPair.second;
if (!tbl.isLoaded()
|| (tbl instanceof FeIncompleteTable
&& ((FeIncompleteTable) tbl).isLoadFailedByRecoverableError())) {

View File

@@ -57,6 +57,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
/**
@@ -78,7 +79,8 @@ public class LocalCatalog implements FeCatalog {
private final static Logger LOG = LoggerFactory.getLogger(LocalCatalog.class);
private final MetaProvider metaProvider_;
private Map<String, FeDb> dbs_ = new HashMap<>();
// Mapping of Db name to FeDb. Not cleared once populated.
private ImmutableMap<String, FeDb> dbs_ = null;
private Map<String, HdfsCachePool> hdfsCachePools_ = null;
private String nullPartitionKeyValue_;
// Catalog service id when MetaProvider is CatalogdMetaProvider
@@ -101,27 +103,33 @@ public class LocalCatalog implements FeCatalog {
return Catalog.filterCatalogObjectsByPattern(dbs_.values(), matcher);
}
/**
* Populate dbs_ if it is empty. This method is synchronized to avoid
* multiple threads trying to populate dbs_ at the same time.
*/
private void loadDbs() {
if (!dbs_.isEmpty()) return;
Map<String, FeDb> dbs = new HashMap<>();
List<String> names;
try {
names = metaProvider_.loadDbList();
} catch (TException e) {
throw new LocalCatalogException("Unable to load database names", e);
}
for (String dbName : names) {
dbName = dbName.toLowerCase();
if (dbs_.containsKey(dbName)) {
dbs.put(dbName, dbs_.get(dbName));
} else {
dbs.put(dbName, new LocalDb(this, dbName));
}
}
// Do nothing if dbs_ is already populated.
if (dbs_ != null) return;
Db bdb = BuiltinsDb.getInstance();
dbs.put(bdb.getName(), bdb);
dbs_ = dbs;
synchronized (this) {
// Check again to avoid redundant work.
if (dbs_ != null) return;
Map<String, FeDb> dbs = new HashMap<>();
List<String> names;
try {
names = metaProvider_.loadDbList();
} catch (TException e) {
throw new LocalCatalogException("Unable to load database names", e);
}
for (String dbName : names) {
String lowerDbName = dbName.toLowerCase();
dbs.putIfAbsent(lowerDbName, new LocalDb(this, lowerDbName));
}
Db bdb = BuiltinsDb.getInstance();
dbs.putIfAbsent(bdb.getName(), bdb);
dbs_ = ImmutableMap.copyOf(dbs);
}
}
@Override

View File

@@ -19,11 +19,11 @@ package org.apache.impala.catalog.local;
import java.util.ArrayList;
import java.util.Collections;
import java.util.stream.Collectors;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.PrincipalType;
@@ -38,7 +38,6 @@ import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.Function;
import org.apache.impala.catalog.Function.CompareMode;
import org.apache.impala.catalog.TableLoadingException;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.common.Pair;
import org.apache.impala.thrift.TBriefTableMeta;
@@ -56,7 +55,6 @@ import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Database instance loaded from {@link LocalCatalog}.
*
@@ -74,7 +72,7 @@ public class LocalDb implements FeDb {
* Map from lower-cased table name to table object. Values will be
* null for tables which have not yet been loaded.
*/
private Map<String, FeTable> tables_;
private ConcurrentHashMap<String, FeTable> tables_;
/**
* Map of function name to list of signatures for that function name.
@@ -198,17 +196,21 @@ public class LocalDb implements FeDb {
*/
private void loadTableNames() {
if (tables_ != null) return;
Map<String, FeTable> newMap = new HashMap<>();
try {
MetaProvider metaProvider = catalog_.getMetaProvider();
for (TBriefTableMeta meta : metaProvider.loadTableList(name_)) {
newMap.put(meta.getName(), new LocalIncompleteTable(this, meta));
synchronized (this) {
if (tables_ != null) return;
ConcurrentHashMap<String, FeTable> newMap = new ConcurrentHashMap<>();
try {
MetaProvider metaProvider = catalog_.getMetaProvider();
for (TBriefTableMeta meta : metaProvider.loadTableList(name_)) {
newMap.put(meta.getName(), new LocalIncompleteTable(this, meta));
}
} catch (TException e) {
throw new LocalCatalogException(
String.format("Could not load table names for database '%s' from HMS", name_),
e);
}
} catch (TException e) {
throw new LocalCatalogException(String.format(
"Could not load table names for database '%s' from HMS", name_), e);
tables_ = newMap;
}
tables_ = newMap;
}
@Override

View File

@@ -620,4 +620,8 @@ public class BackendConfig {
public double getTupleCacheCostCoefficientReadRows() {
return backendCfg_.tuple_cache_cost_coefficient_read_rows;
}
public int getMaxStmtMetadataLoaderThreads() {
return backendCfg_.max_stmt_metadata_loader_threads;
}
}

View File

@@ -92,13 +92,24 @@ public class FrontendProfile {
/**
* Create a new profile, setting it as the current thread-local profile for the
* length of the current scope. This is meant to be used in a try-with-resources
* statement. Supports at most one scope per thread. No nested scopes are currently
* allowed.
* statement. Nested scopes are only supported if they use the same FrontendProfile
* instance.
*/
public static Scope createNewWithScope() {
return new Scope(new FrontendProfile());
}
/**
* Create a new scope with an existing profile. This is meant to be used in a
* try-with-resources statement. Nested scopes are only supported if they use the same
* FrontendProfile instance.
* @param profile existing profile from main thread to use in the new scope.
* @return the new scope.
*/
public static Scope newScopeWithExistingProfile(FrontendProfile profile) {
return new Scope(profile);
}
/**
* Get the profile attached to the current thread, throw IllegalStateException if there
* is none.
@@ -255,7 +266,11 @@ public class FrontendProfile {
private Scope(FrontendProfile profile) {
oldThreadLocalValue_ = THREAD_LOCAL.get();
// TODO: remove when allowing nested scopes.
Preconditions.checkState(oldThreadLocalValue_ == null);
if (oldThreadLocalValue_ != null) {
Preconditions.checkState(oldThreadLocalValue_ == profile,
"Nested FrontendProfile scopes only supported if they use the same "
+ "FrontendProfile instance");
}
THREAD_LOCAL.set(profile);
}

View File

@@ -34,8 +34,8 @@ import java.util.stream.Collectors;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.impala.analysis.TableName;
import org.apache.impala.catalog.FileDescriptor;
import org.apache.impala.catalog.HdfsPartition;
import org.apache.impala.catalog.local.CatalogdMetaProvider.SizeOfWeigher;
import org.apache.impala.catalog.local.MetaProvider.PartitionMetadata;
import org.apache.impala.catalog.local.MetaProvider.PartitionRef;
@@ -61,7 +61,6 @@ import org.apache.impala.thrift.TTable;
import org.apache.impala.util.ListMap;
import org.apache.impala.util.TByteBuffer;
import org.apache.thrift.TConfiguration;
import org.apache.thrift.transport.TTransportException;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
@@ -389,6 +388,54 @@ public class CatalogdMetaProviderTest {
assertTrue(counters.containsKey("CatalogFetch.StorageLoad.Time"));
}
@Test
public void testProfileParallelLoad() throws Exception {
FrontendProfile profile;
// All of these table has not been loaded yet.
List<TableName> tables = new ArrayList<>();
tables.add(new TableName("functional", "alltypesnopart"));
tables.add(new TableName("functional", "alltypessmall"));
tables.add(new TableName("functional", "alltypestiny"));
tables.add(new TableName("functional", "alltypesagg"));
try (FrontendProfile.Scope scope = FrontendProfile.createNewWithScope()) {
profile = FrontendProfile.getCurrent();
assertNotNull(profile);
tables.parallelStream().forEach(t -> {
try (FrontendProfile.Scope s =
FrontendProfile.newScopeWithExistingProfile(profile)) {
// Load the table. This will create a Table miss.
Pair<Table, TableMetaRef> pair = provider_.loadTable(t.getDb(), t.getTbl());
// Load all partition ids. This will create a PartitionLists miss.
List<PartitionRef> allRefs = provider_.loadPartitionList(pair.second);
// Load all partitions. This will create one partition miss per partition.
loadPartitions(pair.second, allRefs);
} catch (Exception e) { throw new RuntimeException(e); }
});
}
TRuntimeProfileNode prof = profile.emitAsThrift();
Map<String, TCounter> counters = Maps.uniqueIndex(prof.counters, TCounter::getName);
assertEquals(prof.counters.toString(), 16, counters.size());
assertEquals(0, counters.get("CatalogFetch.Tables.Hits").getValue());
assertEquals(4, counters.get("CatalogFetch.Tables.Misses").getValue());
assertEquals(4, counters.get("CatalogFetch.Tables.Requests").getValue());
assertTrue(counters.containsKey("CatalogFetch.Tables.Time"));
assertEquals(0, counters.get("CatalogFetch.PartitionLists.Hits").getValue());
assertEquals(4, counters.get("CatalogFetch.PartitionLists.Misses").getValue());
assertEquals(4, counters.get("CatalogFetch.PartitionLists.Requests").getValue());
assertTrue(counters.containsKey("CatalogFetch.PartitionLists.Time"));
assertEquals(0, counters.get("CatalogFetch.Partitions.Hits").getValue());
assertEquals(20, counters.get("CatalogFetch.Partitions.Misses").getValue());
assertEquals(20, counters.get("CatalogFetch.Partitions.Requests").getValue());
assertTrue(counters.containsKey("CatalogFetch.Partitions.Time"));
assertTrue(counters.containsKey("CatalogFetch.RPCs.Bytes"));
assertTrue(counters.containsKey("CatalogFetch.RPCs.Time"));
// 3 RPCs per table: one for fetching table, one for partition list,
// and the other one for fetching partitions.
assertEquals(12, counters.get("CatalogFetch.RPCs.Requests").getValue());
// Should contains StorageLoad.Time since we have loaded partitions from catalogd.
assertTrue(counters.containsKey("CatalogFetch.StorageLoad.Time"));
}
@Test
public void testPiggybackSuccess() throws Exception {
// TODO: investigate the cause of flakiness (IMPALA-8794)