mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
This change adds get_workload() to ImpalaTestSuite and removes it from all test suites that already returned 'functional-query'. get_workload() is also removed from CustomClusterTestSuite which used to return 'tpch'. All other changes besides impala_test_suite.py and custom_cluster_test_suite.py are just mass removals of get_workload() functions. The behavior is only changed in custom cluster tests that didn't override get_workload(). By returning 'functional-query' instead of 'tpch', exploration_strategy() will no longer return 'core' in 'exhaustive' test runs. See IMPALA-3947 on why workload affected exploration_strategy. An example for affected test is TestCatalogHMSFailures which was skipped both in core and exhaustive runs before this change. get_workload() functions that return a different workload than 'functional-query' are not changed - it is possible that some of these also don't handle exploration_strategy() as expected, but individually checking these tests is out of scope in this patch. Change-Id: I9ec6c41ffb3a30e1ea2de773626d1485c69fe115 Reviewed-on: http://gerrit.cloudera.org:8080/22726 Reviewed-by: Riza Suminto <riza.suminto@cloudera.com> Reviewed-by: Daniel Becker <daniel.becker@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
367 lines
15 KiB
Python
367 lines
15 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
from builtins import range
|
|
import math
|
|
import os
|
|
import pytest
|
|
import random
|
|
import struct
|
|
import subprocess
|
|
from os.path import join
|
|
|
|
from tests.common.impala_test_suite import ImpalaTestSuite
|
|
from tests.common.test_dimensions import (
|
|
add_exec_option_dimension,
|
|
create_single_exec_option_dimension)
|
|
from tests.common.test_result_verifier import verify_query_result_is_equal
|
|
from tests.common.test_vector import ImpalaTestDimension
|
|
from tests.util.filesystem_utils import get_fs_path, WAREHOUSE
|
|
|
|
|
|
# (file extension, table suffix) pairs
|
|
compression_formats = [
|
|
('.bz2', 'bzip'),
|
|
('.deflate', 'def'),
|
|
('.gz', 'gzip'),
|
|
('.snappy', 'snap'),
|
|
]
|
|
|
|
compression_extensions = ['.bz2', '.deflate', '.gz', '.snappy']
|
|
|
|
|
|
class TestCompressedFormatsBase(ImpalaTestSuite):
|
|
"""
|
|
Base class to provide utility functions for testing support for compressed
|
|
data files.
|
|
"""
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestCompressedFormatsBase, cls).add_test_dimensions()
|
|
|
|
def _copy_and_query_compressed_file(self, unique_database, table_name, db_suffix,
|
|
dest_extension, expected_error=None):
|
|
"""
|
|
This is a utility function to test the behavior for compressed file formats
|
|
with different file extensions. It creates a new table in the unique_database
|
|
as a copy of the provided table_name from the functional schema with the
|
|
specified db_suffix. It copies the first file from the sourcce table to the new
|
|
table as file_basename + dest_extension. It then runs a query on the
|
|
new table. Unless expected_error is set, it expects the query to run successfully.
|
|
"""
|
|
# Calculate locations for the source table
|
|
base_dir = WAREHOUSE
|
|
src_table = "functional{0}.{1}".format(db_suffix, table_name)
|
|
src_table_dir = join(base_dir, table_name + db_suffix)
|
|
file_basename = self.filesystem_client.ls(src_table_dir)[0]
|
|
src_file = join(src_table_dir, file_basename)
|
|
|
|
# Calculate locations for the destination table
|
|
dest_table_dir = "{2}/{0}.db/{1}".format(unique_database, table_name, WAREHOUSE)
|
|
dest_table = "{0}.{1}".format(unique_database, table_name)
|
|
dest_file = join(dest_table_dir, file_basename + dest_extension)
|
|
|
|
# Use a specific location to avoid any interactions with Hive behavior changes.
|
|
drop_cmd = "DROP TABLE IF EXISTS {0};".format(dest_table)
|
|
create_cmd = "CREATE TABLE {0} LIKE {1} LOCATION \'{2}\';".format(
|
|
dest_table, src_table, dest_table_dir)
|
|
|
|
# Create the table and copy in the data file
|
|
self.client.execute(drop_cmd)
|
|
self.client.execute(create_cmd)
|
|
self.filesystem_client.copy(src_file, dest_file, overwrite=True)
|
|
# Try to read the compressed file with extension
|
|
query = "select count(*) from {0};".format(dest_table)
|
|
try:
|
|
# Need to refresh the metadata to see the file copied in.
|
|
self.client.execute("refresh {0}".format(dest_table))
|
|
result = self.execute_scalar(query)
|
|
# Fail iff we expected an error
|
|
assert expected_error is None, 'Query is expected to fail'
|
|
assert result and int(result) > 0
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
print(error_msg)
|
|
if expected_error is None or expected_error not in error_msg:
|
|
print("Unexpected error:\n{0}".format(error_msg))
|
|
raise
|
|
finally:
|
|
self.client.execute(drop_cmd)
|
|
self.filesystem_client.delete_file_dir(dest_file)
|
|
|
|
|
|
class TestCompressedNonText(TestCompressedFormatsBase):
|
|
"""Tests behavior for compressed non-text formats (avro, rc, seq)."""
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestCompressedNonText, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.clear()
|
|
cls.ImpalaTestMatrix.add_dimension(
|
|
ImpalaTestDimension('file_format', *['rc', 'seq', 'avro']))
|
|
cls.ImpalaTestMatrix.add_dimension(
|
|
ImpalaTestDimension('compression_format', *compression_formats))
|
|
|
|
def test_insensitivity_to_extension(self, vector, unique_database):
|
|
"""
|
|
Avro, RC, and Sequence files do not use the file extension to determine the
|
|
type of compression. This verifies that they work regardless of the
|
|
extension.
|
|
"""
|
|
file_format = vector.get_value('file_format')
|
|
right_extension, suffix = vector.get_value('compression_format')
|
|
# Avro is only loaded in a subset of the compression types. Bail out for
|
|
# the ones that are not loaded.
|
|
if file_format == 'avro' and suffix not in ['def', 'snap']:
|
|
pytest.xfail('Avro is only created for Deflate and Snappy compression codecs')
|
|
db_suffix = '_{0}_{1}'.format(file_format, suffix)
|
|
|
|
# Pick one wrong extension randomly
|
|
wrong_extensions = [ext for ext in compression_extensions if ext != right_extension]
|
|
random.shuffle(wrong_extensions)
|
|
wrong_extension = wrong_extensions[0]
|
|
# Test with the "right" extension that matches the file's compression, one wrong
|
|
# extension, and no extension. By default, Hive does not use a file extension.
|
|
for ext in [right_extension, wrong_extension, ""]:
|
|
self._copy_and_query_compressed_file(
|
|
unique_database, 'tinytable', db_suffix, ext)
|
|
|
|
|
|
class TestCompressedText(TestCompressedFormatsBase):
|
|
"""
|
|
Tests behavior for compressed text files, which determine the compression codec from
|
|
the file extension.
|
|
"""
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestCompressedText, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.clear()
|
|
cls.ImpalaTestMatrix.add_dimension(
|
|
ImpalaTestDimension('file_format', *['text', 'json']))
|
|
cls.ImpalaTestMatrix.add_dimension(
|
|
ImpalaTestDimension('compression_format', *compression_formats))
|
|
|
|
def test_correct_extension(self, vector, unique_database):
|
|
"""
|
|
Text files use the file extension to determine the type of compression.
|
|
By default, Hive creates files with the appropriate file extension.
|
|
This verifies the positive case that the correct extension works.
|
|
|
|
This is a somewhat trivial test. However, it runs with the core exploration
|
|
strategy and runs on all filesystems. It verifies formats on core that are
|
|
otherwise limited to exhaustive. That is important for coverage on non-HDFS
|
|
filesystems like s3.
|
|
"""
|
|
file_format = vector.get_value('file_format')
|
|
extension, suffix = vector.get_value('compression_format')
|
|
db_suffix = '_{0}_{1}'.format(file_format, suffix)
|
|
self._copy_and_query_compressed_file(
|
|
unique_database, 'tinytable', db_suffix, extension)
|
|
|
|
|
|
class TestUnsupportedTableWriters(ImpalaTestSuite):
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestUnsupportedTableWriters, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
|
|
# This class tests different formats, but doesn't use constraints.
|
|
# The constraint added below is only to make sure that the test file runs once.
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
(v.get_value('table_format').file_format == 'text'
|
|
and v.get_value('table_format').compression_codec == 'none'))
|
|
|
|
def test_error_message(self, vector, unique_database):
|
|
# Tests that an appropriate error message is displayed for unsupported writers like
|
|
# compressed text, avro and sequence.
|
|
self.run_test_case('QueryTest/unsupported-writers', vector, unique_database)
|
|
|
|
|
|
@pytest.mark.execute_serially
|
|
class TestLargeCompressedFile(ImpalaTestSuite):
|
|
"""
|
|
Tests that Impala handles compressed files in HDFS larger than 1GB.
|
|
This test creates a 2GB test data file and loads it into a table.
|
|
"""
|
|
TABLE_NAME = "large_compressed_file"
|
|
TABLE_LOCATION = get_fs_path("/test-warehouse/large_compressed_file")
|
|
"""
|
|
Name the file with ".snappy" extension to let scanner treat it as
|
|
a snappy block compressed file.
|
|
"""
|
|
FILE_NAME = "largefile.snappy"
|
|
# Maximum uncompressed size of an outer block in a snappy block compressed file.
|
|
CHUNK_SIZE = 1024 * 1024 * 1024
|
|
# Limit the max file size to 2GB or too much memory may be needed when
|
|
# uncompressing the buffer. 2GB is sufficient to show that we support
|
|
# size beyond maximum 32-bit signed value.
|
|
MAX_FILE_SIZE = 2 * CHUNK_SIZE
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestLargeCompressedFile, cls).add_test_dimensions()
|
|
|
|
if cls.exploration_strategy() != 'exhaustive':
|
|
pytest.skip("skipping if it's not exhaustive test.")
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
(v.get_value('table_format').file_format == 'text'
|
|
and v.get_value('table_format').compression_codec == 'snap'))
|
|
|
|
def teardown_method(self, method):
|
|
self.__drop_test_table()
|
|
|
|
def __generate_file(self, file_name, file_size):
|
|
"""Generate file with random data and a specified size."""
|
|
|
|
# Read the payload compressed using snappy. The compressed payload
|
|
# is generated from a string of 50176 bytes.
|
|
payload_size = 50176
|
|
hdfs_cat = subprocess.Popen(["hadoop", "fs", "-cat",
|
|
"%s/compressed_payload.snap" % WAREHOUSE], stdout=subprocess.PIPE)
|
|
compressed_payload = hdfs_cat.stdout.read()
|
|
compressed_size = len(compressed_payload)
|
|
hdfs_cat.stdout.close()
|
|
hdfs_cat.wait()
|
|
|
|
# The layout of a snappy-block compressed file is one or more
|
|
# of the following nested structure which is called "chunk" in
|
|
# the code below:
|
|
#
|
|
# - <big endian 32-bit value encoding the uncompresed size>
|
|
# - one or more blocks of the following structure:
|
|
# - <big endian 32-bit value encoding the compressed size>
|
|
# - <raw bits compressed by snappy algorithm>
|
|
|
|
# Number of nested structures described above.
|
|
num_chunks = int(math.ceil(file_size / self.CHUNK_SIZE))
|
|
# Number of compressed snappy blocks per chunk.
|
|
num_blocks_per_chunk = self.CHUNK_SIZE // (compressed_size + 4)
|
|
# Total uncompressed size of a nested structure.
|
|
total_chunk_size = num_blocks_per_chunk * payload_size
|
|
|
|
hdfs_put = subprocess.Popen(["hdfs", "dfs", "-put", "-d", "-f", "-", file_name],
|
|
stdin=subprocess.PIPE, bufsize=-1)
|
|
for i in range(num_chunks):
|
|
hdfs_put.stdin.write(struct.pack('>i', total_chunk_size))
|
|
for j in range(num_blocks_per_chunk):
|
|
hdfs_put.stdin.write(struct.pack('>i', compressed_size))
|
|
hdfs_put.stdin.write(compressed_payload)
|
|
hdfs_put.stdin.close()
|
|
hdfs_put.wait()
|
|
|
|
def test_query_large_file(self, vector):
|
|
self.__create_test_table()
|
|
dst_path = "%s/%s" % (self.TABLE_LOCATION, self.FILE_NAME)
|
|
file_size = self.MAX_FILE_SIZE
|
|
self.__generate_file(dst_path, file_size)
|
|
self.client.execute("refresh %s" % self.TABLE_NAME)
|
|
|
|
# Query the table
|
|
self.client.execute("select * from %s limit 1" % self.TABLE_NAME)
|
|
|
|
def __create_test_table(self):
|
|
self.__drop_test_table()
|
|
self.client.execute("CREATE TABLE %s (col string) "
|
|
"ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '%s'"
|
|
% (self.TABLE_NAME, self.TABLE_LOCATION))
|
|
|
|
def __drop_test_table(self):
|
|
self.client.execute("DROP TABLE IF EXISTS %s" % self.TABLE_NAME)
|
|
|
|
|
|
class TestBzip2Streaming(ImpalaTestSuite):
|
|
MAX_SCAN_RANGE_LENGTHS = [0, 5]
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestBzip2Streaming, cls).add_test_dimensions()
|
|
|
|
if cls.exploration_strategy() != 'exhaustive':
|
|
pytest.skip("skipping if it's not exhaustive test.")
|
|
add_exec_option_dimension(
|
|
cls, 'max_scan_range_length', cls.MAX_SCAN_RANGE_LENGTHS)
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
v.get_value('table_format').file_format == 'text'
|
|
and v.get_value('table_format').compression_codec == 'bzip')
|
|
|
|
def test_bzip2_streaming(self, vector):
|
|
self.run_test_case('QueryTest/text-bzip-scan', vector)
|
|
|
|
|
|
class TestReadZtsdLibCompressedFile(ImpalaTestSuite):
|
|
"""
|
|
Test that file compressed by zstd standard library can be read by Impala
|
|
"""
|
|
COMPRESSED_TABLE_NAME = "zstdlib_compressed_table"
|
|
UNCOMPRESSED_TABLE_NAME = "uncompressed_table"
|
|
COMPRESSED_TABLE_LOCATION = get_fs_path("/test-warehouse/zstdlib_compressed_file")
|
|
UNCOMPRESSED_TABLE_LOCATION = get_fs_path("/test-warehouse/uncompressed_file")
|
|
COMPRESSED_LOCAL_FILE_PATH = "testdata/data/text_large_zstd.zst"
|
|
UNCOMPRESSED_LOCAL_FILE_PATH = "testdata/data/text_large_zstd.txt"
|
|
|
|
IMPALA_HOME = os.environ['IMPALA_HOME']
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestReadZtsdLibCompressedFile, cls).add_test_dimensions()
|
|
|
|
if cls.exploration_strategy() != 'exhaustive':
|
|
pytest.skip('runs only in exhaustive')
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
(v.get_value('table_format').file_format == 'text'
|
|
and v.get_value('table_format').compression_codec == 'zstd'))
|
|
|
|
def __generate_file(self, local_file_location, table_location):
|
|
"""
|
|
Make directory in HDFS and copy zstd standard library compressed file to HDFS
|
|
Lib compressed file extension has to be zst to be readable.
|
|
Copy original uncompressed file to HDFS as well for row comparison.
|
|
"""
|
|
source_local_file = os.path.join(self.IMPALA_HOME, local_file_location)
|
|
subprocess.check_call(["hadoop", "fs", "-put", source_local_file, table_location])
|
|
|
|
def __create_test_table(self, table_name, location):
|
|
self.client.execute("DROP TABLE IF EXISTS %s" % table_name)
|
|
self.client.execute("CREATE TABLE %s (col string) LOCATION '%s'"
|
|
% (table_name, location))
|
|
|
|
def test_query_large_file(self):
|
|
self.__create_test_table(self.COMPRESSED_TABLE_NAME,
|
|
self.COMPRESSED_TABLE_LOCATION)
|
|
self.__create_test_table(self.UNCOMPRESSED_TABLE_NAME,
|
|
self.UNCOMPRESSED_TABLE_LOCATION)
|
|
self.__generate_file(self.COMPRESSED_LOCAL_FILE_PATH,
|
|
self.COMPRESSED_TABLE_LOCATION)
|
|
self.__generate_file(self.UNCOMPRESSED_LOCAL_FILE_PATH,
|
|
self.UNCOMPRESSED_TABLE_LOCATION)
|
|
self.client.execute("refresh %s" % self.COMPRESSED_TABLE_NAME)
|
|
self.client.execute("refresh %s" % self.UNCOMPRESSED_TABLE_NAME)
|
|
|
|
# Read from compressed table
|
|
result = self.client.execute("select count(*) from %s" % self.COMPRESSED_TABLE_NAME)
|
|
result_uncompressed = self.client.execute("select count(*) from %s" %
|
|
self.UNCOMPRESSED_TABLE_NAME)
|
|
assert int(result.get_data()) == int(result_uncompressed.get_data())
|
|
|
|
# Read top 10k rows from compressed table and uncompressed table, compare results
|
|
base_result = self.execute_query_expect_success(self.client,
|
|
"select * from {0} order by col limit 10000".format(self.UNCOMPRESSED_TABLE_NAME))
|
|
test_result = self.execute_query_expect_success(self.client,
|
|
"select * from {0} order by col limit 10000".format(self.COMPRESSED_TABLE_NAME))
|
|
verify_query_result_is_equal(test_result.data, base_result.data)
|