mirror of
https://github.com/apache/impala.git
synced 2025-12-19 09:58:28 -05:00
Running exhaustive tests with env var IMPALA_USE_PYTHON3_TESTS=true reveals some tests that require adjustment. This patch made such adjustment, which mostly revolves around encoding differences and string vs bytes type in Python3. This patch also switch the default to run pytest with Python3 by setting IMPALA_USE_PYTHON3_TESTS=true. The following are the details: Change hash() function in conftest.py to crc32() to produce deterministic hash. Hash randomization is enabled by default since Python 3.3 (see https://docs.python.org/3/reference/datamodel.html#object.__hash__). This cause test sharding (like --shard_tests=1/2) produce inconsistent set of tests per shard. Always restart minicluster during custom cluster tests if --shard_tests argument is set, because test order may change and affect test correctness, depending on whether running on fresh minicluster or not. Moved one test case from delimited-latin-text.test to test_delimited_text.py for easier binary comparison. Add bytes_to_str() as a utility function to decode bytes in Python3. This is often needed when inspecting the return value of subprocess.check_output() as a string. Implement DataTypeMetaclass.__lt__ to substitute DataTypeMetaclass.__cmp__ that is ignored in Python3 (see https://peps.python.org/pep-0207/). Fix WEB_CERT_ERR difference in test_ipv6.py. Fix trivial integer parsing in test_restart_services.py. Fix various encoding issues in test_saml2_sso.py, test_shell_commandline.py, and test_shell_interactive.py. Change timeout in Impala.for_each_impalad() from sys.maxsize to 2^31-1. Switch to binary comparison in test_iceberg.py where needed. Specify text mode when calling tempfile.NamedTemporaryFile(). Simplify create_impala_shell_executable_dimension to skip testing dev and python2 impala-shell when IMPALA_USE_PYTHON3_TESTS=true. The reason is that several UTF-8 related tests in test_shell_commandline.py break in Python3 pytest + Python2 impala-shell combo. This skipping already happen automatically in build OS without system Python2 available like RHEL9 (IMPALA_SYSTEM_PYTHON2 env var is empty). Removed unused vector argument and fixed some trivial flake8 issues. Several test logic require modification due to intermittent issue in Python3 pytest. These include: Add _run_query_with_client() in test_ranger.py to allow reusing a single Impala client for running several queries. Ensure clients are closed when the test is done. Mark several tests in test_ranger.py with SkipIfFS.hive because they run queries through beeline + HiveServer2, but Ozone and S3 build environment does not start HiveServer2 by default. Increase the sleep period from 0.1 to 0.5 seconds per iteration in test_statestore.py and mark TestStatestore to execute serially. This is because TServer appears to shut down more slowly when run concurrently with other tests. Handle the deprecation of Thread.setDaemon() as well. Always force_restart=True each test method in TestLoggingCore, TestShellInteractiveReconnect, and TestQueryRetries to prevent them from reusing minicluster from previous test method. Some of these tests destruct minicluster (kill impalad) and will produce minidump if metrics verifier for next tests fail to detect healthy minicluster state. Testing: Pass exhaustive tests with IMPALA_USE_PYTHON3_TESTS=true. Change-Id: I401a93b6cc7bcd17f41d24e7a310e0c882a550d4 Reviewed-on: http://gerrit.cloudera.org:8080/23319 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
738 lines
30 KiB
Python
738 lines
30 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
# py.test configuration module
|
|
#
|
|
from __future__ import absolute_import, division, print_function
|
|
from builtins import map, range, zip
|
|
from impala.dbapi import connect as impala_connect
|
|
from kudu import connect as kudu_connect
|
|
from random import choice, sample
|
|
from string import ascii_lowercase, digits
|
|
from zlib import crc32
|
|
import contextlib
|
|
import logging
|
|
import os
|
|
import pytest
|
|
import sys
|
|
|
|
import tests.common
|
|
from impala_py_lib.helpers import find_all_files, is_core_dump
|
|
import tests.common.base_test_suite
|
|
from tests.common.environ import build_flavor_timeout
|
|
from tests.common.test_result_verifier import QueryTestResult
|
|
from tests.common.patterns import LOG_FORMAT, is_valid_impala_identifier
|
|
from tests.common.test_vector import BEESWAX, HS2, HS2_HTTP
|
|
from tests.comparison.db_connection import ImpalaConnection
|
|
from tests.util.filesystem_utils import FILESYSTEM, ISILON_WEBHDFS_PORT, WAREHOUSE
|
|
|
|
LOG = logging.getLogger('test_configuration')
|
|
VALID_TEST_PROTOCOLS = [BEESWAX, HS2, HS2_HTTP]
|
|
|
|
DEFAULT_CONN_TIMEOUT = 45
|
|
DEFAULT_EXPLORATION_STRATEGY = 'core'
|
|
DEFAULT_HDFS_XML_CONF = os.path.join(os.environ['HADOOP_CONF_DIR'], "hdfs-site.xml")
|
|
DEFAULT_HIVE_SERVER2 = 'localhost:11050'
|
|
DEFAULT_IMPALAD_HS2_PORT = '21050'
|
|
DEFAULT_IMPALAD_HS2_HTTP_PORT = '28000'
|
|
DEFAULT_STRICT_HS2_PORT = '11050'
|
|
DEFAULT_STRICT_HS2_HTTP_PORT = '10001'
|
|
DEFAULT_IMPALADS = "localhost:21000,localhost:21001,localhost:21002"
|
|
DEFAULT_KUDU_MASTER_HOSTS = os.getenv('KUDU_MASTER_HOSTS', '127.0.0.1')
|
|
DEFAULT_KUDU_MASTER_PORT = os.getenv('KUDU_MASTER_PORT', '7051')
|
|
DEFAULT_METASTORE_SERVER = 'localhost:9083'
|
|
DEFAULT_NAMENODE_ADDR = None
|
|
DEFAULT_TEST_PROTOCOL = os.getenv('DEFAULT_TEST_PROTOCOL', HS2)
|
|
if FILESYSTEM == 'isilon':
|
|
DEFAULT_NAMENODE_ADDR = "{node}:{port}".format(node=os.getenv("ISILON_NAMENODE"),
|
|
port=ISILON_WEBHDFS_PORT)
|
|
|
|
# Timeout each individual test case after 2 hours, or 4 hours for slow builds
|
|
PYTEST_TIMEOUT_S = \
|
|
build_flavor_timeout(2 * 60 * 60, slow_build_timeout=4 * 60 * 60)
|
|
|
|
|
|
def pytest_configure(config):
|
|
""" Hook startup of pytest. Sets up log format and per-test timeout. """
|
|
configure_logging()
|
|
config.option.timeout = PYTEST_TIMEOUT_S
|
|
|
|
|
|
def configure_logging():
|
|
# Use a "--" since most of our tests output SQL commands, and it's nice to
|
|
# be able to copy-paste directly from the test output back into a shell to
|
|
# try to reproduce a failure.
|
|
#
|
|
# This call only takes effect if it is the first call to logging.basicConfig().
|
|
# For example, if some other library calls logging.basicConfig() at the global
|
|
# level, then importing that library can render this call ineffective.
|
|
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
|
|
|
|
# Verify that the logging level is set to the correct value.
|
|
rootLoggerLevel = logging.getLogger().getEffectiveLevel()
|
|
print("rootLoggerLevel = {0}".format(logging.getLevelName(rootLoggerLevel)))
|
|
assert (rootLoggerLevel == logging.INFO)
|
|
|
|
|
|
def pytest_addoption(parser):
|
|
"""Adds a new command line options to py.test"""
|
|
parser.addoption("--exploration_strategy", default=DEFAULT_EXPLORATION_STRATEGY,
|
|
help="Default exploration strategy for all tests. Valid values: core, "
|
|
"pairwise, exhaustive.")
|
|
|
|
parser.addoption("--workload_exploration_strategy", default=None,
|
|
help="Override exploration strategy for specific workloads using the "
|
|
"format: workload:exploration_strategy. Ex: tpch:core,tpcds:pairwise.")
|
|
|
|
parser.addoption("--impalad", default=DEFAULT_IMPALADS,
|
|
help="A comma-separated list of impalad Beeswax host:ports to target. "
|
|
"Note: Not all tests make use of all impalad, some tests target just "
|
|
"the first item in the list (it is considered the 'default'")
|
|
|
|
parser.addoption("--impalad_hs2_port", default=DEFAULT_IMPALAD_HS2_PORT,
|
|
help="The impalad HiveServer2 port.")
|
|
|
|
parser.addoption("--impalad_hs2_http_port", default=DEFAULT_IMPALAD_HS2_HTTP_PORT,
|
|
help="The impalad HiveServer2 HTTP port.")
|
|
|
|
parser.addoption("--strict_hs2_port", default=DEFAULT_STRICT_HS2_PORT,
|
|
help="The strict HiveServer2 port (directly to hs2).")
|
|
|
|
parser.addoption("--strict_hs2_http_port", default=DEFAULT_STRICT_HS2_HTTP_PORT,
|
|
help="The strict HiveServer2 HTTP port (directly to hs2).")
|
|
|
|
parser.addoption("--metastore_server", default=DEFAULT_METASTORE_SERVER,
|
|
help="The Hive Metastore server host:port to connect to.")
|
|
|
|
parser.addoption("--hive_server2", default=DEFAULT_HIVE_SERVER2,
|
|
help="Hive's HiveServer2 host:port to connect to.")
|
|
|
|
parser.addoption("--kudu_master_hosts", default=DEFAULT_KUDU_MASTER_HOSTS,
|
|
help="Kudu master. Can be supplied as hostname, or hostname:port.")
|
|
|
|
parser.addoption("--minicluster_xml_conf", default=DEFAULT_HDFS_XML_CONF,
|
|
help="The full path to the HDFS xml configuration file")
|
|
|
|
parser.addoption("--namenode_http_address", default=DEFAULT_NAMENODE_ADDR,
|
|
help="The host:port for the HDFS Namenode's WebHDFS interface. Takes"
|
|
" precedence over any configuration read from --minicluster_xml_conf")
|
|
|
|
parser.addoption("--update_results", action="store_true", default=False,
|
|
help="If set, will generate new results for all tests run instead of "
|
|
"verifying the results.")
|
|
|
|
parser.addoption("--table_formats", dest="table_formats", default=None,
|
|
help="Override the test vectors and run only using the specified "
|
|
"table formats. Ex. --table_formats=seq/snap/block,text/none")
|
|
|
|
parser.addoption("--scale_factor", dest="scale_factor", default=None,
|
|
help="If running on a cluster, specify the scale factor"
|
|
"Ex. --scale_factor=500gb")
|
|
|
|
# KERBEROS TODO: I highly doubt that the default is correct. Try "hive".
|
|
parser.addoption("--hive_service_name", dest="hive_service_name",
|
|
default="Hive Metastore Server",
|
|
help="The principal service name for the hive metastore client when "
|
|
"using kerberos.")
|
|
|
|
parser.addoption("--use_kerberos", action="store_true", default=False,
|
|
help="use kerberos transport for running tests")
|
|
|
|
parser.addoption("--use_local_catalog", dest="use_local_catalog", action="store_true",
|
|
default=False, help="Run all tests against Impala configured with "
|
|
"LocalCatalog.")
|
|
|
|
parser.addoption("--sanity", action="store_true", default=False,
|
|
help="Runs a single test vector from each test to provide a quick "
|
|
"sanity check at the cost of lower test coverage.")
|
|
|
|
parser.addoption("--skip_hbase", action="store_true", default=False,
|
|
help="Skip HBase tests")
|
|
|
|
parser.addoption("--testing_remote_cluster", action="store_true", default=False,
|
|
help=("Indicates that tests are being run against a remote cluster. "
|
|
"Some tests may be marked to skip or xfail on remote clusters."))
|
|
|
|
parser.addoption("--shard_tests", default=None,
|
|
help="If set to N/M (e.g., 3/5), will split the tests into "
|
|
"M partitions and run the Nth partition. 1-indexed.")
|
|
|
|
parser.addoption("--shell_executable", default=None,
|
|
help="Full path to the impala-shell executable. Useful for running"
|
|
"shell/ e2e tests against a version of the impala-shell that has been "
|
|
"installed as a python package in a separate virtualenv. The python "
|
|
"version in the target virtualenv does not need to the match the "
|
|
"version used to execute the tests. "
|
|
"(See $IMPALA_HOME/bin/set-pythonpath.sh.)")
|
|
|
|
parser.addoption("--calcite_report_mode", action="store_true", default=False,
|
|
help="Mode designed to provide coverage for the Calcite planner. "
|
|
"Produces a JSON file for each run_test_case() invocation and "
|
|
"continues past errors. These JSON files can be processed to produce "
|
|
"a report to detect improvements and protect against regressions.")
|
|
|
|
parser.addoption("--calcite_report_output_dir", default=None,
|
|
help="Location to store the output JSON files for "
|
|
"calcite_report_mode. Defaults to ${IMPALA_LOGS_DIR}/calcite_report.")
|
|
|
|
parser.addoption("--default_test_protocol", default=DEFAULT_TEST_PROTOCOL,
|
|
choices=VALID_TEST_PROTOCOLS,
|
|
help="Impala protocol to run test if test does not specify any.")
|
|
|
|
|
|
def assert_impala_test_suite(request):
|
|
assert issubclass(request.cls, tests.common.impala_test_suite.ImpalaTestSuite), (
|
|
"This fixture is only applicable to ImpalaTestSuite subclasses."
|
|
)
|
|
|
|
|
|
def pytest_assertrepr_compare(op, left, right):
|
|
"""
|
|
Provides a hook for outputting type-specific assertion messages
|
|
|
|
Expected to return a list or strings, where each string will be printed as a new line
|
|
in the result output report on assertion failure.
|
|
"""
|
|
if isinstance(left, QueryTestResult) and isinstance(right, QueryTestResult) and \
|
|
op == "==":
|
|
result = ['Comparing QueryTestResults (expected vs actual):']
|
|
for l, r in zip(left.rows, right.rows):
|
|
result.append("%s == %s" % (l, r) if l == r else "%s != %s" % (l, r))
|
|
if len(left.rows) != len(right.rows):
|
|
result.append('Number of rows returned (expected vs actual): '
|
|
'%d != %d' % (len(left.rows), len(right.rows)))
|
|
|
|
# pytest has a bug/limitation where it will truncate long custom assertion messages
|
|
# (it has a hardcoded string length limit of 80*8 characters). To ensure this info
|
|
# isn't lost, always log the assertion message.
|
|
LOG.error('\n'.join(result))
|
|
return result
|
|
|
|
# pytest supports printing the diff for a set equality check, but does not do
|
|
# so well when we're doing a subset check. This handles that situation.
|
|
if isinstance(left, set) and isinstance(right, set) and op == '<=':
|
|
# If expected is not a subset of actual, print out the set difference.
|
|
result = ['Items in expected results not found in actual results:']
|
|
result.extend(list(left - right))
|
|
result.append('Items in actual results:')
|
|
result.extend(list(right))
|
|
LOG.error('\n'.join(result))
|
|
return result
|
|
|
|
|
|
def pytest_xdist_setupnodes(config, specs):
|
|
"""Hook that is called when setting up the xdist plugin"""
|
|
# Force the xdist plugin to be quiet. In verbose mode it spews useless information.
|
|
config.option.verbose = 0
|
|
|
|
|
|
def pytest_generate_tests(metafunc):
|
|
"""
|
|
This is a hook to parameterize the tests based on the input vector.
|
|
|
|
If a test has the 'vector' fixture specified, this code is invoked and it will build
|
|
a set of test vectors to parameterize the test with.
|
|
"""
|
|
# All Impala tests classes must inherit BaseTestSuite.
|
|
assert issubclass(metafunc.cls, tests.common.base_test_suite.BaseTestSuite)
|
|
assert metafunc.cls.default_test_protocol() in VALID_TEST_PROTOCOLS
|
|
if 'vector' in metafunc.fixturenames:
|
|
metafunc.cls.add_test_dimensions()
|
|
vectors = metafunc.cls.ImpalaTestMatrix.generate_test_vectors(
|
|
metafunc.config.option.exploration_strategy)
|
|
|
|
if len(vectors) == 0:
|
|
LOG.warning("No test vectors generated for test '%s'. Check constraints and "
|
|
"input vectors" % metafunc.function.__name__)
|
|
|
|
vector_names = list(map(str, vectors))
|
|
# In the case this is a test result update or sanity run, select a single test vector
|
|
# to run. This is okay for update_results because results are expected to be the same
|
|
# for all test vectors.
|
|
if metafunc.config.option.update_results or metafunc.config.option.sanity:
|
|
vectors = vectors[0:1]
|
|
vector_names = vector_names[0:1]
|
|
metafunc.parametrize('vector', vectors, ids=vector_names)
|
|
|
|
|
|
@pytest.yield_fixture
|
|
def cleanup_generated_core_dumps(request):
|
|
"""
|
|
A fixture to cleanup core dumps intentionally generated by tests (for negative testing).
|
|
|
|
Only core dumps generated by the decorated test function will be removed. Pre-existing
|
|
cores that need to be triaged from prior test failures are retained.
|
|
"""
|
|
possible_cores = find_all_files('*core*')
|
|
pre_test_cores = set([f for f in possible_cores if is_core_dump(f)])
|
|
|
|
yield # Wait for test to execute
|
|
|
|
possible_cores = find_all_files('*core*')
|
|
post_test_cores = set([f for f in possible_cores if is_core_dump(f)])
|
|
|
|
for f in (post_test_cores - pre_test_cores):
|
|
LOG.info("Cleaned up {core} created by {test}".format(core=f, test=request.node.name))
|
|
os.remove(f)
|
|
|
|
|
|
@pytest.fixture
|
|
def testid_checksum(request):
|
|
"""
|
|
Return a hex string representing the CRC32 checksum of the parametrized test
|
|
function's full pytest test ID. The full pytest ID includes relative path, module
|
|
name, possible test class name, function name, and any parameters.
|
|
|
|
This could be combined with some prefix in order to form identifiers unique to a
|
|
particular test case.
|
|
"""
|
|
# For an example of what a full pytest ID looks like, see below (written as Python
|
|
# multi-line literal)
|
|
#
|
|
# ("query_test/test_cancellation.py::TestCancellationParallel::()::test_cancel_select"
|
|
# "[table_format: avro/snap/block | exec_option: {'disable_codegen': False, "
|
|
# "'abort_on_error': 1, 'exec_single_node_rows_threshold': 0, 'batch_size': 0, "
|
|
# "'num_nodes': 0} | query_type: SELECT | cancel_delay: 3 | action: WAIT | "
|
|
# "query: select l_returnflag from lineitem]")
|
|
return '{0:x}'.format(crc32(request.node.nodeid.encode('utf-8')) & 0xffffffff)
|
|
|
|
|
|
@pytest.fixture
|
|
def unique_database(request, testid_checksum):
|
|
"""
|
|
Return a database name unique to any test using the fixture. The fixture creates the
|
|
database during setup, allows the test to know the database name, and drops the
|
|
database after the test has completed.
|
|
|
|
By default, the database name is a concatenation of the test function name and the
|
|
testid_checksum. The database name prefix can be changed via parameter (see below).
|
|
|
|
A good candidate for a test to use this fixture is a test that needs to have a
|
|
test-local database or test-local tables that are created and destroyed at test run
|
|
time. Because the fixture generates a unique name, tests using this fixture can be run
|
|
in parallel as long as they don't need exclusion on other test-local resources.
|
|
|
|
Sample usage:
|
|
|
|
def test_something(self, vector, unique_database):
|
|
# fixture creates database test_something_48A80F
|
|
self.client.execute('DROP TABLE IF EXISTS `{0}`.`mytable`'.format(unique_database))
|
|
# test does other stuff with the unique_database name as needed
|
|
|
|
We also allow for parametrization:
|
|
|
|
from tests.common.parametrize import UniqueDatabase
|
|
|
|
@UniqueDatabase.parametrize(name_prefix='mydb', num_dbs=3, sync_ddl=True)
|
|
def test_something(self, vector, unique_database):
|
|
# fixture creates databases mydb_48A80F, mydb_48A80F2, mydb_48A80F3 with sync_ddl
|
|
self.client.execute('DROP TABLE IF EXISTS `{0}`.`mytable`'.format(unique_database))
|
|
# test does other stuff with the unique_database name as needed
|
|
|
|
The supported parameters:
|
|
|
|
name_prefix: string (defaults to test function __name__)
|
|
- prefix to be used for the database name
|
|
|
|
num_dbs: integer (defaults to 1)
|
|
- number of unique databases to create
|
|
- the name of the 2nd, 3rd, etc. databases are generated by appending "2", "3",
|
|
etc., to the first database name (which does not have a "1" suffix)
|
|
|
|
sync_ddl: boolean (defaults to False)
|
|
- indicates whether the unique database should be created with sync_ddl
|
|
|
|
For a similar DB-API 2 compliant connection/cursor that uses HS2 see the 'conn' and
|
|
'unique_cursor' fixtures below.
|
|
"""
|
|
|
|
# Test cases are at the function level, so no one should "accidentally" re-scope this.
|
|
assert 'function' == request.scope, ('This fixture must have scope "function" since '
|
|
'the fixture must guarantee unique per-test '
|
|
'databases.')
|
|
assert_impala_test_suite(request)
|
|
|
|
db_name_prefix = request.function.__name__
|
|
sync_ddl = False
|
|
num_dbs = 1
|
|
fixture_params = getattr(request, 'param', None)
|
|
if fixture_params is not None:
|
|
if "name_prefix" in fixture_params:
|
|
db_name_prefix = fixture_params["name_prefix"]
|
|
if "sync_ddl" in fixture_params:
|
|
sync_ddl = fixture_params["sync_ddl"]
|
|
if "num_dbs" in fixture_params:
|
|
num_dbs = fixture_params["num_dbs"]
|
|
|
|
first_db_name = '{0}_{1}'.format(db_name_prefix, testid_checksum)
|
|
db_names = [first_db_name]
|
|
for i in range(2, num_dbs + 1):
|
|
db_names.append(first_db_name + str(i))
|
|
for db_name in db_names:
|
|
if not is_valid_impala_identifier(db_name):
|
|
raise ValueError('Unique database name "{0}" is not a valid Impala identifer; check'
|
|
' test function name or any prefixes for long length or invalid '
|
|
'characters.'.format(db_name))
|
|
|
|
def cleanup_database(client, db_name, must_exist):
|
|
for i in range(2):
|
|
try:
|
|
result = client.execute('DROP DATABASE {0} `{1}` CASCADE'.format(
|
|
"" if must_exist else "IF EXISTS", db_name))
|
|
break
|
|
except Exception as e:
|
|
if i == 0:
|
|
# Retry in case we hit IMPALA-14228.
|
|
LOG.warn("Ignored cleanup failure once: " + str(e))
|
|
else:
|
|
raise e
|
|
assert result.success
|
|
# The database directory may not be removed if there are external tables in the
|
|
# database when it is dropped. The external locations are not removed by cascade.
|
|
# These preexisting files/directories can cause errors when tests run repeatedly or
|
|
# use a data snapshot (see IMPALA-9702), so this forces cleanup of the database
|
|
# directory.
|
|
db_location = "{0}/{1}.db".format(WAREHOUSE, db_name).lstrip('/')
|
|
request.instance.filesystem_client.delete_file_dir(db_location, recursive=True)
|
|
|
|
def cleanup():
|
|
with request.cls.create_impala_client(protocol=HS2) as client:
|
|
client.set_configuration({'sync_ddl': sync_ddl})
|
|
for db_name in db_names:
|
|
cleanup_database(client, db_name, True)
|
|
LOG.info('Dropped database "{0}" for test ID "{1}"'.format(
|
|
db_name, str(request.node.nodeid)))
|
|
|
|
request.addfinalizer(cleanup)
|
|
|
|
with request.cls.create_impala_client(protocol=HS2) as client:
|
|
client.set_configuration({'sync_ddl': sync_ddl})
|
|
for db_name in db_names:
|
|
cleanup_database(client, db_name, False)
|
|
result = client.execute('CREATE DATABASE `{0}`'.format(db_name))
|
|
assert result.success
|
|
LOG.info('Created database "{0}" for test ID "{1}"'.format(
|
|
db_name, str(request.node.nodeid)))
|
|
return first_db_name
|
|
|
|
|
|
@pytest.fixture
|
|
def unique_role(request, testid_checksum):
|
|
"""Returns a unique role to any test using the fixture. The fixture does not create
|
|
a role."""
|
|
role_name_prefix = request.function.__name__
|
|
fixture_params = getattr(request, 'param', None)
|
|
if fixture_params is not None:
|
|
if 'name_prefix' in fixture_params:
|
|
role_name_prefix = fixture_params['name_prefix']
|
|
return '{0}_{1}_role'.format(role_name_prefix, testid_checksum)
|
|
|
|
|
|
@pytest.fixture
|
|
def unique_name(request, testid_checksum):
|
|
"""Returns a unique name to any test using the fixture."""
|
|
name_prefix = request.function.__name__
|
|
fixture_params = getattr(request, 'param', None)
|
|
if fixture_params is not None:
|
|
if 'name_prefix' in fixture_params:
|
|
name_prefix = fixture_params['name_prefix']
|
|
return '{0}_{1}'.format(name_prefix, testid_checksum)
|
|
|
|
|
|
@pytest.yield_fixture
|
|
def kudu_client():
|
|
"""Provides a new Kudu client as a pytest fixture. The client only exists for the
|
|
duration of the method it is used in.
|
|
"""
|
|
kudu_master = pytest.config.option.kudu_master_hosts
|
|
|
|
if "," in kudu_master:
|
|
raise Exception("Multi-master not supported yet")
|
|
if ":" in kudu_master:
|
|
host, port = kudu_master.split(":")
|
|
else:
|
|
host, port = kudu_master, DEFAULT_KUDU_MASTER_PORT
|
|
kudu_client = kudu_connect(host, port)
|
|
yield kudu_client
|
|
|
|
try:
|
|
kudu_client.close()
|
|
except Exception as e:
|
|
LOG.warn("Error closing Kudu client: %s", e)
|
|
|
|
|
|
@pytest.yield_fixture(scope="class")
|
|
def conn(request):
|
|
"""Provides a new DB-API compliant connection to Impala as a pytest fixture. The
|
|
same connection is used for all test methods in a class. The class may provide the
|
|
following customizations:
|
|
- get_db_name(): The name of the database to connect to.
|
|
- auto_create_db(): If declared and the method returns True, the database will
|
|
be created before tests run and dropped afterwards. If a database name is
|
|
provided by get_db_name(), it must not exist. Classes that use both
|
|
auto_create_db() and get_db_name() should generate a random name in
|
|
get_db_name() and cache it.
|
|
- get_conn_timeout(): The timeout, in seconds, to use for this connection.
|
|
The returned connection will have a 'db_name' property.
|
|
|
|
DEPRECATED:
|
|
See the 'unique_database' fixture above to use Impala's custom python
|
|
API instead of DB-API.
|
|
"""
|
|
db_name = __call_cls_method_if_exists(request.cls, "get_db_name")
|
|
use_unique_conn = __call_cls_method_if_exists(request.cls, "auto_create_db")
|
|
timeout = __call_cls_method_if_exists(request.cls, "get_conn_timeout") or \
|
|
DEFAULT_CONN_TIMEOUT
|
|
if use_unique_conn:
|
|
with __unique_conn(db_name=db_name, timeout=timeout) as conn:
|
|
yield conn
|
|
else:
|
|
with __auto_closed_conn(db_name=db_name, timeout=timeout) as conn:
|
|
yield conn
|
|
|
|
|
|
def __call_cls_method_if_exists(cls, method_name):
|
|
"""Returns the result of calling the method 'method_name' on class 'class' if the class
|
|
defined such a method, otherwise returns None.
|
|
"""
|
|
method = getattr(cls, method_name, None)
|
|
if method:
|
|
return method()
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def __unique_conn(db_name=None, timeout=DEFAULT_CONN_TIMEOUT):
|
|
"""Connects to Impala and creates a new database, then returns a connection to it.
|
|
This is intended to be used in a "with" block. Upon exit, the database will be
|
|
dropped. A database name can be provided by 'db_name', a database by that name
|
|
must not exist prior to calling this method.
|
|
|
|
with __unique_conn() as conn:
|
|
# Use conn
|
|
# The database no longer exists and the conn is closed.
|
|
|
|
The returned connection will have a 'db_name' property.
|
|
|
|
DEPRECATED:
|
|
See the 'unique_database' fixture above to use Impala's custom python
|
|
API instead of DB-API.
|
|
"""
|
|
if not db_name:
|
|
db_name = choice(ascii_lowercase) + "".join(sample(ascii_lowercase + digits, 5))
|
|
with __auto_closed_conn(timeout=timeout) as conn:
|
|
with __auto_closed_cursor(conn) as cur:
|
|
cur.execute("CREATE DATABASE %s" % db_name)
|
|
with __auto_closed_conn(db_name=db_name, timeout=timeout) as conn:
|
|
try:
|
|
yield conn
|
|
finally:
|
|
try:
|
|
with __auto_closed_cursor(conn) as cur:
|
|
try:
|
|
cur.execute("USE DEFAULT")
|
|
cur.execute("DROP DATABASE IF EXISTS %s CASCADE" % db_name)
|
|
except Exception as e:
|
|
LOG.warn("Error dropping database: %s", e)
|
|
except Exception as e:
|
|
LOG.warn("Error creating a cursor: %s", e)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def __auto_closed_conn(db_name=None, timeout=DEFAULT_CONN_TIMEOUT):
|
|
"""Returns a connection to Impala. This is intended to be used in a "with" block.
|
|
The connection will be closed upon exiting the block.
|
|
|
|
The returned connection will have a 'db_name' property.
|
|
|
|
DEPRECATED:
|
|
See the 'unique_database' fixture above to use Impala's custom python
|
|
API instead of DB-API.
|
|
"""
|
|
default_impalad = pytest.config.option.impalad.split(',')[0]
|
|
impalad_host = default_impalad.split(':')[0]
|
|
hs2_port = pytest.config.option.impalad_hs2_port
|
|
|
|
conn = impala_connect(host=impalad_host, port=hs2_port, database=db_name,
|
|
timeout=timeout)
|
|
try:
|
|
conn.db_name = db_name
|
|
yield conn
|
|
finally:
|
|
try:
|
|
conn.close()
|
|
except Exception as e:
|
|
LOG.warn("Error closing Impala connection: %s", e)
|
|
|
|
|
|
@pytest.yield_fixture
|
|
def cursor(conn):
|
|
"""Provides a new DB-API compliant cursor from a connection provided by the conn()
|
|
fixture. The cursor only exists for the duration of the method it is used in.
|
|
|
|
The returned cursor will have a 'conn' property. The 'conn' will have a 'db_name'
|
|
property.
|
|
|
|
DEPRECATED:
|
|
See the 'unique_database' fixture above to use Impala's custom python
|
|
API instead of DB-API.
|
|
"""
|
|
with __auto_closed_cursor(conn) as cur:
|
|
yield cur
|
|
|
|
|
|
@pytest.yield_fixture(scope="class")
|
|
def cls_cursor(conn):
|
|
"""Provides a new DB-API compliant cursor from a connection provided by the conn()
|
|
fixture. The cursor exists for the duration of the class it is used in.
|
|
|
|
The returned cursor will have a 'conn' property. The 'conn' will have a 'db_name'
|
|
property.
|
|
|
|
DEPRECATED:
|
|
See the 'unique_database' fixture above to use Impala's custom python
|
|
API instead of DB-API.
|
|
"""
|
|
with __auto_closed_cursor(conn) as cur:
|
|
yield cur
|
|
|
|
|
|
@pytest.yield_fixture
|
|
def unique_cursor():
|
|
"""Provides a new DB-API compliant cursor to a newly created Impala database. The
|
|
cursor only exists for the duration of the method it is used in. The database will
|
|
be dropped after the test executes.
|
|
|
|
The returned cursor will have a 'conn' property. The 'conn' will have a 'db_name'
|
|
property.
|
|
|
|
DEPRECATED:
|
|
See the 'unique_database' fixture above to use Impala's custom python
|
|
API instead of DB-API.
|
|
"""
|
|
with __unique_conn() as conn:
|
|
with __auto_closed_cursor(conn) as cur:
|
|
yield cur
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def __auto_closed_cursor(conn):
|
|
"""Returns a cursor created from conn. This is intended to be used in a "with" block.
|
|
The cursor will be closed upon exiting the block.
|
|
"""
|
|
cursor = conn.cursor()
|
|
cursor.conn = conn
|
|
try:
|
|
yield cursor
|
|
finally:
|
|
try:
|
|
cursor.close()
|
|
except Exception as e:
|
|
LOG.warn("Error closing Impala cursor: %s", e)
|
|
|
|
|
|
@pytest.yield_fixture
|
|
def impala_testinfra_cursor():
|
|
"""
|
|
Return ImpalaCursor object. Used for "tests of tests" for the infra for the query
|
|
generator, stress test, etc.
|
|
"""
|
|
# This differs from the cursors above, which return direct Impyla cursors. Tests that
|
|
# use this fixture want to interact with the objects in
|
|
# tests.comparison.db_connection, which need testing.
|
|
with ImpalaConnection() as conn:
|
|
cursor = conn.cursor()
|
|
try:
|
|
yield cursor
|
|
finally:
|
|
cursor.close()
|
|
|
|
|
|
@pytest.fixture(autouse=True, scope='session')
|
|
def validate_pytest_config():
|
|
"""
|
|
Validate that pytest command line options make sense.
|
|
"""
|
|
if pytest.config.option.testing_remote_cluster:
|
|
local_prefixes = ('localhost', '127.', '0.0.0.0')
|
|
if any(pytest.config.option.impalad.startswith(loc) for loc in local_prefixes):
|
|
logging.error("--testing_remote_cluster can not be used with a local impalad")
|
|
pytest.exit("Invalid pytest config option: --testing_remote_cluster")
|
|
|
|
|
|
@pytest.yield_fixture(autouse=True, scope='session')
|
|
def cluster_properties():
|
|
"""Set up test cluster properties for the test session"""
|
|
# Don't import at top level to avoid circular dependency between conftest and
|
|
# tests.common.environ, which uses command-line flags set up by conftest.
|
|
from tests.common.environ import ImpalaTestClusterProperties
|
|
cluster_properties = ImpalaTestClusterProperties.get_instance()
|
|
yield cluster_properties
|
|
|
|
|
|
@pytest.fixture(autouse=True, scope='session')
|
|
def validate_python_version():
|
|
"""Check the Python runtime version before running any tests. Since Impala switched
|
|
to the toolchain Python, which is at least v2.7, the tests will not run on a version
|
|
below that.
|
|
"""
|
|
assert sys.version_info > (2, 7), "Tests only support Python 2.7+"
|
|
|
|
|
|
@pytest.hookimpl(trylast=True)
|
|
def pytest_collection_modifyitems(items, config, session):
|
|
"""Hook to handle --shard_tests command line option.
|
|
|
|
If set, this "deselects" a subset of tests, by hashing (using crc32())
|
|
their id into buckets.
|
|
"""
|
|
if not config.option.shard_tests:
|
|
return
|
|
|
|
num_items = len(items)
|
|
this_shard, num_shards = list(map(int, config.option.shard_tests.split("/")))
|
|
assert 0 <= this_shard <= num_shards
|
|
if this_shard == num_shards:
|
|
this_shard = 0
|
|
|
|
items_selected, items_deselected = [], []
|
|
for i in items:
|
|
if crc32(i.nodeid.encode('utf-8')) % num_shards == this_shard:
|
|
items_selected.append(i)
|
|
else:
|
|
items_deselected.append(i)
|
|
config.hook.pytest_deselected(items=items_deselected)
|
|
|
|
# We must modify the items list in place for it to take effect.
|
|
items[:] = items_selected
|
|
|
|
logging.info(
|
|
"pytest shard selection enabled %s. Of %d items, selected %d items by hash.",
|
|
config.option.shard_tests, num_items, len(items))
|
|
|
|
|
|
@pytest.hookimpl(trylast=True)
|
|
def pytest_runtest_logstart(nodeid, location):
|
|
# Beeswax doesn't support commas or equals in configuration, so they are replaced.
|
|
# Spaces are removed to make the string a little bit shorter.
|
|
# The string is shortened so that it is entirely spit out by ThriftDebugString, rather
|
|
# than being elided.
|
|
tests.common.current_node = \
|
|
nodeid.replace(",", ";").replace(" ", "").replace("=", "-")[0:255]
|
|
|
|
# Store the unaltered nodeid as well
|
|
tests.common.nodeid = nodeid
|