IMPALA-14349: Encode FileDescriptors in time in loading Iceberg Tables

With this patch we create Iceberg file descriptors from
LocatedFileStatus objects during IcebergFileMetadataLoader's
parallelListing(). This has the following benefits:
 * We parallelize the creation of Iceberg file descriptor objects
 * We don't need to maintain a large hash map with all the
   LocatedFileStatus objects at once. Now we only need to keep a few
   LocatedFileStatus objects per partition in memory while we are
   converting them to Iceberg file descriptors. I.e., the GC is free to
   destroy the LocatedFileStatus objects we don't use anymore.

This patch retires startup flag 'iceberg_reload_new_files_threshold'.
Since IMPALA-13254 we only list partitions that have new data files,
and we load them in parallel, i.e. efficient incremental table loading
is already covered. From that point the startup flag only added
unnecessary code complexity.

Measurements

I created two tables (from tpcds.store_sales) to measure table loading
times for large tables:

Table #1:
  PARTITIONED BY SPEC(ss_item_sk, BUCKET(5, ss_sold_time_sk))
  partitions: 107818
  files: 754726

Table #2:
  PARTITIONED BY SPEC(ss_item_sk)
  partitions: 18000
  files: 504224

Time taken in IcebergFileMetadataLoader.load() during full table reload:
+----------+-------+------+---------+
|          | Base  | New  | Speedup |
+----------+-------+------+---------+
| Table #1 | 17.3s | 8.1s |    2.14 |
| Table #2 |  7.8s | 4.3s |     1.8 |
+----------+-------+------+---------+

I measured incremental table loading only for Table #2 (since there are
more files per partition this is the worse scenario for the new code, as
it only uses file listings, and each new file were created in a separate
partition)

Time taken in IcebergFileMetadataLoader.load() during incremental table
reload:
+------------+------+------+---------+
| #new files | Base | New  | Speedup |
+------------+------+------+---------+
|          1 | 1.4s | 1.6s |     0.9 |
|        100 | 1.5s | 1.9s |     0.8 |
|        200 | 1.5s | 1.5s |       1 |
+------------+------+------+---------+

We lose a few tenths of a second, but I think the simplified code
justifies it.

Testing:
 * some tests were updated because we we don't have
   startup flag 'iceberg_reload_new_files_threshold' anymore

Change-Id: Ia1c2a7119d76db7ce7c43caec2ccb122a014851b
Reviewed-on: http://gerrit.cloudera.org:8080/23363
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Zoltan Borok-Nagy
2025-08-29 09:37:20 +02:00
committed by Impala Public Jenkins
parent dbac6ab13a
commit 711797e7fb
9 changed files with 125 additions and 218 deletions

View File

@@ -416,11 +416,6 @@ DEFINE_int64(update_catalogd_rpc_resend_interval_ms, 100, "(Advanced) Interval (
"with which the statestore resends the update catalogd RPC to a subscriber if the "
"statestore has failed to send the RPC to the subscriber.");
DEFINE_int32(iceberg_reload_new_files_threshold, 100, "(Advanced) If during a table "
"refresh the number of new files are greater than this, catalogd will use a "
"recursive file listing to load file metadata. If number of new files are less or "
"equal to this, catalogd will load the file metadata one by one.");
DEFINE_bool(iceberg_allow_datafiles_in_table_location_only, true, "If true, Impala "
"does not allow Iceberg data file locations outside of the table directory during "
"reads");
@@ -500,6 +495,7 @@ REMOVED_FLAG(enable_partitioned_aggregation);
REMOVED_FLAG(enable_partitioned_hash_join);
REMOVED_FLAG(enable_phj_probe_side_filtering);
REMOVED_FLAG(enable_rm);
REMOVED_FLAG(iceberg_reload_new_files_threshold);
REMOVED_FLAG(kerberos_reinit_interval);
REMOVED_FLAG(ldap_manual_config);
REMOVED_FLAG(llama_addresses);

View File

