diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc index 0a935eb77..9132638b9 100644 --- a/be/src/exec/hdfs-table-sink.cc +++ b/be/src/exec/hdfs-table-sink.cc @@ -138,8 +138,6 @@ Status HdfsTableSink::Prepare(RuntimeState* state) { PrintId(state->query_id(), "_")); RETURN_IF_ERROR(PrepareExprs(state)); - RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection( - staging_dir_, &hdfs_connection_)); mem_tracker_.reset(new MemTracker(profile(), -1, -1, profile()->name(), state->instance_mem_tracker())); @@ -296,16 +294,23 @@ Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state, // Check if tmp_hdfs_file_name exists. const char* tmp_hdfs_file_name_cstr = output_partition->current_file_name.c_str(); - if (hdfsExists(hdfs_connection_, tmp_hdfs_file_name_cstr) == 0) { + + if (hdfsExists(output_partition->hdfs_connection, tmp_hdfs_file_name_cstr) == 0) { return Status(GetHdfsErrorMsg("Temporary HDFS file already exists: ", output_partition->current_file_name)); } uint64_t block_size = output_partition->partition_descriptor->block_size(); if (block_size == 0) block_size = output_partition->writer->default_block_size(); - output_partition->tmp_hdfs_file = hdfsOpenFile(hdfs_connection_, + output_partition->tmp_hdfs_file = hdfsOpenFile(output_partition->hdfs_connection, tmp_hdfs_file_name_cstr, O_WRONLY, 0, 0, block_size); + VLOG_FILE << "hdfsOpenFile() file=" << tmp_hdfs_file_name_cstr; + if (output_partition->tmp_hdfs_file == NULL) { + return Status(GetHdfsErrorMsg("Failed to open HDFS file for writing: ", + output_partition->current_file_name)); + } + if (IsS3APath(output_partition->current_file_name.c_str())) { // On S3A, the file cannot be stat'ed until after it's closed, and even so, the block // size reported will be just the filesystem default. So, remember the requested @@ -324,12 +329,6 @@ Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state, hdfsFreeFileInfo(info, 1); } - VLOG_FILE << "hdfsOpenFile() file=" << tmp_hdfs_file_name_cstr; - if (output_partition->tmp_hdfs_file == NULL) { - return Status(GetHdfsErrorMsg("Failed to open HDFS file for writing: ", - output_partition->current_file_name)); - } - ImpaladMetrics::NUM_FILES_OPEN_FOR_INSERT->Increment(1); COUNTER_ADD(files_created_counter_, 1); @@ -343,7 +342,8 @@ Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state, Status status = output_partition->writer->InitNewFile(); if (!status.ok()) { ClosePartitionFile(state, output_partition); - hdfsDelete(hdfs_connection_, output_partition->current_file_name.c_str(), 0); + hdfsDelete(output_partition->hdfs_connection, + output_partition->current_file_name.c_str(), 0); } return status; } @@ -384,7 +384,18 @@ Status HdfsTableSink::InitOutputPartition(RuntimeState* state, output_partition->partition_name = partition_name_ss.str(); BuildHdfsFileNames(partition_descriptor, output_partition); - output_partition->hdfs_connection = hdfs_connection_; + if (ShouldSkipStaging(state, output_partition)) { + // We will be writing to the final file if we're skipping staging, so get a connection + // to its filesystem. + RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection( + output_partition->final_hdfs_file_name_prefix, + &output_partition->hdfs_connection)); + } else { + // Else get a connection to the filesystem of the tmp file. + RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection( + output_partition->tmp_hdfs_file_name_prefix, &output_partition->hdfs_connection)); + } + output_partition->partition_descriptor = &partition_descriptor; bool allow_unsupported_formats = @@ -620,7 +631,7 @@ Status HdfsTableSink::FinalizePartitionFile(RuntimeState* state, void HdfsTableSink::ClosePartitionFile(RuntimeState* state, OutputPartition* partition) { if (partition->tmp_hdfs_file == NULL) return; - int hdfs_ret = hdfsCloseFile(hdfs_connection_, partition->tmp_hdfs_file); + int hdfs_ret = hdfsCloseFile(partition->hdfs_connection, partition->tmp_hdfs_file); VLOG_FILE << "hdfsCloseFile() file=" << partition->current_file_name; if (hdfs_ret != 0) { state->LogError(ErrorMsg(TErrorCode::GENERAL, diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h index a06e66df6..99dcc898f 100644 --- a/be/src/exec/hdfs-table-sink.h +++ b/be/src/exec/hdfs-table-sink.h @@ -231,9 +231,6 @@ class HdfsTableSink : public DataSink { /// Current row from the current RowBatch to output TupleRow* current_row_; - /// Connection to hdfs, established in Open() and closed in Close(). - hdfsFS hdfs_connection_; - /// Row descriptor of row batches passed in Send(). Set in c'tor. const RowDescriptor& row_desc_; diff --git a/testdata/workloads/functional-query/queries/QueryTest/multiple-filesystems.test b/testdata/workloads/functional-query/queries/QueryTest/multiple-filesystems.test index 8b758faa0..0cc50af57 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/multiple-filesystems.test +++ b/testdata/workloads/functional-query/queries/QueryTest/multiple-filesystems.test @@ -3,7 +3,7 @@ # Verify various CREATE TABLE for unpartitioned tables on non-default # filesystem (e.g. S3A). create external table tinytable_like like functional.tinytable -location '$FILESYSTEM_PREFIX/test-warehouse/tinytable' +location '$SECONDARY_FILESYSTEM/multi_fs_tests/$DATABASE.db/tinytable' ---- RESULTS ==== ---- QUERY @@ -32,7 +32,7 @@ BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING ---- QUERY create external table tinytable_copy (a string, b string) row format delimited fields terminated by ',' -location '$FILESYSTEM_PREFIX/test-warehouse/tinytable' +location '$SECONDARY_FILESYSTEM/multi_fs_tests/$DATABASE.db/tinytable' ---- RESULTS ==== ---- QUERY @@ -58,22 +58,22 @@ INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIM ---- QUERY # Verify ADD PARTITION for non-default filesystem. alter table alltypes add partition(year=2009, month=1) -location '$FILESYSTEM_PREFIX/test-warehouse/alltypes_parquet/year=2009/month=1' +location '$SECONDARY_FILESYSTEM/multi_fs_tests/$DATABASE.db/alltypes_parquet/year=2009/month=1' ---- RESULTS ==== ---- QUERY alter table alltypes add partition(year=2009, month=2) -location '$FILESYSTEM_PREFIX/test-warehouse/alltypes_parquet/year=2009/month=2' +location '$SECONDARY_FILESYSTEM/multi_fs_tests/$DATABASE.db/alltypes_parquet/year=2009/month=2' ---- RESULTS ==== ---- QUERY alter table alltypes add partition(year=2010, month=1) -location '$FILESYSTEM_PREFIX/test-warehouse/alltypes_parquet/year=2010/month=1' +location '$SECONDARY_FILESYSTEM/multi_fs_tests/$DATABASE.db/alltypes_parquet/year=2010/month=1' ---- RESULTS ==== ---- QUERY alter table alltypes add partition(year=2010, month=2) -location '$FILESYSTEM_PREFIX/test-warehouse/alltypes_parquet/year=2010/month=2' +location '$SECONDARY_FILESYSTEM/multi_fs_tests/$DATABASE.db/alltypes_parquet/year=2010/month=2' ---- RESULTS ==== ---- QUERY @@ -117,17 +117,17 @@ INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIM ==== ---- QUERY # Verify tables with partitions that span multiple filesystems. -# Note: intentionally not using $FILESYSTEM_PREFIX so that the partition points +# Note: intentionally not using $SECONDARY_FILESYSTEM so that the partition points # to the default filesystem. alter table alltypes add partition(year=2010, month=3) -location '/test-warehouse/multi_fs_db.db/alltypes_parquet/year=2010/month=3' +location '/test-warehouse/alltypes_parquet/year=2010/month=3' ---- RESULTS ==== ---- QUERY -# Note: intentionally not using $FILESYSTEM_PREFIX so that the partition points +# Note: intentionally not using $SECONDARY_FILESYSTEM so that the partition points # to the default filesystem. alter table alltypes add partition(year=2010, month=4) -location '/test-warehouse/multi_fs_db.db/alltypes_parquet/year=2010/month=4' +location '/test-warehouse/alltypes_parquet/year=2010/month=4' ---- RESULTS ==== ---- QUERY @@ -173,7 +173,7 @@ drop table alltypes ---- QUERY # Verify CREATE TABLE for partitioned table on non-default filesystem. create external table alltypes like functional_parquet.alltypes -location '$FILESYSTEM_PREFIX/test-warehouse/multi_fs_db.db/alltypes' +location '$SECONDARY_FILESYSTEM/multi_fs_tests/$DATABASE.db/alltypes' ---- RESULTS ==== ---- QUERY @@ -184,29 +184,29 @@ INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIM ==== ---- QUERY # Verify tables with partitions that span multiple filesystems, split on the other partition. -# Note: intentionally not using $FILESYSTEM_PREFIX so that the partition points +# Note: intentionally not using $SECONDARY_FILESYSTEM so that the partition points # to the default filesystem. alter table alltypes add partition(year=2009, month=5) -location '/test-warehouse/multi_fs_db.db/alltypes_parquet/year=2009/month=5' +location '/test-warehouse/alltypes_parquet/year=2009/month=5' ---- RESULTS ==== ---- QUERY alter table alltypes add partition(year=2009, month=6) -location '$FILESYSTEM_PREFIX/test-warehouse/alltypes_parquet/year=2009/month=6' +location '$SECONDARY_FILESYSTEM/multi_fs_tests/$DATABASE.db/alltypes_parquet/year=2009/month=6' ---- RESULTS ==== ---- QUERY -# Note: intentionally not using $FILESYSTEM_PREFIX so that the partition points +# Note: intentionally not using $SECONDARY_FILESYSTEM so that the partition points # to the default filesystem. alter table alltypes add partition(year=2010, month=5) -location '/test-warehouse/multi_fs_db.db/alltypes_parquet/year=2010/month=5' +location '/test-warehouse/alltypes_parquet/year=2010/month=5' ---- RESULTS ==== ---- QUERY # This partition directory was dropped earlier, so this also verifies the partition # directory was not deleted. alter table alltypes add partition(year=2010, month=2) -location '$FILESYSTEM_PREFIX/test-warehouse/alltypes_parquet/year=2010/month=2' +location '$SECONDARY_FILESYSTEM/multi_fs_tests/$DATABASE.db/alltypes_parquet/year=2010/month=2' ---- RESULTS ==== ---- QUERY @@ -291,7 +291,7 @@ create table alltypes_multipart_insert like functional_parquet.alltypes ---- QUERY # ADD PARTITION on a non-default filesystem. alter table alltypes_multipart_insert add partition(year=2009, month=1) -location '$FILESYSTEM_PREFIX/test-warehouse/alltypes_multipart_insert/year=2009/month=1' +location '$SECONDARY_FILESYSTEM/multi_fs_tests/$DATABASE.db/alltypes_multipart_insert/year=2009/month=1' ---- RESULTS ==== ---- QUERY @@ -304,8 +304,9 @@ year=2009/month=1/: 310 ==== ---- QUERY # ADD PARTITION on the default filesystem. +# Point to unique database so we don't overwrite someone else's data. alter table alltypes_multipart_insert add partition(year=2009, month=2) -location '/test-warehouse/alltypes_multipart_insert/year=2009/month=2' +location '/test-warehouse/$DATABASE.db/alltypes_multipart_insert/year=2009/month=2' ---- RESULTS ==== ---- QUERY diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py index ca7e168a6..c12204cd2 100644 --- a/tests/common/impala_test_suite.py +++ b/tests/common/impala_test_suite.py @@ -42,7 +42,7 @@ from tests.performance.query_executor import JdbcQueryExecConfig from tests.performance.query_exec_functions import execute_using_jdbc from tests.util.hdfs_util import HdfsConfig, get_hdfs_client, get_hdfs_client_from_conf from tests.util.s3_util import S3Client -from tests.util.filesystem_utils import IS_S3, S3_BUCKET_NAME +from tests.util.filesystem_utils import IS_S3, IS_HDFS, S3_BUCKET_NAME # Imports required for Hive Metastore Client from hive_metastore import ThriftHiveMetastore @@ -118,7 +118,8 @@ class ImpalaTestSuite(BaseTestSuite): cls.impalad_test_service = cls.create_impala_service() cls.hdfs_client = cls.create_hdfs_client() - cls.filesystem_client = S3Client(S3_BUCKET_NAME) if IS_S3 else cls.hdfs_client + cls.s3_client = S3Client(S3_BUCKET_NAME) + cls.filesystem_client = cls.s3_client if IS_S3 else cls.hdfs_client @classmethod def teardown_class(cls): @@ -266,7 +267,9 @@ class ImpalaTestSuite(BaseTestSuite): query = QueryTestSectionReader.build_query(test_section['QUERY'] .replace('$GROUP_NAME', group_name) .replace('$IMPALA_HOME', IMPALA_HOME) - .replace('$FILESYSTEM_PREFIX', FILESYSTEM_PREFIX)) + .replace('$FILESYSTEM_PREFIX', FILESYSTEM_PREFIX) + .replace('$SECONDARY_FILESYSTEM', os.getenv("SECONDARY_FILESYSTEM") or str())) + if use_db: query = query.replace('$DATABASE', use_db) if 'QUERY_NAME' in test_section: LOG.info('Query Name: \n%s\n' % test_section['QUERY_NAME']) diff --git a/tests/common/skip.py b/tests/common/skip.py index e260eaeab..e434915f9 100644 --- a/tests/common/skip.py +++ b/tests/common/skip.py @@ -21,9 +21,9 @@ import re import os import pytest from functools import partial - from tests.common.environ import IMPALAD_BUILD, USING_OLD_AGGS_JOINS -from tests.util.filesystem_utils import IS_DEFAULT_FS, IS_S3, IS_ISILON, IS_LOCAL +from tests.util.filesystem_utils import IS_DEFAULT_FS, IS_S3, IS_ISILON, IS_LOCAL,\ + SECONDARY_FILESYSTEM class SkipIfS3: @@ -45,8 +45,6 @@ class SkipIfS3: reason="Tests rely on HDFS qualified paths, IMPALA-1872") class SkipIf: - # Some tests require a non-default filesystem to be present. - default_fs = pytest.mark.skipif(IS_DEFAULT_FS, reason="Non-default filesystem needed") skip_hbase = pytest.mark.skipif(pytest.config.option.skip_hbase, reason="--skip_hbase argument specified") not_default_fs = pytest.mark.skipif(not IS_DEFAULT_FS, @@ -54,6 +52,8 @@ class SkipIf: kudu_not_supported = pytest.mark.skipif(os.environ["KUDU_IS_SUPPORTED"] == "false", reason="Kudu is not supported") not_s3 = pytest.mark.skipif(not IS_S3, reason="S3 Filesystem needed") + no_secondary_fs = pytest.mark.skipif(not SECONDARY_FILESYSTEM, + reason="Secondary filesystem needed") class SkipIfIsilon: caching = pytest.mark.skipif(IS_ISILON, reason="SET CACHED not implemented for Isilon") diff --git a/tests/query_test/test_multiple_filesystems.py b/tests/query_test/test_multiple_filesystems.py index 926c247bc..f6bf7278e 100644 --- a/tests/query_test/test_multiple_filesystems.py +++ b/tests/query_test/test_multiple_filesystems.py @@ -2,21 +2,19 @@ # Validates table stored on the LocalFileSystem. # import pytest -from subprocess import check_call +import os +from subprocess import check_call, call from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.test_dimensions import create_single_exec_option_dimension -from tests.common.skip import SkipIf, SkipIfIsilon, SkipIfS3 -from tests.util.filesystem_utils import get_fs_path +from tests.common.skip import SkipIf +from tests.util.filesystem_utils import get_secondary_fs_path, S3_BUCKET_NAME, ISILON_NAMENODE -@SkipIf.default_fs # Run only when a non-default filesystem is available. -@SkipIfIsilon.untriaged # Missing coverage: Find out why this is failing. +@SkipIf.no_secondary_fs class TestMultipleFilesystems(ImpalaTestSuite): """ Tests that tables and queries can span multiple filesystems. """ - TEST_DB = 'multi_fs_db' - @classmethod def get_workload(self): return 'functional-query' @@ -30,21 +28,24 @@ class TestMultipleFilesystems(ImpalaTestSuite): v.get_value('table_format').file_format == 'text' and \ v.get_value('table_format').compression_codec == 'none') - def setup_method(self, method): - self.cleanup_db(self.TEST_DB) - # Note: Purposely creates database on the default filesystem. Do not specify location. - self.client.execute("create database %s" % self.TEST_DB) - self._populate_hdfs_partitions() + def _populate_secondary_fs_partitions(self, db_name): + # This directory may already exist. So we needn't mind if this call fails. + call(["hadoop", "fs", "-mkdir", get_secondary_fs_path("/multi_fs_tests/")], shell=False) + check_call(["hadoop", "fs", "-mkdir", + get_secondary_fs_path("/multi_fs_tests/%s.db/" % db_name)], shell=False) + check_call(["hadoop", "fs", "-cp", "/test-warehouse/alltypes_parquet/", + get_secondary_fs_path("/multi_fs_tests/%s.db/" % db_name)], shell=False) + check_call(["hadoop", "fs", "-cp", "/test-warehouse/tinytable/", + get_secondary_fs_path("/multi_fs_tests/%s.db/" % db_name)], shell=False) - def teardown_method(self, method): - self.cleanup_db(self.TEST_DB) - - def _populate_hdfs_partitions(self): - """ Copy some data to defaultFS HDFS filesystem so that the test can verify tables - that span the default (HDFS) and secondary filesystem (e.g. S3A).""" - check_call(["hadoop", "fs", "-cp", - get_fs_path("/test-warehouse/alltypes_parquet"), - "/test-warehouse/%s.db/" % self.TEST_DB], shell=False) - - def test_local_filesystem(self, vector): - self.run_test_case('QueryTest/multiple-filesystems', vector, use_db=self.TEST_DB) + @pytest.mark.execute_serially + def test_multiple_filesystems(self, vector, unique_database): + try: + self._populate_secondary_fs_partitions(unique_database) + self.run_test_case('QueryTest/multiple-filesystems', vector, use_db=unique_database) + finally: + # We delete this from the secondary filesystem here because the database was created + # in HDFS but the queries will create this path in the secondary FS as well. So + # dropping the database will not delete the directory in the secondary FS. + check_call(["hadoop", "fs", "-rm", "-r", + get_secondary_fs_path("/multi_fs_tests/%s.db/" % unique_database)], shell=False) diff --git a/tests/util/filesystem_utils.py b/tests/util/filesystem_utils.py index d22ae00af..5aa9718df 100644 --- a/tests/util/filesystem_utils.py +++ b/tests/util/filesystem_utils.py @@ -16,10 +16,12 @@ import os FILESYSTEM_PREFIX = os.getenv("FILESYSTEM_PREFIX") or str() +SECONDARY_FILESYSTEM = os.getenv("SECONDARY_FILESYSTEM") or str() FILESYSTEM = os.getenv("TARGET_FILESYSTEM") IS_S3 = FILESYSTEM == "s3" IS_ISILON = FILESYSTEM == "isilon" IS_LOCAL = FILESYSTEM == "local" +IS_HDFS = FILESYSTEM == "hdfs" # This condition satisfies both the states where one can assume a default fs # - The environment variable is set to an empty string. # - Tne environment variables is unset ( None ) @@ -27,6 +29,7 @@ IS_LOCAL = FILESYSTEM == "local" IS_DEFAULT_FS = not FILESYSTEM_PREFIX or IS_LOCAL # Isilon specific values. +ISILON_NAMENODE = os.getenv("ISILON_NAMENODE") or str() ISILON_WEBHDFS_PORT = 8082 # S3 specific values @@ -35,4 +38,7 @@ S3_BUCKET_NAME = os.getenv("S3_BUCKET") def get_fs_path(path): return "%s%s" % (FILESYSTEM_PREFIX, path) +def get_secondary_fs_path(path): + return "%s%s" % (SECONDARY_FILESYSTEM, path) + WAREHOUSE = get_fs_path('/test-warehouse')