mirror of
https://github.com/apache/impala.git
synced 2025-12-25 02:03:09 -05:00
IMPALA-7681. Add Azure Blob File System (ADLS Gen2) support.
HADOOP-15407 adds a new FileSystem implementation called "ABFS" for the ADLS Gen2 service. It's in the hadoop-azure module as a replacement for WASB. Filesystem semantics should be the same, so skipped tests and other behavior changes have simply mirrored what is done for ADLS Gen1 by default. Tests skipped on ADLS Gen1 due to eventual consistency of the Python client can be run against ADLS Gen2. Change-Id: I5120b071760e7655e78902dce8483f8f54de445d Reviewed-on: http://gerrit.cloudera.org:8080/11630 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
committed by
Impala Public Jenkins
parent
0340a153ce
commit
7a022cf36a
@@ -378,6 +378,7 @@ Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state,
|
||||
}
|
||||
|
||||
if (IsS3APath(output_partition->current_file_name.c_str()) ||
|
||||
IsABFSPath(output_partition->current_file_name.c_str()) ||
|
||||
IsADLSPath(output_partition->current_file_name.c_str())) {
|
||||
// On S3A, the file cannot be stat'ed until after it's closed, and even so, the block
|
||||
// size reported will be just the filesystem default. Similarly, the block size
|
||||
|
||||
@@ -51,6 +51,7 @@ DECLARE_int64(min_buffer_size);
|
||||
DECLARE_int32(num_remote_hdfs_io_threads);
|
||||
DECLARE_int32(num_s3_io_threads);
|
||||
DECLARE_int32(num_adls_io_threads);
|
||||
DECLARE_int32(num_abfs_io_threads);
|
||||
#ifndef NDEBUG
|
||||
DECLARE_int32(stress_disk_read_delay_ms);
|
||||
#endif
|
||||
@@ -1565,7 +1566,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
|
||||
TEST_F(DiskIoMgrTest, VerifyNumThreadsParameter) {
|
||||
InitRootReservation(LARGE_RESERVATION_LIMIT);
|
||||
const int num_io_threads_for_remote_disks = FLAGS_num_remote_hdfs_io_threads
|
||||
+ FLAGS_num_s3_io_threads + FLAGS_num_adls_io_threads;
|
||||
+ FLAGS_num_s3_io_threads + FLAGS_num_adls_io_threads + FLAGS_num_abfs_io_threads;
|
||||
|
||||
// Verify num_io_threads_per_rotational_disk and num_io_threads_per_solid_state_disk.
|
||||
// Since we do not have control over which disk is used, we check for either type
|
||||
|
||||
@@ -83,6 +83,8 @@ DEFINE_int32(num_remote_hdfs_io_threads, 8, "Number of remote HDFS I/O threads")
|
||||
// open to S3 and use of multiple CPU cores since S3 reads are relatively compute
|
||||
// expensive (SSL and JNI buffer overheads).
|
||||
DEFINE_int32(num_s3_io_threads, 16, "Number of S3 I/O threads");
|
||||
// The maximum number of ABFS I/O threads. TODO: choose the default empirically.
|
||||
DEFINE_int32(num_abfs_io_threads, 16, "Number of ABFS I/O threads");
|
||||
// The maximum number of ADLS I/O threads. This number is a good default to have for
|
||||
// clusters that may vary widely in size, due to an undocumented concurrency limit
|
||||
// enforced by ADLS for a cluster, which spans between 500-700. For smaller clusters
|
||||
@@ -233,6 +235,9 @@ Status DiskIoMgr::Init() {
|
||||
} else if (i == RemoteS3DiskId()) {
|
||||
num_threads_per_disk = FLAGS_num_s3_io_threads;
|
||||
device_name = "S3 remote";
|
||||
} else if (i == RemoteAbfsDiskId()) {
|
||||
num_threads_per_disk = FLAGS_num_abfs_io_threads;
|
||||
device_name = "ABFS remote";
|
||||
} else if (i == RemoteAdlsDiskId()) {
|
||||
num_threads_per_disk = FLAGS_num_adls_io_threads;
|
||||
device_name = "ADLS remote";
|
||||
@@ -457,10 +462,12 @@ int DiskIoMgr::AssignQueue(const char* file, int disk_id, bool expected_local) {
|
||||
return RemoteDfsDiskId();
|
||||
}
|
||||
if (IsS3APath(file)) return RemoteS3DiskId();
|
||||
if (IsABFSPath(file)) return RemoteAbfsDiskId();
|
||||
if (IsADLSPath(file)) return RemoteAdlsDiskId();
|
||||
}
|
||||
// Assign to a local disk queue.
|
||||
DCHECK(!IsS3APath(file)); // S3 is always remote.
|
||||
DCHECK(!IsABFSPath(file)); // ABFS is always remote.
|
||||
DCHECK(!IsADLSPath(file)); // ADLS is always remote.
|
||||
if (disk_id == -1) {
|
||||
// disk id is unknown, assign it an arbitrary one.
|
||||
|
||||
@@ -286,6 +286,9 @@ class DiskIoMgr : public CacheLineAligned {
|
||||
/// The disk ID (and therefore disk_queues_ index) used for S3 accesses.
|
||||
int RemoteS3DiskId() const { return num_local_disks() + REMOTE_S3_DISK_OFFSET; }
|
||||
|
||||
/// The disk ID (and therefore disk_queues_ index) used for ABFS accesses.
|
||||
int RemoteAbfsDiskId() const { return num_local_disks() + REMOTE_ABFS_DISK_OFFSET; }
|
||||
|
||||
/// The disk ID (and therefore disk_queues_ index) used for ADLS accesses.
|
||||
int RemoteAdlsDiskId() const { return num_local_disks() + REMOTE_ADLS_DISK_OFFSET; }
|
||||
|
||||
@@ -335,6 +338,7 @@ class DiskIoMgr : public CacheLineAligned {
|
||||
REMOTE_DFS_DISK_OFFSET = 0,
|
||||
REMOTE_S3_DISK_OFFSET,
|
||||
REMOTE_ADLS_DISK_OFFSET,
|
||||
REMOTE_ABFS_DISK_OFFSET,
|
||||
REMOTE_NUM_DISKS
|
||||
};
|
||||
|
||||
|
||||
@@ -29,11 +29,13 @@ using namespace impala;
|
||||
using namespace impala::io;
|
||||
|
||||
// TODO: Run perf tests and empirically settle on the most optimal default value for the
|
||||
// read buffer size. Currently setting it as 128k for the same reason as for S3, i.e.
|
||||
// read buffer sizes. Currently setting them as 128k for the same reason as for S3, i.e.
|
||||
// due to JNI array allocation and memcpy overhead, 128k was emperically found to have the
|
||||
// least overhead.
|
||||
DEFINE_int64(adls_read_chunk_size, 128 * 1024, "The maximum read chunk size to use when "
|
||||
"reading from ADLS.");
|
||||
DEFINE_int64(abfs_read_chunk_size, 128 * 1024, "The maximum read chunk size to use when "
|
||||
"reading from ABFS.");
|
||||
|
||||
// Implementation of the ScanRange functionality. Each ScanRange contains a queue
|
||||
// of ready buffers. For each ScanRange, there is only a single producer and
|
||||
@@ -554,6 +556,10 @@ int64_t ScanRange::MaxReadChunkSize() const {
|
||||
DCHECK(IsADLSPath(file()));
|
||||
return FLAGS_adls_read_chunk_size;
|
||||
}
|
||||
if (disk_id_ == io_mgr_->RemoteAbfsDiskId()) {
|
||||
DCHECK(IsABFSPath(file()));
|
||||
return FLAGS_abfs_read_chunk_size;
|
||||
}
|
||||
// The length argument of hdfsRead() is an int. Ensure we don't overflow it.
|
||||
return numeric_limits<int>::max();
|
||||
}
|
||||
|
||||
@@ -86,6 +86,14 @@ bool IsS3APath(const char* path) {
|
||||
return strncmp(path, "s3a://", 6) == 0;
|
||||
}
|
||||
|
||||
bool IsABFSPath(const char* path) {
|
||||
if (strstr(path, ":/") == NULL) {
|
||||
return ExecEnv::GetInstance()->default_fs().compare(0, 7, "abfs://") == 0 ||
|
||||
ExecEnv::GetInstance()->default_fs().compare(0, 8, "abfss://") == 0;
|
||||
}
|
||||
return strncmp(path, "abfs://", 7) == 0 || strncmp(path, "abfss://", 8) == 0;
|
||||
}
|
||||
|
||||
bool IsADLSPath(const char* path) {
|
||||
if (strstr(path, ":/") == NULL) {
|
||||
return ExecEnv::GetInstance()->default_fs().compare(0, 6, "adl://") == 0;
|
||||
|
||||
@@ -50,6 +50,9 @@ bool IsHdfsPath(const char* path);
|
||||
/// Returns true iff the path refers to a location on an S3A filesystem.
|
||||
bool IsS3APath(const char* path);
|
||||
|
||||
/// Returns true iff the path refers to a location on an ABFS filesystem.
|
||||
bool IsABFSPath(const char* path);
|
||||
|
||||
/// Returns true iff the path refers to a location on an ADL filesystem.
|
||||
bool IsADLSPath(const char* path);
|
||||
|
||||
|
||||
@@ -263,6 +263,8 @@ export azure_tenant_id="${azure_tenant_id-DummyAdlsTenantId}"
|
||||
export azure_client_id="${azure_client_id-DummyAdlsClientId}"
|
||||
export azure_client_secret="${azure_client_secret-DummyAdlsClientSecret}"
|
||||
export azure_data_lake_store_name="${azure_data_lake_store_name-}"
|
||||
export azure_storage_account_name="${azure_storage_account_name-}"
|
||||
export azure_storage_container_name="${azure_storage_container_name-}"
|
||||
export HDFS_REPLICATION="${HDFS_REPLICATION-3}"
|
||||
export ISILON_NAMENODE="${ISILON_NAMENODE-}"
|
||||
export DEFAULT_FS="${DEFAULT_FS-hdfs://localhost:20500}"
|
||||
@@ -352,6 +354,28 @@ elif [ "${TARGET_FILESYSTEM}" = "adls" ]; then
|
||||
fi
|
||||
DEFAULT_FS="adl://${azure_data_lake_store_name}.azuredatalakestore.net"
|
||||
export DEFAULT_FS
|
||||
elif [ "${TARGET_FILESYSTEM}" = "abfs" ]; then
|
||||
# ABFS is also known as ADLS Gen2, and they can share credentials
|
||||
# Basic error checking
|
||||
if [[ "${azure_client_id}" = "DummyAdlsClientId" ||\
|
||||
"${azure_tenant_id}" = "DummyAdlsTenantId" ||\
|
||||
"${azure_client_secret}" = "DummyAdlsClientSecret" ]]; then
|
||||
echo "All 3 of the following need to be assigned valid values and belong
|
||||
to the owner of the Azure storage account in order to access the
|
||||
filesystem: azure_client_id, azure_tenant_id, azure_client_secret."
|
||||
return 1
|
||||
fi
|
||||
if [[ "${azure_storage_account_name}" = "" ]]; then
|
||||
echo "azure_storage_account_name cannot be an empty string for ABFS"
|
||||
return 1
|
||||
fi
|
||||
if [[ "${azure_storage_container_name}" = "" ]]; then
|
||||
echo "azure_storage_container_name cannot be an empty string for ABFS"
|
||||
return 1
|
||||
fi
|
||||
domain="${azure_storage_account_name}.dfs.core.windows.net"
|
||||
DEFAULT_FS="abfss://${azure_storage_container_name}@${domain}"
|
||||
export DEFAULT_FS
|
||||
elif [ "${TARGET_FILESYSTEM}" = "isilon" ]; then
|
||||
if [ "${ISILON_NAMENODE}" = "" ]; then
|
||||
echo "In order to access the Isilon filesystem, ISILON_NAMENODE"
|
||||
|
||||
@@ -86,6 +86,12 @@ under the License.
|
||||
<version>${hadoop.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-azure</artifactId>
|
||||
<version>${hadoop.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-azure-datalake</artifactId>
|
||||
|
||||
@@ -24,6 +24,8 @@ import java.util.List;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.adl.AdlFileSystem;
|
||||
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
|
||||
import org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
@@ -143,9 +145,11 @@ public class LoadDataStmt extends StatementBase {
|
||||
Path source = sourceDataPath_.getPath();
|
||||
FileSystem fs = source.getFileSystem(FileSystemUtil.getConfiguration());
|
||||
if (!(fs instanceof DistributedFileSystem) && !(fs instanceof S3AFileSystem) &&
|
||||
!(fs instanceof AzureBlobFileSystem) &&
|
||||
!(fs instanceof SecureAzureBlobFileSystem) &&
|
||||
!(fs instanceof AdlFileSystem)) {
|
||||
throw new AnalysisException(String.format("INPATH location '%s' " +
|
||||
"must point to an HDFS, S3A or ADL filesystem.", sourceDataPath_));
|
||||
"must point to an HDFS, S3A, ADL or ABFS filesystem.", sourceDataPath_));
|
||||
}
|
||||
if (!fs.exists(source)) {
|
||||
throw new AnalysisException(String.format(
|
||||
@@ -157,7 +161,8 @@ public class LoadDataStmt extends StatementBase {
|
||||
// its parent directory (in order to delete the file as part of the move operation).
|
||||
FsPermissionChecker checker = FsPermissionChecker.getInstance();
|
||||
// TODO: Disable permission checking for S3A as well (HADOOP-13892)
|
||||
boolean shouldCheckPerms = !(fs instanceof AdlFileSystem);
|
||||
boolean shouldCheckPerms = !(fs instanceof AdlFileSystem ||
|
||||
fs instanceof AzureBlobFileSystem || fs instanceof SecureAzureBlobFileSystem);
|
||||
|
||||
if (fs.isDirectory(source)) {
|
||||
if (FileSystemUtil.getTotalNumVisibleFiles(source) == 0) {
|
||||
|
||||
@@ -918,6 +918,7 @@ public class HdfsTable extends Table implements FeFsTable {
|
||||
// behavior. So ADLS ACLs are unsupported until the connector is able to map
|
||||
// permissions to hadoop users/groups (HADOOP-14437).
|
||||
if (FileSystemUtil.isADLFileSystem(fs)) return TAccessLevel.READ_WRITE;
|
||||
if (FileSystemUtil.isABFSFileSystem(fs)) return TAccessLevel.READ_WRITE;
|
||||
|
||||
while (location != null) {
|
||||
try {
|
||||
|
||||
@@ -33,6 +33,8 @@ import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
|
||||
import org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem;
|
||||
import org.apache.hadoop.fs.adl.AdlFileSystem;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.client.HdfsAdmin;
|
||||
@@ -306,6 +308,7 @@ public class FileSystemUtil {
|
||||
if (isDistributedFileSystem(fs)) return true;
|
||||
// Blacklist FileSystems that are known to not to include storage UUIDs.
|
||||
return !(fs instanceof S3AFileSystem || fs instanceof LocalFileSystem ||
|
||||
fs instanceof AzureBlobFileSystem || fs instanceof SecureAzureBlobFileSystem ||
|
||||
fs instanceof AdlFileSystem);
|
||||
}
|
||||
|
||||
@@ -337,6 +340,26 @@ public class FileSystemUtil {
|
||||
return isADLFileSystem(path.getFileSystem(CONF));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true iff the filesystem is AzureBlobFileSystem or
|
||||
* SecureAzureBlobFileSystem. This function is unique in that there are 2
|
||||
* distinct classes it checks for, but the ony functional difference is the
|
||||
* use of wire encryption. Some features like OAuth authentication do require
|
||||
* wire encryption but that does not matter in usages of this function.
|
||||
*/
|
||||
public static boolean isABFSFileSystem(FileSystem fs) {
|
||||
return fs instanceof AzureBlobFileSystem
|
||||
|| fs instanceof SecureAzureBlobFileSystem;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true iff the path is on AzureBlobFileSystem or
|
||||
* SecureAzureBlobFileSystem.
|
||||
*/
|
||||
public static boolean isABFSFileSystem(Path path) throws IOException {
|
||||
return isABFSFileSystem(path.getFileSystem(CONF));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true iff the filesystem is an instance of LocalFileSystem.
|
||||
*/
|
||||
@@ -458,6 +481,7 @@ public class FileSystemUtil {
|
||||
return (FileSystemUtil.isDistributedFileSystem(path) ||
|
||||
FileSystemUtil.isLocalFileSystem(path) ||
|
||||
FileSystemUtil.isS3AFileSystem(path) ||
|
||||
FileSystemUtil.isABFSFileSystem(path) ||
|
||||
FileSystemUtil.isADLFileSystem(path));
|
||||
}
|
||||
|
||||
|
||||
@@ -32,6 +32,8 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
|
||||
import org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem;
|
||||
import org.apache.hadoop.fs.adl.AdlFileSystem;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
@@ -799,6 +801,8 @@ public class JniFrontend {
|
||||
FileSystem fs = FileSystem.get(CONF);
|
||||
if (!(fs instanceof DistributedFileSystem ||
|
||||
fs instanceof S3AFileSystem ||
|
||||
fs instanceof AzureBlobFileSystem ||
|
||||
fs instanceof SecureAzureBlobFileSystem ||
|
||||
fs instanceof AdlFileSystem)) {
|
||||
return "Currently configured default filesystem: " +
|
||||
fs.getClass().getSimpleName() + ". " +
|
||||
|
||||
@@ -3204,7 +3204,7 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
|
||||
AnalysisError(String.format("load data inpath '%s' %s into table " +
|
||||
"tpch.lineitem", "file:///test-warehouse/test.out", overwrite),
|
||||
"INPATH location 'file:/test-warehouse/test.out' must point to an " +
|
||||
"HDFS, S3A or ADL filesystem.");
|
||||
"HDFS, S3A, ADL or ABFS filesystem.");
|
||||
|
||||
// File type / table type mismatch.
|
||||
AnalyzesOk(String.format("load data inpath '%s' %s into table " +
|
||||
|
||||
@@ -110,6 +110,31 @@ DEFAULT</value>
|
||||
<value>https://login.windows.net/${azure_tenant_id}/oauth2/token</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.azure.account.auth.type</name>
|
||||
<value>OAuth</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.azure.account.oauth.provider.type</name>
|
||||
<value>org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.azure.account.oauth2.client.id</name>
|
||||
<value>${azure_client_id}</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.azure.account.oauth2.client.secret</name>
|
||||
<value>${azure_client_secret}</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.azure.account.oauth2.client.endpoint</name>
|
||||
<value>https://login.microsoftonline.com/${azure_tenant_id}/oauth2/token</value>
|
||||
</property>
|
||||
|
||||
<!-- This property can be used in tests to ascertain that this core-site.xml from
|
||||
the classpath has been loaded. (Ex: TestRequestPoolService) -->
|
||||
<property>
|
||||
|
||||
@@ -56,6 +56,7 @@ from tests.performance.query_exec_functions import execute_using_jdbc
|
||||
from tests.performance.query_executor import JdbcQueryExecConfig
|
||||
from tests.util.filesystem_utils import (
|
||||
IS_S3,
|
||||
IS_ABFS,
|
||||
IS_ADLS,
|
||||
S3_BUCKET_NAME,
|
||||
ADLS_STORE_NAME,
|
||||
@@ -67,6 +68,7 @@ from tests.util.hdfs_util import (
|
||||
get_hdfs_client_from_conf,
|
||||
NAMENODE)
|
||||
from tests.util.s3_util import S3Client
|
||||
from tests.util.abfs_util import ABFSClient
|
||||
from tests.util.test_file_parser import (
|
||||
QueryTestSectionReader,
|
||||
parse_query_test_file,
|
||||
@@ -162,6 +164,8 @@ class ImpalaTestSuite(BaseTestSuite):
|
||||
cls.filesystem_client = cls.hdfs_client
|
||||
if IS_S3:
|
||||
cls.filesystem_client = S3Client(S3_BUCKET_NAME)
|
||||
elif IS_ABFS:
|
||||
cls.filesystem_client = ABFSClient()
|
||||
elif IS_ADLS:
|
||||
cls.filesystem_client = ADLSClient(ADLS_STORE_NAME)
|
||||
|
||||
@@ -789,7 +793,7 @@ class ImpalaTestSuite(BaseTestSuite):
|
||||
# If 'skip_hbase' is specified or the filesystem is isilon, s3 or local, we don't
|
||||
# need the hbase dimension.
|
||||
if pytest.config.option.skip_hbase or TARGET_FILESYSTEM.lower() \
|
||||
in ['s3', 'isilon', 'local', 'adls']:
|
||||
in ['s3', 'isilon', 'local', 'abfs', 'adls']:
|
||||
for tf_dimension in tf_dimensions:
|
||||
if tf_dimension.value.file_format == "hbase":
|
||||
tf_dimensions.remove(tf_dimension)
|
||||
|
||||
@@ -26,6 +26,7 @@ from functools import partial
|
||||
|
||||
from tests.common.environ import IMPALAD_BUILD
|
||||
from tests.util.filesystem_utils import (
|
||||
IS_ABFS,
|
||||
IS_ADLS,
|
||||
IS_EC,
|
||||
IS_HDFS,
|
||||
@@ -55,6 +56,27 @@ class SkipIfS3:
|
||||
qualified_path = pytest.mark.skipif(IS_S3,
|
||||
reason="Tests rely on HDFS qualified paths, IMPALA-1872")
|
||||
|
||||
|
||||
class SkipIfABFS:
|
||||
|
||||
# These ones are skipped due to product limitations.
|
||||
caching = pytest.mark.skipif(IS_ABFS, reason="SET CACHED not implemented for ABFS")
|
||||
hive = pytest.mark.skipif(IS_ABFS, reason="Hive doesn't work with ABFS")
|
||||
hdfs_block_size = pytest.mark.skipif(IS_ABFS, reason="ABFS uses it's own block size")
|
||||
hdfs_acls = pytest.mark.skipif(IS_ABFS, reason="HDFS acls are not supported on ABFS")
|
||||
jira = partial(pytest.mark.skipif, IS_ABFS)
|
||||
hdfs_encryption = pytest.mark.skipif(IS_ABFS,
|
||||
reason="HDFS encryption is not supported with ABFS")
|
||||
trash = pytest.mark.skipif(IS_ABFS,
|
||||
reason="Drop/purge not working as expected on ABFS, IMPALA-7726")
|
||||
|
||||
# These ones need test infra work to re-enable.
|
||||
udfs = pytest.mark.skipif(IS_ABFS, reason="udas/udfs not copied to ABFS")
|
||||
datasrc = pytest.mark.skipif(IS_ABFS, reason="data sources not copied to ABFS")
|
||||
hbase = pytest.mark.skipif(IS_ABFS, reason="HBase not started with ABFS")
|
||||
qualified_path = pytest.mark.skipif(IS_ABFS,
|
||||
reason="Tests rely on HDFS qualified paths, IMPALA-1872")
|
||||
|
||||
class SkipIfADLS:
|
||||
|
||||
# These ones are skipped due to product limitations.
|
||||
|
||||
@@ -36,6 +36,7 @@ from tests.common.impala_test_suite import ImpalaTestSuite
|
||||
from tests.common.resource_pool_config import ResourcePoolConfig
|
||||
from tests.common.skip import (
|
||||
SkipIfS3,
|
||||
SkipIfABFS,
|
||||
SkipIfADLS,
|
||||
SkipIfEC,
|
||||
SkipIfNotHdfsMinicluster)
|
||||
@@ -439,6 +440,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
|
||||
".* is greater than pool max mem resources 10.00 MB", str(ex))
|
||||
|
||||
@SkipIfS3.hdfs_block_size
|
||||
@SkipIfABFS.hdfs_block_size
|
||||
@SkipIfADLS.hdfs_block_size
|
||||
@SkipIfEC.fix_later
|
||||
@pytest.mark.execute_serially
|
||||
|
||||
@@ -22,6 +22,7 @@ from tests.common.skip import SkipIfLocal, SkipIfEC
|
||||
from tests.util.filesystem_utils import (
|
||||
IS_ISILON,
|
||||
IS_S3,
|
||||
IS_ABFS,
|
||||
IS_ADLS)
|
||||
from time import sleep
|
||||
|
||||
@@ -132,7 +133,8 @@ class TestHdfsFdCaching(CustomClusterTestSuite):
|
||||
|
||||
# Caching only applies to local HDFS files. If this is local HDFS, then verify
|
||||
# that caching works. Otherwise, verify that file handles are not cached.
|
||||
if IS_S3 or IS_ADLS or IS_ISILON or pytest.config.option.testing_remote_cluster:
|
||||
if IS_S3 or IS_ABFS or IS_ADLS or IS_ISILON or \
|
||||
pytest.config.option.testing_remote_cluster:
|
||||
caching_expected = False
|
||||
else:
|
||||
caching_expected = True
|
||||
@@ -148,7 +150,8 @@ class TestHdfsFdCaching(CustomClusterTestSuite):
|
||||
handle_timeout = 5
|
||||
|
||||
# Only test eviction on platforms where caching is enabled.
|
||||
if IS_S3 or IS_ADLS or IS_ISILON or pytest.config.option.testing_remote_cluster:
|
||||
if IS_S3 or IS_ABFS or IS_ADLS or IS_ISILON or \
|
||||
pytest.config.option.testing_remote_cluster:
|
||||
return
|
||||
caching_expected = True
|
||||
self.run_fd_caching_test(vector, caching_expected, cache_capacity, handle_timeout)
|
||||
|
||||
@@ -18,13 +18,14 @@
|
||||
import pytest
|
||||
|
||||
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
||||
from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfLocal
|
||||
from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfLocal
|
||||
from tests.util.filesystem_utils import IS_ISILON, WAREHOUSE
|
||||
from tests.util.hdfs_util import HdfsConfig, get_hdfs_client, get_hdfs_client_from_conf
|
||||
|
||||
TEST_TBL = "insert_inherit_permission"
|
||||
|
||||
@SkipIfS3.hdfs_acls
|
||||
@SkipIfABFS.hdfs_acls
|
||||
@SkipIfADLS.hdfs_acls
|
||||
class TestInsertBehaviourCustomCluster(CustomClusterTestSuite):
|
||||
|
||||
|
||||
@@ -22,12 +22,14 @@ from tests.common.environ import specific_build_type_timeout
|
||||
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
||||
from tests.common.skip import (
|
||||
SkipIfS3,
|
||||
SkipIfABFS,
|
||||
SkipIfADLS,
|
||||
SkipIfIsilon,
|
||||
SkipIfLocal)
|
||||
from tests.util.hive_utils import HiveDbWrapper
|
||||
|
||||
@SkipIfS3.hive
|
||||
@SkipIfABFS.hive
|
||||
@SkipIfADLS.hive
|
||||
@SkipIfIsilon.hive
|
||||
@SkipIfLocal.hive
|
||||
|
||||
@@ -24,7 +24,7 @@ import string
|
||||
import subprocess
|
||||
|
||||
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
||||
from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon
|
||||
from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon
|
||||
|
||||
class TestParquetMaxPageHeader(CustomClusterTestSuite):
|
||||
'''This tests large page headers in parquet files. Parquet page header size can
|
||||
@@ -92,6 +92,7 @@ class TestParquetMaxPageHeader(CustomClusterTestSuite):
|
||||
put.wait()
|
||||
|
||||
@SkipIfS3.hive
|
||||
@SkipIfABFS.hive
|
||||
@SkipIfADLS.hive
|
||||
@SkipIfIsilon.hive
|
||||
@pytest.mark.execute_serially
|
||||
|
||||
@@ -24,7 +24,7 @@ import subprocess
|
||||
|
||||
from tempfile import mkdtemp
|
||||
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
||||
from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal
|
||||
from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
|
||||
from tests.common.test_dimensions import create_uncompressed_text_dimension
|
||||
from tests.util.filesystem_utils import get_fs_path
|
||||
|
||||
@@ -162,6 +162,7 @@ class TestUdfPersistence(CustomClusterTestSuite):
|
||||
|
||||
@SkipIfIsilon.hive
|
||||
@SkipIfS3.hive
|
||||
@SkipIfABFS.hive
|
||||
@SkipIfADLS.hive
|
||||
@SkipIfLocal.hive
|
||||
@pytest.mark.execute_serially
|
||||
@@ -183,6 +184,7 @@ class TestUdfPersistence(CustomClusterTestSuite):
|
||||
|
||||
@SkipIfIsilon.hive
|
||||
@SkipIfS3.hive
|
||||
@SkipIfABFS.hive
|
||||
@SkipIfADLS.hive
|
||||
@SkipIfLocal.hive
|
||||
@pytest.mark.execute_serially
|
||||
@@ -246,6 +248,8 @@ class TestUdfPersistence(CustomClusterTestSuite):
|
||||
|
||||
@SkipIfIsilon.hive
|
||||
@SkipIfS3.hive
|
||||
@SkipIfABFS.hive
|
||||
@SkipIfADLS.hive
|
||||
@SkipIfLocal.hive
|
||||
@pytest.mark.execute_serially
|
||||
@CustomClusterTestSuite.with_args(
|
||||
@@ -307,6 +311,8 @@ class TestUdfPersistence(CustomClusterTestSuite):
|
||||
|
||||
@SkipIfIsilon.hive
|
||||
@SkipIfS3.hive
|
||||
@SkipIfABFS.hive
|
||||
@SkipIfADLS.hive
|
||||
@SkipIfLocal.hive
|
||||
@pytest.mark.execute_serially
|
||||
@CustomClusterTestSuite.with_args(
|
||||
|
||||
@@ -25,7 +25,7 @@ import subprocess
|
||||
|
||||
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
|
||||
from tests.common.impala_test_suite import ImpalaTestSuite
|
||||
from tests.common.skip import SkipIf, SkipIfS3, SkipIfADLS, SkipIfLocal
|
||||
from tests.common.skip import SkipIf, SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfLocal
|
||||
from tests.common.test_dimensions import create_exec_option_dimension
|
||||
|
||||
class TestDataErrors(ImpalaTestSuite):
|
||||
@@ -106,6 +106,7 @@ class TestHdfsUnknownErrors(ImpalaTestSuite):
|
||||
assert "Safe mode is OFF" in output
|
||||
|
||||
@SkipIfS3.qualified_path
|
||||
@SkipIfABFS.qualified_path
|
||||
@SkipIfADLS.qualified_path
|
||||
class TestHdfsScanNodeErrors(TestDataErrors):
|
||||
@classmethod
|
||||
@@ -124,6 +125,7 @@ class TestHdfsScanNodeErrors(TestDataErrors):
|
||||
self.run_test_case('DataErrorsTest/hdfs-scan-node-errors', vector)
|
||||
|
||||
@SkipIfS3.qualified_path
|
||||
@SkipIfABFS.qualified_path
|
||||
@SkipIfADLS.qualified_path
|
||||
@SkipIfLocal.qualified_path
|
||||
class TestHdfsSeqScanNodeErrors(TestHdfsScanNodeErrors):
|
||||
@@ -139,6 +141,7 @@ class TestHdfsSeqScanNodeErrors(TestHdfsScanNodeErrors):
|
||||
|
||||
|
||||
@SkipIfS3.qualified_path
|
||||
@SkipIfABFS.qualified_path
|
||||
@SkipIfADLS.qualified_path
|
||||
class TestHdfsRcFileScanNodeErrors(TestHdfsScanNodeErrors):
|
||||
@classmethod
|
||||
|
||||
@@ -26,7 +26,8 @@ from time import sleep
|
||||
|
||||
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
|
||||
from tests.common.impala_test_suite import ImpalaTestSuite, LOG
|
||||
from tests.common.skip import SkipIf, SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal
|
||||
from tests.common.skip import SkipIf, SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, \
|
||||
SkipIfLocal
|
||||
from tests.common.test_dimensions import create_exec_option_dimension
|
||||
from tests.common.test_vector import ImpalaTestDimension
|
||||
|
||||
@@ -53,6 +54,7 @@ QUERIES = [
|
||||
|
||||
@SkipIf.skip_hbase # -skip_hbase argument specified
|
||||
@SkipIfS3.hbase # S3: missing coverage: failures
|
||||
@SkipIfABFS.hbase
|
||||
@SkipIfADLS.hbase
|
||||
@SkipIfIsilon.hbase # ISILON: missing coverage: failures.
|
||||
@SkipIfLocal.hbase
|
||||
|
||||
@@ -20,7 +20,7 @@ from subprocess import check_call
|
||||
|
||||
from tests.common.impala_cluster import ImpalaCluster
|
||||
from tests.common.impala_test_suite import ImpalaTestSuite
|
||||
from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal
|
||||
from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
|
||||
from tests.common.test_dimensions import (
|
||||
create_exec_option_dimension,
|
||||
create_single_exec_option_dimension,
|
||||
@@ -71,6 +71,7 @@ class TestComputeStats(ImpalaTestSuite):
|
||||
self.cleanup_db("parquet")
|
||||
|
||||
@SkipIfS3.hive
|
||||
@SkipIfABFS.hive
|
||||
@SkipIfADLS.hive
|
||||
@SkipIfIsilon.hive
|
||||
@SkipIfLocal.hive
|
||||
|
||||
@@ -25,7 +25,7 @@ from test_ddl_base import TestDdlBase
|
||||
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
|
||||
from tests.common.impala_test_suite import LOG
|
||||
from tests.common.parametrize import UniqueDatabase
|
||||
from tests.common.skip import SkipIf, SkipIfADLS, SkipIfLocal
|
||||
from tests.common.skip import SkipIf, SkipIfABFS, SkipIfADLS, SkipIfLocal
|
||||
from tests.common.test_dimensions import create_single_exec_option_dimension
|
||||
from tests.util.filesystem_utils import WAREHOUSE, IS_HDFS, IS_S3, IS_ADLS
|
||||
from tests.common.impala_cluster import ImpalaCluster
|
||||
@@ -33,6 +33,7 @@ from tests.common.impala_cluster import ImpalaCluster
|
||||
# Validates DDL statements (create, drop)
|
||||
class TestDdlStatements(TestDdlBase):
|
||||
@SkipIfLocal.hdfs_client
|
||||
@SkipIfABFS.trash
|
||||
def test_drop_table_with_purge(self, unique_database):
|
||||
"""This test checks if the table data is permamently deleted in
|
||||
DROP TABLE <tbl> PURGE queries"""
|
||||
@@ -425,6 +426,7 @@ class TestDdlStatements(TestDdlBase):
|
||||
use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector))
|
||||
|
||||
@SkipIfLocal.hdfs_client
|
||||
@SkipIfABFS.trash
|
||||
def test_drop_partition_with_purge(self, vector, unique_database):
|
||||
"""Verfies whether alter <tbl> drop partition purge actually skips trash"""
|
||||
self.client.execute(
|
||||
|
||||
@@ -19,7 +19,7 @@ import getpass
|
||||
import pytest
|
||||
|
||||
from tests.common.impala_test_suite import ImpalaTestSuite
|
||||
from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal
|
||||
from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
|
||||
from tests.common.test_dimensions import (
|
||||
create_single_exec_option_dimension,
|
||||
create_uncompressed_text_dimension)
|
||||
@@ -34,6 +34,7 @@ TMP_DIR = '/%s' % (PYWEBHDFS_TMP_DIR)
|
||||
|
||||
|
||||
@SkipIfS3.hdfs_encryption
|
||||
@SkipIfABFS.hdfs_encryption
|
||||
@SkipIfADLS.hdfs_encryption
|
||||
@SkipIfIsilon.hdfs_encryption
|
||||
@SkipIfLocal.hdfs_encryption
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
# under the License.
|
||||
|
||||
from tests.common.impala_test_suite import ImpalaTestSuite
|
||||
from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfLocal
|
||||
from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfLocal
|
||||
from tests.common.test_dimensions import (
|
||||
create_single_exec_option_dimension,
|
||||
create_uncompressed_text_dimension)
|
||||
@@ -28,6 +28,7 @@ TBL_LOC = '%s/%s' % (WAREHOUSE, TEST_TBL)
|
||||
|
||||
|
||||
@SkipIfS3.hdfs_acls
|
||||
@SkipIfABFS.hdfs_acls
|
||||
@SkipIfADLS.hdfs_acls
|
||||
@SkipIfLocal.hdfs_client
|
||||
class TestHdfsPermissions(ImpalaTestSuite):
|
||||
|
||||
@@ -30,13 +30,14 @@ import string
|
||||
from subprocess import call
|
||||
|
||||
from tests.common.impala_test_suite import ImpalaTestSuite
|
||||
from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal
|
||||
from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
|
||||
from tests.common.test_dimensions import (
|
||||
create_single_exec_option_dimension,
|
||||
create_uncompressed_text_dimension)
|
||||
from tests.util.hive_utils import HiveDbWrapper, HiveTableWrapper
|
||||
|
||||
@SkipIfS3.hive
|
||||
@SkipIfABFS.hive
|
||||
@SkipIfADLS.hive
|
||||
@SkipIfIsilon.hive
|
||||
@SkipIfLocal.hive
|
||||
@@ -84,6 +85,7 @@ class TestHmsIntegrationSanity(ImpalaTestSuite):
|
||||
assert 'test_tbl' in self.client.execute("show tables in hms_sanity_db").data
|
||||
|
||||
@SkipIfS3.hive
|
||||
@SkipIfABFS.hive
|
||||
@SkipIfADLS.hive
|
||||
@SkipIfIsilon.hive
|
||||
@SkipIfLocal.hive
|
||||
|
||||
@@ -22,7 +22,7 @@ import re
|
||||
|
||||
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
|
||||
from tests.common.impala_test_suite import ImpalaTestSuite
|
||||
from tests.common.skip import SkipIfIsilon, SkipIfS3, SkipIfADLS, SkipIfLocal
|
||||
from tests.common.skip import SkipIfIsilon, SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfLocal
|
||||
from tests.common.test_dimensions import ALL_NODES_ONLY
|
||||
from tests.common.test_dimensions import create_exec_option_dimension
|
||||
from tests.common.test_dimensions import create_uncompressed_text_dimension
|
||||
@@ -75,6 +75,7 @@ class TestMetadataQueryStatements(ImpalaTestSuite):
|
||||
# data doesn't reside in hdfs.
|
||||
@SkipIfIsilon.hive
|
||||
@SkipIfS3.hive
|
||||
@SkipIfABFS.hive
|
||||
@SkipIfADLS.hive
|
||||
@SkipIfLocal.hive
|
||||
def test_describe_formatted(self, vector, unique_database):
|
||||
@@ -149,6 +150,7 @@ class TestMetadataQueryStatements(ImpalaTestSuite):
|
||||
self.client.execute(self.CREATE_DATA_SRC_STMT % (name,))
|
||||
|
||||
@SkipIfS3.hive
|
||||
@SkipIfABFS.hive
|
||||
@SkipIfADLS.hive
|
||||
@SkipIfIsilon.hive
|
||||
@SkipIfLocal.hive
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
|
||||
import pytest
|
||||
from tests.common.impala_test_suite import ImpalaTestSuite
|
||||
from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal
|
||||
from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
|
||||
from tests.common.test_dimensions import (create_single_exec_option_dimension,
|
||||
create_uncompressed_text_dimension)
|
||||
from tests.util.filesystem_utils import get_fs_path, WAREHOUSE, FILESYSTEM_PREFIX
|
||||
@@ -89,6 +89,7 @@ class TestPartitionMetadata(ImpalaTestSuite):
|
||||
assert data.split('\t') == ['6', '9']
|
||||
|
||||
@SkipIfS3.hive
|
||||
@SkipIfABFS.hive
|
||||
@SkipIfADLS.hive
|
||||
@SkipIfIsilon.hive
|
||||
@SkipIfLocal.hive
|
||||
|
||||
@@ -17,11 +17,12 @@ from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
|
||||
from tests.common.impala_test_suite import ImpalaTestSuite
|
||||
from tests.common.test_dimensions import create_single_exec_option_dimension
|
||||
from tests.common.test_dimensions import create_uncompressed_text_dimension
|
||||
from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal
|
||||
from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
|
||||
from tests.util.filesystem_utils import get_fs_path
|
||||
|
||||
|
||||
@SkipIfS3.hive
|
||||
@SkipIfABFS.hive
|
||||
@SkipIfADLS.hive
|
||||
@SkipIfIsilon.hive
|
||||
@SkipIfLocal.hive
|
||||
|
||||
@@ -23,7 +23,7 @@ from subprocess import call
|
||||
|
||||
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
|
||||
from tests.common.impala_test_suite import ImpalaTestSuite
|
||||
from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal
|
||||
from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
|
||||
from tests.common.test_dimensions import create_uncompressed_text_dimension
|
||||
from tests.util.test_file_parser import QueryTestSectionReader
|
||||
|
||||
@@ -47,6 +47,7 @@ from tests.util.test_file_parser import QueryTestSectionReader
|
||||
# Missing Coverage: Views created by Hive and Impala being visible and queryble by each
|
||||
# other on non hdfs storage.
|
||||
@SkipIfS3.hive
|
||||
@SkipIfABFS.hive
|
||||
@SkipIfADLS.hive
|
||||
@SkipIfIsilon.hive
|
||||
@SkipIfLocal.hive
|
||||
|
||||
@@ -23,7 +23,7 @@ from os.path import join
|
||||
from subprocess import call
|
||||
|
||||
from tests.common.impala_test_suite import ImpalaTestSuite
|
||||
from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal
|
||||
from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
|
||||
from tests.common.test_dimensions import create_single_exec_option_dimension
|
||||
from tests.common.test_vector import ImpalaTestDimension
|
||||
from tests.util.filesystem_utils import get_fs_path
|
||||
@@ -40,6 +40,7 @@ compression_formats = [
|
||||
# Missing Coverage: Compressed data written by Hive is queriable by Impala on a non-hdfs
|
||||
# filesystem.
|
||||
@SkipIfS3.hive
|
||||
@SkipIfABFS.hive
|
||||
@SkipIfADLS.hive
|
||||
@SkipIfIsilon.hive
|
||||
@SkipIfLocal.hive
|
||||
|
||||
@@ -25,13 +25,15 @@ from subprocess import check_call
|
||||
from tests.common.environ import specific_build_type_timeout
|
||||
from tests.common.impala_cluster import ImpalaCluster
|
||||
from tests.common.impala_test_suite import ImpalaTestSuite, LOG
|
||||
from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal, SkipIfEC
|
||||
from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, \
|
||||
SkipIfLocal, SkipIfEC
|
||||
from tests.common.test_dimensions import create_single_exec_option_dimension
|
||||
from tests.util.filesystem_utils import get_fs_path
|
||||
from tests.util.shell_util import exec_process
|
||||
|
||||
# End to end test that hdfs caching is working.
|
||||
@SkipIfS3.caching # S3: missing coverage: verify SET CACHED gives error
|
||||
@SkipIfABFS.caching
|
||||
@SkipIfADLS.caching
|
||||
@SkipIfIsilon.caching
|
||||
@SkipIfLocal.caching
|
||||
@@ -109,6 +111,7 @@ class TestHdfsCaching(ImpalaTestSuite):
|
||||
# run as a part of exhaustive tests which require the workload to be 'functional-query'.
|
||||
# TODO: Move this to TestHdfsCaching once we make exhaustive tests run for other workloads
|
||||
@SkipIfS3.caching
|
||||
@SkipIfABFS.caching
|
||||
@SkipIfADLS.caching
|
||||
@SkipIfIsilon.caching
|
||||
@SkipIfLocal.caching
|
||||
@@ -118,6 +121,7 @@ class TestHdfsCachingFallbackPath(ImpalaTestSuite):
|
||||
return 'functional-query'
|
||||
|
||||
@SkipIfS3.hdfs_encryption
|
||||
@SkipIfABFS.hdfs_encryption
|
||||
@SkipIfADLS.hdfs_encryption
|
||||
@SkipIfIsilon.hdfs_encryption
|
||||
@SkipIfLocal.hdfs_encryption
|
||||
@@ -169,6 +173,7 @@ class TestHdfsCachingFallbackPath(ImpalaTestSuite):
|
||||
|
||||
|
||||
@SkipIfS3.caching
|
||||
@SkipIfABFS.caching
|
||||
@SkipIfADLS.caching
|
||||
@SkipIfIsilon.caching
|
||||
@SkipIfLocal.caching
|
||||
|
||||
@@ -22,7 +22,7 @@ import pytest
|
||||
from testdata.common import widetable
|
||||
from tests.common.impala_cluster import ImpalaCluster
|
||||
from tests.common.impala_test_suite import ImpalaTestSuite
|
||||
from tests.common.skip import SkipIfEC, SkipIfLocal, SkipIfNotHdfsMinicluster
|
||||
from tests.common.skip import SkipIfABFS, SkipIfEC, SkipIfLocal, SkipIfNotHdfsMinicluster
|
||||
from tests.common.test_dimensions import (
|
||||
create_exec_option_dimension,
|
||||
create_uncompressed_text_dimension)
|
||||
@@ -112,6 +112,8 @@ class TestInsertQueries(ImpalaTestSuite):
|
||||
@pytest.mark.execute_serially
|
||||
# Erasure coding doesn't respect memory limit
|
||||
@SkipIfEC.fix_later
|
||||
# ABFS partition names cannot end in periods
|
||||
@SkipIfABFS.jira(reason="HADOOP-15860")
|
||||
def test_insert(self, vector):
|
||||
if (vector.get_value('table_format').file_format == 'parquet'):
|
||||
vector.get_value('exec_option')['COMPRESSION_CODEC'] = \
|
||||
|
||||
@@ -23,7 +23,7 @@ import re
|
||||
|
||||
from tests.common.impala_test_suite import ImpalaTestSuite
|
||||
from tests.common.parametrize import UniqueDatabase
|
||||
from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal
|
||||
from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
|
||||
from tests.util.filesystem_utils import WAREHOUSE, get_fs_path, IS_S3
|
||||
|
||||
@SkipIfLocal.hdfs_client
|
||||
@@ -132,6 +132,7 @@ class TestInsertBehaviour(ImpalaTestSuite):
|
||||
assert len(self.filesystem_client.ls(part_dir)) == 1
|
||||
|
||||
@SkipIfS3.hdfs_acls
|
||||
@SkipIfABFS.hdfs_acls
|
||||
@SkipIfADLS.hdfs_acls
|
||||
@SkipIfIsilon.hdfs_acls
|
||||
@pytest.mark.xfail(run=False, reason="Fails intermittently on test clusters")
|
||||
@@ -193,6 +194,7 @@ class TestInsertBehaviour(ImpalaTestSuite):
|
||||
check_has_acls("p1=1/p2=2/p3=30", "default:group:new_leaf_group:-w-")
|
||||
|
||||
@SkipIfS3.hdfs_acls
|
||||
@SkipIfABFS.hdfs_acls
|
||||
@SkipIfADLS.hdfs_acls
|
||||
@SkipIfIsilon.hdfs_acls
|
||||
def test_insert_file_permissions(self, unique_database):
|
||||
@@ -244,6 +246,7 @@ class TestInsertBehaviour(ImpalaTestSuite):
|
||||
self.execute_query_expect_success(self.client, insert_query)
|
||||
|
||||
@SkipIfS3.hdfs_acls
|
||||
@SkipIfABFS.hdfs_acls
|
||||
@SkipIfADLS.hdfs_acls
|
||||
@SkipIfIsilon.hdfs_acls
|
||||
def test_mixed_partition_permissions(self, unique_database):
|
||||
@@ -324,6 +327,7 @@ class TestInsertBehaviour(ImpalaTestSuite):
|
||||
load_data(self.execute_query_expect_success, "added_part")
|
||||
|
||||
@SkipIfS3.hdfs_acls
|
||||
@SkipIfABFS.hdfs_acls
|
||||
@SkipIfADLS.hdfs_acls
|
||||
@SkipIfIsilon.hdfs_acls
|
||||
def test_readonly_table_dir(self, unique_database):
|
||||
@@ -355,6 +359,7 @@ class TestInsertBehaviour(ImpalaTestSuite):
|
||||
assert re.search(r'Impala does not have WRITE access.*' + table_path, str(err))
|
||||
|
||||
@SkipIfS3.hdfs_acls
|
||||
@SkipIfABFS.hdfs_acls
|
||||
@SkipIfADLS.hdfs_acls
|
||||
@SkipIfIsilon.hdfs_acls
|
||||
def test_insert_acl_permissions(self, unique_database):
|
||||
@@ -433,6 +438,7 @@ class TestInsertBehaviour(ImpalaTestSuite):
|
||||
self.execute_query_expect_success(self.client, insert_query)
|
||||
|
||||
@SkipIfS3.hdfs_acls
|
||||
@SkipIfABFS.hdfs_acls
|
||||
@SkipIfADLS.hdfs_acls
|
||||
@SkipIfIsilon.hdfs_acls
|
||||
def test_load_permissions(self, unique_database):
|
||||
@@ -557,6 +563,7 @@ class TestInsertBehaviour(ImpalaTestSuite):
|
||||
self.execute_query_expect_failure(self.client, insert_query)
|
||||
|
||||
@SkipIfS3.hdfs_acls
|
||||
@SkipIfABFS.hdfs_acls
|
||||
@SkipIfADLS.hdfs_acls
|
||||
@SkipIfIsilon.hdfs_acls
|
||||
def test_multiple_group_acls(self, unique_database):
|
||||
|
||||
@@ -29,7 +29,8 @@ from parquet.ttypes import ColumnOrder, SortingColumn, TypeDefinedOrder
|
||||
from tests.common.environ import impalad_basedir
|
||||
from tests.common.impala_test_suite import ImpalaTestSuite
|
||||
from tests.common.parametrize import UniqueDatabase
|
||||
from tests.common.skip import SkipIfEC, SkipIfIsilon, SkipIfLocal, SkipIfS3, SkipIfADLS
|
||||
from tests.common.skip import SkipIfEC, SkipIfIsilon, SkipIfLocal, SkipIfS3, SkipIfABFS, \
|
||||
SkipIfADLS
|
||||
from tests.common.test_dimensions import create_exec_option_dimension
|
||||
from tests.common.test_vector import ImpalaTestDimension
|
||||
from tests.util.filesystem_utils import get_fs_path
|
||||
@@ -360,6 +361,7 @@ class TestHdfsParquetTableWriter(ImpalaTestSuite):
|
||||
@SkipIfIsilon.hive
|
||||
@SkipIfLocal.hive
|
||||
@SkipIfS3.hive
|
||||
@SkipIfABFS.hive
|
||||
@SkipIfADLS.hive
|
||||
# TODO: Should we move this to test_parquet_stats.py?
|
||||
class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
|
||||
|
||||
@@ -26,6 +26,7 @@ from tests.common.skip import (
|
||||
SkipIfIsilon,
|
||||
SkipIfLocal,
|
||||
SkipIfS3,
|
||||
SkipIfABFS,
|
||||
SkipIfADLS)
|
||||
from tests.common.test_vector import ImpalaTestDimension
|
||||
|
||||
@@ -62,6 +63,7 @@ class TestJoinQueries(ImpalaTestSuite):
|
||||
self.run_test_case('QueryTest/single-node-joins-with-limits-exhaustive', new_vector)
|
||||
|
||||
@SkipIfS3.hbase
|
||||
@SkipIfABFS.hbase
|
||||
@SkipIfADLS.hbase
|
||||
@SkipIfIsilon.hbase
|
||||
@SkipIf.skip_hbase
|
||||
|
||||
@@ -26,6 +26,7 @@ from tests.common.impala_test_suite import ImpalaTestSuite
|
||||
from tests.common.skip import (
|
||||
SkipIfIsilon,
|
||||
SkipIfS3,
|
||||
SkipIfABFS,
|
||||
SkipIfADLS,
|
||||
SkipIfEC,
|
||||
SkipIfLocal,
|
||||
@@ -95,6 +96,7 @@ class TestNestedTypes(ImpalaTestSuite):
|
||||
|
||||
@SkipIfIsilon.hive
|
||||
@SkipIfS3.hive
|
||||
@SkipIfABFS.hive
|
||||
@SkipIfADLS.hive
|
||||
@SkipIfLocal.hive
|
||||
def test_upper_case_field_name(self, unique_database):
|
||||
@@ -568,6 +570,7 @@ class TestMaxNestingDepth(ImpalaTestSuite):
|
||||
|
||||
@SkipIfIsilon.hive
|
||||
@SkipIfS3.hive
|
||||
@SkipIfABFS.hive
|
||||
@SkipIfADLS.hive
|
||||
@SkipIfLocal.hive
|
||||
def test_load_hive_table(self, vector, unique_database):
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
|
||||
from tests.common.impala_cluster import ImpalaCluster
|
||||
from tests.common.impala_test_suite import ImpalaTestSuite
|
||||
from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal
|
||||
from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
|
||||
from tests.util.filesystem_utils import IS_EC
|
||||
import logging
|
||||
import pytest
|
||||
@@ -63,6 +63,7 @@ class TestObservability(ImpalaTestSuite):
|
||||
@SkipIfS3.hbase
|
||||
@SkipIfLocal.hbase
|
||||
@SkipIfIsilon.hbase
|
||||
@SkipIfABFS.hbase
|
||||
@SkipIfADLS.hbase
|
||||
def test_scan_summary(self):
|
||||
"""IMPALA-4499: Checks that the exec summary for scans show the table name."""
|
||||
|
||||
@@ -20,7 +20,7 @@ import pytest
|
||||
|
||||
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
|
||||
from tests.common.impala_test_suite import ImpalaTestSuite
|
||||
from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal
|
||||
from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
|
||||
from tests.common.test_dimensions import create_single_exec_option_dimension
|
||||
|
||||
# Tests to validate HDFS partitioning.
|
||||
@@ -47,6 +47,7 @@ class TestPartitioning(ImpalaTestSuite):
|
||||
# Missing Coverage: Impala deals with boolean partitions created by Hive on a non-hdfs
|
||||
# filesystem.
|
||||
@SkipIfS3.hive
|
||||
@SkipIfABFS.hive
|
||||
@SkipIfADLS.hive
|
||||
@SkipIfIsilon.hive
|
||||
@SkipIfLocal.hive
|
||||
|
||||
@@ -16,7 +16,8 @@
|
||||
# under the License.
|
||||
|
||||
from tests.common.impala_test_suite import ImpalaTestSuite
|
||||
from tests.common.skip import SkipIfEC, SkipIfKudu, SkipIfLocal, SkipIfS3
|
||||
from tests.common.skip import SkipIfEC, SkipIfKudu, SkipIfLocal, SkipIfS3, SkipIfABFS, \
|
||||
SkipIfADLS
|
||||
from tests.common.test_dimensions import create_parquet_dimension
|
||||
|
||||
|
||||
@@ -45,6 +46,8 @@ class TestResourceLimits(ImpalaTestSuite):
|
||||
self.run_test_case('QueryTest/query-resource-limits', vector)
|
||||
|
||||
@SkipIfS3.hbase
|
||||
@SkipIfADLS.hbase
|
||||
@SkipIfABFS.hbase
|
||||
@SkipIfLocal.multiple_impalad
|
||||
def test_resource_limits_hbase(self, vector):
|
||||
self.run_test_case('QueryTest/query-resource-limits-hbase', vector)
|
||||
|
||||
@@ -35,6 +35,7 @@ from tests.common.impala_test_suite import ImpalaTestSuite, LOG
|
||||
from tests.common.skip import (
|
||||
SkipIf,
|
||||
SkipIfS3,
|
||||
SkipIfABFS,
|
||||
SkipIfADLS,
|
||||
SkipIfEC,
|
||||
SkipIfIsilon,
|
||||
@@ -351,6 +352,7 @@ class TestParquet(ImpalaTestSuite):
|
||||
assert len(result.data) == 1
|
||||
assert "4294967294" in result.data
|
||||
|
||||
@SkipIfABFS.hive
|
||||
@SkipIfADLS.hive
|
||||
@SkipIfIsilon.hive
|
||||
@SkipIfLocal.hive
|
||||
@@ -473,6 +475,7 @@ class TestParquet(ImpalaTestSuite):
|
||||
vector, unique_database)
|
||||
|
||||
@SkipIfS3.hdfs_block_size
|
||||
@SkipIfABFS.hdfs_block_size
|
||||
@SkipIfADLS.hdfs_block_size
|
||||
@SkipIfIsilon.hdfs_block_size
|
||||
@SkipIfLocal.multiple_impalad
|
||||
@@ -530,6 +533,7 @@ class TestParquet(ImpalaTestSuite):
|
||||
assert total == num_scanners_with_no_reads
|
||||
|
||||
@SkipIfS3.hdfs_block_size
|
||||
@SkipIfABFS.hdfs_block_size
|
||||
@SkipIfADLS.hdfs_block_size
|
||||
@SkipIfIsilon.hdfs_block_size
|
||||
@SkipIfLocal.multiple_impalad
|
||||
@@ -545,6 +549,7 @@ class TestParquet(ImpalaTestSuite):
|
||||
self._multiple_blocks_helper(table_name, 40000, ranges_per_node=2)
|
||||
|
||||
@SkipIfS3.hdfs_block_size
|
||||
@SkipIfABFS.hdfs_block_size
|
||||
@SkipIfADLS.hdfs_block_size
|
||||
@SkipIfIsilon.hdfs_block_size
|
||||
@SkipIfLocal.multiple_impalad
|
||||
@@ -906,6 +911,7 @@ class TestTextScanRangeLengths(ImpalaTestSuite):
|
||||
|
||||
# Missing Coverage: No coverage for truncated files errors or scans.
|
||||
@SkipIfS3.hive
|
||||
@SkipIfABFS.hive
|
||||
@SkipIfADLS.hive
|
||||
@SkipIfIsilon.hive
|
||||
@SkipIfLocal.hive
|
||||
@@ -984,6 +990,7 @@ class TestOrc(ImpalaTestSuite):
|
||||
lambda v: v.get_value('table_format').file_format == 'orc')
|
||||
|
||||
@SkipIfS3.hdfs_block_size
|
||||
@SkipIfABFS.hdfs_block_size
|
||||
@SkipIfADLS.hdfs_block_size
|
||||
@SkipIfEC.fix_later
|
||||
@SkipIfIsilon.hdfs_block_size
|
||||
|
||||
@@ -18,7 +18,7 @@
|
||||
import pytest
|
||||
|
||||
from tests.common.impala_test_suite import ImpalaTestSuite
|
||||
from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal
|
||||
from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
|
||||
|
||||
# Number of tables to create per thread
|
||||
NUM_TBLS_PER_THREAD = 10
|
||||
@@ -48,6 +48,7 @@ class TestDdlStress(ImpalaTestSuite):
|
||||
v.get_value('table_format').compression_codec == 'none'))
|
||||
|
||||
@SkipIfS3.caching
|
||||
@SkipIfABFS.caching
|
||||
@SkipIfADLS.caching
|
||||
@SkipIfIsilon.caching
|
||||
@SkipIfLocal.caching
|
||||
|
||||
113
tests/util/abfs_util.py
Normal file
113
tests/util/abfs_util.py
Normal file
@@ -0,0 +1,113 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# ABFS access utilities
|
||||
#
|
||||
# This file uses the Hadoop CLI to provide simple functions to the Impala test
|
||||
# suite to whatever the default filesystem is
|
||||
|
||||
import re
|
||||
import subprocess
|
||||
import tempfile
|
||||
|
||||
from tests.util.filesystem_base import BaseFilesystem
|
||||
|
||||
|
||||
class ABFSClient(BaseFilesystem):
|
||||
|
||||
def _hadoop_fs_shell(self, command):
|
||||
hadoop_command = ['hadoop', 'fs'] + command
|
||||
process = subprocess.Popen(hadoop_command,
|
||||
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
stdout, stderr = process.communicate()
|
||||
status = process.returncode
|
||||
return (status, stdout, stderr)
|
||||
|
||||
def create_file(self, path, file_data, overwrite=True):
|
||||
fixed_path = self._normalize_path(path)
|
||||
if not overwrite and self.exists(fixed_path): return False
|
||||
f = tempfile.NamedTemporaryFile(delete=False)
|
||||
tmp_path = f.name
|
||||
f.write(file_data)
|
||||
f.close()
|
||||
(status, stdout, stderr) = \
|
||||
self._hadoop_fs_shell(['-put', tmp_path, fixed_path])
|
||||
return status == 0
|
||||
|
||||
def make_dir(self, path, permission=None):
|
||||
fixed_path = self._normalize_path(path)
|
||||
self._hadoop_fs_shell(['-mkdir', '-p', fixed_path])
|
||||
return True
|
||||
|
||||
def copy(self, src, dst):
|
||||
fixed_src = self._normalize_path(src)
|
||||
fixed_dst = self._normalize_path(dst)
|
||||
(status, stdout, stderr) = \
|
||||
self._hadoop_fs_shell(['-cp', fixed_src, fixed_dst])
|
||||
assert status == 0, \
|
||||
'ABFS copy failed: ' + stderr + "; " + stdout
|
||||
assert self.exists(dst), \
|
||||
'ABFS copy failed: Destination file {dst} does not exist'\
|
||||
.format(dst=dst)
|
||||
|
||||
def _inner_ls(self, path):
|
||||
fixed_path = self._normalize_path(path)
|
||||
(status, stdout, stderr) = self._hadoop_fs_shell(['-ls', fixed_path])
|
||||
# Trim the "Found X items" line and trailing new-line
|
||||
entries = stdout.split("\n")[1:-1]
|
||||
files = []
|
||||
for entry in entries:
|
||||
fields = re.split(" +", entry)
|
||||
files.append({
|
||||
'name': fields[7],
|
||||
'length': int(fields[4]),
|
||||
'mode': fields[0]
|
||||
})
|
||||
return files
|
||||
|
||||
def ls(self, path):
|
||||
fixed_path = self._normalize_path(path)
|
||||
files = []
|
||||
for f in self._inner_ls(fixed_path):
|
||||
fname = f['name'].split("/")[-1]
|
||||
if not fname == '':
|
||||
files += [fname]
|
||||
return files
|
||||
|
||||
def exists(self, path):
|
||||
fixed_path = self._normalize_path(path)
|
||||
(status, stdout, stderr) = self._hadoop_fs_shell(['-test', '-e', fixed_path])
|
||||
return status == 0
|
||||
|
||||
def delete_file_dir(self, path, recursive=False):
|
||||
fixed_path = self._normalize_path(path)
|
||||
rm_command = ['-rm', fixed_path]
|
||||
if recursive:
|
||||
rm_command = ['-rm', '-r', fixed_path]
|
||||
(status, stdout, stderr) = self._hadoop_fs_shell(rm_command)
|
||||
return status == 0
|
||||
|
||||
def get_all_file_sizes(self, path):
|
||||
"""Returns a list of integers which are all the file sizes of files found
|
||||
under 'path'."""
|
||||
fixed_path = self._normalize_path(path)
|
||||
return [f['length'] for f in
|
||||
self._inner_ls(fixed_path) if f['mode'][0] == "-"]
|
||||
|
||||
def _normalize_path(self, path):
|
||||
# Paths passed in may lack a leading slash
|
||||
return path if path.startswith('/') else '/' + path
|
||||
@@ -30,6 +30,7 @@ IS_ISILON = FILESYSTEM == "isilon"
|
||||
IS_LOCAL = FILESYSTEM == "local"
|
||||
IS_HDFS = FILESYSTEM == "hdfs"
|
||||
IS_ADLS = FILESYSTEM == "adls"
|
||||
IS_ABFS = FILESYSTEM == "abfs"
|
||||
IS_EC = os.getenv("ERASURE_CODING") == "true"
|
||||
# This condition satisfies both the states where one can assume a default fs
|
||||
# - The environment variable is set to an empty string.
|
||||
@@ -44,7 +45,9 @@ ISILON_WEBHDFS_PORT = 8082
|
||||
# S3 specific values
|
||||
S3_BUCKET_NAME = os.getenv("S3_BUCKET")
|
||||
|
||||
# ADLS specific values
|
||||
# ADLS / ABFS specific values
|
||||
ABFS_ACCOUNT_NAME = os.getenv("azure_storage_account_name")
|
||||
ABFS_CONTAINER_NAME = os.getenv("azure_storage_container_name")
|
||||
ADLS_STORE_NAME = os.getenv("azure_data_lake_store_name")
|
||||
ADLS_CLIENT_ID = os.getenv("azure_client_id")
|
||||
ADLS_TENANT_ID = os.getenv("azure_tenant_id")
|
||||
|
||||
Reference in New Issue
Block a user