IMPALA-14138: Manually disable block location loading via Hadoop config

For storage systems that support block location information (HDFS,
Ozone) we always retrieve it with the assumption that we can use it for
scheduling, to do local reads. But it's also typical that Impala is not
co-located with the storage system, not even in on-prem deployments.
E.g. when Impala runs in containers, and even if they are co-located,
we don't try to figure out which container runs on which machine.

In such cases we should not reach out to the storage system to collect
file information because it can be very expensive for large tables and
we won't benefit from it at all. Since currently there is no easy way
to tell if Impala is co-located with the storage system this patch
adds configuration options to disable block location retrieval during
table loading.

It can be disabled globally via Hadoop Configuration:

'impala.preload-block-locations-for-scheduling': 'false'

We can restrict it to filesystem schemes, e.g.:

'impala.preload-block-locations-for-scheduling.scheme.hdfs': 'false'

When multiple storage systems are configured with the same scheme, we
can still control block location loading based on authority, e.g.:

'impala.preload-block-locations-for-scheduling.authority.mycluster': 'false'

The latter only disables block location loading for URIs like
'hdfs://mycluster/warehouse/tablespace/...'

If block location loading is disabled by any of the switches, it cannot
be re-enabled by another, i.e. the most restrictive setting prevails.
E.g:
  disable scheme 'hdfs', enable authority 'mycluster'
     ==> hdfs://mycluster/ is still disabled

  disable globally, enable scheme 'hdfs', enable authority 'mycluster'
     ==> hdfs://mycluster/ is still disabled, as everything else is.

Testing:
 * added unit tests for FileSystemUtil
 * added unit tests for the file metadata loaders
 * custom cluster tests with custom Hadoop configuration

Change-Id: I1c7a6a91f657c99792db885991b7677d2c240867
Reviewed-on: http://gerrit.cloudera.org:8080/23175
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Zoltan Borok-Nagy
2025-07-15 14:44:16 +02:00
committed by Impala Public Jenkins
parent 1ae97e7173
commit 438461db9e
8 changed files with 294 additions and 9 deletions

View File

@@ -135,6 +135,14 @@ rm -f authz-provider.ini
# Set IMPALA_JAVA_TOOL_OPTIONS to allow passing it to Tez containers.
. $IMPALA_HOME/bin/set-impala-java-tool-options.sh
CORE_SITE_VARIANT=disable_block_locations $IMPALA_HOME/bin/generate_xml_config.py \
$IMPALA_HOME/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.py \
core-site_disabled_block_locations.xml
mkdir -p core-site-disabled-block-locations
rm -f core-site-disabled-block-locations/core-site.xml
ln -s "${CONFIG_DIR}/core-site_disabled_block_locations.xml" \
core-site-disabled-block-locations/core-site.xml
$IMPALA_HOME/bin/generate_xml_config.py hive-site.xml.py hive-site.xml
export HIVE_VARIANT=changed_external_dir
$IMPALA_HOME/bin/generate_xml_config.py hive-site.xml.py hive-site_ext.xml

View File

