IMPALA-14437: Fix regression in FileMetadataLoader.createFd()

IMPALA-14349 caused a regression due to change in
FileMetadataLoader.createFd(). When default FS is S3, all files is S3
should not have any FileBlock. However, after IMPALA-14349, CTAS query
that scans functional.alltypes table in S3 hit following Preconditions
in HdfsScanNode.java:

  if (!fsHasBlocks) {
    Preconditions.checkState(fileDesc.getNumFileBlocks() == 0);

This is because FileMetadataLoader.createFd() skip checking if the
originating FileSystem support supportsStorageIds() or not. S3
dataloading from HDFS snapshot consistently failed due this regression.

This patch fix the issue by restoring FileMetadataLoader.createFd() to
its state before IMPALA-14349. It also make
FileMetadataLoader.createFd() calls more consistent by not allowing null
parameters except for 'absPath' that is only not null for Iceberg data
files. Generalize numUnknownDiskIds parameter from Reference<Long> to
AtomicLong for parallel usage.

Testing:
Pass dataloading, FE_TEST, EE_TEST, and CLUSTER_TEST in S3.

Change-Id: Ie16c5d7b020a59b5937b52dfbf66175ac94f60cd
Reviewed-on: http://gerrit.cloudera.org:8080/23423
Reviewed-by: Zoltan Borok-Nagy <boroknagyz@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Riza Suminto
2025-09-13 22:04:13 -07:00
parent ab717b8fcf
commit 68ab52f2c7
5 changed files with 33 additions and 34 deletions

View File

@@ -33,6 +33,7 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
@@ -65,7 +66,6 @@ import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.common.PrintUtils;
import org.apache.impala.common.Reference;
import org.apache.impala.fb.FbFileBlock;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TColumn;
@@ -826,7 +826,7 @@ public interface FeIcebergTable extends FeFsTable {
FileStatus fileStatus, Table iceApiTable,
boolean requiresDataFilesInTableLocation,
ListMap<TNetworkAddress> hostIndex) throws IOException {
Reference<Long> numUnknownDiskIds = new Reference<>(0L);
AtomicLong numUnknownDiskIds = new AtomicLong(0);
String absPath = null;
Path tableLoc = new Path(iceApiTable.location());

View File

@@ -20,6 +20,7 @@ package org.apache.impala.catalog;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@@ -29,7 +30,6 @@ import com.google.common.collect.Sets;
import com.google.flatbuffers.FlatBufferBuilder;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.impala.common.Reference;
import org.apache.impala.fb.FbFileBlock;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.util.ListMap;
@@ -57,7 +57,7 @@ public class FileBlock {
FlatBufferBuilder fbb,
BlockLocation loc,
ListMap<TNetworkAddress> hostIndex,
Reference<Long> numUnknownDiskIds) throws IOException {
AtomicLong numUnknownDiskIds) throws IOException {
Preconditions.checkNotNull(fbb);
Preconditions.checkNotNull(loc);
Preconditions.checkNotNull(hostIndex);
@@ -123,8 +123,7 @@ public class FileBlock {
*/
private static short[] createDiskIds(
BlockLocation location,
Reference<Long> numUnknownDiskIds) throws IOException {
long unknownDiskIdCount = 0;
AtomicLong numUnknownDiskIds) throws IOException {
String[] storageIds = location.getStorageIds();
String[] hosts = location.getHosts();
if (storageIds.length != hosts.length) {
@@ -139,13 +138,11 @@ public class FileBlock {
for (int i = 0; i < storageIds.length; ++i) {
if (Strings.isNullOrEmpty(storageIds[i])) {
diskIDs[i] = (short) -1;
++unknownDiskIdCount;
numUnknownDiskIds.incrementAndGet();
} else {
diskIDs[i] = DiskIdMapper.INSTANCE.getDiskId(hosts[i], storageIds[i]);
}
}
long count = numUnknownDiskIds.getRef() + unknownDiskIdCount;
numUnknownDiskIds.setRef(Long.valueOf(count));
return diskIDs;
}

View File

@@ -29,12 +29,14 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.impala.common.Reference;
import org.apache.impala.fb.FbCompression;
import org.apache.impala.fb.FbFileBlock;
import org.apache.impala.fb.FbFileDesc;
@@ -128,8 +130,8 @@ public class FileDescriptor implements Comparable<FileDescriptor> {
ListMap<TNetworkAddress> hostIndex,
boolean isEncrypted,
boolean isEc,
Reference<Long> numUnknownDiskIds,
String absPath) throws IOException {
AtomicLong numUnknownDiskIds,
@Nullable String absPath) throws IOException {
FlatBufferBuilder fbb = new FlatBufferBuilder(1);
int[] fbFileBlockOffsets;
if (blockLocations == null) {
@@ -160,7 +162,7 @@ public class FileDescriptor implements Comparable<FileDescriptor> {
* resides in a filesystem that doesn't support the BlockLocation API (e.g. S3).
*/
public static FileDescriptor createWithNoBlocks(
FileStatus fileStatus, String relPath, String absPath) {
FileStatus fileStatus, String relPath, @Nullable String absPath) {
FlatBufferBuilder fbb = new FlatBufferBuilder(1);
return new FileDescriptor(
createFbFileDesc(fbb, fileStatus, relPath, null, false, false, absPath));
@@ -179,7 +181,7 @@ public class FileDescriptor implements Comparable<FileDescriptor> {
int[] fbFileBlockOffsets,
boolean isEncrypted,
boolean isEc,
String absPath) {
@Nullable String absPath) {
int relPathOffset = fbb.createString(relPath == null ? StringUtils.EMPTY : relPath);
// A negative block vector offset is used when no block offsets are specified.
int blockVectorOffset = -1;

View File

@@ -32,7 +32,6 @@ import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.impala.catalog.FeFsTable.FileMetadataStats;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.Reference;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.util.AcidUtils;
import org.apache.impala.util.HudiUtil;
@@ -45,6 +44,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
@@ -206,7 +206,7 @@ public class FileMetadataLoader {
loadedFds_ = new ArrayList<>();
if (fileStatuses == null) return;
Reference<Long> numUnknownDiskIds = new Reference<>(0L);
AtomicLong numUnknownDiskIds = new AtomicLong(0);
if (writeIds_ != null) {
fileStatuses = AcidUtils.filterFilesForAcidState(fileStatuses, partPath,
@@ -243,7 +243,7 @@ public class FileMetadataLoader {
}
}
}
loadStats_.unknownDiskIds += numUnknownDiskIds.getRef();
loadStats_.unknownDiskIds += numUnknownDiskIds.get();
if (LOG.isTraceEnabled()) {
LOG.trace(loadStats_.debugString());
}
@@ -258,7 +258,7 @@ public class FileMetadataLoader {
* Return fd created by the given fileStatus or from the cache(oldFdsByPath_).
*/
protected FileDescriptor getFileDescriptor(FileSystem fs, boolean listWithLocations,
Reference<Long> numUnknownDiskIds, FileStatus fileStatus, Path partPath)
AtomicLong numUnknownDiskIds, FileStatus fileStatus, Path partPath)
throws IOException {
String relPath = FileSystemUtil.relativizePath(fileStatus.getPath(), partPath);
FileDescriptor fd = oldFdsByPath_.get(relPath);
@@ -305,15 +305,16 @@ public class FileMetadataLoader {
* Iceberg tables may not be in the table location.
*/
protected FileDescriptor createFd(FileSystem fs, FileStatus fileStatus,
String relPath, Reference<Long> numUnknownDiskIds, String absPath)
String relPath, AtomicLong numUnknownDiskIds, @Nullable String absPath)
throws IOException {
if (!FileSystemUtil.supportsStorageIds(fs)) {
return FileDescriptor.createWithNoBlocks(fileStatus, relPath, absPath);
}
BlockLocation[] locations;
if (fileStatus instanceof LocatedFileStatus) {
locations = ((LocatedFileStatus) fileStatus).getBlockLocations();
} else if (FileSystemUtil.supportsStorageIds(fs)) {
locations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
} else {
return FileDescriptor.createWithNoBlocks(fileStatus, relPath, absPath);
locations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
}
return FileDescriptor.create(fileStatus, relPath, locations, hostIndex_,
fileStatus.isEncrypted(), fileStatus.isErasureCoded(), numUnknownDiskIds,
@@ -321,7 +322,7 @@ public class FileMetadataLoader {
}
private FileDescriptor createFd(FileSystem fs, FileStatus fileStatus,
String relPath, Reference<Long> numUnknownDiskIds) throws IOException {
String relPath, AtomicLong numUnknownDiskIds) throws IOException {
return createFd(fs, fileStatus, relPath, numUnknownDiskIds, null);
}

View File

@@ -51,7 +51,6 @@ import org.apache.impala.catalog.FeIcebergTable.Utils;
import org.apache.impala.catalog.iceberg.GroupedContentFiles;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.PrintUtils;
import org.apache.impala.common.Reference;
import org.apache.impala.common.Pair;
import org.apache.impala.thrift.TIcebergPartition;
import org.apache.impala.thrift.TNetworkAddress;
@@ -133,6 +132,7 @@ public class IcebergFileMetadataLoader extends FileMetadataLoader {
List<ContentFile<?>> filesSupportsStorageIds = Lists.newArrayList();
FileSystem fsForTable = FileSystemUtil.getFileSystemForPath(tablePath_);
FileSystem defaultFs = FileSystemUtil.getDefaultFileSystem();
AtomicLong numUnknownDiskIds = new AtomicLong();
for (ContentFile<?> contentFile : newContentFiles) {
FileSystem fsForPath = fsForTable;
// If requiresDataFilesInTableLocation_ is true, we assume that the file system
@@ -147,11 +147,11 @@ public class IcebergFileMetadataLoader extends FileMetadataLoader {
if (FileSystemUtil.supportsStorageIds(fsForPath)) {
filesSupportsStorageIds.add(contentFile);
} else {
IcebergFileDescriptor fd = createNonLocatedFd(fsForPath, contentFile, tablePath_);
IcebergFileDescriptor fd =
createNonLocatedFd(fsForPath, contentFile, tablePath_, numUnknownDiskIds);
registerNewlyLoadedFd(fd);
}
}
AtomicLong numUnknownDiskIds = new AtomicLong();
List<IcebergFileDescriptor> newFds = parallelListing(filesSupportsStorageIds,
numUnknownDiskIds);
for (IcebergFileDescriptor fd : newFds) {
@@ -203,7 +203,8 @@ public class IcebergFileMetadataLoader extends FileMetadataLoader {
}
private IcebergFileDescriptor createNonLocatedFd(FileSystem fs,
ContentFile<?> contentFile, Path partPath) throws CatalogException, IOException {
ContentFile<?> contentFile, Path partPath, AtomicLong numUnknownDiskIds)
throws CatalogException, IOException {
Path fileLoc = FileSystemUtil.createFullyQualifiedPath(
new Path(contentFile.path().toString()));
// For OSS service (e.g. S3A, COS, OSS, etc), we create FileStatus ourselves.
@@ -215,12 +216,12 @@ public class IcebergFileMetadataLoader extends FileMetadataLoader {
int partitionId = addPartitionInfo(contentFile);
return IcebergFileDescriptor.cloneWithFileMetadata(
createFd(fs, stat, relPath, null, absPath),
createFd(fs, stat, relPath, numUnknownDiskIds, absPath),
IcebergUtil.createIcebergMetadata(iceTbl_, contentFile, partitionId));
}
private IcebergFileDescriptor createLocatedFd(ContentFile<?> contentFile,
FileStatus stat, Path partPath, Reference<Long> numUnknownDiskIds)
private IcebergFileDescriptor createLocatedFd(FileSystem fs, ContentFile<?> contentFile,
FileStatus stat, Path partPath, AtomicLong numUnknownDiskIds)
throws CatalogException, IOException {
Preconditions.checkState(stat instanceof LocatedFileStatus);
@@ -230,7 +231,7 @@ public class IcebergFileMetadataLoader extends FileMetadataLoader {
int partitionId = addPartitionInfo(contentFile);
return IcebergFileDescriptor.cloneWithFileMetadata(
createFd(null, stat, relPath, numUnknownDiskIds, absPath),
createFd(fs, stat, relPath, numUnknownDiskIds, absPath),
IcebergUtil.createIcebergMetadata(iceTbl_, contentFile, partitionId));
}
@@ -332,7 +333,6 @@ public class IcebergFileMetadataLoader extends FileMetadataLoader {
pathToFileStatus.put(status.getPath(), status);
}
List<IcebergFileDescriptor> ret = new ArrayList<>();
Reference<Long> localNumUnknownDiskIds = new Reference<>(0L);
for (ContentFile<?> contentFile : contentFiles) {
Path path = FileSystemUtil.createFullyQualifiedPath(
new Path(contentFile.path().toString()));
@@ -343,9 +343,8 @@ public class IcebergFileMetadataLoader extends FileMetadataLoader {
contentFile.path().toString()));
continue;
}
ret.add(createLocatedFd(contentFile, stat, tablePath_, localNumUnknownDiskIds));
ret.add(createLocatedFd(fs, contentFile, stat, tablePath_, numUnknownDiskIds));
}
numUnknownDiskIds.addAndGet(localNumUnknownDiskIds.getRef());
return ret;
}