# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from __future__ import absolute_import, division, print_function from builtins import range import math import os import pytest import random import struct import subprocess from os.path import join from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.test_dimensions import ( add_exec_option_dimension, create_single_exec_option_dimension) from tests.common.test_result_verifier import verify_query_result_is_equal from tests.common.test_vector import ImpalaTestDimension from tests.util.filesystem_utils import get_fs_path, WAREHOUSE # (file extension, table suffix) pairs compression_formats = [ ('.bz2', 'bzip'), ('.deflate', 'def'), ('.gz', 'gzip'), ('.snappy', 'snap'), ] compression_extensions = ['.bz2', '.deflate', '.gz', '.snappy'] class TestCompressedFormatsBase(ImpalaTestSuite): """ Base class to provide utility functions for testing support for compressed data files. """ @classmethod def add_test_dimensions(cls): super(TestCompressedFormatsBase, cls).add_test_dimensions() def _copy_and_query_compressed_file(self, unique_database, table_name, db_suffix, dest_extension, expected_error=None): """ This is a utility function to test the behavior for compressed file formats with different file extensions. It creates a new table in the unique_database as a copy of the provided table_name from the functional schema with the specified db_suffix. It copies the first file from the sourcce table to the new table as file_basename + dest_extension. It then runs a query on the new table. Unless expected_error is set, it expects the query to run successfully. """ # Calculate locations for the source table base_dir = WAREHOUSE src_table = "functional{0}.{1}".format(db_suffix, table_name) src_table_dir = join(base_dir, table_name + db_suffix) file_basename = self.filesystem_client.ls(src_table_dir)[0] src_file = join(src_table_dir, file_basename) # Calculate locations for the destination table dest_table_dir = "{2}/{0}.db/{1}".format(unique_database, table_name, WAREHOUSE) dest_table = "{0}.{1}".format(unique_database, table_name) dest_file = join(dest_table_dir, file_basename + dest_extension) # Use a specific location to avoid any interactions with Hive behavior changes. drop_cmd = "DROP TABLE IF EXISTS {0};".format(dest_table) create_cmd = "CREATE TABLE {0} LIKE {1} LOCATION \'{2}\';".format( dest_table, src_table, dest_table_dir) # Create the table and copy in the data file self.client.execute(drop_cmd) self.client.execute(create_cmd) self.filesystem_client.copy(src_file, dest_file, overwrite=True) # Try to read the compressed file with extension query = "select count(*) from {0};".format(dest_table) try: # Need to refresh the metadata to see the file copied in. self.client.execute("refresh {0}".format(dest_table)) result = self.execute_scalar(query) # Fail iff we expected an error assert expected_error is None, 'Query is expected to fail' assert result and int(result) > 0 except Exception as e: error_msg = str(e) print(error_msg) if expected_error is None or expected_error not in error_msg: print("Unexpected error:\n{0}".format(error_msg)) raise finally: self.client.execute(drop_cmd) self.filesystem_client.delete_file_dir(dest_file) class TestCompressedNonText(TestCompressedFormatsBase): """Tests behavior for compressed non-text formats (avro, rc, seq).""" @classmethod def add_test_dimensions(cls): super(TestCompressedNonText, cls).add_test_dimensions() cls.ImpalaTestMatrix.clear() cls.ImpalaTestMatrix.add_dimension( ImpalaTestDimension('file_format', *['rc', 'seq', 'avro'])) cls.ImpalaTestMatrix.add_dimension( ImpalaTestDimension('compression_format', *compression_formats)) def test_insensitivity_to_extension(self, vector, unique_database): """ Avro, RC, and Sequence files do not use the file extension to determine the type of compression. This verifies that they work regardless of the extension. """ file_format = vector.get_value('file_format') right_extension, suffix = vector.get_value('compression_format') # Avro is only loaded in a subset of the compression types. Bail out for # the ones that are not loaded. if file_format == 'avro' and suffix not in ['def', 'snap']: pytest.xfail('Avro is only created for Deflate and Snappy compression codecs') db_suffix = '_{0}_{1}'.format(file_format, suffix) # Pick one wrong extension randomly wrong_extensions = [ext for ext in compression_extensions if ext != right_extension] random.shuffle(wrong_extensions) wrong_extension = wrong_extensions[0] # Test with the "right" extension that matches the file's compression, one wrong # extension, and no extension. By default, Hive does not use a file extension. for ext in [right_extension, wrong_extension, ""]: self._copy_and_query_compressed_file( unique_database, 'tinytable', db_suffix, ext) class TestCompressedText(TestCompressedFormatsBase): """ Tests behavior for compressed text files, which determine the compression codec from the file extension. """ @classmethod def add_test_dimensions(cls): super(TestCompressedText, cls).add_test_dimensions() cls.ImpalaTestMatrix.clear() cls.ImpalaTestMatrix.add_dimension( ImpalaTestDimension('file_format', *['text', 'json'])) cls.ImpalaTestMatrix.add_dimension( ImpalaTestDimension('compression_format', *compression_formats)) def test_correct_extension(self, vector, unique_database): """ Text files use the file extension to determine the type of compression. By default, Hive creates files with the appropriate file extension. This verifies the positive case that the correct extension works. This is a somewhat trivial test. However, it runs with the core exploration strategy and runs on all filesystems. It verifies formats on core that are otherwise limited to exhaustive. That is important for coverage on non-HDFS filesystems like s3. """ file_format = vector.get_value('file_format') extension, suffix = vector.get_value('compression_format') db_suffix = '_{0}_{1}'.format(file_format, suffix) self._copy_and_query_compressed_file( unique_database, 'tinytable', db_suffix, extension) class TestUnsupportedTableWriters(ImpalaTestSuite): @classmethod def add_test_dimensions(cls): super(TestUnsupportedTableWriters, cls).add_test_dimensions() cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension()) # This class tests different formats, but doesn't use constraints. # The constraint added below is only to make sure that the test file runs once. cls.ImpalaTestMatrix.add_constraint(lambda v: (v.get_value('table_format').file_format == 'text' and v.get_value('table_format').compression_codec == 'none')) def test_error_message(self, vector, unique_database): # Tests that an appropriate error message is displayed for unsupported writers like # compressed text, avro and sequence. self.run_test_case('QueryTest/unsupported-writers', vector, unique_database) @pytest.mark.execute_serially class TestLargeCompressedFile(ImpalaTestSuite): """ Tests that Impala handles compressed files in HDFS larger than 1GB. This test creates a 2GB test data file and loads it into a table. """ TABLE_NAME = "large_compressed_file" TABLE_LOCATION = get_fs_path("/test-warehouse/large_compressed_file") """ Name the file with ".snappy" extension to let scanner treat it as a snappy block compressed file. """ FILE_NAME = "largefile.snappy" # Maximum uncompressed size of an outer block in a snappy block compressed file. CHUNK_SIZE = 1024 * 1024 * 1024 # Limit the max file size to 2GB or too much memory may be needed when # uncompressing the buffer. 2GB is sufficient to show that we support # size beyond maximum 32-bit signed value. MAX_FILE_SIZE = 2 * CHUNK_SIZE @classmethod def add_test_dimensions(cls): super(TestLargeCompressedFile, cls).add_test_dimensions() if cls.exploration_strategy() != 'exhaustive': pytest.skip("skipping if it's not exhaustive test.") cls.ImpalaTestMatrix.add_constraint(lambda v: (v.get_value('table_format').file_format == 'text' and v.get_value('table_format').compression_codec == 'snap')) def teardown_method(self, method): self.__drop_test_table() def __generate_file(self, file_name, file_size): """Generate file with random data and a specified size.""" # Read the payload compressed using snappy. The compressed payload # is generated from a string of 50176 bytes. payload_size = 50176 hdfs_cat = subprocess.Popen(["hadoop", "fs", "-cat", "%s/compressed_payload.snap" % WAREHOUSE], stdout=subprocess.PIPE) compressed_payload = hdfs_cat.stdout.read() compressed_size = len(compressed_payload) hdfs_cat.stdout.close() hdfs_cat.wait() # The layout of a snappy-block compressed file is one or more # of the following nested structure which is called "chunk" in # the code below: # # - # - one or more blocks of the following structure: # - # - # Number of nested structures described above. num_chunks = int(math.ceil(file_size / self.CHUNK_SIZE)) # Number of compressed snappy blocks per chunk. num_blocks_per_chunk = self.CHUNK_SIZE // (compressed_size + 4) # Total uncompressed size of a nested structure. total_chunk_size = num_blocks_per_chunk * payload_size hdfs_put = subprocess.Popen(["hdfs", "dfs", "-put", "-d", "-f", "-", file_name], stdin=subprocess.PIPE, bufsize=-1) for i in range(num_chunks): hdfs_put.stdin.write(struct.pack('>i', total_chunk_size)) for j in range(num_blocks_per_chunk): hdfs_put.stdin.write(struct.pack('>i', compressed_size)) hdfs_put.stdin.write(compressed_payload) hdfs_put.stdin.close() hdfs_put.wait() def test_query_large_file(self, vector): self.__create_test_table() dst_path = "%s/%s" % (self.TABLE_LOCATION, self.FILE_NAME) file_size = self.MAX_FILE_SIZE self.__generate_file(dst_path, file_size) self.client.execute("refresh %s" % self.TABLE_NAME) # Query the table self.client.execute("select * from %s limit 1" % self.TABLE_NAME) def __create_test_table(self): self.__drop_test_table() self.client.execute("CREATE TABLE %s (col string) " "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '%s'" % (self.TABLE_NAME, self.TABLE_LOCATION)) def __drop_test_table(self): self.client.execute("DROP TABLE IF EXISTS %s" % self.TABLE_NAME) class TestBzip2Streaming(ImpalaTestSuite): MAX_SCAN_RANGE_LENGTHS = [0, 5] @classmethod def add_test_dimensions(cls): super(TestBzip2Streaming, cls).add_test_dimensions() if cls.exploration_strategy() != 'exhaustive': pytest.skip("skipping if it's not exhaustive test.") add_exec_option_dimension( cls, 'max_scan_range_length', cls.MAX_SCAN_RANGE_LENGTHS) cls.ImpalaTestMatrix.add_constraint(lambda v: v.get_value('table_format').file_format == 'text' and v.get_value('table_format').compression_codec == 'bzip') def test_bzip2_streaming(self, vector): self.run_test_case('QueryTest/text-bzip-scan', vector) class TestReadZtsdLibCompressedFile(ImpalaTestSuite): """ Test that file compressed by zstd standard library can be read by Impala """ COMPRESSED_TABLE_NAME = "zstdlib_compressed_table" UNCOMPRESSED_TABLE_NAME = "uncompressed_table" COMPRESSED_TABLE_LOCATION = get_fs_path("/test-warehouse/zstdlib_compressed_file") UNCOMPRESSED_TABLE_LOCATION = get_fs_path("/test-warehouse/uncompressed_file") COMPRESSED_LOCAL_FILE_PATH = "testdata/data/text_large_zstd.zst" UNCOMPRESSED_LOCAL_FILE_PATH = "testdata/data/text_large_zstd.txt" IMPALA_HOME = os.environ['IMPALA_HOME'] @classmethod def add_test_dimensions(cls): super(TestReadZtsdLibCompressedFile, cls).add_test_dimensions() if cls.exploration_strategy() != 'exhaustive': pytest.skip('runs only in exhaustive') cls.ImpalaTestMatrix.add_constraint(lambda v: (v.get_value('table_format').file_format == 'text' and v.get_value('table_format').compression_codec == 'zstd')) def __generate_file(self, local_file_location, table_location): """ Make directory in HDFS and copy zstd standard library compressed file to HDFS Lib compressed file extension has to be zst to be readable. Copy original uncompressed file to HDFS as well for row comparison. """ source_local_file = os.path.join(self.IMPALA_HOME, local_file_location) subprocess.check_call(["hadoop", "fs", "-put", source_local_file, table_location]) def __create_test_table(self, table_name, location): self.client.execute("DROP TABLE IF EXISTS %s" % table_name) self.client.execute("CREATE TABLE %s (col string) LOCATION '%s'" % (table_name, location)) def test_query_large_file(self): self.__create_test_table(self.COMPRESSED_TABLE_NAME, self.COMPRESSED_TABLE_LOCATION) self.__create_test_table(self.UNCOMPRESSED_TABLE_NAME, self.UNCOMPRESSED_TABLE_LOCATION) self.__generate_file(self.COMPRESSED_LOCAL_FILE_PATH, self.COMPRESSED_TABLE_LOCATION) self.__generate_file(self.UNCOMPRESSED_LOCAL_FILE_PATH, self.UNCOMPRESSED_TABLE_LOCATION) self.client.execute("refresh %s" % self.COMPRESSED_TABLE_NAME) self.client.execute("refresh %s" % self.UNCOMPRESSED_TABLE_NAME) # Read from compressed table result = self.client.execute("select count(*) from %s" % self.COMPRESSED_TABLE_NAME) result_uncompressed = self.client.execute("select count(*) from %s" % self.UNCOMPRESSED_TABLE_NAME) assert int(result.get_data()) == int(result_uncompressed.get_data()) # Read top 10k rows from compressed table and uncompressed table, compare results base_result = self.execute_query_expect_success(self.client, "select * from {0} order by col limit 10000".format(self.UNCOMPRESSED_TABLE_NAME)) test_result = self.execute_query_expect_success(self.client, "select * from {0} order by col limit 10000".format(self.COMPRESSED_TABLE_NAME)) verify_query_result_is_equal(test_result.data, base_result.data)