mirror of
https://github.com/apache/impala.git
synced 2025-12-23 21:08:39 -05:00
IMPALA-10005: Fix Snappy decompression for non-block filesystems
Snappy-compressed text always uses THdfsCompression::SNAPPY_BLOCKED type compression in the backend. However, for non-block filesystems, the frontend is incorrectly passing THdfsCompression::SNAPPY instead. On debug builds, this leads to a DCHECK when trying to read Snappy-compressed text. On release builds, it fails to decompress the data. This fixes the frontend to always pass THdfsCompression::SNAPPY_BLOCKED for Snappy-compressed text. This reworks query_test/test_compressed_formats.py to provide better coverage: - Changed the RC and Seq test cases to verify that the file extension doesn't matter. Added Avro to this case as well. - Fixed the text case to use appropriate extensions (fixing IMPALA-9004) - Changed the utility function so it doesn't use Hive. This allows it to be enabled on non-HDFS filesystems like S3. - Changed the test to use unique_database and allow parallel execution. - Changed the test to run in the core job, so it now has coverage on the usual S3 test configuration. It is reasonably quick (1-2 minutes) and runs in parallel. Testing: - Exhaustive job - Core s3 job - Changed the frontend to force it to use the code for non-block filesystems (i.e. the TFileSplitGeneratorSpec code) and verified that it is now able to read Snappy-compressed text. Change-Id: I0879f2fc0bf75bb5c15cecb845ece46a901601ac Reviewed-on: http://gerrit.cloudera.org:8080/16278 Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Reviewed-by: Sahil Takiar <stakiar@cloudera.com>
This commit is contained in:
@@ -18,13 +18,12 @@
|
||||
import math
|
||||
import os
|
||||
import pytest
|
||||
import random
|
||||
import struct
|
||||
import subprocess
|
||||
from os.path import join
|
||||
|
||||
from tests.common.environ import EXTERNAL_WAREHOUSE_DIR
|
||||
from tests.common.impala_test_suite import ImpalaTestSuite
|
||||
from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
|
||||
from tests.common.test_dimensions import create_single_exec_option_dimension
|
||||
from tests.common.test_result_verifier import verify_query_result_is_equal
|
||||
from tests.common.test_vector import ImpalaTestDimension
|
||||
@@ -33,25 +32,19 @@ from tests.util.filesystem_utils import get_fs_path
|
||||
|
||||
# (file extension, table suffix) pairs
|
||||
compression_formats = [
|
||||
('.bz2', 'bzip'),
|
||||
('.bz2', 'bzip'),
|
||||
('.deflate', 'def'),
|
||||
('.gz', 'gzip'),
|
||||
('.snappy', 'snap'),
|
||||
('.gz', 'gzip'),
|
||||
('.snappy', 'snap'),
|
||||
]
|
||||
|
||||
compression_extensions = ['.bz2', '.deflate', '.gz', '.snappy']
|
||||
|
||||
# Missing Coverage: Compressed data written by Hive is queriable by Impala on a non-hdfs
|
||||
# filesystem.
|
||||
@SkipIfS3.hive
|
||||
@SkipIfABFS.hive
|
||||
@SkipIfADLS.hive
|
||||
@SkipIfIsilon.hive
|
||||
@SkipIfLocal.hive
|
||||
class TestCompressedFormats(ImpalaTestSuite):
|
||||
|
||||
class TestCompressedFormatsBase(ImpalaTestSuite):
|
||||
"""
|
||||
Tests that we support compressed RC, sequence and text files and that unsupported
|
||||
formats fail gracefully (see IMPALA-14: Files with .gz extension reported as 'not
|
||||
supported').
|
||||
Base class to provide utility functions for testing support for compressed
|
||||
data files.
|
||||
"""
|
||||
@classmethod
|
||||
def get_workload(self):
|
||||
@@ -59,64 +52,43 @@ class TestCompressedFormats(ImpalaTestSuite):
|
||||
|
||||
@classmethod
|
||||
def add_test_dimensions(cls):
|
||||
super(TestCompressedFormats, cls).add_test_dimensions()
|
||||
cls.ImpalaTestMatrix.clear()
|
||||
cls.ImpalaTestMatrix.add_dimension(\
|
||||
ImpalaTestDimension('file_format', *['rc', 'seq', 'text']))
|
||||
cls.ImpalaTestMatrix.add_dimension(\
|
||||
ImpalaTestDimension('compression_format', *compression_formats))
|
||||
if cls.exploration_strategy() == 'core':
|
||||
# Don't run on core. This test is very slow and we are unlikely
|
||||
# to regress here.
|
||||
cls.ImpalaTestMatrix.add_constraint(lambda v: False);
|
||||
super(TestCompressedFormatsBase, cls).add_test_dimensions()
|
||||
|
||||
@pytest.mark.execute_serially
|
||||
def test_compressed_formats(self, vector):
|
||||
file_format = vector.get_value('file_format')
|
||||
extension, suffix = vector.get_value('compression_format')
|
||||
if file_format in ['rc', 'seq']:
|
||||
# Test that {gzip,snappy,bzip,deflate}-compressed
|
||||
# {RC,sequence,text} files are supported.
|
||||
db_suffix = '_%s_%s' % (file_format, suffix)
|
||||
self._copy_and_query_compressed_file(
|
||||
'tinytable', db_suffix, suffix, '000000_0', extension)
|
||||
elif file_format is 'text':
|
||||
pytest.xfail('IMPALA-9004: TestCompressedFormats is broken for text files')
|
||||
else:
|
||||
assert False, "Unknown file_format: %s" % file_format
|
||||
|
||||
# TODO: switch to using hive metastore API rather than hive shell.
|
||||
def _copy_and_query_compressed_file(self, table_name, db_suffix, compression_codec,
|
||||
file_name, extension, expected_error=None):
|
||||
# We want to create a test table with a compressed file that has a file
|
||||
# extension. We'll do this by making a copy of an existing table with hive.
|
||||
def _copy_and_query_compressed_file(self, unique_database, table_name, db_suffix,
|
||||
file_basename, src_extension, dest_extension, expected_error=None):
|
||||
"""
|
||||
This is a utility function to test the behavior for compressed file formats
|
||||
with different file extensions. It creates a new table in the unique_database
|
||||
as a copy of the provided table_name from the functional schema with the
|
||||
specified db_suffix. It copies file_basename + src_extension to the new
|
||||
table as file_basename + dest_extension. It then runs a query on the
|
||||
new table. Unless expected_error is set, it expects the query to run successfully.
|
||||
"""
|
||||
# Calculate locations for the source table
|
||||
base_dir = '/test-warehouse'
|
||||
src_table = 'functional%s.%s' % (db_suffix, table_name)
|
||||
src_table_dir = "%s%s" % (table_name, db_suffix)
|
||||
src_table_dir = join(base_dir, src_table_dir)
|
||||
src_file = join(src_table_dir, file_name)
|
||||
src_table = "functional{0}.{1}".format(db_suffix, table_name)
|
||||
src_table_dir = join(base_dir, table_name + db_suffix)
|
||||
src_file = join(src_table_dir, file_basename + src_extension)
|
||||
|
||||
# Make sure destination table uses suffix, even if use_suffix=False, so
|
||||
# unique tables are created for each compression format
|
||||
# In Hive3+ create table like behavior is still in discussion, add location
|
||||
# to avoid impact on Impala test.
|
||||
dest_base_dir = '/{0}'.format(EXTERNAL_WAREHOUSE_DIR)
|
||||
dest_table = '%s_%s_copy' % (table_name, compression_codec)
|
||||
dest_table_dir = join(dest_base_dir, dest_table)
|
||||
dest_file = join(dest_table_dir, file_name + extension)
|
||||
# Calculate locations for the destination table
|
||||
dest_table_dir = "/test-warehouse/{0}.db/{1}".format(unique_database, table_name)
|
||||
dest_table = "{0}.{1}".format(unique_database, table_name)
|
||||
dest_file = join(dest_table_dir, file_basename + dest_extension)
|
||||
|
||||
drop_cmd = 'DROP TABLE IF EXISTS %s;' % (dest_table)
|
||||
hive_cmd = drop_cmd + 'CREATE TABLE %s LIKE %s LOCATION \'%s\';' % \
|
||||
(dest_table, src_table, dest_table_dir)
|
||||
# Use a specific location to avoid any interactions with Hive behavior changes.
|
||||
drop_cmd = "DROP TABLE IF EXISTS {0};".format(dest_table)
|
||||
create_cmd = "CREATE TABLE {0} LIKE {1} LOCATION \'{2}\';".format(
|
||||
dest_table, src_table, dest_table_dir)
|
||||
|
||||
# Create the table
|
||||
self.run_stmt_in_hive(hive_cmd)
|
||||
# Create the table and copy in the data file
|
||||
self.client.execute(drop_cmd)
|
||||
self.client.execute(create_cmd)
|
||||
self.filesystem_client.copy(src_file, dest_file, overwrite=True)
|
||||
# Try to read the compressed file with extension
|
||||
query = 'select count(*) from %s' % dest_table
|
||||
query = "select count(*) from {0};".format(dest_table)
|
||||
try:
|
||||
# Need to invalidate the metadata because the table was created external to Impala.
|
||||
self.client.execute("invalidate metadata %s" % dest_table)
|
||||
# Need to refresh the metadata to see the file copied in.
|
||||
self.client.execute("refresh {0}".format(dest_table))
|
||||
result = self.execute_scalar(query)
|
||||
# Fail iff we expected an error
|
||||
assert expected_error is None, 'Query is expected to fail'
|
||||
@@ -125,12 +97,84 @@ class TestCompressedFormats(ImpalaTestSuite):
|
||||
error_msg = str(e)
|
||||
print error_msg
|
||||
if expected_error is None or expected_error not in error_msg:
|
||||
print "Unexpected error:\n%s", error_msg
|
||||
print("Unexpected error:\n{0}".format(error_msg))
|
||||
raise
|
||||
finally:
|
||||
self.run_stmt_in_hive(drop_cmd)
|
||||
self.client.execute(drop_cmd)
|
||||
self.filesystem_client.delete_file_dir(dest_file)
|
||||
|
||||
|
||||
class TestCompressedNonText(TestCompressedFormatsBase):
|
||||
"""Tests behavior for compressed non-text formats (avro, rc, seq)."""
|
||||
|
||||
@classmethod
|
||||
def add_test_dimensions(cls):
|
||||
super(TestCompressedNonText, cls).add_test_dimensions()
|
||||
cls.ImpalaTestMatrix.clear()
|
||||
cls.ImpalaTestMatrix.add_dimension(
|
||||
ImpalaTestDimension('file_format', *['rc', 'seq', 'avro']))
|
||||
cls.ImpalaTestMatrix.add_dimension(
|
||||
ImpalaTestDimension('compression_format', *compression_formats))
|
||||
|
||||
def test_insensitivity_to_extension(self, vector, unique_database):
|
||||
"""
|
||||
Avro, RC, and Sequence files do not use the file extension to determine the
|
||||
type of compression. This verifies that they work regardless of the
|
||||
extension.
|
||||
"""
|
||||
file_format = vector.get_value('file_format')
|
||||
right_extension, suffix = vector.get_value('compression_format')
|
||||
# Avro is only loaded in a subset of the compression types. Bail out for
|
||||
# the ones that are not loaded.
|
||||
if file_format == 'avro' and suffix not in ['def', 'snap']:
|
||||
pytest.xfail('Avro is only created for Deflate and Snappy compression codecs')
|
||||
db_suffix = '_{0}_{1}'.format(file_format, suffix)
|
||||
|
||||
# Pick one wrong extension randomly
|
||||
wrong_extensions = [ext for ext in compression_extensions if ext != right_extension]
|
||||
random.shuffle(wrong_extensions)
|
||||
wrong_extension = wrong_extensions[0]
|
||||
# Test with the "right" extension that matches the file's compression, one wrong
|
||||
# extension, and no extension. By default, Hive does not use a file extension.
|
||||
src_extension = ""
|
||||
for ext in [right_extension, wrong_extension, ""]:
|
||||
self._copy_and_query_compressed_file(
|
||||
unique_database, 'tinytable', db_suffix, '000000_0', src_extension, ext)
|
||||
|
||||
|
||||
class TestCompressedText(TestCompressedFormatsBase):
|
||||
"""
|
||||
Tests behavior for compressed text files, which determine the compression codec from
|
||||
the file extension.
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def add_test_dimensions(cls):
|
||||
super(TestCompressedText, cls).add_test_dimensions()
|
||||
cls.ImpalaTestMatrix.clear()
|
||||
cls.ImpalaTestMatrix.add_dimension(
|
||||
ImpalaTestDimension('file_format', *['text']))
|
||||
cls.ImpalaTestMatrix.add_dimension(
|
||||
ImpalaTestDimension('compression_format', *compression_formats))
|
||||
|
||||
def test_correct_extension(self, vector, unique_database):
|
||||
"""
|
||||
Text files use the file extension to determine the type of compression.
|
||||
By default, Hive creates files with the appropriate file extension.
|
||||
This verifies the positive case that the correct extension works.
|
||||
|
||||
This is a somewhat trivial test. However, it runs with the core exploration
|
||||
strategy and runs on all filesystems. It verifies formats on core that are
|
||||
otherwise limited to exhaustive. That is important for coverage on non-HDFS
|
||||
filesystems like s3.
|
||||
"""
|
||||
file_format = vector.get_value('file_format')
|
||||
extension, suffix = vector.get_value('compression_format')
|
||||
db_suffix = '_{0}_{1}'.format(file_format, suffix)
|
||||
self._copy_and_query_compressed_file(
|
||||
unique_database, 'tinytable', db_suffix, '000000_0', extension, extension)
|
||||
|
||||
|
||||
class TestUnsupportedTableWriters(ImpalaTestSuite):
|
||||
@classmethod
|
||||
def get_workload(cls):
|
||||
@@ -143,7 +187,7 @@ class TestUnsupportedTableWriters(ImpalaTestSuite):
|
||||
# This class tests different formats, but doesn't use constraints.
|
||||
# The constraint added below is only to make sure that the test file runs once.
|
||||
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
||||
(v.get_value('table_format').file_format =='text' and
|
||||
(v.get_value('table_format').file_format == 'text' and
|
||||
v.get_value('table_format').compression_codec == 'none'))
|
||||
|
||||
def test_error_message(self, vector, unique_database):
|
||||
@@ -151,6 +195,7 @@ class TestUnsupportedTableWriters(ImpalaTestSuite):
|
||||
# compressed text, avro and sequence.
|
||||
self.run_test_case('QueryTest/unsupported-writers', vector, unique_database)
|
||||
|
||||
|
||||
@pytest.mark.execute_serially
|
||||
class TestLargeCompressedFile(ImpalaTestSuite):
|
||||
"""
|
||||
@@ -182,7 +227,7 @@ class TestLargeCompressedFile(ImpalaTestSuite):
|
||||
if cls.exploration_strategy() != 'exhaustive':
|
||||
pytest.skip("skipping if it's not exhaustive test.")
|
||||
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
||||
(v.get_value('table_format').file_format =='text' and
|
||||
(v.get_value('table_format').file_format == 'text' and
|
||||
v.get_value('table_format').compression_codec == 'snap'))
|
||||
|
||||
def teardown_method(self, method):
|
||||
@@ -228,24 +273,25 @@ class TestLargeCompressedFile(ImpalaTestSuite):
|
||||
hdfs_put.wait()
|
||||
|
||||
def test_query_large_file(self, vector):
|
||||
self.__create_test_table();
|
||||
self.__create_test_table()
|
||||
dst_path = "%s/%s" % (self.TABLE_LOCATION, self.FILE_NAME)
|
||||
file_size = self.MAX_FILE_SIZE
|
||||
self.__generate_file(dst_path, file_size)
|
||||
self.client.execute("refresh %s" % self.TABLE_NAME)
|
||||
|
||||
# Query the table
|
||||
result = self.client.execute("select * from %s limit 1" % self.TABLE_NAME)
|
||||
self.client.execute("select * from %s limit 1" % self.TABLE_NAME)
|
||||
|
||||
def __create_test_table(self):
|
||||
self.__drop_test_table()
|
||||
self.client.execute("CREATE TABLE %s (col string) " \
|
||||
self.client.execute("CREATE TABLE %s (col string) "
|
||||
"ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '%s'"
|
||||
% (self.TABLE_NAME, self.TABLE_LOCATION))
|
||||
|
||||
def __drop_test_table(self):
|
||||
self.client.execute("DROP TABLE IF EXISTS %s" % self.TABLE_NAME)
|
||||
|
||||
|
||||
class TestBzip2Streaming(ImpalaTestSuite):
|
||||
MAX_SCAN_RANGE_LENGTHS = [0, 5]
|
||||
|
||||
@@ -261,8 +307,8 @@ class TestBzip2Streaming(ImpalaTestSuite):
|
||||
pytest.skip("skipping if it's not exhaustive test.")
|
||||
cls.ImpalaTestMatrix.add_dimension(
|
||||
ImpalaTestDimension('max_scan_range_length', *cls.MAX_SCAN_RANGE_LENGTHS))
|
||||
cls.ImpalaTestMatrix.add_constraint(lambda v:\
|
||||
v.get_value('table_format').file_format == 'text' and\
|
||||
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
||||
v.get_value('table_format').file_format == 'text' and
|
||||
v.get_value('table_format').compression_codec == 'bzip')
|
||||
|
||||
def test_bzip2_streaming(self, vector):
|
||||
|
||||
Reference in New Issue
Block a user