mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
Beeswax protocol has been due for deprecation in a long time. This patch remove BEESWAX from create_client_protocol_dimension(). This will limit protocol dimension to [HS2, HS2_HTTP] by default. It is still possible to include BEESWAX again for testing if DEFAULT_TEST_PROTOCOL env var is set to 'beeswax', such as: DEFAULT_TEST_PROTOCOL=beeswax impala-py.test custom_cluster/test_ipv6.py This patch does not disable beeswax server yet. Some tests that specifically test against beeswax protocol, such as test_beeswax.py, will continue to work. ImpalaTestSuite.beeswax_client also remain unchanged. Testing: Run following command and confirm that beeswax protocol is skipped. impala-py.test --collect-only --exploration=exhaustive \ custom_cluster/test_ipv6.py Change-Id: I3cff79f59305b5d44944804ed1f1b92838575495 Reviewed-on: http://gerrit.cloudera.org:8080/23076 Reviewed-by: Jason Fehr <jfehr@cloudera.com> Tested-by: Riza Suminto <riza.suminto@cloudera.com>
386 lines
15 KiB
Python
386 lines
15 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
# Common test dimensions and associated utility functions.
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
import copy
|
|
from itertools import product
|
|
import os
|
|
|
|
from builtins import range
|
|
import pytest
|
|
|
|
from tests.common.test_vector import (
|
|
assert_exec_option_key,
|
|
BEESWAX,
|
|
EXEC_OPTION,
|
|
HS2,
|
|
HS2_HTTP,
|
|
ImpalaTestDimension,
|
|
ImpalaTestVector,
|
|
PROTOCOL,
|
|
TABLE_FORMAT,
|
|
)
|
|
from tests.util.filesystem_utils import IS_HDFS
|
|
|
|
WORKLOAD_DIR = os.environ['IMPALA_WORKLOAD_DIR']
|
|
|
|
# Map from the test dimension file_format string to the SQL "STORED AS" or "STORED BY"
|
|
# argument.
|
|
FILE_FORMAT_TO_STORED_AS_MAP = {
|
|
'text': 'TEXTFILE',
|
|
'seq': 'SEQUENCEFILE',
|
|
'rc': 'RCFILE',
|
|
'orc': 'ORC',
|
|
'parquet': 'PARQUET',
|
|
'hudiparquet': 'HUDIPARQUET',
|
|
'avro': 'AVRO',
|
|
'hbase': "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'",
|
|
'kudu': "KUDU",
|
|
'iceberg': "ICEBERG",
|
|
'json': "JSONFILE",
|
|
}
|
|
|
|
|
|
# Describes the configuration used to execute a single tests. Contains both the details
|
|
# of what specific table format to target along with the exec options (num_nodes, etc)
|
|
# to use when running the query.
|
|
class TableFormatInfo(object):
|
|
KNOWN_FILE_FORMATS = ['text', 'seq', 'rc', 'parquet', 'orc', 'avro', 'hbase',
|
|
'kudu', 'iceberg', 'json']
|
|
KNOWN_COMPRESSION_CODECS = ['none', 'snap', 'gzip', 'bzip', 'def', 'zstd', 'lz4']
|
|
KNOWN_COMPRESSION_TYPES = ['none', 'block', 'record']
|
|
DEFAULT_COMPRESSIONS_IN_TESTS = {
|
|
'avro': 'snap',
|
|
'json': 'none', # some tables (e.g. date_tbl) was only created for none
|
|
'seq': 'gzip',
|
|
'rc': 'bzip',
|
|
'text': 'none' # use none for tables that are not written by Impala
|
|
}
|
|
|
|
def __init__(self, **kwargs):
|
|
self.dataset = kwargs.get('dataset', 'UNKNOWN')
|
|
self.file_format = kwargs.get('file_format', 'text')
|
|
self.compression_codec = kwargs.get('compression_codec', 'none')
|
|
self.compression_type = kwargs.get('compression_type', 'none')
|
|
self.__validate()
|
|
|
|
def __validate(self):
|
|
if self.file_format not in TableFormatInfo.KNOWN_FILE_FORMATS:
|
|
raise ValueError('Unknown file format: %s' % self.file_format)
|
|
if self.compression_codec not in TableFormatInfo.KNOWN_COMPRESSION_CODECS:
|
|
raise ValueError('Unknown compression codec: %s' % self.compression_codec)
|
|
if self.compression_type not in TableFormatInfo.KNOWN_COMPRESSION_TYPES:
|
|
raise ValueError('Unknown compression type: %s' % self.compression_type)
|
|
if (self.compression_codec == 'none' or self.compression_type == 'none') and\
|
|
self.compression_codec != self.compression_type:
|
|
raise ValueError('Invalid combination of compression codec/type: %s' % str(self))
|
|
|
|
@staticmethod
|
|
def create_from_string(dataset, table_format_string):
|
|
"""
|
|
Parses a table format string and creates a table format info object from the string
|
|
|
|
Expected input is file_format/compression_codec/[compression_type]. The
|
|
compression_type is optional, defaulting to 'block' if the table is compressed
|
|
or 'none' if the table is uncompressed.
|
|
"""
|
|
if table_format_string is None:
|
|
raise ValueError('Table format string cannot be None')
|
|
|
|
format_parts = table_format_string.strip().split('/')
|
|
if len(format_parts) not in list(range(2, 4)):
|
|
raise ValueError('Invalid table format %s' % table_format_string)
|
|
|
|
file_format, compression_codec = format_parts[:2]
|
|
if len(format_parts) == 3:
|
|
compression_type = format_parts[2]
|
|
else:
|
|
# Assume the default compression type is block (of the table is compressed)
|
|
compression_type = 'none' if compression_codec == 'none' else 'block'
|
|
|
|
return TableFormatInfo(dataset=dataset, file_format=file_format,
|
|
compression_codec=compression_codec,
|
|
compression_type=compression_type)
|
|
|
|
def __str__(self):
|
|
compression_str = '%s/%s' % (self.compression_codec, self.compression_type)
|
|
if self.compression_codec == 'none' and self.compression_type == 'none':
|
|
compression_str = 'none'
|
|
return '%s/%s' % (self.file_format, compression_str)
|
|
|
|
def db_suffix(self):
|
|
if self.file_format == 'text' and self.compression_codec == 'none':
|
|
return ''
|
|
elif self.compression_codec == 'none':
|
|
return '_%s' % (self.file_format)
|
|
elif self.compression_type == 'record':
|
|
return '_%s_record_%s' % (self.file_format, self.compression_codec)
|
|
else:
|
|
return '_%s_%s' % (self.file_format, self.compression_codec)
|
|
|
|
|
|
def create_table_format_dimension(workload, table_format_string):
|
|
dataset = get_dataset_from_workload(workload)
|
|
return ImpalaTestDimension(TABLE_FORMAT,
|
|
TableFormatInfo.create_from_string(dataset, table_format_string))
|
|
|
|
|
|
def create_uncompressed_text_dimension(workload):
|
|
return create_table_format_dimension(workload, 'text/none')
|
|
|
|
|
|
def create_uncompressed_json_dimension(workload):
|
|
return create_table_format_dimension(workload, 'json/none')
|
|
|
|
|
|
def create_parquet_dimension(workload):
|
|
return create_table_format_dimension(workload, 'parquet/none')
|
|
|
|
|
|
def create_orc_dimension(workload):
|
|
return create_table_format_dimension(workload, 'orc/def')
|
|
|
|
|
|
def create_avro_snappy_dimension(workload):
|
|
return create_table_format_dimension(workload, 'avro/snap/block')
|
|
|
|
|
|
def create_kudu_dimension(workload):
|
|
return create_table_format_dimension(workload, 'kudu/none')
|
|
|
|
|
|
def default_client_protocol_dimension():
|
|
return ImpalaTestDimension(PROTOCOL, pytest.config.option.default_test_protocol)
|
|
|
|
|
|
def hs2_client_protocol_dimension():
|
|
return ImpalaTestDimension(PROTOCOL, HS2)
|
|
|
|
|
|
def create_client_protocol_dimension():
|
|
protocols_to_test = [HS2]
|
|
if pytest.config.option.default_test_protocol == BEESWAX:
|
|
protocols_to_test.append(BEESWAX)
|
|
# IMPALA-8864: Older python versions do not support SSLContext object that the thrift
|
|
# http client implementation depends on. Otherwise, include HS2_HTTP.
|
|
import ssl
|
|
if hasattr(ssl, "create_default_context"):
|
|
protocols_to_test.append(HS2_HTTP)
|
|
return ImpalaTestDimension(PROTOCOL, *protocols_to_test)
|
|
|
|
|
|
def create_client_protocol_http_transport():
|
|
return ImpalaTestDimension(PROTOCOL, HS2_HTTP)
|
|
|
|
|
|
def create_client_protocol_strict_dimension():
|
|
# only support strict dimensions if the file system is HDFS, since that is
|
|
# where the hive cluster is run.
|
|
if IS_HDFS:
|
|
return ImpalaTestDimension('strict_hs2_protocol', False, True)
|
|
else:
|
|
return create_client_protocol_no_strict_dimension()
|
|
|
|
|
|
def create_client_protocol_no_strict_dimension():
|
|
return ImpalaTestDimension('strict_hs2_protocol', False)
|
|
|
|
|
|
def default_protocol_or_parquet_constraint(v):
|
|
"""Constraint function, used to limit non-default test protocol against uncompressed
|
|
parquet format, because file format and the client protocol are orthogonal."""
|
|
return (v.get_protocol() == pytest.config.option.default_test_protocol
|
|
or (v.get_table_format().file_format == 'parquet'
|
|
and v.get_table_format().compression_codec == 'none'))
|
|
|
|
|
|
def default_protocol_or_text_constraint(v):
|
|
"""Constraint function, used to limit non-default test protocol against uncompressed
|
|
text format, because file format and the client protocol are orthogonal."""
|
|
return (v.get_protocol() == pytest.config.option.default_test_protocol
|
|
or (v.get_table_format().file_format == 'text'
|
|
and v.get_table_format().compression_codec == 'none'))
|
|
|
|
|
|
def orc_schema_resolution_constraint(v):
|
|
""" Constraint to use multiple orc_schema_resolution only in case of orc files"""
|
|
file_format = v.get_table_format().file_format
|
|
orc_schema_resolution = v.get_value('orc_schema_resolution')
|
|
return file_format == 'orc' or orc_schema_resolution == 0
|
|
|
|
|
|
def single_compression_constraint(v):
|
|
""" Constraint to use a single compression in compressable file formats """
|
|
file_format = v.get_value('table_format').file_format
|
|
compression = v.get_value('table_format').compression_codec
|
|
if file_format not in TableFormatInfo.DEFAULT_COMPRESSIONS_IN_TESTS: return True
|
|
return TableFormatInfo.DEFAULT_COMPRESSIONS_IN_TESTS[file_format] == compression
|
|
|
|
|
|
# Common sets of values for the exec option vectors
|
|
ALL_BATCH_SIZES = [0]
|
|
|
|
# Test SingleNode and Distributed Planners
|
|
ALL_CLUSTER_SIZES = [0, 1]
|
|
|
|
SINGLE_NODE_ONLY = [1]
|
|
ALL_NODES_ONLY = [0]
|
|
ALL_DISABLE_CODEGEN_OPTIONS = [True, False]
|
|
|
|
|
|
def create_single_exec_option_dimension(num_nodes=0, disable_codegen_rows_threshold=5000):
|
|
"""Creates an exec_option dimension that will produce a single test vector"""
|
|
return create_exec_option_dimension(cluster_sizes=[num_nodes],
|
|
disable_codegen_options=[False],
|
|
# Make sure codegen kicks in for functional.alltypes.
|
|
disable_codegen_rows_threshold_options=[disable_codegen_rows_threshold],
|
|
batch_sizes=[0])
|
|
|
|
|
|
# TODO IMPALA-12394: switch to ALL_CLUSTER_SIZES
|
|
def create_exec_option_dimension(cluster_sizes=ALL_NODES_ONLY,
|
|
disable_codegen_options=ALL_DISABLE_CODEGEN_OPTIONS,
|
|
batch_sizes=ALL_BATCH_SIZES,
|
|
sync_ddl=None, exec_single_node_option=[0],
|
|
# We already run with codegen on and off explicitly -
|
|
# don't need automatic toggling.
|
|
disable_codegen_rows_threshold_options=[0],
|
|
debug_action_options=None):
|
|
exec_option_dimensions = {
|
|
'abort_on_error': [1],
|
|
'exec_single_node_rows_threshold': exec_single_node_option,
|
|
'batch_size': batch_sizes,
|
|
'disable_codegen': disable_codegen_options,
|
|
'disable_codegen_rows_threshold': disable_codegen_rows_threshold_options,
|
|
'num_nodes': cluster_sizes,
|
|
'test_replan': [1]}
|
|
|
|
if sync_ddl is not None:
|
|
exec_option_dimensions['sync_ddl'] = sync_ddl
|
|
if debug_action_options is not None:
|
|
exec_option_dimensions['debug_action'] = debug_action_options
|
|
return create_exec_option_dimension_from_dict(exec_option_dimensions)
|
|
|
|
|
|
def create_exec_option_dimension_from_dict(exec_option_dimensions):
|
|
"""
|
|
Builds a query exec option test dimension
|
|
|
|
Exhaustively goes through all combinations of the given query option values.
|
|
For each combination create an exec option dictionary and add it as a value in the
|
|
exec option test dimension. Each dictionary can then be passed via Beeswax to control
|
|
Impala query execution behavior.
|
|
|
|
TODO: In the future we could generate these values using pairwise to reduce total
|
|
execution time.
|
|
"""
|
|
# Generate the cross product (all combinations) of the exec options specified. Then
|
|
# store them in exec_option dictionary format.
|
|
keys = sorted(exec_option_dimensions)
|
|
for name in keys:
|
|
assert_exec_option_key(name)
|
|
combinations = product(*(exec_option_dimensions[name] for name in keys))
|
|
exec_option_dimension_values = [dict(zip(keys, prod)) for prod in combinations]
|
|
|
|
# Build a test vector out of it
|
|
return ImpalaTestDimension(EXEC_OPTION, *exec_option_dimension_values)
|
|
|
|
|
|
def add_exec_option_dimension(test_suite, key, values):
|
|
"""
|
|
Takes an ImpalaTestSuite object 'test_suite' and register new exec option dimension.
|
|
'key' must be a query option known to Impala, and 'values' must be a list of more than
|
|
one element. Exec option 'key' must not be declared before.
|
|
If writing constraint against 'key', the value should be looked up at:
|
|
vector.get_value('key')
|
|
instead of:
|
|
vector.get_value('exec_option')['key']
|
|
"""
|
|
test_suite.ImpalaTestMatrix.add_exec_option_dimension(
|
|
ImpalaTestDimension(key, *values))
|
|
|
|
|
|
def add_mandatory_exec_option(test_suite, key, value):
|
|
"""
|
|
Takes an ImpalaTestSuite object 'test_suite' and adds 'key=value' to every exec option
|
|
test dimension, leaving the number of tests that will be run unchanged. Exec option
|
|
'key' must not be declared before.
|
|
"""
|
|
test_suite.ImpalaTestMatrix.add_mandatory_exec_option(key, value)
|
|
|
|
|
|
def extend_exec_option_dimension(test_suite, key, value):
|
|
"""
|
|
Takes an ImpalaTestSuite object 'test_suite' and extends the exec option test dimension
|
|
by creating a copy of each existing exec option value that has 'key' set to 'value',
|
|
doubling the number of tests that will be run. Exec option 'key' must not be declared
|
|
before.
|
|
"""
|
|
test_suite.ImpalaTestMatrix.assert_unique_exec_option_key(key)
|
|
dim = test_suite.ImpalaTestMatrix.dimensions["exec_option"]
|
|
new_value = []
|
|
for v in dim:
|
|
new_value.append(ImpalaTestVector.Value(v.name, copy.copy(v.value)))
|
|
new_value[-1].value[key] = value
|
|
dim.extend(new_value)
|
|
test_suite.ImpalaTestMatrix.add_dimension(dim)
|
|
|
|
|
|
def get_dataset_from_workload(workload):
|
|
# TODO: We need a better way to define the workload -> dataset mapping so we can
|
|
# extract it without reading the actual test vector file
|
|
return load_table_info_dimension(workload, 'exhaustive')[0].value.dataset
|
|
|
|
|
|
def load_table_info_dimension(workload_name, exploration_strategy, file_formats=None,
|
|
compression_codecs=None):
|
|
"""Loads test vector corresponding to the given workload and exploration strategy"""
|
|
test_vector_file = os.path.join(
|
|
WORKLOAD_DIR, workload_name, '%s_%s.csv' % (workload_name, exploration_strategy))
|
|
|
|
if not os.path.isfile(test_vector_file):
|
|
raise RuntimeError('Vector file not found: ' + test_vector_file)
|
|
|
|
vector_values = []
|
|
|
|
with open(test_vector_file, 'r') as vector_file:
|
|
for line in vector_file.readlines():
|
|
if line.strip().startswith('#'):
|
|
continue
|
|
|
|
# Extract each test vector and add them to a dictionary
|
|
vals = dict((key.strip(), value.strip()) for key, value in
|
|
(item.split(':') for item in line.split(',')))
|
|
|
|
# If only loading specific file formats skip anything that doesn't match
|
|
if file_formats is not None and vals['file_format'] not in file_formats:
|
|
continue
|
|
if compression_codecs is not None and\
|
|
vals['compression_codec'] not in compression_codecs:
|
|
continue
|
|
vector_values.append(TableFormatInfo(**vals))
|
|
|
|
return ImpalaTestDimension(TABLE_FORMAT, *vector_values)
|
|
|
|
|
|
def is_supported_insert_format(table_format):
|
|
# Returns true if the given table_format is a supported Impala INSERT format
|
|
return table_format.compression_codec == 'none' and\
|
|
table_format.file_format in ['text', 'parquet']
|