From 6f1fe4ebe7ca9998218e0909b4ea6f8b2d27f31a Mon Sep 17 00:00:00 2001 From: Sailesh Mukil Date: Thu, 19 May 2016 16:34:41 -0700 Subject: [PATCH] IMPALA-3577, IMPALA-3486: Partitions on multiple filesystems breaks with S3_SKIP_INSERT_STAGING The HdfsTableSink usualy creates a HDFS connection to the filesystem that the base table resides in. However, if we create a partition in a FS different than that of the base table and set S3_SKIP_INSERT_STAGING to "true", the table sink will try to write to a different filesystem with the wrong filesystem connector. This patch allows the table sink itself to work with different filesystems by getting rid of a single FS connector and getting a connector per partition. This also reenables the multiple_filesystems test and modifies it to use the unique_database fixture so that parallel runs on the same bucket do not clash and end up in failures. This patch also introduces a SECONDARY_FILESYSTEM environment variable which will be set by the test to allow S3, Isilon and the localFS to be used as the secondary filesystems. All jobs with HDFS as the default filesystem need to set the appropriate environment for S3 and Isilon, i.e. the following: - export AWS_SECERT_ACCESS_KEY - export AWS_ACCESS_KEY_ID - export SECONDARY_FILESYSTEM (to whatever filesystem needs to be tested) TODO: SECONDARY_FILESYSTEM and FILESYSTEM_PREFIX and NAMENODE have a lot of similarities. Need to clean them up in a following patch. Change-Id: Ib13b610eb9efb68c83894786cea862d7eae43aa7 Reviewed-on: http://gerrit.cloudera.org:8080/3146 Reviewed-by: Sailesh Mukil Tested-by: Internal Jenkins --- be/src/exec/hdfs-table-sink.cc | 37 +++++++++----- be/src/exec/hdfs-table-sink.h | 3 -- .../QueryTest/multiple-filesystems.test | 39 ++++++++------- tests/common/impala_test_suite.py | 9 ++-- tests/common/skip.py | 8 +-- tests/query_test/test_multiple_filesystems.py | 49 ++++++++++--------- tests/util/filesystem_utils.py | 6 +++ 7 files changed, 85 insertions(+), 66 deletions(-) diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc index 0a935eb77..9132638b9 100644 --- a/be/src/exec/hdfs-table-sink.cc +++ b/be/src/exec/hdfs-table-sink.cc @@ -138,8 +138,6 @@ Status HdfsTableSink::Prepare(RuntimeState* state) { PrintId(state->query_id(), "_")); RETURN_IF_ERROR(PrepareExprs(state)); - RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection( - staging_dir_, &hdfs_connection_)); mem_tracker_.reset(new MemTracker(profile(), -1, -1, profile()->name(), state->instance_mem_tracker())); @@ -296,16 +294,23 @@ Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state, // Check if tmp_hdfs_file_name exists. const char* tmp_hdfs_file_name_cstr = output_partition->current_file_name.c_str(); - if (hdfsExists(hdfs_connection_, tmp_hdfs_file_name_cstr) == 0) { + + if (hdfsExists(output_partition->hdfs_connection, tmp_hdfs_file_name_cstr) == 0) { return Status(GetHdfsErrorMsg("Temporary HDFS file already exists: ", output_partition->current_file_name)); } uint64_t block_size = output_partition->partition_descriptor->block_size(); if (block_size == 0) block_size = output_partition->writer->default_block_size(); - output_partition->tmp_hdfs_file = hdfsOpenFile(hdfs_connection_, + output_partition->tmp_hdfs_file = hdfsOpenFile(output_partition->hdfs_connection, tmp_hdfs_file_name_cstr, O_WRONLY, 0, 0, block_size); + VLOG_FILE << "hdfsOpenFile() file=" << tmp_hdfs_file_name_cstr; + if (output_partition->tmp_hdfs_file == NULL) { + return Status(GetHdfsErrorMsg("Failed to open HDFS file for writing: ", + output_partition->current_file_name)); + } + if (IsS3APath(output_partition->current_file_name.c_str())) { // On S3A, the file cannot be stat'ed until after it's closed, and even so, the block // size reported will be just the filesystem default. So, remember the requested @@ -324,12 +329,6 @@ Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state, hdfsFreeFileInfo(info, 1); } - VLOG_FILE << "hdfsOpenFile() file=" << tmp_hdfs_file_name_cstr; - if (output_partition->tmp_hdfs_file == NULL) { - return Status(GetHdfsErrorMsg("Failed to open HDFS file for writing: ", - output_partition->current_file_name)); - } - ImpaladMetrics::NUM_FILES_OPEN_FOR_INSERT->Increment(1); COUNTER_ADD(files_created_counter_, 1); @@ -343,7 +342,8 @@ Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state, Status status = output_partition->writer->InitNewFile(); if (!status.ok()) { ClosePartitionFile(state, output_partition); - hdfsDelete(hdfs_connection_, output_partition->current_file_name.c_str(), 0); + hdfsDelete(output_partition->hdfs_connection, + output_partition->current_file_name.c_str(), 0); } return status; } @@ -384,7 +384,18 @@ Status HdfsTableSink::InitOutputPartition(RuntimeState* state, output_partition->partition_name = partition_name_ss.str(); BuildHdfsFileNames(partition_descriptor, output_partition); - output_partition->hdfs_connection = hdfs_connection_; + if (ShouldSkipStaging(state, output_partition)) { + // We will be writing to the final file if we're skipping staging, so get a connection + // to its filesystem. + RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection( + output_partition->final_hdfs_file_name_prefix, + &output_partition->hdfs_connection)); + } else { + // Else get a connection to the filesystem of the tmp file. + RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection( + output_partition->tmp_hdfs_file_name_prefix, &output_partition->hdfs_connection)); + } + output_partition->partition_descriptor = &partition_descriptor; bool allow_unsupported_formats = @@ -620,7 +631,7 @@ Status HdfsTableSink::FinalizePartitionFile(RuntimeState* state, void HdfsTableSink::ClosePartitionFile(RuntimeState* state, OutputPartition* partition) { if (partition->tmp_hdfs_file == NULL) return; - int hdfs_ret = hdfsCloseFile(hdfs_connection_, partition->tmp_hdfs_file); + int hdfs_ret = hdfsCloseFile(partition->hdfs_connection, partition->tmp_hdfs_file); VLOG_FILE << "hdfsCloseFile() file=" << partition->current_file_name; if (hdfs_ret != 0) { state->LogError(ErrorMsg(TErrorCode::GENERAL, diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h index a06e66df6..99dcc898f 100644 --- a/be/src/exec/hdfs-table-sink.h +++ b/be/src/exec/hdfs-table-sink.h @@ -231,9 +231,6 @@ class HdfsTableSink : public DataSink { /// Current row from the current RowBatch to output TupleRow* current_row_; - /// Connection to hdfs, established in Open() and closed in Close(). - hdfsFS hdfs_connection_; - /// Row descriptor of row batches passed in Send(). Set in c'tor. const RowDescriptor& row_desc_; diff --git a/testdata/workloads/functional-query/queries/QueryTest/multiple-filesystems.test b/testdata/workloads/functional-query/queries/QueryTest/multiple-filesystems.test index 8b758faa0..0cc50af57 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/multiple-filesystems.test +++ b/testdata/workloads/functional-query/queries/QueryTest/multiple-filesystems.test @@ -3,7 +3,7 @@ # Verify various CREATE TABLE for unpartitioned tables on non-default # filesystem (e.g. S3A). create external table tinytable_like like functional.tinytable -location '$FILESYSTEM_PREFIX/test-warehouse/tinytable' +location '$SECONDARY_FILESYSTEM/multi_fs_tests/$DATABASE.db/tinytable' ---- RESULTS ==== ---- QUERY @@ -32,7 +32,7 @@ BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING ---- QUERY create external table tinytable_copy (a string, b string) row format delimited fields terminated by ',' -location '$FILESYSTEM_PREFIX/test-warehouse/tinytable' +location '$SECONDARY_FILESYSTEM/multi_fs_tests/$DATABASE.db/tinytable' ---- RESULTS ==== ---- QUERY @@ -58,22 +58,22 @@ INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIM ---- QUERY # Verify ADD PARTITION for non-default filesystem. alter table alltypes add partition(year=2009, month=1) -location '$FILESYSTEM_PREFIX/test-warehouse/alltypes_parquet/year=2009/month=1' +location '$SECONDARY_FILESYSTEM/multi_fs_tests/$DATABASE.db/alltypes_parquet/year=2009/month=1' ---- RESULTS ==== ---- QUERY alter table alltypes add partition(year=2009, month=2) -location '$FILESYSTEM_PREFIX/test-warehouse/alltypes_parquet/year=2009/month=2' +location '$SECONDARY_FILESYSTEM/multi_fs_tests/$DATABASE.db/alltypes_parquet/year=2009/month=2' ---- RESULTS ==== ---- QUERY alter table alltypes add partition(year=2010, month=1) -location '$FILESYSTEM_PREFIX/test-warehouse/alltypes_parquet/year=2010/month=1' +location '$SECONDARY_FILESYSTEM/multi_fs_tests/$DATABASE.db/alltypes_parquet/year=2010/month=1' ---- RESULTS ==== ---- QUERY alter table alltypes add partition(year=2010, month=2) -location '$FILESYSTEM_PREFIX/test-warehouse/alltypes_parquet/year=2010/month=2' +location '$SECONDARY_FILESYSTEM/multi_fs_tests/$DATABASE.db/alltypes_parquet/year=2010/month=2' ---- RESULTS ==== ---- QUERY @@ -117,17 +117,17 @@ INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIM ==== ---- QUERY # Verify tables with partitions that span multiple filesystems. -# Note: intentionally not using $FILESYSTEM_PREFIX so that the partition points +# Note: intentionally not using $SECONDARY_FILESYSTEM so that the partition points # to the default filesystem. alter table alltypes add partition(year=2010, month=3) -location '/test-warehouse/multi_fs_db.db/alltypes_parquet/year=2010/month=3' +location '/test-warehouse/alltypes_parquet/year=2010/month=3' ---- RESULTS ==== ---- QUERY -# Note: intentionally not using $FILESYSTEM_PREFIX so that the partition points +# Note: intentionally not using $SECONDARY_FILESYSTEM so that the partition points # to the default filesystem. alter table alltypes add partition(year=2010, month=4) -location '/test-warehouse/multi_fs_db.db/alltypes_parquet/year=2010/month=4' +location '/test-warehouse/alltypes_parquet/year=2010/month=4' ---- RESULTS ==== ---- QUERY @@ -173,7 +173,7 @@ drop table alltypes ---- QUERY # Verify CREATE TABLE for partitioned table on non-default filesystem. create external table alltypes like functional_parquet.alltypes -location '$FILESYSTEM_PREFIX/test-warehouse/multi_fs_db.db/alltypes' +location '$SECONDARY_FILESYSTEM/multi_fs_tests/$DATABASE.db/alltypes' ---- RESULTS ==== ---- QUERY @@ -184,29 +184,29 @@ INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIM ==== ---- QUERY # Verify tables with partitions that span multiple filesystems, split on the other partition. -# Note: intentionally not using $FILESYSTEM_PREFIX so that the partition points +# Note: intentionally not using $SECONDARY_FILESYSTEM so that the partition points # to the default filesystem. alter table alltypes add partition(year=2009, month=5) -location '/test-warehouse/multi_fs_db.db/alltypes_parquet/year=2009/month=5' +location '/test-warehouse/alltypes_parquet/year=2009/month=5' ---- RESULTS ==== ---- QUERY alter table alltypes add partition(year=2009, month=6) -location '$FILESYSTEM_PREFIX/test-warehouse/alltypes_parquet/year=2009/month=6' +location '$SECONDARY_FILESYSTEM/multi_fs_tests/$DATABASE.db/alltypes_parquet/year=2009/month=6' ---- RESULTS ==== ---- QUERY -# Note: intentionally not using $FILESYSTEM_PREFIX so that the partition points +# Note: intentionally not using $SECONDARY_FILESYSTEM so that the partition points # to the default filesystem. alter table alltypes add partition(year=2010, month=5) -location '/test-warehouse/multi_fs_db.db/alltypes_parquet/year=2010/month=5' +location '/test-warehouse/alltypes_parquet/year=2010/month=5' ---- RESULTS ==== ---- QUERY # This partition directory was dropped earlier, so this also verifies the partition # directory was not deleted. alter table alltypes add partition(year=2010, month=2) -location '$FILESYSTEM_PREFIX/test-warehouse/alltypes_parquet/year=2010/month=2' +location '$SECONDARY_FILESYSTEM/multi_fs_tests/$DATABASE.db/alltypes_parquet/year=2010/month=2' ---- RESULTS ==== ---- QUERY @@ -291,7 +291,7 @@ create table alltypes_multipart_insert like functional_parquet.alltypes ---- QUERY # ADD PARTITION on a non-default filesystem. alter table alltypes_multipart_insert add partition(year=2009, month=1) -location '$FILESYSTEM_PREFIX/test-warehouse/alltypes_multipart_insert/year=2009/month=1' +location '$SECONDARY_FILESYSTEM/multi_fs_tests/$DATABASE.db/alltypes_multipart_insert/year=2009/month=1' ---- RESULTS ==== ---- QUERY @@ -304,8 +304,9 @@ year=2009/month=1/: 310 ==== ---- QUERY # ADD PARTITION on the default filesystem. +# Point to unique database so we don't overwrite someone else's data. alter table alltypes_multipart_insert add partition(year=2009, month=2) -location '/test-warehouse/alltypes_multipart_insert/year=2009/month=2' +location '/test-warehouse/$DATABASE.db/alltypes_multipart_insert/year=2009/month=2' ---- RESULTS ==== ---- QUERY diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py index ca7e168a6..c12204cd2 100644 --- a/tests/common/impala_test_suite.py +++ b/tests/common/impala_test_suite.py @@ -42,7 +42,7 @@ from tests.performance.query_executor import JdbcQueryExecConfig from tests.performance.query_exec_functions import execute_using_jdbc from tests.util.hdfs_util import HdfsConfig, get_hdfs_client, get_hdfs_client_from_conf from tests.util.s3_util import S3Client -from tests.util.filesystem_utils import IS_S3, S3_BUCKET_NAME +from tests.util.filesystem_utils import IS_S3, IS_HDFS, S3_BUCKET_NAME # Imports required for Hive Metastore Client from hive_metastore import ThriftHiveMetastore @@ -118,7 +118,8 @@ class ImpalaTestSuite(BaseTestSuite): cls.impalad_test_service = cls.create_impala_service() cls.hdfs_client = cls.create_hdfs_client() - cls.filesystem_client = S3Client(S3_BUCKET_NAME) if IS_S3 else cls.hdfs_client + cls.s3_client = S3Client(S3_BUCKET_NAME) + cls.filesystem_client = cls.s3_client if IS_S3 else cls.hdfs_client @classmethod def teardown_class(cls): @@ -266,7 +267,9 @@ class ImpalaTestSuite(BaseTestSuite): query = QueryTestSectionReader.build_query(test_section['QUERY'] .replace('$GROUP_NAME', group_name) .replace('$IMPALA_HOME', IMPALA_HOME) - .replace('$FILESYSTEM_PREFIX', FILESYSTEM_PREFIX)) + .replace('$FILESYSTEM_PREFIX', FILESYSTEM_PREFIX) + .replace('$SECONDARY_FILESYSTEM', os.getenv("SECONDARY_FILESYSTEM") or str())) + if use_db: query = query.replace('$DATABASE', use_db) if 'QUERY_NAME' in test_section: LOG.info('Query Name: \n%s\n' % test_section['QUERY_NAME']) diff --git a/tests/common/skip.py b/tests/common/skip.py index e260eaeab..e434915f9 100644 --- a/tests/common/skip.py +++ b/tests/common/skip.py @@ -21,9 +21,9 @@ import re import os import pytest from functools import partial - from tests.common.environ import IMPALAD_BUILD, USING_OLD_AGGS_JOINS -from tests.util.filesystem_utils import IS_DEFAULT_FS, IS_S3, IS_ISILON, IS_LOCAL +from tests.util.filesystem_utils import IS_DEFAULT_FS, IS_S3, IS_ISILON, IS_LOCAL,\ + SECONDARY_FILESYSTEM class SkipIfS3: @@ -45,8 +45,6 @@ class SkipIfS3: reason="Tests rely on HDFS qualified paths, IMPALA-1872") class SkipIf: - # Some tests require a non-default filesystem to be present. - default_fs = pytest.mark.skipif(IS_DEFAULT_FS, reason="Non-default filesystem needed") skip_hbase = pytest.mark.skipif(pytest.config.option.skip_hbase, reason="--skip_hbase argument specified") not_default_fs = pytest.mark.skipif(not IS_DEFAULT_FS, @@ -54,6 +52,8 @@ class SkipIf: kudu_not_supported = pytest.mark.skipif(os.environ["KUDU_IS_SUPPORTED"] == "false", reason="Kudu is not supported") not_s3 = pytest.mark.skipif(not IS_S3, reason="S3 Filesystem needed") + no_secondary_fs = pytest.mark.skipif(not SECONDARY_FILESYSTEM, + reason="Secondary filesystem needed") class SkipIfIsilon: caching = pytest.mark.skipif(IS_ISILON, reason="SET CACHED not implemented for Isilon") diff --git a/tests/query_test/test_multiple_filesystems.py b/tests/query_test/test_multiple_filesystems.py index 926c247bc..f6bf7278e 100644 --- a/tests/query_test/test_multiple_filesystems.py +++ b/tests/query_test/test_multiple_filesystems.py @@ -2,21 +2,19 @@ # Validates table stored on the LocalFileSystem. # import pytest -from subprocess import check_call +import os +from subprocess import check_call, call from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.test_dimensions import create_single_exec_option_dimension -from tests.common.skip import SkipIf, SkipIfIsilon, SkipIfS3 -from tests.util.filesystem_utils import get_fs_path +from tests.common.skip import SkipIf +from tests.util.filesystem_utils import get_secondary_fs_path, S3_BUCKET_NAME, ISILON_NAMENODE -@SkipIf.default_fs # Run only when a non-default filesystem is available. -@SkipIfIsilon.untriaged # Missing coverage: Find out why this is failing. +@SkipIf.no_secondary_fs class TestMultipleFilesystems(ImpalaTestSuite): """ Tests that tables and queries can span multiple filesystems. """ - TEST_DB = 'multi_fs_db' - @classmethod def get_workload(self): return 'functional-query' @@ -30,21 +28,24 @@ class TestMultipleFilesystems(ImpalaTestSuite): v.get_value('table_format').file_format == 'text' and \ v.get_value('table_format').compression_codec == 'none') - def setup_method(self, method): - self.cleanup_db(self.TEST_DB) - # Note: Purposely creates database on the default filesystem. Do not specify location. - self.client.execute("create database %s" % self.TEST_DB) - self._populate_hdfs_partitions() + def _populate_secondary_fs_partitions(self, db_name): + # This directory may already exist. So we needn't mind if this call fails. + call(["hadoop", "fs", "-mkdir", get_secondary_fs_path("/multi_fs_tests/")], shell=False) + check_call(["hadoop", "fs", "-mkdir", + get_secondary_fs_path("/multi_fs_tests/%s.db/" % db_name)], shell=False) + check_call(["hadoop", "fs", "-cp", "/test-warehouse/alltypes_parquet/", + get_secondary_fs_path("/multi_fs_tests/%s.db/" % db_name)], shell=False) + check_call(["hadoop", "fs", "-cp", "/test-warehouse/tinytable/", + get_secondary_fs_path("/multi_fs_tests/%s.db/" % db_name)], shell=False) - def teardown_method(self, method): - self.cleanup_db(self.TEST_DB) - - def _populate_hdfs_partitions(self): - """ Copy some data to defaultFS HDFS filesystem so that the test can verify tables - that span the default (HDFS) and secondary filesystem (e.g. S3A).""" - check_call(["hadoop", "fs", "-cp", - get_fs_path("/test-warehouse/alltypes_parquet"), - "/test-warehouse/%s.db/" % self.TEST_DB], shell=False) - - def test_local_filesystem(self, vector): - self.run_test_case('QueryTest/multiple-filesystems', vector, use_db=self.TEST_DB) + @pytest.mark.execute_serially + def test_multiple_filesystems(self, vector, unique_database): + try: + self._populate_secondary_fs_partitions(unique_database) + self.run_test_case('QueryTest/multiple-filesystems', vector, use_db=unique_database) + finally: + # We delete this from the secondary filesystem here because the database was created + # in HDFS but the queries will create this path in the secondary FS as well. So + # dropping the database will not delete the directory in the secondary FS. + check_call(["hadoop", "fs", "-rm", "-r", + get_secondary_fs_path("/multi_fs_tests/%s.db/" % unique_database)], shell=False) diff --git a/tests/util/filesystem_utils.py b/tests/util/filesystem_utils.py index d22ae00af..5aa9718df 100644 --- a/tests/util/filesystem_utils.py +++ b/tests/util/filesystem_utils.py @@ -16,10 +16,12 @@ import os FILESYSTEM_PREFIX = os.getenv("FILESYSTEM_PREFIX") or str() +SECONDARY_FILESYSTEM = os.getenv("SECONDARY_FILESYSTEM") or str() FILESYSTEM = os.getenv("TARGET_FILESYSTEM") IS_S3 = FILESYSTEM == "s3" IS_ISILON = FILESYSTEM == "isilon" IS_LOCAL = FILESYSTEM == "local" +IS_HDFS = FILESYSTEM == "hdfs" # This condition satisfies both the states where one can assume a default fs # - The environment variable is set to an empty string. # - Tne environment variables is unset ( None ) @@ -27,6 +29,7 @@ IS_LOCAL = FILESYSTEM == "local" IS_DEFAULT_FS = not FILESYSTEM_PREFIX or IS_LOCAL # Isilon specific values. +ISILON_NAMENODE = os.getenv("ISILON_NAMENODE") or str() ISILON_WEBHDFS_PORT = 8082 # S3 specific values @@ -35,4 +38,7 @@ S3_BUCKET_NAME = os.getenv("S3_BUCKET") def get_fs_path(path): return "%s%s" % (FILESYSTEM_PREFIX, path) +def get_secondary_fs_path(path): + return "%s%s" % (SECONDARY_FILESYSTEM, path) + WAREHOUSE = get_fs_path('/test-warehouse')