@@ -57,6 +57,7 @@ import java.io.InputStreamReader;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -73,7 +74,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
*/
public class FileSystemUtil {
private static final Configuration CONF = new Configuration();
private static Configuration CONF;
private static final Logger LOG = LoggerFactory.getLogger(FileSystemUtil.class);
public static final String SCHEME_ABFS = "abfs";
@@ -161,6 +162,52 @@ public class FileSystemUtil {
.add(SCHEME_OBS)
.build();
private static Set<String> BLOCK_LOCATIONS_FOR_FS_SCHEMES;
private static Set<String> NO_BLOCK_LOCATIONS_FOR_AUTHORITIES;
static {
setConfiguration(new Configuration());
}
@VisibleForTesting
public static void setConfiguration(Configuration conf) {
CONF = conf;
initConfigurationDependendentStaticFields();
}
private static void initConfigurationDependendentStaticFields() {
resetConfigurationDependendentStaticFields();
final String PRELOAD_BLOCK_LOCATIONS_CONFIGURATION_PREFIX =
"impala.preload-block-locations-for-scheduling";
final String AUTHORITY = ".authority.";
final String SCHEME = ".scheme.";
Map<String, String> preloadBlockLocations = CONF.getPropsWithPrefix(
PRELOAD_BLOCK_LOCATIONS_CONFIGURATION_PREFIX);
for (Map.Entry<String, String> noBlocksEntry : preloadBlockLocations.entrySet()) {
// By default we load block locations, so no need to do anything when
// config is 'true'.
if (Boolean.parseBoolean(noBlocksEntry.getValue())) continue;
// Key is already the string after 'impala.preload-block-locations-for-scheduling'.
String key = noBlocksEntry.getKey();
if (key.isEmpty()) {
BLOCK_LOCATIONS_FOR_FS_SCHEMES.clear();
} else if (noBlocksEntry.getKey().startsWith(SCHEME)) {
BLOCK_LOCATIONS_FOR_FS_SCHEMES.remove(key.substring(SCHEME.length()));
} else if (noBlocksEntry.getKey().startsWith(AUTHORITY)) {
NO_BLOCK_LOCATIONS_FOR_AUTHORITIES.add(key.substring(AUTHORITY.length()));
}
}
}
private static void resetConfigurationDependendentStaticFields() {
BLOCK_LOCATIONS_FOR_FS_SCHEMES = new HashSet<>(SCHEME_SUPPORT_STORAGE_IDS);
NO_BLOCK_LOCATIONS_FOR_AUTHORITIES = new HashSet<>();
}
/**
* Performs a non-recursive delete of all visible (non-hidden) files in a given
* directory. Returns the number of files deleted as part of this operation.
@@ -527,10 +574,12 @@ public class FileSystemUtil {
}
/**
* Returns true if the filesystem supports storage UUIDs in BlockLocation calls.
* Returns true if the filesystem supports storage UUIDs in BlockLocation calls, and
* preloading block locations is not disabled in the Hadoop configuration.
*/
public static boolean supportsStorageIds(FileSystem fs) {
return SCHEME_SUPPORT_STORAGE_IDS.contains(fs.getScheme());
return BLOCK_LOCATIONS_FOR_FS_SCHEMES.contains(fs.getScheme()) &&
!NO_BLOCK_LOCATIONS_FOR_AUTHORITIES.contains(fs.getUri().getAuthority());
}
/**

View File

@@ -30,8 +30,8 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.impala.catalog.iceberg.GroupedContentFiles;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.testutil.CatalogServiceTestCatalog;
@@ -83,6 +83,31 @@ public class FileMetadataLoaderTest {
assertEquals(1, refreshFml.getStats().loadedFiles);
}
@Test
public void testRecursiveLoadingWithoutBlockLocations()
throws IOException, CatalogException {
try {
Configuration customConf = new Configuration();
customConf.set(
"impala.preload-block-locations-for-scheduling.authority.localhost:20500",
"false");
FileSystemUtil.setConfiguration(customConf);
//TODO(IMPALA-9042): Remove "throws CatalogException"
ListMap<TNetworkAddress> hostIndex = new ListMap<>();
String tablePath = "hdfs://localhost:20500/test-warehouse/alltypes/";
FileMetadataLoader fml = new FileMetadataLoader(tablePath, /* recursive=*/true,
/* oldFds = */Collections.emptyList(), hostIndex, null, null);
fml.load();
List<FileDescriptor> fileDescs = fml.getLoadedFds();
for (FileDescriptor fd : fileDescs) {
assertEquals(0, fd.getNumFileBlocks());
}
} finally {
// Reset default configuration.
FileSystemUtil.setConfiguration(new Configuration());
}
}
@Test
public void testHudiParquetLoading() throws IOException, CatalogException {
//TODO(IMPALA-9042): Remove "throws CatalogException"
@@ -149,6 +174,30 @@ public class FileMetadataLoaderTest {
relPaths.get(0));
}
@Test
public void testIcebergLoadingWithoutBlockLocations()
throws IOException, CatalogException {
try {
Configuration customConf = new Configuration();
customConf.set("impala.preload-block-locations-for-scheduling.scheme.hdfs",
"false");
FileSystemUtil.setConfiguration(customConf);
CatalogServiceCatalog catalog = CatalogServiceTestCatalog.create();
IcebergFileMetadataLoader fml = getLoaderForIcebergTable(catalog,
"functional_parquet", "iceberg_partitioned",
/* oldFds = */ Collections.emptyList(),
/* requiresDataFilesInTableLocation = */ true);
fml.load();
List<IcebergFileDescriptor> fileDescs = fml.getLoadedIcebergFds();
for (IcebergFileDescriptor fd : fileDescs) {
assertEquals(0, fd.getNumFileBlocks());
}
} finally {
// Reset default configuration.
FileSystemUtil.setConfiguration(new Configuration());
}
}
@Test
public void testIcebergRefresh() throws IOException, CatalogException {
CatalogServiceCatalog catalog = CatalogServiceTestCatalog.create();

View File

@@ -17,6 +17,7 @@
package org.apache.impala.common;
import org.apache.hadoop.conf.Configuration;
import org.apache.impala.common.Pair;
import static org.apache.impala.common.FileSystemUtil.HIVE_TEMP_FILE_PREFIX;
import static org.apache.impala.common.FileSystemUtil.SPARK_TEMP_FILE_PREFIX;
@@ -130,6 +131,81 @@ public class FileSystemUtilTest {
// testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ALLUXIO), true);
}
@Test
public void testSupportStorageIdsDisabledCompletely() throws IOException {
try {
Configuration customConf = new Configuration();
customConf.set("impala.preload-block-locations-for-scheduling", "false");
FileSystemUtil.setConfiguration(customConf);
testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ABFS), false);
testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ABFSS), false);
testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ADL), false);
testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_FILE), false);
testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_S3A), false);
testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_HDFS), false);
// The following tests are disabled because the underlying systems is not included
// in impala mini cluster.
// TODO: enable following tests if we add them into impala mini cluster.
// testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_O3FS), false);
// testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_OFS), false);
// testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ALLUXIO), false);
} finally {
// Reset default configuration.
FileSystemUtil.setConfiguration(new Configuration());
}
}
@Test
public void testSupportStorageIdsDisabledViaScheme() throws IOException {
try {
Configuration customConf = new Configuration();
customConf.set("impala.preload-block-locations-for-scheduling.scheme.hdfs",
"false");
FileSystemUtil.setConfiguration(customConf);
testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ABFS), false);
testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ABFSS), false);
testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ADL), false);
testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_FILE), false);
testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_S3A), false);
testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_HDFS), false);
// The following tests are disabled because the underlying systems is not included
// in impala mini cluster.
// TODO: enable following tests if we add them into impala mini cluster.
// testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_O3FS), false);
// testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_OFS), false);
// testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ALLUXIO), false);
} finally {
// Reset default configuration.
FileSystemUtil.setConfiguration(new Configuration());
}
}
@Test
public void testSupportStorageIdsDisabledViaAuthority() throws IOException {
try {
Configuration customConf = new Configuration();
customConf.set(
"impala.preload-block-locations-for-scheduling.authority.localhost:20500",
"false");
FileSystemUtil.setConfiguration(customConf);
testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ABFS), false);
testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ABFSS), false);
testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ADL), false);
testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_FILE), false);
testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_S3A), false);
testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_HDFS), false);
// The following tests are disabled because the underlying systems is not included
// in impala mini cluster.
// TODO: enable following tests if we add them into impala mini cluster.
// testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_O3FS), false);
// testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_OFS), false);
// testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ALLUXIO), false);
} finally {
// Reset default configuration.
FileSystemUtil.setConfiguration(new Configuration());
}
}
@Test
public void testWriteableByImpala() throws IOException {
testIsWritableByImpala(mockLocation(FileSystemUtil.SCHEME_ALLUXIO), false);

View File

@@ -24,6 +24,8 @@ import sys
kerberize = os.environ.get('IMPALA_KERBERIZE') == 'true'
target_filesystem = os.environ.get('TARGET_FILESYSTEM')
VARIANT = os.environ.get('CORE_SITE_VARIANT')
jceks_keystore = ("localjceks://file" +
os.path.join(os.environ['IMPALA_HOME'], 'testdata/jceks/test.jceks'))
@@ -134,6 +136,11 @@ CONFIG = {
'iceberg.io.manifest.cache.max-content-length': '8388608',
}
if VARIANT == 'disable_block_locations':
CONFIG.update({
'impala.preload-block-locations-for-scheduling': 'false'
})
if target_filesystem == 's3':
CONFIG.update({'fs.s3a.connection.maximum': 1500})
s3guard_enabled = os.environ.get("S3GUARD_ENABLED") == 'true'

View File

@@ -0,0 +1,50 @@
====
---- QUERY
select * from alltypes where id < 5;
---- RESULTS
0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1
1,false,1,1,1,10,1.100000023841858,10.1,'01/01/09','1',2009-01-01 00:01:00,2009,1
2,true,2,2,2,20,2.200000047683716,20.2,'01/01/09','2',2009-01-01 00:02:00.100000000,2009,1
3,false,3,3,3,30,3.299999952316284,30.3,'01/01/09','3',2009-01-01 00:03:00.300000000,2009,1
4,true,4,4,4,40,4.400000095367432,40.4,'01/01/09','4',2009-01-01 00:04:00.600000000,2009,1
---- TYPES
INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP, INT, INT
====
---- QUERY
select count(*) from alltypes;
---- RESULTS
7300
---- TYPES
BIGINT
====
---- QUERY
select count(*) from alltypes where id % 3 = 0;
---- RESULTS
2434
---- TYPES
BIGINT
====
---- QUERY
# 'lineitem_sixblocks' contains a single data file with six HDFS blocks. Without
# block information we schedule the whole data file to a single SCAN operator.
select count(*) from functional_parquet.lineitem_sixblocks where l_orderkey % 2 = 0;
---- RESULTS
19929
---- TYPES
BIGINT
---- RUNTIME_PROFILE
# The following should be in the ExecSummary
row_regex: 00:SCAN [A-Z0-9]+ +1 +1 +.*
====
---- QUERY
# 'iceberg_lineitem_sixblocks' contains a single data file with six HDFS blocks. Without
# block information we schedule the whole data file to a single SCAN operator.
select count(*) from functional_parquet.iceberg_lineitem_sixblocks where l_orderkey % 2 = 0;
---- RESULTS
9805
---- TYPES
BIGINT
---- RUNTIME_PROFILE
# The following should be in the ExecSummary
row_regex: 00:SCAN [A-Z0-9]+ +1 +1 +.*
====

View File

@@ -55,6 +55,7 @@ KUDU_ARGS = 'kudu_args'
# Additional args passed to the start-impala-cluster script.
START_ARGS = 'start_args'
JVM_ARGS = 'jvm_args'
CUSTOM_CORE_SITE_DIR = 'custom_core_site_dir'
HIVE_CONF_DIR = 'hive_conf_dir'
CLUSTER_SIZE = "cluster_size"
# Default query options passed to the impala daemon command line. Handled separately from
@@ -157,7 +158,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
impalad_timeout_s=None, expect_cores=None, reset_ranger=False,
tmp_dir_placeholders=[],
expect_startup_fail=False, disable_log_buffering=False, log_symlinks=False,
workload_mgmt=False, force_restart=False):
workload_mgmt=False, force_restart=False, custom_core_site_dir=None):
"""Records arguments to be passed to a cluster by adding them to the decorated
method's func_dict"""
args = dict()
@@ -171,6 +172,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
args[START_ARGS] = start_args
if jvm_args is not None:
args[JVM_ARGS] = jvm_args
if custom_core_site_dir is not None:
args[CUSTOM_CORE_SITE_DIR] = custom_core_site_dir
if hive_conf_dir is not None:
args[HIVE_CONF_DIR] = hive_conf_dir
if kudu_args is not None:
@@ -299,13 +302,20 @@ class CustomClusterTestSuite(ImpalaTestSuite):
if START_ARGS in args:
cluster_args.extend(args[START_ARGS].split())
custom_class_path_val = ""
if CUSTOM_CORE_SITE_DIR in args:
custom_class_path_val = args[CUSTOM_CORE_SITE_DIR]
if HIVE_CONF_DIR in args:
cls._start_hive_service(args[HIVE_CONF_DIR])
# Should let Impala adopt the same hive-site.xml. The only way is to add it in the
# beginning of the CLASSPATH. Because there's already a hive-site.xml in the
# default CLASSPATH (see bin/set-classpath.sh).
custom_class_path_val += ":" + args[HIVE_CONF_DIR]
if custom_class_path_val:
# Should let Impala adopt the custom Hadoop configuration. The only way is to add it
# in the beginning of the CLASSPATH. Because there's already Hadoop site xml files
# in the default CLASSPATH (see bin/set-classpath.sh).
cluster_args.append(
'--env_vars=CUSTOM_CLASSPATH=%s ' % args[HIVE_CONF_DIR])
'--env_vars=CUSTOM_CLASSPATH=%s ' % custom_class_path_val)
if KUDU_ARGS in args:
cls._restart_kudu_service(args[KUDU_ARGS])

View File

@@ -0,0 +1,36 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
import pytest
from os import getenv
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
CORE_SITE_CONFIG_DIR = getenv('IMPALA_HOME') + '/fe/src/test/resources/' +\
'core-site-disabled-block-locations'
class TestDisabledBlockLocations(CustomClusterTestSuite):
@classmethod
def setup_class(cls):
super(TestDisabledBlockLocations, cls).setup_class()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(custom_core_site_dir=CORE_SITE_CONFIG_DIR)
def test_no_block_locations(self, vector):
self.run_test_case('QueryTest/no-block-locations', vector)