@@ -109,7 +109,6 @@ DECLARE_bool(enable_reload_events);
DECLARE_string(geospatial_library);
DECLARE_string(file_metadata_reload_properties);
DECLARE_string(java_weigher);
DECLARE_int32(iceberg_reload_new_files_threshold);
DECLARE_bool(enable_skipping_older_events);
DECLARE_bool(enable_json_scanner);
DECLARE_bool(iceberg_allow_datafiles_in_table_location_only);
@@ -494,7 +493,6 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
cfg.__set_thrift_rpc_max_message_size(ThriftInternalRpcMaxMessageSize());
cfg.__set_scan_range_cost_factor(FLAGS_scan_range_cost_factor);
cfg.__set_use_jamm_weigher(FLAGS_java_weigher == "jamm");
cfg.__set_iceberg_reload_new_files_threshold(FLAGS_iceberg_reload_new_files_threshold);
cfg.__set_enable_skipping_older_events(FLAGS_enable_skipping_older_events);
cfg.__set_enable_json_scanner(FLAGS_enable_json_scanner);
cfg.__set_iceberg_allow_datafiles_in_table_location_only(

View File

@@ -260,8 +260,6 @@ struct TBackendGflags {
114: required bool use_jamm_weigher
115: required i32 iceberg_reload_new_files_threshold
116: required bool enable_skipping_older_events;
117: required bool enable_json_scanner

View File

@@ -307,14 +307,13 @@ public class FileMetadataLoader {
protected FileDescriptor createFd(FileSystem fs, FileStatus fileStatus,
String relPath, Reference<Long> numUnknownDiskIds, String absPath)
throws IOException {
if (!FileSystemUtil.supportsStorageIds(fs)) {
return FileDescriptor.createWithNoBlocks(fileStatus, relPath, absPath);
}
BlockLocation[] locations;
if (fileStatus instanceof LocatedFileStatus) {
locations = ((LocatedFileStatus) fileStatus).getBlockLocations();
} else {
} else if (FileSystemUtil.supportsStorageIds(fs)) {
locations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
} else {
return FileDescriptor.createWithNoBlocks(fileStatus, relPath, absPath);
}
return FileDescriptor.create(fileStatus, relPath, locations, hostIndex_,
fileStatus.isEncrypted(), fileStatus.isErasureCoded(), numUnknownDiskIds,

View File

@@ -17,32 +17,30 @@
package org.apache.impala.catalog;
import static org.apache.impala.catalog.ParallelFileMetadataLoader.
MAX_HDFS_PARTITIONS_PARALLEL_LOAD;
import static org.apache.impala.catalog.ParallelFileMetadataLoader.TOTAL_THREADS;
import static org.apache.impala.catalog.ParallelFileMetadataLoader.createPool;
import static org.apache.impala.catalog.ParallelFileMetadataLoader.getPoolSize;
import com.google.common.annotations.VisibleForTesting;
import com.codahale.metrics.Clock;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.iceberg.ContentFile;
@@ -50,9 +48,9 @@ import org.apache.impala.catalog.FeFsTable.FileMetadataStats;
import org.apache.impala.catalog.FeIcebergTable.Utils;
import org.apache.impala.catalog.iceberg.GroupedContentFiles;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.PrintUtils;
import org.apache.impala.common.Reference;
import org.apache.impala.common.Pair;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.util.IcebergUtil;
import org.apache.impala.util.ListMap;
@@ -68,43 +66,20 @@ public class IcebergFileMetadataLoader extends FileMetadataLoader {
private final static Logger LOG = LoggerFactory.getLogger(
IcebergFileMetadataLoader.class);
// Default value of 'newFilesThreshold_' if the given parameter or startup flag have
// invalid value.
private final int NEW_FILES_THRESHOLD_DEFAULT = 100;
private final org.apache.iceberg.Table iceTbl_;
// If there are more new files than 'newFilesThreshold_', we should fall back
// to regular file metadata loading.
private final int newFilesThreshold_;
private final Path tablePath_;
private final GroupedContentFiles icebergFiles_;
private final boolean requiresDataFilesInTableLocation_;
private boolean useParallelListing_;
public IcebergFileMetadataLoader(org.apache.iceberg.Table iceTbl,
Iterable<IcebergFileDescriptor> oldFds, ListMap<TNetworkAddress> hostIndex,
GroupedContentFiles icebergFiles, boolean requiresDataFilesInTableLocation) {
this(iceTbl, oldFds, hostIndex, icebergFiles, requiresDataFilesInTableLocation,
BackendConfig.INSTANCE.icebergReloadNewFilesThreshold());
}
public IcebergFileMetadataLoader(org.apache.iceberg.Table iceTbl,
Iterable<IcebergFileDescriptor> oldFds, ListMap<TNetworkAddress> hostIndex,
GroupedContentFiles icebergFiles, boolean requiresDataFilesInTableLocation,
int newFilesThresholdParam) {
super(iceTbl.location(), true, oldFds, hostIndex, null, null,
HdfsFileFormat.ICEBERG);
iceTbl_ = iceTbl;
tablePath_ = FileSystemUtil.createFullyQualifiedPath(new Path(iceTbl.location()));
icebergFiles_ = icebergFiles;
requiresDataFilesInTableLocation_ = requiresDataFilesInTableLocation;
if (newFilesThresholdParam >= 0) {
newFilesThreshold_ = newFilesThresholdParam;
} else {
newFilesThreshold_ = NEW_FILES_THRESHOLD_DEFAULT;
LOG.warn("Ignoring invalid new files threshold: {} " +
"using value: {}", newFilesThresholdParam, newFilesThreshold_);
}
}
@Override
@@ -127,18 +102,17 @@ public class IcebergFileMetadataLoader extends FileMetadataLoader {
}
private void loadInternal() throws CatalogException, IOException {
Path partPath = FileSystemUtil.createFullyQualifiedPath(new Path(partDir_));
loadedFds_ = new ArrayList<>();
loadStats_ = new LoadStats(partDir_);
fileMetadataStats_ = new FileMetadataStats();
// Process the existing Fd ContentFile and return the newly added ContentFile
Iterable<ContentFile<?>> newContentFiles = loadContentFilesWithOldFds(partPath);
Iterable<ContentFile<?>> newContentFiles = loadContentFilesWithOldFds(tablePath_);
// Iterate through all the newContentFiles, determine if StorageIds are supported,
// and use different handling methods accordingly.
// This considers that different ContentFiles are on different FileSystems
List<Pair<FileSystem, ContentFile<?>>> filesSupportsStorageIds = Lists.newArrayList();
FileSystem fsForTable = FileSystemUtil.getFileSystemForPath(partPath);
List<ContentFile<?>> filesSupportsStorageIds = Lists.newArrayList();
FileSystem fsForTable = FileSystemUtil.getFileSystemForPath(tablePath_);
FileSystem defaultFs = FileSystemUtil.getDefaultFileSystem();
for (ContentFile<?> contentFile : newContentFiles) {
FileSystem fsForPath = fsForTable;
@@ -152,56 +126,28 @@ public class IcebergFileMetadataLoader extends FileMetadataLoader {
// If the specific fs does not support StorageIds, then
// we create FileDescriptor directly
if (FileSystemUtil.supportsStorageIds(fsForPath)) {
filesSupportsStorageIds.add(Pair.create(fsForPath, contentFile));
filesSupportsStorageIds.add(contentFile);
} else {
IcebergFileDescriptor fd = createFd(fsForPath, contentFile, null, partPath, null);
loadedFds_.add(fd);
fileMetadataStats_.accumulate(fd);
++loadStats_.loadedFiles;
IcebergFileDescriptor fd = createNonLocatedFd(fsForPath, contentFile, tablePath_);
registerNewlyLoadedFd(fd);
}
}
// If the number of filesSupportsStorageIds are greater than newFilesThreshold,
// we will use a recursive file listing to load file metadata. If number of new
// files are less or equal to this, we will load the metadata of the newly added
// files one by one
useParallelListing_ = filesSupportsStorageIds.size() > newFilesThreshold_;
Reference<Long> numUnknownDiskIds = new Reference<>(0L);
Map<Path, FileStatus> nameToFileStatus = Collections.emptyMap();
if (useParallelListing_) {
nameToFileStatus = parallelListing(filesSupportsStorageIds);
AtomicLong numUnknownDiskIds = new AtomicLong();
List<IcebergFileDescriptor> newFds = parallelListing(filesSupportsStorageIds,
numUnknownDiskIds);
for (IcebergFileDescriptor fd : newFds) {
registerNewlyLoadedFd(fd);
}
for (Pair<FileSystem, ContentFile<?>> contentFileInfo : filesSupportsStorageIds) {
Path path = FileSystemUtil.createFullyQualifiedPath(
new Path(contentFileInfo.getSecond().path().toString()));
FileStatus stat = nameToFileStatus.get(path);
loadFdFromStorage(contentFileInfo, stat, partPath, numUnknownDiskIds);
}
loadStats_.unknownDiskIds += numUnknownDiskIds.getRef();
loadStats_.unknownDiskIds += numUnknownDiskIds.get();
if (LOG.isTraceEnabled()) {
LOG.trace(loadStats_.debugString());
}
}
private void loadFdFromStorage(Pair<FileSystem, ContentFile<?>> contentFileInfo,
FileStatus stat, Path partPath, Reference<Long> numUnknownDiskIds)
throws CatalogException {
try {
IcebergFileDescriptor fd = createFd(contentFileInfo.getFirst(),
contentFileInfo.getSecond(), stat, partPath, numUnknownDiskIds);
loadedFds_.add(fd);
++loadStats_.loadedFiles;
fileMetadataStats_.accumulate(fd);
} catch (IOException e) {
StringWriter w = new StringWriter();
e.printStackTrace(new PrintWriter(w));
LOG.warn(String.format("Failed to load Iceberg content file: '%s' Caused by: %s",
contentFileInfo.getSecond().path().toString(), w));
}
}
@VisibleForTesting
boolean useParallelListing() {
return useParallelListing_;
private void registerNewlyLoadedFd(IcebergFileDescriptor fd) {
loadedFds_.add(fd);
fileMetadataStats_.accumulate(fd);
++loadStats_.loadedFiles;
}
/**
@@ -227,20 +173,38 @@ public class IcebergFileMetadataLoader extends FileMetadataLoader {
return newContentFiles;
}
private IcebergFileDescriptor createFd(FileSystem fs, ContentFile<?> contentFile,
private IcebergFileDescriptor createNonLocatedFd(FileSystem fs,
ContentFile<?> contentFile, Path partPath) throws CatalogException, IOException {
Path fileLoc = FileSystemUtil.createFullyQualifiedPath(
new Path(contentFile.path().toString()));
// For OSS service (e.g. S3A, COS, OSS, etc), we create FileStatus ourselves.
FileStatus stat = Utils.createFileStatus(contentFile, fileLoc);
Pair<String, String> absPathRelPath = getAbsPathRelPath(partPath, stat);
String absPath = absPathRelPath.first;
String relPath = absPathRelPath.second;
return IcebergFileDescriptor.cloneWithFileMetadata(
createFd(fs, stat, relPath, null, absPath),
IcebergUtil.createIcebergMetadata(iceTbl_, contentFile));
}
private IcebergFileDescriptor createLocatedFd(ContentFile<?> contentFile,
FileStatus stat, Path partPath, Reference<Long> numUnknownDiskIds)
throws CatalogException, IOException {
if (stat == null) {
Path fileLoc = FileSystemUtil.createFullyQualifiedPath(
new Path(contentFile.path().toString()));
if (FileSystemUtil.supportsStorageIds(fs)) {
stat = Utils.createLocatedFileStatus(fileLoc, fs);
} else {
// For OSS service (e.g. S3A, COS, OSS, etc), we create FileStatus ourselves.
stat = Utils.createFileStatus(contentFile, fileLoc);
}
}
Preconditions.checkState(stat instanceof LocatedFileStatus);
Pair<String, String> absPathRelPath = getAbsPathRelPath(partPath, stat);
String absPath = absPathRelPath.first;
String relPath = absPathRelPath.second;
return IcebergFileDescriptor.cloneWithFileMetadata(
createFd(null, stat, relPath, numUnknownDiskIds, absPath),
IcebergUtil.createIcebergMetadata(iceTbl_, contentFile));
}
Pair<String, String> getAbsPathRelPath(Path partPath, FileStatus stat)
throws TableLoadingException {
String absPath = null;
String relPath = FileSystemUtil.relativizePathNoThrow(stat.getPath(), partPath);
if (relPath == null) {
@@ -251,70 +215,100 @@ public class IcebergFileMetadataLoader extends FileMetadataLoader {
absPath = stat.getPath().toString();
}
}
return IcebergFileDescriptor.cloneWithFileMetadata(
createFd(fs, stat, relPath, numUnknownDiskIds, absPath),
IcebergUtil.createIcebergMetadata(iceTbl_, contentFile));
return new Pair<>(absPath, relPath);
}
/**
* Using a thread pool to perform parallel List operations on the FileSystem, this takes
* into account the situation where multiple FileSystems exist within the ContentFiles.
*/
private Map<Path, FileStatus> parallelListing(
Iterable<Pair<FileSystem, ContentFile<?>>> contentFiles) throws IOException {
final Set<Path> partitionPaths = collectPartitionPaths(contentFiles);
if (partitionPaths.size() == 0) return Collections.emptyMap();
private List<IcebergFileDescriptor> parallelListing(
List<ContentFile<?>> contentFiles,
AtomicLong numUnknownDiskIds) throws IOException {
final Map<Path, List<ContentFile<?>>> partitionPaths =
collectPartitionPaths(contentFiles);
if (partitionPaths.isEmpty()) return Collections.emptyList();
List<IcebergFileDescriptor> ret = new ArrayList<>();
String logPrefix = "Parallel Iceberg file metadata listing";
// Use the file system type of the table's root path as
// the basis for determining the pool size.
int poolSize = getPoolSize(partitionPaths.size(),
FileSystemUtil.getFileSystemForPath(partDir_));
int poolSize = getPoolSize(partitionPaths.size());
ExecutorService pool = createPool(poolSize, logPrefix);
TOTAL_THREADS.addAndGet(poolSize);
Map<Path, FileStatus> nameToFileStatus = Maps.newConcurrentMap();
try (ThreadNameAnnotator tna = new ThreadNameAnnotator(logPrefix)) {
TOTAL_TASKS.addAndGet(partitionPaths.size());
List<Future<Void>> tasks =
partitionPaths.stream()
.map(path -> pool.submit(() -> {
List<Future<List<IcebergFileDescriptor>>> tasks =
partitionPaths.entrySet().stream()
.map(entry -> pool.submit(() -> {
try {
return listingTask(path, nameToFileStatus);
return createFdsForPartition(entry.getKey(), entry.getValue(),
numUnknownDiskIds);
} finally {
TOTAL_TASKS.decrementAndGet();
}
}))
.collect(Collectors.toList());
for (Future<Void> task : tasks) { task.get(); }
for (Future<List<IcebergFileDescriptor>> task : tasks) {
ret.addAll(task.get());
}
} catch (ExecutionException | InterruptedException e) {
throw new IOException(String.format("%s: failed to load paths.", logPrefix), e);
} finally {
TOTAL_THREADS.addAndGet(-poolSize);
pool.shutdown();
}
return nameToFileStatus;
return ret;
}
private Set<Path> collectPartitionPaths(
Iterable<Pair<FileSystem, ContentFile<?>>> contentFiles) {
return StreamSupport.stream(contentFiles.spliterator(), false)
.map(contentFile ->
new Path(String.valueOf(contentFile.getSecond().path())).getParent())
.collect(Collectors.toSet());
private Map<Path, List<ContentFile<?>>> collectPartitionPaths(
List<ContentFile<?>> contentFiles) {
final Clock clock = Clock.defaultClock();
long startTime = clock.getTick();
Map<Path, List<ContentFile<?>>> ret = contentFiles.stream()
.collect(Collectors.groupingBy(
cf -> new Path(String.valueOf(cf.path())).getParent(),
HashMap::new,
Collectors.toList()
));
long duration = clock.getTick() - startTime;
LOG.info("Collected {} Iceberg content files into {} partitions. Duration: {}",
contentFiles.size(), ret.size(), PrintUtils.printTimeNs(duration));
return ret;
}
private Void listingTask(Path partitionPath,
Map<Path, FileStatus> nameToFileStatus) throws IOException {
/**
* Returns thread pool size for listing files in parallel from storage systems that
* provide block location information.
*/
private static int getPoolSize(int numLoaders) {
return Math.min(numLoaders, MAX_HDFS_PARTITIONS_PARALLEL_LOAD);
}
private List<IcebergFileDescriptor> createFdsForPartition(Path partitionPath,
List<ContentFile<?>> contentFiles, AtomicLong numUnknownDiskIds)
throws IOException, CatalogException {
FileSystem fs = FileSystemUtil.getFileSystemForPath(partitionPath);
RemoteIterator<? extends FileStatus> remoteIterator =
FileSystemUtil.listFiles(fs, partitionPath, recursive_, debugAction_);
Map<Path, FileStatus> perThreadMapping = new HashMap<>();
Map<Path, FileStatus> pathToFileStatus = new HashMap<>();
while (remoteIterator.hasNext()) {
FileStatus status = remoteIterator.next();
perThreadMapping.put(status.getPath(), status);
pathToFileStatus.put(status.getPath(), status);
}
nameToFileStatus.putAll(perThreadMapping);
return null;
List<IcebergFileDescriptor> ret = new ArrayList<>();
Reference<Long> localNumUnknownDiskIds = new Reference<>(0L);
for (ContentFile<?> contentFile : contentFiles) {
Path path = FileSystemUtil.createFullyQualifiedPath(
new Path(contentFile.path().toString()));
FileStatus stat = pathToFileStatus.get(path);
if (stat == null) {
LOG.warn(String.format(
"Failed to load Iceberg content file: '%s', Not found on storage",
contentFile.path().toString()));
continue;
}
ret.add(createLocatedFd(contentFile, stat, tablePath_, localNumUnknownDiskIds));
}
numUnknownDiskIds.addAndGet(localNumUnknownDiskIds.getRef());
return ret;
}
IcebergFileDescriptor getOldFd(ContentFile<?> contentFile, Path partPath)

View File

@@ -47,6 +47,7 @@ import org.apache.impala.analysis.IcebergPartitionSpec;
import org.apache.impala.analysis.IcebergPartitionTransform;
import org.apache.impala.catalog.iceberg.GroupedContentFiles;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.common.PrintUtils;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.CatalogLookupStatus;
import org.apache.impala.thrift.TAlterTableUpdateStatsParams;
@@ -513,7 +514,7 @@ public class IcebergTable extends Table implements FeIcebergTable {
((BaseTable)icebergApiTable_).operations().current().metadataFileLocation();
GroupedContentFiles icebergFiles = IcebergUtil.getIcebergFiles(this,
new ArrayList<>(), /*timeTravelSpec=*/null);
catalogTimeline.markEvent("Loaded Iceberg files");
catalogTimeline.markEvent("Loaded Iceberg content file list");
// We use IcebergFileMetadataLoader directly to load file metadata, so we don't
// want 'hdfsTable_' to do any file loading.
hdfsTable_.setSkipIcebergFileMetadataLoading(true);
@@ -527,6 +528,7 @@ public class IcebergTable extends Table implements FeIcebergTable {
getHostIndex(), Preconditions.checkNotNull(icebergFiles),
Utils.requiresDataFilesInTableLocation(this));
loader.load();
catalogTimeline.markEvent("Loaded Iceberg file descriptors");
fileStore_ = new IcebergContentFileStore(
icebergApiTable_, loader.getLoadedIcebergFds(), icebergFiles);
partitionStats_ = Utils.loadPartitionStats(this, icebergFiles);
@@ -539,6 +541,8 @@ public class IcebergTable extends Table implements FeIcebergTable {
} finally {
storageMetadataLoadTime_ = ctxStorageLdTime.stop();
}
LOG.info("Loaded file and block metadata for {}. Time taken: {}",
getFullName(), PrintUtils.printTimeNs(storageMetadataLoadTime_));
}
private boolean canSkipReload() {

View File

@@ -62,7 +62,7 @@ public class ParallelFileMetadataLoader {
private final static Logger LOG = LoggerFactory.getLogger(
ParallelFileMetadataLoader.class);
private static final int MAX_HDFS_PARTITIONS_PARALLEL_LOAD =
public static final int MAX_HDFS_PARTITIONS_PARALLEL_LOAD =
BackendConfig.INSTANCE.maxHdfsPartsParallelLoad();
private static final int MAX_NON_HDFS_PARTITIONS_PARALLEL_LOAD =
BackendConfig.INSTANCE.maxNonHdfsPartsParallelLoad();

View File

@@ -432,10 +432,6 @@ public class BackendConfig {
return backendCfg_.use_jamm_weigher;
}
public int icebergReloadNewFilesThreshold() {
return backendCfg_.iceberg_reload_new_files_threshold;
}
public boolean icebergAllowDatafileInTableLocationOnly() {
return backendCfg_.iceberg_allow_datafiles_in_table_location_only;
}

View File

@@ -212,7 +212,6 @@ public class FileMetadataLoaderTest {
/* oldFds = */ fml1.getLoadedIcebergFds(),
/* requiresDataFilesInTableLocation = */ true);
fml1Refresh.load();
assertFalse(fml1Refresh.useParallelListing());
assertEquals(0, fml1Refresh.getStats().loadedFiles);
assertEquals(20, fml1Refresh.getStats().skippedFiles);
assertEquals(20, fml1Refresh.getLoadedFds().size());
@@ -235,7 +234,6 @@ public class FileMetadataLoaderTest {
/* oldFds = */ fml2.getLoadedIcebergFds(),
/* requiresDataFilesInTableLocation = */ true);
fml2Refresh.load();
assertFalse(fml2Refresh.useParallelListing());
assertEquals(0, fml2Refresh.getStats().loadedFiles);
assertEquals(20, fml2Refresh.getStats().skippedFiles);
assertEquals(20, fml2Refresh.getLoadedFds().size());
@@ -262,7 +260,6 @@ public class FileMetadataLoaderTest {
/* oldFds = */ fml1.getLoadedIcebergFds().subList(0, 10),
/* requiresDataFilesInTableLocation = */ true);
fml1Refresh.load();
assertFalse(fml1Refresh.useParallelListing());
assertEquals(10, fml1Refresh.getStats().loadedFiles);
assertEquals(10, fml1Refresh.getStats().skippedFiles);
assertEquals(20, fml1Refresh.getLoadedFds().size());
@@ -278,75 +275,11 @@ public class FileMetadataLoaderTest {
/* oldFds = */ fml2.getLoadedIcebergFds().subList(0, 10),
/* requiresDataFilesInTableLocation = */ true);
fml2Refresh.load();
assertFalse(fml2Refresh.useParallelListing());
assertEquals(10, fml2Refresh.getStats().loadedFiles);
assertEquals(10, fml2Refresh.getStats().skippedFiles);
assertEquals(20, fml2Refresh.getLoadedFds().size());
}
@Test
public void testIcebergNewFilesThreshold() throws IOException, CatalogException {
CatalogServiceCatalog catalog = CatalogServiceTestCatalog.create();
IcebergFileMetadataLoader fml1 = getLoaderForIcebergTable(catalog,
"functional_parquet", "iceberg_partitioned",
/* oldFds = */ Collections.emptyList(),
/* requiresDataFilesInTableLocation = */ true);
fml1.load();
IcebergFileMetadataLoader fml1ForceRefresh = getLoaderForIcebergTable(catalog,
"functional_parquet", "iceberg_partitioned",
/* oldFds = */ fml1.getLoadedIcebergFds().subList(0, 10),
/* requiresDataFilesInTableLocation = */ true, 10);
fml1ForceRefresh.setForceRefreshBlockLocations(true);
fml1ForceRefresh.load();
assertTrue(fml1ForceRefresh.useParallelListing());
IcebergFileMetadataLoader fml1Refresh = getLoaderForIcebergTable(catalog,
"functional_parquet", "iceberg_partitioned",
/* oldFds = */ fml1.getLoadedIcebergFds().subList(0, 10),
/* requiresDataFilesInTableLocation = */ true, 10);
fml1Refresh.setForceRefreshBlockLocations(false);
fml1Refresh.load();
assertFalse(fml1Refresh.useParallelListing());
IcebergFileMetadataLoader fml1Refresh10 = getLoaderForIcebergTable(catalog,
"functional_parquet", "iceberg_partitioned",
/* oldFds = */ fml1.getLoadedIcebergFds().subList(0, 10),
/* requiresDataFilesInTableLocation = */ true, 10);
fml1Refresh10.load();
assertFalse(fml1Refresh10.useParallelListing());
IcebergFileMetadataLoader fml1Refresh9 = getLoaderForIcebergTable(catalog,
"functional_parquet", "iceberg_partitioned",
/* oldFds = */ fml1.getLoadedIcebergFds().subList(0, 10),
/* requiresDataFilesInTableLocation = */ true, 9);
fml1Refresh9.load();
assertTrue(fml1Refresh9.useParallelListing());
IcebergFileMetadataLoader fml2 = getLoaderForIcebergTable(catalog,
"functional_parquet", "iceberg_non_partitioned",
/* oldFds = */ Collections.emptyList(),
/* requiresDataFilesInTableLocation = */ true);
fml2.load();
IcebergFileMetadataLoader fml2Refresh = getLoaderForIcebergTable(catalog,
"functional_parquet", "iceberg_non_partitioned",
/* oldFds = */ fml2.getLoadedIcebergFds().subList(0, 10),
/* requiresDataFilesInTableLocation = */ true);
IcebergFileMetadataLoader fml2Refresh10 = getLoaderForIcebergTable(catalog,
"functional_parquet", "iceberg_non_partitioned",
/* oldFds = */ fml2.getLoadedIcebergFds().subList(0, 10),
/* requiresDataFilesInTableLocation = */ true, 10);
fml2Refresh10.load();
assertFalse(fml2Refresh10.useParallelListing());
IcebergFileMetadataLoader fml2Refresh9 = getLoaderForIcebergTable(catalog,
"functional_parquet", "iceberg_non_partitioned",
/* oldFds = */ fml2.getLoadedIcebergFds().subList(0, 10),
/* requiresDataFilesInTableLocation = */ true, 9);
fml2Refresh9.load();
assertTrue(fml2Refresh9.useParallelListing());
}
@Test
public void testIcebergMultipleStorageLocations() throws IOException, CatalogException {
CatalogServiceCatalog catalog = CatalogServiceTestCatalog.create();
@@ -362,7 +295,6 @@ public class FileMetadataLoaderTest {
/* oldFds = */ fml1.getLoadedIcebergFds().subList(0, 1),
/* requiresDataFilesInTableLocation = */ false);
fml1Refresh1.load();
assertFalse(fml1Refresh1.useParallelListing());
assertEquals(5, fml1Refresh1.getStats().loadedFiles);
assertEquals(1, fml1Refresh1.getStats().skippedFiles);
assertEquals(6, fml1Refresh1.getLoadedFds().size());
@@ -372,7 +304,6 @@ public class FileMetadataLoaderTest {
/* oldFds = */ fml1.getLoadedIcebergFds().subList(0, 5),
/* requiresDataFilesInTableLocation = */ false);
fml1Refresh5.load();
assertFalse(fml1Refresh5.useParallelListing());
assertEquals(1, fml1Refresh5.getStats().loadedFiles);
assertEquals(5, fml1Refresh5.getStats().skippedFiles);
assertEquals(6, fml1Refresh5.getLoadedFds().size());
@@ -382,15 +313,6 @@ public class FileMetadataLoaderTest {
CatalogServiceCatalog catalog, String dbName, String tblName,
List<IcebergFileDescriptor> oldFds, boolean requiresDataFilesInTableLocation)
throws CatalogException {
return getLoaderForIcebergTable(catalog, dbName, tblName, oldFds,
requiresDataFilesInTableLocation, -1);
}
private IcebergFileMetadataLoader getLoaderForIcebergTable(
CatalogServiceCatalog catalog, String dbName, String tblName,
List<IcebergFileDescriptor> oldFds, boolean requiresDataFilesInTableLocation,
int newFilesThreshold)
throws CatalogException {
ListMap<TNetworkAddress> hostIndex = new ListMap<>();
FeIcebergTable iceT = (FeIcebergTable)catalog.getOrLoadTable(
dbName, tblName, "test", null);
@@ -398,7 +320,7 @@ public class FileMetadataLoaderTest {
GroupedContentFiles iceFiles = IcebergUtil.getIcebergFiles(iceT,
/*predicates=*/Collections.emptyList(), /*timeTravelSpec=*/null);
return new IcebergFileMetadataLoader(iceT.getIcebergApiTable(),
oldFds, hostIndex, iceFiles, requiresDataFilesInTableLocation, newFilesThreshold);
oldFds, hostIndex, iceFiles, requiresDataFilesInTableLocation);
}
private FileMetadataLoader getLoaderForAcidTable(