From 438461db9e82be1d91f4320cce5ab08bc895a305 Mon Sep 17 00:00:00 2001 From: Zoltan Borok-Nagy Date: Tue, 15 Jul 2025 14:44:16 +0200 Subject: [PATCH] IMPALA-14138: Manually disable block location loading via Hadoop config For storage systems that support block location information (HDFS, Ozone) we always retrieve it with the assumption that we can use it for scheduling, to do local reads. But it's also typical that Impala is not co-located with the storage system, not even in on-prem deployments. E.g. when Impala runs in containers, and even if they are co-located, we don't try to figure out which container runs on which machine. In such cases we should not reach out to the storage system to collect file information because it can be very expensive for large tables and we won't benefit from it at all. Since currently there is no easy way to tell if Impala is co-located with the storage system this patch adds configuration options to disable block location retrieval during table loading. It can be disabled globally via Hadoop Configuration: 'impala.preload-block-locations-for-scheduling': 'false' We can restrict it to filesystem schemes, e.g.: 'impala.preload-block-locations-for-scheduling.scheme.hdfs': 'false' When multiple storage systems are configured with the same scheme, we can still control block location loading based on authority, e.g.: 'impala.preload-block-locations-for-scheduling.authority.mycluster': 'false' The latter only disables block location loading for URIs like 'hdfs://mycluster/warehouse/tablespace/...' If block location loading is disabled by any of the switches, it cannot be re-enabled by another, i.e. the most restrictive setting prevails. E.g: disable scheme 'hdfs', enable authority 'mycluster' ==> hdfs://mycluster/ is still disabled disable globally, enable scheme 'hdfs', enable authority 'mycluster' ==> hdfs://mycluster/ is still disabled, as everything else is. Testing: * added unit tests for FileSystemUtil * added unit tests for the file metadata loaders * custom cluster tests with custom Hadoop configuration Change-Id: I1c7a6a91f657c99792db885991b7677d2c240867 Reviewed-on: http://gerrit.cloudera.org:8080/23175 Reviewed-by: Impala Public Jenkins Tested-by: Impala Public Jenkins --- bin/create-test-configuration.sh | 8 ++ .../apache/impala/common/FileSystemUtil.java | 55 +++++++++++++- .../catalog/FileMetadataLoaderTest.java | 51 ++++++++++++- .../impala/common/FileSystemUtilTest.java | 76 +++++++++++++++++++ .../common/etc/hadoop/conf/core-site.xml.py | 7 ++ .../queries/QueryTest/no-block-locations.test | 50 ++++++++++++ tests/common/custom_cluster_test_suite.py | 20 +++-- .../test_disabled_block_locations.py | 36 +++++++++ 8 files changed, 294 insertions(+), 9 deletions(-) create mode 100644 testdata/workloads/functional-query/queries/QueryTest/no-block-locations.test create mode 100644 tests/custom_cluster/test_disabled_block_locations.py diff --git a/bin/create-test-configuration.sh b/bin/create-test-configuration.sh index 323da9669..f8c376e92 100755 --- a/bin/create-test-configuration.sh +++ b/bin/create-test-configuration.sh @@ -135,6 +135,14 @@ rm -f authz-provider.ini # Set IMPALA_JAVA_TOOL_OPTIONS to allow passing it to Tez containers. . $IMPALA_HOME/bin/set-impala-java-tool-options.sh +CORE_SITE_VARIANT=disable_block_locations $IMPALA_HOME/bin/generate_xml_config.py \ + $IMPALA_HOME/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.py \ + core-site_disabled_block_locations.xml +mkdir -p core-site-disabled-block-locations +rm -f core-site-disabled-block-locations/core-site.xml +ln -s "${CONFIG_DIR}/core-site_disabled_block_locations.xml" \ + core-site-disabled-block-locations/core-site.xml + $IMPALA_HOME/bin/generate_xml_config.py hive-site.xml.py hive-site.xml export HIVE_VARIANT=changed_external_dir $IMPALA_HOME/bin/generate_xml_config.py hive-site.xml.py hive-site_ext.xml diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java index 439b26ae8..a0074ddb9 100644 --- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java +++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java @@ -57,6 +57,7 @@ import java.io.InputStreamReader; import java.net.URI; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -73,7 +74,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER; */ public class FileSystemUtil { - private static final Configuration CONF = new Configuration(); + private static Configuration CONF; private static final Logger LOG = LoggerFactory.getLogger(FileSystemUtil.class); public static final String SCHEME_ABFS = "abfs"; @@ -161,6 +162,52 @@ public class FileSystemUtil { .add(SCHEME_OBS) .build(); + private static Set BLOCK_LOCATIONS_FOR_FS_SCHEMES; + private static Set NO_BLOCK_LOCATIONS_FOR_AUTHORITIES; + + static { + setConfiguration(new Configuration()); + } + + @VisibleForTesting + public static void setConfiguration(Configuration conf) { + CONF = conf; + initConfigurationDependendentStaticFields(); + } + + private static void initConfigurationDependendentStaticFields() { + resetConfigurationDependendentStaticFields(); + + final String PRELOAD_BLOCK_LOCATIONS_CONFIGURATION_PREFIX = + "impala.preload-block-locations-for-scheduling"; + final String AUTHORITY = ".authority."; + final String SCHEME = ".scheme."; + + Map preloadBlockLocations = CONF.getPropsWithPrefix( + PRELOAD_BLOCK_LOCATIONS_CONFIGURATION_PREFIX); + + for (Map.Entry noBlocksEntry : preloadBlockLocations.entrySet()) { + // By default we load block locations, so no need to do anything when + // config is 'true'. + if (Boolean.parseBoolean(noBlocksEntry.getValue())) continue; + + // Key is already the string after 'impala.preload-block-locations-for-scheduling'. + String key = noBlocksEntry.getKey(); + if (key.isEmpty()) { + BLOCK_LOCATIONS_FOR_FS_SCHEMES.clear(); + } else if (noBlocksEntry.getKey().startsWith(SCHEME)) { + BLOCK_LOCATIONS_FOR_FS_SCHEMES.remove(key.substring(SCHEME.length())); + } else if (noBlocksEntry.getKey().startsWith(AUTHORITY)) { + NO_BLOCK_LOCATIONS_FOR_AUTHORITIES.add(key.substring(AUTHORITY.length())); + } + } + } + + private static void resetConfigurationDependendentStaticFields() { + BLOCK_LOCATIONS_FOR_FS_SCHEMES = new HashSet<>(SCHEME_SUPPORT_STORAGE_IDS); + NO_BLOCK_LOCATIONS_FOR_AUTHORITIES = new HashSet<>(); + } + /** * Performs a non-recursive delete of all visible (non-hidden) files in a given * directory. Returns the number of files deleted as part of this operation. @@ -527,10 +574,12 @@ public class FileSystemUtil { } /** - * Returns true if the filesystem supports storage UUIDs in BlockLocation calls. + * Returns true if the filesystem supports storage UUIDs in BlockLocation calls, and + * preloading block locations is not disabled in the Hadoop configuration. */ public static boolean supportsStorageIds(FileSystem fs) { - return SCHEME_SUPPORT_STORAGE_IDS.contains(fs.getScheme()); + return BLOCK_LOCATIONS_FOR_FS_SCHEMES.contains(fs.getScheme()) && + !NO_BLOCK_LOCATIONS_FOR_AUTHORITIES.contains(fs.getUri().getAuthority()); } /** diff --git a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java index bff749c34..75188e1e5 100644 --- a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java +++ b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java @@ -30,8 +30,8 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; -import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.impala.catalog.iceberg.GroupedContentFiles; +import org.apache.impala.common.FileSystemUtil; import org.apache.impala.compat.MetastoreShim; import org.apache.impala.service.BackendConfig; import org.apache.impala.testutil.CatalogServiceTestCatalog; @@ -83,6 +83,31 @@ public class FileMetadataLoaderTest { assertEquals(1, refreshFml.getStats().loadedFiles); } + @Test + public void testRecursiveLoadingWithoutBlockLocations() + throws IOException, CatalogException { + try { + Configuration customConf = new Configuration(); + customConf.set( + "impala.preload-block-locations-for-scheduling.authority.localhost:20500", + "false"); + FileSystemUtil.setConfiguration(customConf); + //TODO(IMPALA-9042): Remove "throws CatalogException" + ListMap hostIndex = new ListMap<>(); + String tablePath = "hdfs://localhost:20500/test-warehouse/alltypes/"; + FileMetadataLoader fml = new FileMetadataLoader(tablePath, /* recursive=*/true, + /* oldFds = */Collections.emptyList(), hostIndex, null, null); + fml.load(); + List fileDescs = fml.getLoadedFds(); + for (FileDescriptor fd : fileDescs) { + assertEquals(0, fd.getNumFileBlocks()); + } + } finally { + // Reset default configuration. + FileSystemUtil.setConfiguration(new Configuration()); + } + } + @Test public void testHudiParquetLoading() throws IOException, CatalogException { //TODO(IMPALA-9042): Remove "throws CatalogException" @@ -149,6 +174,30 @@ public class FileMetadataLoaderTest { relPaths.get(0)); } + @Test + public void testIcebergLoadingWithoutBlockLocations() + throws IOException, CatalogException { + try { + Configuration customConf = new Configuration(); + customConf.set("impala.preload-block-locations-for-scheduling.scheme.hdfs", + "false"); + FileSystemUtil.setConfiguration(customConf); + CatalogServiceCatalog catalog = CatalogServiceTestCatalog.create(); + IcebergFileMetadataLoader fml = getLoaderForIcebergTable(catalog, + "functional_parquet", "iceberg_partitioned", + /* oldFds = */ Collections.emptyList(), + /* requiresDataFilesInTableLocation = */ true); + fml.load(); + List fileDescs = fml.getLoadedIcebergFds(); + for (IcebergFileDescriptor fd : fileDescs) { + assertEquals(0, fd.getNumFileBlocks()); + } + } finally { + // Reset default configuration. + FileSystemUtil.setConfiguration(new Configuration()); + } + } + @Test public void testIcebergRefresh() throws IOException, CatalogException { CatalogServiceCatalog catalog = CatalogServiceTestCatalog.create(); diff --git a/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java b/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java index 57a1099d5..d80ff682a 100644 --- a/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java +++ b/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java @@ -17,6 +17,7 @@ package org.apache.impala.common; +import org.apache.hadoop.conf.Configuration; import org.apache.impala.common.Pair; import static org.apache.impala.common.FileSystemUtil.HIVE_TEMP_FILE_PREFIX; import static org.apache.impala.common.FileSystemUtil.SPARK_TEMP_FILE_PREFIX; @@ -130,6 +131,81 @@ public class FileSystemUtilTest { // testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ALLUXIO), true); } + @Test + public void testSupportStorageIdsDisabledCompletely() throws IOException { + try { + Configuration customConf = new Configuration(); + customConf.set("impala.preload-block-locations-for-scheduling", "false"); + FileSystemUtil.setConfiguration(customConf); + testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ABFS), false); + testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ABFSS), false); + testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ADL), false); + testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_FILE), false); + testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_S3A), false); + testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_HDFS), false); + // The following tests are disabled because the underlying systems is not included + // in impala mini cluster. + // TODO: enable following tests if we add them into impala mini cluster. + // testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_O3FS), false); + // testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_OFS), false); + // testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ALLUXIO), false); + } finally { + // Reset default configuration. + FileSystemUtil.setConfiguration(new Configuration()); + } + } + + @Test + public void testSupportStorageIdsDisabledViaScheme() throws IOException { + try { + Configuration customConf = new Configuration(); + customConf.set("impala.preload-block-locations-for-scheduling.scheme.hdfs", + "false"); + FileSystemUtil.setConfiguration(customConf); + testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ABFS), false); + testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ABFSS), false); + testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ADL), false); + testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_FILE), false); + testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_S3A), false); + testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_HDFS), false); + // The following tests are disabled because the underlying systems is not included + // in impala mini cluster. + // TODO: enable following tests if we add them into impala mini cluster. + // testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_O3FS), false); + // testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_OFS), false); + // testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ALLUXIO), false); + } finally { + // Reset default configuration. + FileSystemUtil.setConfiguration(new Configuration()); + } + } + + @Test + public void testSupportStorageIdsDisabledViaAuthority() throws IOException { + try { + Configuration customConf = new Configuration(); + customConf.set( + "impala.preload-block-locations-for-scheduling.authority.localhost:20500", + "false"); + FileSystemUtil.setConfiguration(customConf); + testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ABFS), false); + testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ABFSS), false); + testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ADL), false); + testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_FILE), false); + testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_S3A), false); + testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_HDFS), false); + // The following tests are disabled because the underlying systems is not included + // in impala mini cluster. + // TODO: enable following tests if we add them into impala mini cluster. + // testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_O3FS), false); + // testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_OFS), false); + // testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ALLUXIO), false); + } finally { + // Reset default configuration. + FileSystemUtil.setConfiguration(new Configuration()); + } + } + @Test public void testWriteableByImpala() throws IOException { testIsWritableByImpala(mockLocation(FileSystemUtil.SCHEME_ALLUXIO), false); diff --git a/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.py b/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.py index 03c4463bf..53f31ed46 100644 --- a/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.py +++ b/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.py @@ -24,6 +24,8 @@ import sys kerberize = os.environ.get('IMPALA_KERBERIZE') == 'true' target_filesystem = os.environ.get('TARGET_FILESYSTEM') +VARIANT = os.environ.get('CORE_SITE_VARIANT') + jceks_keystore = ("localjceks://file" + os.path.join(os.environ['IMPALA_HOME'], 'testdata/jceks/test.jceks')) @@ -134,6 +136,11 @@ CONFIG = { 'iceberg.io.manifest.cache.max-content-length': '8388608', } +if VARIANT == 'disable_block_locations': + CONFIG.update({ + 'impala.preload-block-locations-for-scheduling': 'false' + }) + if target_filesystem == 's3': CONFIG.update({'fs.s3a.connection.maximum': 1500}) s3guard_enabled = os.environ.get("S3GUARD_ENABLED") == 'true' diff --git a/testdata/workloads/functional-query/queries/QueryTest/no-block-locations.test b/testdata/workloads/functional-query/queries/QueryTest/no-block-locations.test new file mode 100644 index 000000000..e63848251 --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/no-block-locations.test @@ -0,0 +1,50 @@ +==== +---- QUERY +select * from alltypes where id < 5; +---- RESULTS +0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1 +1,false,1,1,1,10,1.100000023841858,10.1,'01/01/09','1',2009-01-01 00:01:00,2009,1 +2,true,2,2,2,20,2.200000047683716,20.2,'01/01/09','2',2009-01-01 00:02:00.100000000,2009,1 +3,false,3,3,3,30,3.299999952316284,30.3,'01/01/09','3',2009-01-01 00:03:00.300000000,2009,1 +4,true,4,4,4,40,4.400000095367432,40.4,'01/01/09','4',2009-01-01 00:04:00.600000000,2009,1 +---- TYPES +INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP, INT, INT +==== +---- QUERY +select count(*) from alltypes; +---- RESULTS +7300 +---- TYPES +BIGINT +==== +---- QUERY +select count(*) from alltypes where id % 3 = 0; +---- RESULTS +2434 +---- TYPES +BIGINT +==== +---- QUERY +# 'lineitem_sixblocks' contains a single data file with six HDFS blocks. Without +# block information we schedule the whole data file to a single SCAN operator. +select count(*) from functional_parquet.lineitem_sixblocks where l_orderkey % 2 = 0; +---- RESULTS +19929 +---- TYPES +BIGINT +---- RUNTIME_PROFILE +# The following should be in the ExecSummary +row_regex: 00:SCAN [A-Z0-9]+ +1 +1 +.* +==== +---- QUERY +# 'iceberg_lineitem_sixblocks' contains a single data file with six HDFS blocks. Without +# block information we schedule the whole data file to a single SCAN operator. +select count(*) from functional_parquet.iceberg_lineitem_sixblocks where l_orderkey % 2 = 0; +---- RESULTS +9805 +---- TYPES +BIGINT +---- RUNTIME_PROFILE +# The following should be in the ExecSummary +row_regex: 00:SCAN [A-Z0-9]+ +1 +1 +.* +==== diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py index a54e9616d..502a53eef 100644 --- a/tests/common/custom_cluster_test_suite.py +++ b/tests/common/custom_cluster_test_suite.py @@ -55,6 +55,7 @@ KUDU_ARGS = 'kudu_args' # Additional args passed to the start-impala-cluster script. START_ARGS = 'start_args' JVM_ARGS = 'jvm_args' +CUSTOM_CORE_SITE_DIR = 'custom_core_site_dir' HIVE_CONF_DIR = 'hive_conf_dir' CLUSTER_SIZE = "cluster_size" # Default query options passed to the impala daemon command line. Handled separately from @@ -157,7 +158,7 @@ class CustomClusterTestSuite(ImpalaTestSuite): impalad_timeout_s=None, expect_cores=None, reset_ranger=False, tmp_dir_placeholders=[], expect_startup_fail=False, disable_log_buffering=False, log_symlinks=False, - workload_mgmt=False, force_restart=False): + workload_mgmt=False, force_restart=False, custom_core_site_dir=None): """Records arguments to be passed to a cluster by adding them to the decorated method's func_dict""" args = dict() @@ -171,6 +172,8 @@ class CustomClusterTestSuite(ImpalaTestSuite): args[START_ARGS] = start_args if jvm_args is not None: args[JVM_ARGS] = jvm_args + if custom_core_site_dir is not None: + args[CUSTOM_CORE_SITE_DIR] = custom_core_site_dir if hive_conf_dir is not None: args[HIVE_CONF_DIR] = hive_conf_dir if kudu_args is not None: @@ -299,13 +302,20 @@ class CustomClusterTestSuite(ImpalaTestSuite): if START_ARGS in args: cluster_args.extend(args[START_ARGS].split()) + custom_class_path_val = "" + if CUSTOM_CORE_SITE_DIR in args: + custom_class_path_val = args[CUSTOM_CORE_SITE_DIR] + if HIVE_CONF_DIR in args: cls._start_hive_service(args[HIVE_CONF_DIR]) - # Should let Impala adopt the same hive-site.xml. The only way is to add it in the - # beginning of the CLASSPATH. Because there's already a hive-site.xml in the - # default CLASSPATH (see bin/set-classpath.sh). + custom_class_path_val += ":" + args[HIVE_CONF_DIR] + + if custom_class_path_val: + # Should let Impala adopt the custom Hadoop configuration. The only way is to add it + # in the beginning of the CLASSPATH. Because there's already Hadoop site xml files + # in the default CLASSPATH (see bin/set-classpath.sh). cluster_args.append( - '--env_vars=CUSTOM_CLASSPATH=%s ' % args[HIVE_CONF_DIR]) + '--env_vars=CUSTOM_CLASSPATH=%s ' % custom_class_path_val) if KUDU_ARGS in args: cls._restart_kudu_service(args[KUDU_ARGS]) diff --git a/tests/custom_cluster/test_disabled_block_locations.py b/tests/custom_cluster/test_disabled_block_locations.py new file mode 100644 index 000000000..504faac06 --- /dev/null +++ b/tests/custom_cluster/test_disabled_block_locations.py @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import absolute_import, division, print_function +import pytest +from os import getenv + +from tests.common.custom_cluster_test_suite import CustomClusterTestSuite + +CORE_SITE_CONFIG_DIR = getenv('IMPALA_HOME') + '/fe/src/test/resources/' +\ + 'core-site-disabled-block-locations' + + +class TestDisabledBlockLocations(CustomClusterTestSuite): + @classmethod + def setup_class(cls): + super(TestDisabledBlockLocations, cls).setup_class() + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args(custom_core_site_dir=CORE_SITE_CONFIG_DIR) + def test_no_block_locations(self, vector): + self.run_test_case('QueryTest/no-block-locations', vector)