From 6b09612e763aace6ec3ec22031e4e960b9a41e3d Mon Sep 17 00:00:00 2001 From: Joe McDonnell Date: Fri, 22 Mar 2019 14:16:31 -0700 Subject: [PATCH] IMPALA-8344: Add support for running the minicluster with S3Guard Some tests can fail on S3 due to some operations that are eventually consistent. S3Guard stores extra metadata in a DynamoDB to solve several consistency issues. This adds support for running the minicluster on S3 with S3Guard. S3Guard is configured by the following environment variables: S3GUARD_ENABLED: defaults to false, set to true to enable S3Guard S3GUARD_DYNAMODB_TABLE: name of the DynamoDB table to use. This must be exclusively owned by this minicluster. The dataload scripts initialize this table and will purge entries if the table already exists. The table should be in the same region as the S3_BUCKET for the minicluster. S3GUARD_DYNAMODB_REGION - AWS region for S3GUARD_DYNAMODB_TABLE These environment variables only impact S3 configurations. The support comes from three pieces: 1. Configuration changes in core-site.xml to add the appropriate parameters. 2. Updating dataload to initialize/purge the s3guard dynamodb table and import data appropriately. 3. Update tests to manipulate files through the HDFS command line rather than through s3 utilities. This takes the filesystem utility code for ABFS (which actually calls HDFS command line), makes it generic, and uses it for S3Guard. Testing: - Ran multiple rounds of s3 tests - Aborted tests in the middle and restarted the s3 tests (to test the s3guard reinitialization code) Change-Id: I3c748529a494bb6e70fec96dc031523ff79bf61d Reviewed-on: http://gerrit.cloudera.org:8080/13020 Reviewed-by: Joe McDonnell Tested-by: Impala Public Jenkins Reviewed-by: Sahil Takiar --- bin/generate_xml_config.py | 5 +- bin/impala-config.sh | 13 ++ bin/jenkins/release_cloud_resources.sh | 50 ++++++ infra/python/deps/requirements.txt | 1 + testdata/bin/load-test-warehouse-snapshot.sh | 8 + .../common/etc/hadoop/conf/core-site.xml.py | 111 ++++++++++++++ .../common/etc/hadoop/conf/core-site.xml.tmpl | 145 ------------------ tests/common/impala_test_suite.py | 13 +- tests/query_test/test_scanners_fuzz.py | 2 +- tests/util/abfs_util.py | 113 -------------- tests/util/filesystem_utils.py | 1 + tests/util/hdfs_util.py | 124 ++++++++++++++- tests/util/s3_util.py | 103 ------------- 13 files changed, 316 insertions(+), 373 deletions(-) create mode 100755 bin/jenkins/release_cloud_resources.sh create mode 100644 testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.py delete mode 100644 testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.tmpl delete mode 100644 tests/util/abfs_util.py delete mode 100644 tests/util/s3_util.py diff --git a/bin/generate_xml_config.py b/bin/generate_xml_config.py index 18e361599..af9c2e5c4 100755 --- a/bin/generate_xml_config.py +++ b/bin/generate_xml_config.py @@ -80,9 +80,10 @@ def dump_config(d, source_path, out): print >>out, dedent(header) for k, v in sorted(d.iteritems()): try: + k_new = _substitute_env_vars(k) if isinstance(v, int): v = str(v) - v = _substitute_env_vars(v) + v_new = _substitute_env_vars(v) except KeyError, e: raise Exception("failed environment variable substitution for value {k}: {e}" .format(k=k, e=e)) @@ -90,7 +91,7 @@ def dump_config(d, source_path, out): {name} {value} - """.format(name=xmlescape(k), value=xmlescape(v)) + """.format(name=xmlescape(k_new), value=xmlescape(v_new)) print >>out, "" diff --git a/bin/impala-config.sh b/bin/impala-config.sh index b57d62564..fca708dee 100755 --- a/bin/impala-config.sh +++ b/bin/impala-config.sh @@ -284,6 +284,9 @@ export TARGET_FILESYSTEM="${TARGET_FILESYSTEM-hdfs}" export ERASURE_CODING="${ERASURE_CODING-false}" export FILESYSTEM_PREFIX="${FILESYSTEM_PREFIX-}" export S3_BUCKET="${S3_BUCKET-}" +export S3GUARD_ENABLED="${S3GUARD_ENABLED-false}" +export S3GUARD_DYNAMODB_TABLE="${S3GUARD_DYNAMODB_TABLE-}" +export S3GUARD_DYNAMODB_REGION="${S3GUARD_DYNAMODB_REGION-}" export azure_tenant_id="${azure_tenant_id-DummyAdlsTenantId}" export azure_client_id="${azure_client_id-DummyAdlsClientId}" export azure_client_secret="${azure_client_secret-DummyAdlsClientSecret}" @@ -401,6 +404,16 @@ if [ "${TARGET_FILESYSTEM}" = "s3" ]; then else echo "S3 access already validated" fi + # If using s3guard, verify that the dynamodb table and region are set + if [[ "${S3GUARD_ENABLED}" = "true" ]]; then + if [[ -z "${S3GUARD_DYNAMODB_TABLE}" || -z "${S3GUARD_DYNAMODB_REGION}" ]]; then + echo "When S3GUARD_ENABLED=true, S3GUARD_DYNAMODB_TABLE and + S3GUARD_DYNAMODB_REGION must be set" + echo "S3GUARD_DYNAMODB_TABLE: ${S3GUARD_DYNAMODB_TABLE}" + echo "S3GUARD_DYNAMODB_REGION: ${S3GUARD_DYNAMODB_REGION}" + return 1 + fi + fi elif [ "${TARGET_FILESYSTEM}" = "adls" ]; then # Basic error checking if [[ "${azure_client_id}" = "DummyAdlsClientId" ||\ diff --git a/bin/jenkins/release_cloud_resources.sh b/bin/jenkins/release_cloud_resources.sh new file mode 100755 index 000000000..a03f95579 --- /dev/null +++ b/bin/jenkins/release_cloud_resources.sh @@ -0,0 +1,50 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Release cloud resources (useful for Jenkins jobs). Should be called +# when the minicluster is shut down. The minicluster should not be used +# after this called, as this removes the test warehouse data. + +set -euo pipefail +trap 'echo Error in $0 at line $LINENO: $(cd "'$PWD'" && awk "NR == $LINENO" $0)' ERR + +. ${IMPALA_HOME}/bin/impala-config.sh > /dev/null 2>&1 +: ${TEST_WAREHOUSE_DIR=/test-warehouse} + +# This is currently only implemented for s3. +# TODO: implement this for other cloud filesystems +# NOTE: Some environment variables referenced here are checked for validity in +# bin/impala-config.sh. Because this is releasing resources, we double check them here +# as well. +if [[ "${TARGET_FILESYSTEM}" == "s3" ]]; then + # For S3, S3_BUCKET should always be defined. + [[ -n "${S3_BUCKET}" ]] + if [[ "${S3GUARD_ENABLED}" == "true" ]]; then + # If S3GUARD_ENABLED == true, then S3GUARD_DYNAMODB_TABLE and S3GUARD_DYNAMODB_REGION + # must also be defined. Verify that before proceeding. + [[ -n "${S3GUARD_DYNAMODB_TABLE}" && -n "${S3GUARD_DYNAMODB_REGION}" ]] + echo "Cleaning up s3guard and deleting Dynamo DB ${S3GUARD_DYNAMODB_TABLE} ..." + hadoop s3guard destroy -meta "dynamodb://${S3GUARD_DYNAMODB_TABLE}" \ + -region "${S3GUARD_DYNAMODB_REGION}" + echo "Done cleaning up s3guard" + fi + # Remove the test warehouse + echo "Removing test warehouse from s3://${S3_BUCKET}${TEST_WAREHOUSE_DIR} ..." + aws s3 rm --recursive --quiet s3://${S3_BUCKET}${TEST_WAREHOUSE_DIR} + echo "Done removing test warehouse" +fi diff --git a/infra/python/deps/requirements.txt b/infra/python/deps/requirements.txt index fd07ca82b..fa0deb06a 100644 --- a/infra/python/deps/requirements.txt +++ b/infra/python/deps/requirements.txt @@ -23,6 +23,7 @@ # multiple times (though maybe they could be). allpairs == 2.0.1 +# TODO: boto3 is now unused, it can be removed. boto3 == 1.2.3 simplejson == 3.3.0 # For python version 2.6 botocore == 1.3.30 diff --git a/testdata/bin/load-test-warehouse-snapshot.sh b/testdata/bin/load-test-warehouse-snapshot.sh index cfec55816..c53cb0389 100755 --- a/testdata/bin/load-test-warehouse-snapshot.sh +++ b/testdata/bin/load-test-warehouse-snapshot.sh @@ -59,6 +59,14 @@ if [[ "$REPLY" =~ ^[Yy]$ ]]; then echo "Deleting pre-existing data in s3 failed, aborting." exit 1 fi + if [[ "${S3GUARD_ENABLED}" = "true" ]]; then + # Initialize the s3guard dynamodb table and clear it out. This is valid even if + # the table already exists. + hadoop s3guard init -meta "dynamodb://${S3GUARD_DYNAMODB_TABLE}" \ + -region "${S3GUARD_DYNAMODB_REGION}" + hadoop s3guard prune -seconds 1 -meta "dynamodb://${S3GUARD_DYNAMODB_TABLE}" \ + -region "${S3GUARD_DYNAMODB_REGION}" + fi else # Either isilon or hdfs, no change in procedure. if hadoop fs -test -d ${FILESYSTEM_PREFIX}${TEST_WAREHOUSE_DIR}; then diff --git a/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.py b/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.py new file mode 100644 index 000000000..6af28f460 --- /dev/null +++ b/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +import sys + +kerberize = os.environ.get('IMPALA_KERBERIZE') == '1' +target_filesystem = os.environ.get('TARGET_FILESYSTEM') + +compression_codecs = [ + 'org.apache.hadoop.io.compress.GzipCodec', + 'org.apache.hadoop.io.compress.DefaultCodec', + 'com.hadoop.compression.lzo.LzoCodec', + 'com.hadoop.compression.lzo.LzopCodec', + 'org.apache.hadoop.io.compress.BZip2Codec' +] + +auth_to_local_rules = [ + 'RULE:[2:$1@$0](authtest@REALM.COM)s/(.*)@REALM.COM/auth_to_local_user/', + 'RULE:[1:$1]', + 'RULE:[2:$1]', + 'DEFAULT' +] + +CONFIG = { + 'fs.defaultFS': '${DEFAULT_FS}', + 'dfs.replication': '${HDFS_REPLICATION}', + + # Compression codecs + 'io.compression.codecs': ",".join(compression_codecs), + 'io.compression.deoc.lzo.class': 'com.hadoop.compression.lzo.LzoCodec', + + # Set up proxyuser + 'hadoop.proxyuser.${USER}.hosts': '*', + 'hadoop.proxyuser.${USER}.groups': '*', + + # Trash is enabled since some tests (in metadata/test_ddl.py) depend on it + # The trash interval is set to 1030 years to avoid checkpointing until 3000 AD + 'fs.trash.interval': 541728000, + + # AuthorizationTest depends on auth_to_local configs. These tests are run + # irrespective of whether kerberos is enabled. + 'hadoop.security.auth_to_local': '\n'.join(auth_to_local_rules), + + # Location of the KMS key provider + 'hadoop.security.key.provider.path': 'kms://http@${INTERNAL_LISTEN_HOST}:9600/kms', + + # Needed as long as multiple nodes are running on the same address. For Impala + # testing only. + 'yarn.scheduler.include-port-in-node-name': 'true', + + # ADLS configuration + # Note: This is needed even when not running on ADLS, because some frontend tests + # include ADLS paths that require initializing an ADLS filesystem. See + # ExplainTest.testScanNodeFsScheme(). + 'dfs.adls.oauth2.access.token.provider.type': 'ClientCredential', + 'dfs.adls.oauth2.client.id': '${azure_client_id}', + 'dfs.adls.oauth2.credential': '${azure_client_secret}', + 'dfs.adls.oauth2.refresh.url': + 'https://login.windows.net/${azure_tenant_id}/oauth2/token', + + # ABFS configuration + # Note: This is needed even when not running on ABFS for the same reason as for ADLS. + # See ExplainTest.testScanNodeFsScheme(). + 'fs.azure.account.auth.type': 'OAuth', + 'fs.azure.account.oauth.provider.type': + 'org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider', + 'fs.azure.account.oauth2.client.id': '${azure_client_id}', + 'fs.azure.account.oauth2.client.secret': '${azure_client_secret}', + 'fs.azure.account.oauth2.client.endpoint': + 'https://login.microsoftonline.com/${azure_tenant_id}/oauth2/token', + + # This property can be used in tests to ascertain that this core-site.xml from + # the classpath has been loaded. (Ex: TestRequestPoolService) + 'impala.core-site.overridden': 'true', +} + +if target_filesystem == 's3': + CONFIG.update({'fs.s3a.connection.maximum': 1500}) + s3guard_enabled = os.environ.get("S3GUARD_ENABLED") == 'true' + if s3guard_enabled: + CONFIG.update({ + 'fs.s3a.metadatastore.impl': + 'org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore', + 'fs.s3a.s3guard.ddb.table': '${S3GUARD_DYNAMODB_TABLE}', + 'fs.s3a.s3guard.ddb.region': '${S3GUARD_DYNAMODB_REGION}', + }) + +if kerberize: + CONFIG.update({ + 'hadoop.security.authentication': 'kerberos', + 'hadoop.security.authorization': 'true', + 'hadoop.proxyuser.hive.hosts': '*', + 'hadoop.proxyuser.hive.groups': '*', + }) diff --git a/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.tmpl b/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.tmpl deleted file mode 100644 index 7eaff6fea..000000000 --- a/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.tmpl +++ /dev/null @@ -1,145 +0,0 @@ - - - - - - fs.defaultFS - ${DEFAULT_FS} - - - - dfs.replication - ${HDFS_REPLICATION} - - - - io.compression.codecs - org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec,org.apache.hadoop.io.compress.BZip2Codec - - - - io.compression.codec.lzo.class - com.hadoop.compression.lzo.LzoCodec - - - - hadoop.proxyuser.${USER}.hosts - * - - - - - fs.trash.interval - 541728000 - - - - - hadoop.security.auth_to_local - RULE:[2:$1@$0](authtest@REALM.COM)s/(.*)@REALM.COM/auth_to_local_user/ -RULE:[1:$1] -RULE:[2:$1] -DEFAULT - - - - hadoop.proxyuser.${USER}.groups - * - - - - - yarn.scheduler.include-port-in-node-name - true - - - - fs.s3a.connection.maximum - 1500 - - - - - hadoop.security.key.provider.path - kms://http@${INTERNAL_LISTEN_HOST}:9600/kms - - - - - hadoop.security.authentication - kerberos - - - - hadoop.security.authorization - true - - - - hadoop.proxyuser.hive.hosts - * - - - - hadoop.proxyuser.hive.groups - * - - - - - - dfs.adls.oauth2.access.token.provider.type - ClientCredential - - - - dfs.adls.oauth2.client.id - ${azure_client_id} - - - - dfs.adls.oauth2.credential - ${azure_client_secret} - - - - dfs.adls.oauth2.refresh.url - https://login.windows.net/${azure_tenant_id}/oauth2/token - - - - fs.azure.account.auth.type - OAuth - - - - fs.azure.account.oauth.provider.type - org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider - - - - fs.azure.account.oauth2.client.id - ${azure_client_id} - - - - fs.azure.account.oauth2.client.secret - ${azure_client_secret} - - - - fs.azure.account.oauth2.client.endpoint - https://login.microsoftonline.com/${azure_tenant_id}/oauth2/token - - - - - impala.core-site.overridden - true - - - diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py index fffad063e..bdd14dd5e 100644 --- a/tests/common/impala_test_suite.py +++ b/tests/common/impala_test_suite.py @@ -62,6 +62,7 @@ from tests.util.filesystem_utils import ( IS_ABFS, IS_ADLS, S3_BUCKET_NAME, + S3GUARD_ENABLED, ADLS_STORE_NAME, FILESYSTEM_PREFIX, FILESYSTEM_NAME) @@ -70,9 +71,8 @@ from tests.util.hdfs_util import ( HdfsConfig, get_hdfs_client, get_hdfs_client_from_conf, - NAMENODE) -from tests.util.s3_util import S3Client -from tests.util.abfs_util import ABFSClient + NAMENODE, + HadoopFsCommandLineClient) from tests.util.test_file_parser import ( QueryTestSectionReader, parse_query_test_file, @@ -172,9 +172,12 @@ class ImpalaTestSuite(BaseTestSuite): cls.hdfs_client = cls.create_hdfs_client() cls.filesystem_client = cls.hdfs_client if IS_S3: - cls.filesystem_client = S3Client(S3_BUCKET_NAME) + # S3Guard needs filesystem operations to go through the s3 connector. Use the + # HDFS command line client. + cls.filesystem_client = HadoopFsCommandLineClient("S3") elif IS_ABFS: - cls.filesystem_client = ABFSClient() + # ABFS is implemented via HDFS command line client + cls.filesystem_client = HadoopFsCommandLineClient("ABFS") elif IS_ADLS: cls.filesystem_client = ADLSClient(ADLS_STORE_NAME) diff --git a/tests/query_test/test_scanners_fuzz.py b/tests/query_test/test_scanners_fuzz.py index 9501775d8..a7a578c57 100644 --- a/tests/query_test/test_scanners_fuzz.py +++ b/tests/query_test/test_scanners_fuzz.py @@ -176,7 +176,7 @@ class TestScannersFuzzing(ImpalaTestSuite): # Copy all of the local files and directories to hdfs. to_copy = ["%s/%s" % (tmp_table_dir, file_or_dir) for file_or_dir in os.listdir(tmp_table_dir)] - check_call(['hdfs', 'dfs', '-copyFromLocal'] + to_copy + [fuzz_table_location]) + check_call(['hdfs', 'dfs', '-copyFromLocal', '-d'] + to_copy + [fuzz_table_location]) if "SCANNER_FUZZ_KEEP_FILES" not in os.environ: shutil.rmtree(tmp_table_dir) diff --git a/tests/util/abfs_util.py b/tests/util/abfs_util.py deleted file mode 100644 index 856788884..000000000 --- a/tests/util/abfs_util.py +++ /dev/null @@ -1,113 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# ABFS access utilities -# -# This file uses the Hadoop CLI to provide simple functions to the Impala test -# suite to whatever the default filesystem is - -import re -import subprocess -import tempfile - -from tests.util.filesystem_base import BaseFilesystem - - -class ABFSClient(BaseFilesystem): - - def _hadoop_fs_shell(self, command): - hadoop_command = ['hadoop', 'fs'] + command - process = subprocess.Popen(hadoop_command, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = process.communicate() - status = process.returncode - return (status, stdout, stderr) - - def create_file(self, path, file_data, overwrite=True): - fixed_path = self._normalize_path(path) - if not overwrite and self.exists(fixed_path): return False - f = tempfile.NamedTemporaryFile(delete=False) - tmp_path = f.name - f.write(file_data) - f.close() - (status, stdout, stderr) = \ - self._hadoop_fs_shell(['-put', tmp_path, fixed_path]) - return status == 0 - - def make_dir(self, path, permission=None): - fixed_path = self._normalize_path(path) - self._hadoop_fs_shell(['-mkdir', '-p', fixed_path]) - return True - - def copy(self, src, dst): - fixed_src = self._normalize_path(src) - fixed_dst = self._normalize_path(dst) - (status, stdout, stderr) = \ - self._hadoop_fs_shell(['-cp', fixed_src, fixed_dst]) - assert status == 0, \ - 'ABFS copy failed: ' + stderr + "; " + stdout - assert self.exists(dst), \ - 'ABFS copy failed: Destination file {dst} does not exist'\ - .format(dst=dst) - - def _inner_ls(self, path): - fixed_path = self._normalize_path(path) - (status, stdout, stderr) = self._hadoop_fs_shell(['-ls', fixed_path]) - # Trim the "Found X items" line and trailing new-line - entries = stdout.split("\n")[1:-1] - files = [] - for entry in entries: - fields = re.split(" +", entry) - files.append({ - 'name': fields[7], - 'length': int(fields[4]), - 'mode': fields[0] - }) - return files - - def ls(self, path): - fixed_path = self._normalize_path(path) - files = [] - for f in self._inner_ls(fixed_path): - fname = f['name'].split("/")[-1] - if not fname == '': - files += [fname] - return files - - def exists(self, path): - fixed_path = self._normalize_path(path) - (status, stdout, stderr) = self._hadoop_fs_shell(['-test', '-e', fixed_path]) - return status == 0 - - def delete_file_dir(self, path, recursive=False): - fixed_path = self._normalize_path(path) - rm_command = ['-rm', fixed_path] - if recursive: - rm_command = ['-rm', '-r', fixed_path] - (status, stdout, stderr) = self._hadoop_fs_shell(rm_command) - return status == 0 - - def get_all_file_sizes(self, path): - """Returns a list of integers which are all the file sizes of files found - under 'path'.""" - fixed_path = self._normalize_path(path) - return [f['length'] for f in - self._inner_ls(fixed_path) if f['mode'][0] == "-"] - - def _normalize_path(self, path): - # Paths passed in may lack a leading slash - return path if path.startswith('/') else '/' + path diff --git a/tests/util/filesystem_utils.py b/tests/util/filesystem_utils.py index 76212f6b3..5b39d3681 100644 --- a/tests/util/filesystem_utils.py +++ b/tests/util/filesystem_utils.py @@ -44,6 +44,7 @@ ISILON_WEBHDFS_PORT = 8082 # S3 specific values S3_BUCKET_NAME = os.getenv("S3_BUCKET") +S3GUARD_ENABLED = os.getenv("S3GUARD_ENABLED") == "true" # ADLS / ABFS specific values ABFS_ACCOUNT_NAME = os.getenv("azure_storage_account_name") diff --git a/tests/util/hdfs_util.py b/tests/util/hdfs_util.py index 3c3a45ec7..f015f1f34 100644 --- a/tests/util/hdfs_util.py +++ b/tests/util/hdfs_util.py @@ -19,15 +19,19 @@ import getpass import httplib +import os.path +import re import requests +import subprocess +import tempfile from os import environ -from os.path import join as join_path from pywebhdfs.webhdfs import PyWebHdfsClient, errors, _raise_pywebhdfs_exception from xml.etree.ElementTree import parse from tests.util.filesystem_base import BaseFilesystem from tests.util.filesystem_utils import FILESYSTEM_PREFIX + class HdfsConfig(object): """Reads an XML configuration file (produced by a mini-cluster) into a dictionary accessible via get()""" @@ -41,14 +45,16 @@ class HdfsConfig(object): def get(self, key): return self.conf.get(key) + # Configuration object for the configuration that the minicluster will use. -CORE_CONF = HdfsConfig(join_path(environ['HADOOP_CONF_DIR'], "core-site.xml")) +CORE_CONF = HdfsConfig(os.path.join(environ['HADOOP_CONF_DIR'], "core-site.xml")) # NAMENODE is the path prefix that should be used in results, since paths that come # out of Impala have been qualified. When running against the default filesystem, # this will be the same as fs.defaultFS. When running against a secondary filesystem, # this will be the same as FILESYSTEM_PREFIX. NAMENODE = FILESYSTEM_PREFIX or CORE_CONF.get('fs.defaultFS') + class PyWebHdfsClientWithChmod(PyWebHdfsClient, BaseFilesystem): def chmod(self, path, permission): """Set the permission of 'path' to 'permission' (specified as an octal string, e.g. @@ -140,6 +146,113 @@ class PyWebHdfsClientWithChmod(PyWebHdfsClient, BaseFilesystem): return False return True + +class HadoopFsCommandLineClient(BaseFilesystem): + """This client is a wrapper around the hadoop fs command line. This is useful for + filesystems that rely on the logic in the Hadoop connector. For example, S3 with + S3Guard needs all accesses to go through the S3 connector. This is also useful for + filesystems that are fully served by this limited set of functionality (ABFS uses + this). + """ + + def __init__(self, filesystem_type="HDFS"): + # The filesystem_type is used only for providing more specific error messages. + self.filesystem_type = filesystem_type + super(HadoopFsCommandLineClient, self).__init__() + + def _hadoop_fs_shell(self, command): + """Helper function wrapper around 'hadoop fs' takes in the arguments as a list.""" + hadoop_command = ['hadoop', 'fs'] + command + process = subprocess.Popen(hadoop_command, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = process.communicate() + status = process.returncode + return (status, stdout, stderr) + + def create_file(self, path, file_data, overwrite=True): + """Creates a temporary file with the specified file_data on the local filesystem, + then puts it into the specified path.""" + fixed_path = self._normalize_path(path) + if not overwrite and self.exists(fixed_path): return False + with tempfile.NamedTemporaryFile(delete=False) as tmp_file: + tmp_file.write(file_data) + (status, stdout, stderr) = \ + self._hadoop_fs_shell(['-put', tmp_file.name, fixed_path]) + return status == 0 + + def make_dir(self, path, permission=None): + """Create a directory at the specified path. Permissions are not supported.""" + fixed_path = self._normalize_path(path) + (status, stdout, stderr) = self._hadoop_fs_shell(['-mkdir', '-p', fixed_path]) + return status == 0 + + def copy(self, src, dst): + """Copy the source file to the destination.""" + fixed_src = self._normalize_path(src) + fixed_dst = self._normalize_path(dst) + (status, stdout, stderr) = \ + self._hadoop_fs_shell(['-cp', fixed_src, fixed_dst]) + assert status == 0, \ + '{0} copy failed: '.format(self.filesystem_type) + stderr + "; " + stdout + assert self.exists(dst), \ + '{fs_type} copy failed: Destination file {dst} does not exist'\ + .format(fs_type=self.filesystem_type, dst=dst) + + def _inner_ls(self, path): + """List names, lengths, and mode for files/directories under the specified path.""" + fixed_path = self._normalize_path(path) + (status, stdout, stderr) = self._hadoop_fs_shell(['-ls', fixed_path]) + # Trim the "Found X items" line and trailing new-line + entries = stdout.split("\n")[1:-1] + files = [] + for entry in entries: + fields = re.split(" +", entry) + files.append({ + 'name': fields[7], + 'length': int(fields[4]), + 'mode': fields[0] + }) + return files + + def ls(self, path): + """Returns a list of all file and directory names in 'path'""" + fixed_path = self._normalize_path(path) + files = [] + for f in self._inner_ls(fixed_path): + fname = os.path.basename(f['name']) + if not fname == '': + files += [fname] + return files + + def exists(self, path): + """Checks if a particular path exists""" + fixed_path = self._normalize_path(path) + (status, stdout, stderr) = self._hadoop_fs_shell(['-test', '-e', fixed_path]) + return status == 0 + + def delete_file_dir(self, path, recursive=False): + """Delete the file or directory given by the specified path. Recursive must be true + for directories.""" + fixed_path = self._normalize_path(path) + rm_command = ['-rm', fixed_path] + if recursive: + rm_command = ['-rm', '-r', fixed_path] + (status, stdout, stderr) = self._hadoop_fs_shell(rm_command) + return status == 0 + + def get_all_file_sizes(self, path): + """Returns a list of integers which are all the file sizes of files found + under 'path'.""" + fixed_path = self._normalize_path(path) + return [f['length'] for f in + self._inner_ls(fixed_path) if f['mode'][0] == "-"] + + def _normalize_path(self, path): + """Paths passed in may lack a leading slash. This adds a leading slash if it is + missing.""" + return path if path.startswith('/') else '/' + path + + def get_hdfs_client_from_conf(conf): """Returns a new HTTP client for an HDFS cluster using an HdfsConfig object""" hostport = conf.get('dfs.namenode.http-address') @@ -148,14 +261,17 @@ def get_hdfs_client_from_conf(conf): host, port = hostport.split(":") return get_hdfs_client(host=host, port=port) + def get_hdfs_client(host, port, user_name=getpass.getuser()): """Returns a new HTTP client for an HDFS cluster using an explict host:port pair""" return PyWebHdfsClientWithChmod(host=host, port=port, user_name=user_name) + def get_default_hdfs_config(): - core_site_path = join_path(environ.get('HADOOP_CONF_DIR'), 'core-site.xml') - hdfs_site_path = join_path(environ.get('HADOOP_CONF_DIR'), 'hdfs-site.xml') + core_site_path = os.path.join(environ.get('HADOOP_CONF_DIR'), 'core-site.xml') + hdfs_site_path = os.path.join(environ.get('HADOOP_CONF_DIR'), 'hdfs-site.xml') return HdfsConfig(core_site_path, hdfs_site_path) + def create_default_hdfs_client(): return get_hdfs_client_from_conf(get_default_hdfs_config()) diff --git a/tests/util/s3_util.py b/tests/util/s3_util.py deleted file mode 100644 index ec5cfdfcf..000000000 --- a/tests/util/s3_util.py +++ /dev/null @@ -1,103 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# S3 access utilities -# -# This file uses the boto3 client and provides simple functions to the Impala test suite -# to access Amazon S3. - -import boto3 -from tests.util.filesystem_base import BaseFilesystem - -class S3Client(BaseFilesystem): - - @classmethod - def __init__(self, bucket): - self.bucketname = bucket - self.s3 = boto3.resource('s3') - self.bucket = self.s3.Bucket(self.bucketname) - self.s3client = boto3.client('s3') - - def create_file(self, path, file_data, overwrite=True): - if not overwrite and self.exists(path): return False - self.s3client.put_object(Bucket=self.bucketname, Key=path, Body=file_data) - return True - - def make_dir(self, path, permission=None): - # This function is a no-op. S3 is a key-value store and does not have a directory - # structure. We can use a non existant path as though it already exists. - pass - - def copy(self, src, dst): - self.s3client.copy_object(Bucket=self.bucketname, - CopySource={'Bucket':self.bucketname, 'Key':src}, Key=dst) - assert self.exists(dst), \ - 'S3 copy failed: Destination file {dst} does not exist'.format(dst=dst) - - # Since S3 is a key-value store, it does not have a command like 'ls' for a directory - # structured filesystem. It lists everything under a path recursively. - # We have to manipulate its response to get an 'ls' like output. - def ls(self, path): - if not path.endswith('/'): - path += '/' - # Use '/' as a delimiter so that we don't get all keys under a path recursively. - response = self.s3client.list_objects( - Bucket=self.bucketname, Prefix=path, Delimiter='/') - dirs = [] - # Non-keys or "directories" will be listed as 'Prefix' under 'CommonPrefixes'. - if 'CommonPrefixes' in response: - dirs = [t['Prefix'] for t in response['CommonPrefixes']] - files = [] - # Keys or "files" will be listed as 'Key' under 'Contents'. - if 'Contents' in response: - files = [t['Key'] for t in response['Contents']] - files_and_dirs = [] - files_and_dirs.extend([d.split('/')[-2] for d in dirs]) - for f in files: - key = f.split("/")[-1] - if not key == '': - files_and_dirs += [key] - return files_and_dirs - - def get_all_file_sizes(self, path): - if not path.endswith('/'): - path += '/' - # Use '/' as a delimiter so that we don't get all keys under a path recursively. - response = self.s3client.list_objects( - Bucket=self.bucketname, Prefix=path, Delimiter='/') - if 'Contents' in response: - return [t['Size'] for t in response['Contents']] - return [] - - def exists(self, path): - response = self.s3client.list_objects(Bucket=self.bucketname,Prefix=path) - return response.get('Contents') is not None - - # Helper function which lists keys in a path. Should not be used by the tests directly. - def _list_keys(self, path): - if not self.exists(path): - return False - response = self.s3client.list_objects(Bucket=self.bucketname, Prefix=path) - contents = response.get('Contents') - return [c['Key'] for c in contents] - - def delete_file_dir(self, path, recursive=False): - if not self.exists(path): - return True - objects = [{'Key': k} for k in self._list_keys(path)] if recursive else path - self.s3client.delete_objects(Bucket=self.bucketname, Delete={'Objects':objects}) - return True