diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc index 8852762eb..f50fb7efa 100644 --- a/be/src/runtime/io/disk-io-mgr-test.cc +++ b/be/src/runtime/io/disk-io-mgr-test.cc @@ -59,6 +59,7 @@ DECLARE_int32(num_oss_io_threads); DECLARE_int32(num_remote_hdfs_file_oper_io_threads); DECLARE_int32(num_s3_file_oper_io_threads); DECLARE_int32(num_sfs_io_threads); +DECLARE_int32(num_obs_io_threads); #ifndef NDEBUG DECLARE_int32(stress_disk_read_delay_ms); @@ -1723,7 +1724,8 @@ TEST_F(DiskIoMgrTest, VerifyNumThreadsParameter) { + FLAGS_num_remote_hdfs_file_oper_io_threads + FLAGS_num_s3_file_oper_io_threads + FLAGS_num_gcs_io_threads + FLAGS_num_cos_io_threads - + FLAGS_num_sfs_io_threads; + + FLAGS_num_sfs_io_threads + + FLAGS_num_obs_io_threads; // Verify num_io_threads_per_rotational_disk and num_io_threads_per_solid_state_disk. // Since we do not have control over which disk is used, we check for either type diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc index 41a2e64d4..43b744f69 100644 --- a/be/src/runtime/io/disk-io-mgr.cc +++ b/be/src/runtime/io/disk-io-mgr.cc @@ -144,6 +144,9 @@ DEFINE_int32(num_ozone_io_threads, 16, "Number of Ozone I/O threads"); // The maximum number of SFS I/O threads. DEFINE_int32(num_sfs_io_threads, 16, "Number of SFS I/O threads"); +// The maximum number of OBS I/O threads. +DEFINE_int32(num_obs_io_threads, 16, "Number of OBS I/O threads"); + // The number of cached file handles defines how much memory can be used per backend for // caching frequently used file handles. Measurements indicate that a single file handle // uses about 6kB of memory. 20k file handles will thus reserve ~120MB of memory. @@ -575,6 +578,9 @@ Status DiskIoMgr::Init() { } else if (i == RemoteSFSDiskId()) { num_threads_per_disk = FLAGS_num_sfs_io_threads; device_name = "SFS remote"; + } else if (i == RemoteOBSDiskId()) { + num_threads_per_disk = FLAGS_num_obs_io_threads; + device_name = "OBS remote"; } else if (DiskInfo::is_rotational(i)) { num_threads_per_disk = num_io_threads_per_rotational_disk_; // During tests, i may not point to an existing disk. @@ -917,6 +923,7 @@ int DiskIoMgr::AssignQueue( if (IsCosPath(file, check_default_fs)) return RemoteCosDiskId(); if (IsOzonePath(file, check_default_fs)) return RemoteOzoneDiskId(); if (IsSFSPath(file, check_default_fs)) return RemoteSFSDiskId(); + if (IsOBSPath(file, check_default_fs)) return RemoteOBSDiskId(); } // Assign to a local disk queue. DCHECK(!IsS3APath(file, check_default_fs)); // S3 is always remote. @@ -926,6 +933,7 @@ int DiskIoMgr::AssignQueue( DCHECK(!IsGcsPath(file, check_default_fs)); // GCS is always remote. DCHECK(!IsCosPath(file, check_default_fs)); // COS is always remote. DCHECK(!IsSFSPath(file, check_default_fs)); // SFS is always remote. + DCHECK(!IsOBSPath(file, check_default_fs)); // OBS is always remote. if (disk_id == -1) { // disk id is unknown, assign it an arbitrary one. disk_id = next_disk_id_.Add(1); diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h index b58690889..7cf7662e3 100644 --- a/be/src/runtime/io/disk-io-mgr.h +++ b/be/src/runtime/io/disk-io-mgr.h @@ -342,6 +342,9 @@ class DiskIoMgr : public CacheLineAligned { /// The disk ID (and therefore disk_queues_ index) used for SFS accesses. int RemoteSFSDiskId() const { return num_local_disks() + REMOTE_SFS_DISK_OFFSET; } + /// The disk ID (and therefore disk_queues_ index) used for OBS accesses. + int RemoteOBSDiskId() const { return num_local_disks() + REMOTE_OBS_DISK_OFFSET; } + /// Dumps the disk IoMgr queues (for readers and disks) std::string DebugString(); @@ -399,6 +402,7 @@ class DiskIoMgr : public CacheLineAligned { REMOTE_S3_DISK_FILE_OPER_OFFSET, REMOTE_SFS_DISK_OFFSET, REMOTE_OSS_DISK_OFFSET, + REMOTE_OBS_DISK_OFFSET, REMOTE_NUM_DISKS }; diff --git a/be/src/util/hdfs-util.cc b/be/src/util/hdfs-util.cc index 46ab67af1..90f71470f 100644 --- a/be/src/util/hdfs-util.cc +++ b/be/src/util/hdfs-util.cc @@ -41,6 +41,7 @@ const char* FILESYS_PREFIX_OFS = "ofs://"; const char* FILESYS_PREFIX_SFS = "sfs+"; const char* FILESYS_PREFIX_OSS = "oss://"; const char* FILESYS_PREFIX_JINDOFS = "jfs://"; +const char* FILESYS_PREFIX_OBS = "obs://"; string GetHdfsErrorMsg(const string& prefix, const string& file) { string error_msg = GetStrErrMsg(); @@ -138,6 +139,10 @@ bool IsSFSPath(const char* path, bool check_default_fs) { return IsSpecificPath(path, FILESYS_PREFIX_SFS, check_default_fs); } +bool IsOBSPath(const char* path, bool check_default_fs) { + return IsSpecificPath(path, FILESYS_PREFIX_OBS, check_default_fs); +} + // Returns the length of the filesystem name in 'path' which is the length of the // 'scheme://authority'. Returns 0 if the path is unqualified. static int GetFilesystemNameLength(const char* path) { diff --git a/be/src/util/hdfs-util.h b/be/src/util/hdfs-util.h index cb9629a50..b5d3cf607 100644 --- a/be/src/util/hdfs-util.h +++ b/be/src/util/hdfs-util.h @@ -84,6 +84,9 @@ bool IsOzonePath(const char* path, bool check_default_fs = true); /// Returns true iff the path refers to a location on an SFS filesystem. bool IsSFSPath(const char* path, bool check_default_fs = true); +/// Returns true iff the path refers to a location on an OBS filesystem. +bool IsOBSPath(const char* path, bool check_default_fs = true); + /// Returns true iff 'pathA' and 'pathB' are on the same filesystem and bucket. /// Most filesystems embed bucket in the authority, but Ozone's ofs protocol allows /// addressing volume/bucket via the path and does not allow renames across them. diff --git a/bin/impala-config.sh b/bin/impala-config.sh index c954b15d2..1ac293676 100755 --- a/bin/impala-config.sh +++ b/bin/impala-config.sh @@ -250,6 +250,7 @@ export IMPALA_RELOAD4j_VERSION=1.2.22 export IMPALA_SLF4J_VERSION=2.0.3 export IMPALA_SPRINGFRAMEWORK_VERSION=5.3.20 export IMPALA_XMLSEC_VERSION=2.2.3 +export IMPALA_OBS_VERSION=3.1.1-hw-42 # When Impala is building docker images on Redhat-based distributions, # it is useful to be able to customize the base image. Some users will @@ -699,6 +700,14 @@ elif [ "${TARGET_FILESYSTEM}" = "oss" ]; then fi DEFAULT_FS="oss://${OSS_BUCKET}" export DEFAULT_FS +elif [ "${TARGET_FILESYSTEM}" = "obs" ]; then + # Basic error checking + OBS_ACCESS_KEY="${OBS_ACCESS_KEY:?OBS_ACCESS_KEY cannot be an empty string for OBS}" + OBS_SECRET_KEY="${OBS_SECRET_KEY:?OBS_SECRET_KEY cannot be an empty string for OBS}" + OBS_ENDPOINT="${OBS_ENDPOINT:?OBS_ENDPOINT cannot be an empty string for OBS}" + OBS_BUCKET="${OBS_BUCKET:?OBS_BUCKET cannot be an empty string for OBS}" + DEFAULT_FS="obs://${OBS_BUCKET}" + export OBS_ACCESS_KEY OBS_SECRET_KEY OBS_ENDPOINT DEFAULT_FS ENABLE_OBS_FILESYSTEM=true elif [ "${TARGET_FILESYSTEM}" = "isilon" ]; then if [ "${ISILON_NAMENODE}" = "" ]; then echo "In order to access the Isilon filesystem, ISILON_NAMENODE" @@ -972,6 +981,7 @@ echo "IMPALA_RANGER_VERSION = $IMPALA_RANGER_VERSION" echo "IMPALA_ICEBERG_VERSION = $IMPALA_ICEBERG_VERSION" echo "IMPALA_COS_VERSION = $IMPALA_COS_VERSION" echo "IMPALA_OSS_VERSION = $IMPALA_OSS_VERSION" +echo "IMPALA_OBS_VERSION = $IMPALA_OBS_VERSION" # Kerberos things. If the cluster exists and is kerberized, source # the required environment. This is required for any hadoop tool to diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java index e50fb85ba..bd88f4a98 100644 --- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java +++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java @@ -84,6 +84,7 @@ public class FileSystemUtil { public static final String SCHEME_COS = "cosn"; public static final String SCHEME_OSS = "oss"; public static final String SCHEME_SFS = "sfs"; + public static final String SCHEME_OBS = "obs"; public static final String NO_ERASURE_CODE_LABEL = "NONE"; @@ -115,6 +116,7 @@ public class FileSystemUtil { .add(SCHEME_GCS) .add(SCHEME_COS) .add(SCHEME_OSS) + .add(SCHEME_OBS) .build(); /** @@ -131,6 +133,7 @@ public class FileSystemUtil { .add(SCHEME_GCS) .add(SCHEME_COS) .add(SCHEME_OSS) + .add(SCHEME_OBS) .build(); /** @@ -148,6 +151,7 @@ public class FileSystemUtil { .add(SCHEME_GCS) .add(SCHEME_COS) .add(SCHEME_OSS) + .add(SCHEME_OBS) .build(); /** @@ -565,6 +569,13 @@ public class FileSystemUtil { return hasScheme(fs, SCHEME_OSS); } + /** + * Returns true iff the filesystem is a OBSFileSystem. + */ + public static boolean isOBSFileSystem(FileSystem fs) { + return hasScheme(fs, SCHEME_OBS); + } + /** * Returns true iff the filesystem is AdlFileSystem. */ @@ -676,7 +687,8 @@ public class FileSystemUtil { GCS, COS, OSS, - SFS; + SFS, + OBS; private static final Map SCHEME_TO_FS_MAPPING = ImmutableMap.builder() @@ -692,6 +704,7 @@ public class FileSystemUtil { .put(SCHEME_GCS, GCS) .put(SCHEME_COS, COS) .put(SCHEME_OSS, OSS) + .put(SCHEME_OBS, OBS) .build(); /** diff --git a/java/executor-deps/pom.xml b/java/executor-deps/pom.xml index 428b78145..0847462e1 100644 --- a/java/executor-deps/pom.xml +++ b/java/executor-deps/pom.xml @@ -205,4 +205,41 @@ under the License. + + + + + obs-filesystem + + + env.ENABLE_OBS_FILESYSTEM + true + + + + + + org.apache.hadoop + hadoop-huaweicloud + ${obs.version} + + + + + + huaweicloud.repo + https://repo.huaweicloud.com/repository/maven/huaweicloudsdk + HuaweiCloud SDK Repository + + false + + + + + diff --git a/java/pom.xml b/java/pom.xml index c84571551..76364f06d 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -52,6 +52,7 @@ under the License. ${env.IMPALA_KNOX_VERSION} ${env.IMPALA_COS_VERSION} ${env.IMPALA_OSS_VERSION} + ${env.IMPALA_OBS_VERSION} ${env.IMPALA_THRIFT_POM_VERSION} ${project.version} ${project.version} diff --git a/testdata/bin/create-load-data.sh b/testdata/bin/create-load-data.sh index 8b5f67859..3c8f6fc51 100755 --- a/testdata/bin/create-load-data.sh +++ b/testdata/bin/create-load-data.sh @@ -119,8 +119,8 @@ fi TIMEOUT_PID=$! SCHEMA_MISMATCH_ERROR="A schema change has been detected in the metadata, " -SCHEMA_MISMATCH_ERROR+="but it cannot be loaded on Isilon, s3, gcs, cos, oss or local " -SCHEMA_MISMATCH_ERROR+="filesystem, and the filesystem is ${TARGET_FILESYSTEM}". +SCHEMA_MISMATCH_ERROR+="but it cannot be loaded on Isilon, s3, gcs, cos, oss, obs or " +SCHEMA_MISMATCH_ERROR+="local filesystem, and the filesystem is ${TARGET_FILESYSTEM}". if [[ $SKIP_METADATA_LOAD -eq 0 && "$SNAPSHOT_FILE" = "" ]]; then run-step "Generating HBase data" create-hbase.log \ @@ -135,7 +135,8 @@ elif [ $SKIP_SNAPSHOT_LOAD -eq 0 ]; then if ! ${IMPALA_HOME}/testdata/bin/check-schema-diff.sh; then if [[ "${TARGET_FILESYSTEM}" == "isilon" || "${TARGET_FILESYSTEM}" == "s3" || \ "${TARGET_FILESYSTEM}" == "local" || "${TARGET_FILESYSTEM}" == "gs" || \ - "${TARGET_FILESYSTEM}" == "cosn" || "${TARGET_FILESYSTEM}" == "oss" ]] ; then + "${TARGET_FILESYSTEM}" == "cosn" || "${TARGET_FILESYSTEM}" == "oss" || \ + "${TARGET_FILESYSTEM}" == "obs" ]] ; then echo "ERROR in $0 at line $LINENO: A schema change has been detected in the" echo "metadata, but it cannot be loaded on isilon, s3, gcs, cos, oss or local" echo "and the target file system is ${TARGET_FILESYSTEM}. Exiting." diff --git a/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.py b/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.py index bcb17edfa..0ebc97372 100644 --- a/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.py +++ b/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.py @@ -138,6 +138,15 @@ if target_filesystem == 's3': 'fs.s3a.s3guard.ddb.region': '${S3GUARD_DYNAMODB_REGION}', }) +if target_filesystem == 'obs': + CONFIG.update({ + 'fs.obs.impl': 'org.apache.hadoop.fs.obs.OBSFileSystem', + 'fs.AbstractFileSystem.obs.impl': 'org.apache.hadoop.fs.obs.OBS', + 'fs.obs.access.key': '${OBS_ACCESS_KEY}', + 'fs.obs.secret.key': '${OBS_SECRET_KEY}', + 'fs.obs.endpoint': '${OBS_ENDPOINT}', + }) + if target_filesystem == 'ozone': CONFIG.update({'fs.ofs.impl': 'org.apache.hadoop.fs.ozone.RootedOzoneFileSystem'}) diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py index b88b629b2..0a605a0d7 100644 --- a/tests/common/impala_test_suite.py +++ b/tests/common/impala_test_suite.py @@ -70,6 +70,7 @@ from tests.util.filesystem_utils import ( IS_GCS, IS_COS, IS_OSS, + IS_OBS, IS_HDFS, S3_BUCKET_NAME, S3GUARD_ENABLED, @@ -275,6 +276,9 @@ class ImpalaTestSuite(BaseTestSuite): elif IS_OSS: # OSS is implemented via HDFS command line client cls.filesystem_client = HadoopFsCommandLineClient("OSS") + elif IS_OBS: + # OBS is implemented via HDFS command line client + cls.filesystem_client = HadoopFsCommandLineClient("OBS") elif IS_OZONE: cls.filesystem_client = HadoopFsCommandLineClient("Ozone") @@ -1067,7 +1071,7 @@ class ImpalaTestSuite(BaseTestSuite): # If 'skip_hbase' is specified or the filesystem is isilon, s3, GCS(gs), COS(cosn) or # local, we don't need the hbase dimension. if pytest.config.option.skip_hbase or TARGET_FILESYSTEM.lower() \ - in ['s3', 'isilon', 'local', 'abfs', 'adls', 'gs', 'cosn', 'ozone']: + in ['s3', 'isilon', 'local', 'abfs', 'adls', 'gs', 'cosn', 'ozone', 'obs']: for tf_dimension in tf_dimensions: if tf_dimension.value.file_format == "hbase": tf_dimensions.remove(tf_dimension) diff --git a/tests/common/skip.py b/tests/common/skip.py index 863ed0a26..b908ea9e8 100644 --- a/tests/common/skip.py +++ b/tests/common/skip.py @@ -34,6 +34,7 @@ from tests.util.filesystem_utils import ( IS_GCS, IS_COS, IS_OSS, + IS_OBS, IS_EC, IS_HDFS, IS_ISILON, @@ -62,9 +63,9 @@ class SkipIfFS: reason="Empty directories are not supported on S3") file_or_folder_name_ends_with_period = pytest.mark.skipif(IS_ABFS, reason="ABFS does not support file / directories that end with a period") - stress_insert_timeouts = pytest.mark.skipif(IS_COS or IS_GCS or IS_OSS, + stress_insert_timeouts = pytest.mark.skipif(IS_COS or IS_GCS or IS_OSS or IS_OBS, reason="IMPALA-10563, IMPALA-10773") - shutdown_idle_fails = pytest.mark.skipif(IS_COS or IS_GCS or IS_OSS, + shutdown_idle_fails = pytest.mark.skipif(IS_COS or IS_GCS or IS_OSS or IS_OBS, reason="IMPALA-10562") late_filters = pytest.mark.skipif(IS_ISILON, reason="IMPALA-6998") read_past_eof = pytest.mark.skipif(IS_S3 or IS_GCS or (IS_OZONE and IS_EC), @@ -81,9 +82,9 @@ class SkipIfFS: reason="Tests rely on HDFS qualified paths, IMPALA-1872") no_partial_listing = pytest.mark.skipif(not IS_HDFS, reason="Tests rely on HDFS partial listing.") - variable_listing_times = pytest.mark.skipif(IS_S3 or IS_GCS or IS_COS or IS_OSS, - reason="Flakiness due to unpredictable listing times on S3.") - eventually_consistent = pytest.mark.skipif(IS_ADLS or IS_COS or IS_OSS, + variable_listing_times = pytest.mark.skipif(IS_S3 or IS_GCS or IS_COS or IS_OSS + or IS_OBS, reason="Flakiness due to unpredictable listing times on S3.") + eventually_consistent = pytest.mark.skipif(IS_ADLS or IS_COS or IS_OSS or IS_OBS, reason="The client is slow to realize changes to file metadata") class SkipIfKudu: diff --git a/tests/custom_cluster/test_metastore_service.py b/tests/custom_cluster/test_metastore_service.py index 05c7a0895..62a621a73 100644 --- a/tests/custom_cluster/test_metastore_service.py +++ b/tests/custom_cluster/test_metastore_service.py @@ -29,8 +29,7 @@ from hive_metastore.ttypes import SerDeInfo from tests.util.event_processor_utils import EventProcessorUtils from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.impala_test_suite import ImpalaTestSuite -from tests.util.filesystem_utils import (IS_S3, IS_ADLS, IS_GCS, IS_COS, IS_OSS) - +from tests.util.filesystem_utils import IS_HDFS, IS_OZONE class TestMetastoreService(CustomClusterTestSuite): """ @@ -1210,9 +1209,9 @@ class TestMetastoreService(CustomClusterTestSuite): test_part_names = list(part_names) if expect_files: assert get_parts_by_names_result.dictionary is not None - # obj_dict will only be populated when the table is on HDFS + # obj_dict will only be populated when the table is on HDFS or OZONE # where block locations are available. - if not IS_S3 and not IS_GCS and not IS_COS and not IS_ADLS and not IS_OSS: + if IS_HDFS or IS_OZONE: assert len(get_parts_by_names_result.dictionary.values) > 0 else: assert get_parts_by_names_result.dictionary is None @@ -1238,9 +1237,9 @@ class TestMetastoreService(CustomClusterTestSuite): assert filemetadata is not None assert filemetadata.data is not None assert obj_dict is not None - # obj_dict will only be populated when the table is on HDFS + # obj_dict will only be populated when the table is on HDFS or OZONE # where block locations are available. - if not IS_S3 and not IS_GCS and not IS_COS and not IS_ADLS and not IS_OSS: + if IS_HDFS or IS_OZONE: assert len(obj_dict.values) > 0 def __assert_no_filemd(self, filemetadata, obj_dict): diff --git a/tests/util/filesystem_utils.py b/tests/util/filesystem_utils.py index 7444db7e8..fe13a807e 100644 --- a/tests/util/filesystem_utils.py +++ b/tests/util/filesystem_utils.py @@ -34,6 +34,7 @@ IS_ABFS = FILESYSTEM == "abfs" IS_GCS = FILESYSTEM == "gs" IS_COS = FILESYSTEM == "cosn" IS_OSS = FILESYSTEM == "oss" +IS_OBS = FILESYSTEM == "obs" IS_OZONE = FILESYSTEM == "ozone" IS_EC = os.getenv("ERASURE_CODING") == "true" IS_ENCRYPTED = os.getenv("USE_OZONE_ENCRYPTION") == "true" @@ -61,7 +62,8 @@ ADLS_CLIENT_SECRET = os.getenv("azure_client_secret") # A map of FILESYSTEM values to their corresponding Scan Node types fs_to_name = {'s3': 'S3', 'hdfs': 'HDFS', 'local': 'LOCAL', 'adls': 'ADLS', - 'abfs': 'ADLS', 'gs': 'GCS', 'cosn': 'COS', 'ozone': 'OZONE', 'oss': 'OSS'} + 'abfs': 'ADLS', 'gs': 'GCS', 'cosn': 'COS', 'ozone': 'OZONE', + 'oss': 'OSS', 'obs': 'OBS'} def get_fs_name(fs):