mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
Recent testing showed that the pytests are not respecting the log level and format set in conftest.py's configure_logging(). It is using the default log level of WARNING and the default formatter. The issue is that logging.basicConfig() is only effective the first time it is called. The code in lib/python/impala_py_lib/helpers.py does a call to logging.basicConfig() at the global level, and conftest.py imports that file. This renders the call in configure_logging() ineffective. To avoid this type of confusion, logging.basicConfig() should only be called from the main() functions for libraries. This removes the call in lib/python/impala_py_lib (as it is not needed for a library without a main function). It also fixes up various other locations to move the logging.basicConfig() call to the main() function. Testing: - Ran the end to end tests and custom cluster tests - Confirmed the logging format - Added an assert in configure_logging() to test that the INFO log level is applied to the root logger. Change-Id: I5d91b7f910b3606c50bcba4579179a0bc8c20588 Reviewed-on: http://gerrit.cloudera.org:8080/16679 Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
689 lines
28 KiB
Python
689 lines
28 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
# py.test configuration module
|
|
#
|
|
from impala.dbapi import connect as impala_connect
|
|
from kudu import connect as kudu_connect
|
|
from random import choice, sample
|
|
from string import ascii_lowercase, digits
|
|
from zlib import crc32
|
|
import contextlib
|
|
import logging
|
|
import os
|
|
import pytest
|
|
import sys
|
|
|
|
import tests.common
|
|
from impala_py_lib.helpers import find_all_files, is_core_dump
|
|
from tests.common.environ import build_flavor_timeout
|
|
from common.test_result_verifier import QueryTestResult
|
|
from tests.common.patterns import is_valid_impala_identifier
|
|
from tests.comparison.db_connection import ImpalaConnection
|
|
from tests.util.filesystem_utils import FILESYSTEM, ISILON_WEBHDFS_PORT, WAREHOUSE
|
|
|
|
LOG = logging.getLogger('test_configuration')
|
|
LOG_FORMAT = "-- %(asctime)s %(levelname)-8s %(threadName)s: %(message)s"
|
|
|
|
DEFAULT_CONN_TIMEOUT = 45
|
|
DEFAULT_EXPLORATION_STRATEGY = 'core'
|
|
DEFAULT_HDFS_XML_CONF = os.path.join(os.environ['HADOOP_CONF_DIR'], "hdfs-site.xml")
|
|
DEFAULT_HIVE_SERVER2 = 'localhost:11050'
|
|
DEFAULT_IMPALAD_HS2_PORT = '21050'
|
|
DEFAULT_IMPALAD_HS2_HTTP_PORT = '28000'
|
|
DEFAULT_IMPALADS = "localhost:21000,localhost:21001,localhost:21002"
|
|
DEFAULT_KUDU_MASTER_HOSTS = os.getenv('KUDU_MASTER_HOSTS', '127.0.0.1')
|
|
DEFAULT_KUDU_MASTER_PORT = os.getenv('KUDU_MASTER_PORT', '7051')
|
|
DEFAULT_METASTORE_SERVER = 'localhost:9083'
|
|
DEFAULT_NAMENODE_ADDR = None
|
|
if FILESYSTEM == 'isilon':
|
|
DEFAULT_NAMENODE_ADDR = "{node}:{port}".format(node=os.getenv("ISILON_NAMENODE"),
|
|
port=ISILON_WEBHDFS_PORT)
|
|
|
|
# Timeout each individual test case after 2 hours, or 4 hours for slow builds
|
|
PYTEST_TIMEOUT_S = \
|
|
build_flavor_timeout(2 * 60 * 60, slow_build_timeout=4 * 60 * 60)
|
|
|
|
def pytest_configure(config):
|
|
""" Hook startup of pytest. Sets up log format and per-test timeout. """
|
|
configure_logging()
|
|
config.option.timeout = PYTEST_TIMEOUT_S
|
|
|
|
|
|
def configure_logging():
|
|
# Use a "--" since most of our tests output SQL commands, and it's nice to
|
|
# be able to copy-paste directly from the test output back into a shell to
|
|
# try to reproduce a failure.
|
|
#
|
|
# This call only takes effect if it is the first call to logging.basicConfig().
|
|
# For example, if some other library calls logging.basicConfig() at the global
|
|
# level, then importing that library can render this call ineffective.
|
|
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
|
|
|
|
# Verify that the logging level is set to the correct value.
|
|
rootLoggerLevel = logging.getLogger().getEffectiveLevel()
|
|
print("rootLoggerLevel = {0}".format(logging.getLevelName(rootLoggerLevel)))
|
|
assert(rootLoggerLevel == logging.INFO)
|
|
|
|
|
|
def pytest_addoption(parser):
|
|
"""Adds a new command line options to py.test"""
|
|
parser.addoption("--exploration_strategy", default=DEFAULT_EXPLORATION_STRATEGY,
|
|
help="Default exploration strategy for all tests. Valid values: core, "
|
|
"pairwise, exhaustive.")
|
|
|
|
parser.addoption("--workload_exploration_strategy", default=None,
|
|
help="Override exploration strategy for specific workloads using the "
|
|
"format: workload:exploration_strategy. Ex: tpch:core,tpcds:pairwise.")
|
|
|
|
parser.addoption("--impalad", default=DEFAULT_IMPALADS,
|
|
help="A comma-separated list of impalad Beeswax host:ports to target. "
|
|
"Note: Not all tests make use of all impalad, some tests target just "
|
|
"the first item in the list (it is considered the 'default'")
|
|
|
|
parser.addoption("--impalad_hs2_port", default=DEFAULT_IMPALAD_HS2_PORT,
|
|
help="The impalad HiveServer2 port.")
|
|
|
|
parser.addoption("--impalad_hs2_http_port", default=DEFAULT_IMPALAD_HS2_HTTP_PORT,
|
|
help="The impalad HiveServer2 HTTP port.")
|
|
|
|
parser.addoption("--metastore_server", default=DEFAULT_METASTORE_SERVER,
|
|
help="The Hive Metastore server host:port to connect to.")
|
|
|
|
parser.addoption("--hive_server2", default=DEFAULT_HIVE_SERVER2,
|
|
help="Hive's HiveServer2 host:port to connect to.")
|
|
|
|
parser.addoption("--kudu_master_hosts", default=DEFAULT_KUDU_MASTER_HOSTS,
|
|
help="Kudu master. Can be supplied as hostname, or hostname:port.")
|
|
|
|
parser.addoption("--minicluster_xml_conf", default=DEFAULT_HDFS_XML_CONF,
|
|
help="The full path to the HDFS xml configuration file")
|
|
|
|
parser.addoption("--namenode_http_address", default=DEFAULT_NAMENODE_ADDR,
|
|
help="The host:port for the HDFS Namenode's WebHDFS interface. Takes"
|
|
" precedence over any configuration read from --minicluster_xml_conf")
|
|
|
|
parser.addoption("--update_results", action="store_true", default=False,
|
|
help="If set, will generate new results for all tests run instead of "
|
|
"verifying the results.")
|
|
|
|
parser.addoption("--table_formats", dest="table_formats", default=None,
|
|
help="Override the test vectors and run only using the specified "
|
|
"table formats. Ex. --table_formats=seq/snap/block,text/none")
|
|
|
|
parser.addoption("--scale_factor", dest="scale_factor", default=None,
|
|
help="If running on a cluster, specify the scale factor"
|
|
"Ex. --scale_factor=500gb")
|
|
|
|
# KERBEROS TODO: I highly doubt that the default is correct. Try "hive".
|
|
parser.addoption("--hive_service_name", dest="hive_service_name",
|
|
default="Hive Metastore Server",
|
|
help="The principal service name for the hive metastore client when "
|
|
"using kerberos.")
|
|
|
|
parser.addoption("--use_kerberos", action="store_true", default=False,
|
|
help="use kerberos transport for running tests")
|
|
|
|
parser.addoption("--use_local_catalog", dest="use_local_catalog", action="store_true",
|
|
default=False, help="Run all tests against Impala configured with "
|
|
"LocalCatalog.")
|
|
|
|
parser.addoption("--sanity", action="store_true", default=False,
|
|
help="Runs a single test vector from each test to provide a quick "
|
|
"sanity check at the cost of lower test coverage.")
|
|
|
|
parser.addoption("--skip_hbase", action="store_true", default=False,
|
|
help="Skip HBase tests")
|
|
|
|
parser.addoption("--testing_remote_cluster", action="store_true", default=False,
|
|
help=("Indicates that tests are being run against a remote cluster. "
|
|
"Some tests may be marked to skip or xfail on remote clusters."))
|
|
|
|
parser.addoption("--shard_tests", default=None,
|
|
help="If set to N/M (e.g., 3/5), will split the tests into "
|
|
"M partitions and run the Nth partition. 1-indexed.")
|
|
|
|
parser.addoption("--shell_executable", default=None,
|
|
help="Full path to the impala-shell executable. Useful for running"
|
|
"shell/ e2e tests against a version of the impala-shell that has been "
|
|
"installed as a python package in a separate virtualenv. The python "
|
|
"version in the target virtualenv does not need to the match the "
|
|
"version used to execute the tests, but if the tested environment is "
|
|
"using python 3, also be sure to also set the environment variable "
|
|
"USE_THRIFT11_GEN_PY=true in the test env. "
|
|
"(See $IMPALA_HOME/bin/set-pythonpath.sh.)")
|
|
|
|
|
|
def pytest_assertrepr_compare(op, left, right):
|
|
"""
|
|
Provides a hook for outputting type-specific assertion messages
|
|
|
|
Expected to return a list or strings, where each string will be printed as a new line
|
|
in the result output report on assertion failure.
|
|
"""
|
|
if isinstance(left, QueryTestResult) and isinstance(right, QueryTestResult) and \
|
|
op == "==":
|
|
result = ['Comparing QueryTestResults (expected vs actual):']
|
|
for l, r in map(None, left.rows, right.rows):
|
|
result.append("%s == %s" % (l, r) if l == r else "%s != %s" % (l, r))
|
|
if len(left.rows) != len(right.rows):
|
|
result.append('Number of rows returned (expected vs actual): '
|
|
'%d != %d' % (len(left.rows), len(right.rows)))
|
|
|
|
# pytest has a bug/limitation where it will truncate long custom assertion messages
|
|
# (it has a hardcoded string length limit of 80*8 characters). To ensure this info
|
|
# isn't lost, always log the assertion message.
|
|
LOG.error('\n'.join(result))
|
|
return result
|
|
|
|
# pytest supports printing the diff for a set equality check, but does not do
|
|
# so well when we're doing a subset check. This handles that situation.
|
|
if isinstance(left, set) and isinstance(right, set) and op == '<=':
|
|
# If expected is not a subset of actual, print out the set difference.
|
|
result = ['Items in expected results not found in actual results:']
|
|
result.extend(list(left - right))
|
|
result.append('Items in actual results:')
|
|
result.extend(list(right))
|
|
LOG.error('\n'.join(result))
|
|
return result
|
|
|
|
|
|
def pytest_xdist_setupnodes(config, specs):
|
|
"""Hook that is called when setting up the xdist plugin"""
|
|
# Force the xdist plugin to be quiet. In verbose mode it spews useless information.
|
|
config.option.verbose = 0
|
|
|
|
|
|
def pytest_generate_tests(metafunc):
|
|
"""
|
|
This is a hook to parameterize the tests based on the input vector.
|
|
|
|
If a test has the 'vector' fixture specified, this code is invoked and it will build
|
|
a set of test vectors to parameterize the test with.
|
|
"""
|
|
if 'vector' in metafunc.fixturenames:
|
|
metafunc.cls.add_test_dimensions()
|
|
vectors = metafunc.cls.ImpalaTestMatrix.generate_test_vectors(
|
|
metafunc.config.option.exploration_strategy)
|
|
|
|
if len(vectors) == 0:
|
|
LOG.warning("No test vectors generated for test '%s'. Check constraints and "
|
|
"input vectors" % metafunc.function.func_name)
|
|
|
|
vector_names = map(str, vectors)
|
|
# In the case this is a test result update or sanity run, select a single test vector
|
|
# to run. This is okay for update_results because results are expected to be the same
|
|
# for all test vectors.
|
|
if metafunc.config.option.update_results or metafunc.config.option.sanity:
|
|
vectors = vectors[0:1]
|
|
vector_names = vector_names[0:1]
|
|
metafunc.parametrize('vector', vectors, ids=vector_names)
|
|
|
|
|
|
@pytest.yield_fixture
|
|
def cleanup_generated_core_dumps(request):
|
|
"""
|
|
A fixture to cleanup core dumps intentionally generated by tests (for negative testing).
|
|
|
|
Only core dumps generated by the decorated test function will be removed. Pre-existing
|
|
cores that need to be triaged from prior test failures are retained.
|
|
"""
|
|
possible_cores = find_all_files('*core*')
|
|
pre_test_cores = set([f for f in possible_cores if is_core_dump(f)])
|
|
|
|
yield # Wait for test to execute
|
|
|
|
possible_cores = find_all_files('*core*')
|
|
post_test_cores = set([f for f in possible_cores if is_core_dump(f)])
|
|
|
|
for f in (post_test_cores - pre_test_cores):
|
|
LOG.info("Cleaned up {core} created by {test}".format(core=f, test=request.node.name))
|
|
os.remove(f)
|
|
|
|
|
|
@pytest.fixture
|
|
def testid_checksum(request):
|
|
"""
|
|
Return a hex string representing the CRC32 checksum of the parametrized test
|
|
function's full pytest test ID. The full pytest ID includes relative path, module
|
|
name, possible test class name, function name, and any parameters.
|
|
|
|
This could be combined with some prefix in order to form identifiers unique to a
|
|
particular test case.
|
|
"""
|
|
# For an example of what a full pytest ID looks like, see below (written as Python
|
|
# multi-line literal)
|
|
#
|
|
# ("query_test/test_cancellation.py::TestCancellationParallel::()::test_cancel_select"
|
|
# "[table_format: avro/snap/block | exec_option: {'disable_codegen': False, "
|
|
# "'abort_on_error': 1, 'exec_single_node_rows_threshold': 0, 'batch_size': 0, "
|
|
# "'num_nodes': 0} | query_type: SELECT | cancel_delay: 3 | action: WAIT | "
|
|
# "query: select l_returnflag from lineitem]")
|
|
return '{0:x}'.format(crc32(request.node.nodeid) & 0xffffffff)
|
|
|
|
|
|
@pytest.fixture
|
|
def unique_database(request, testid_checksum):
|
|
"""
|
|
Return a database name unique to any test using the fixture. The fixture creates the
|
|
database during setup, allows the test to know the database name, and drops the
|
|
database after the test has completed.
|
|
|
|
By default, the database name is a concatenation of the test function name and the
|
|
testid_checksum. The database name prefix can be changed via parameter (see below).
|
|
|
|
A good candidate for a test to use this fixture is a test that needs to have a
|
|
test-local database or test-local tables that are created and destroyed at test run
|
|
time. Because the fixture generates a unique name, tests using this fixture can be run
|
|
in parallel as long as they don't need exclusion on other test-local resources.
|
|
|
|
Sample usage:
|
|
|
|
def test_something(self, vector, unique_database):
|
|
# fixture creates database test_something_48A80F
|
|
self.client.execute('DROP TABLE IF EXISTS `{0}`.`mytable`'.format(unique_database))
|
|
# test does other stuff with the unique_database name as needed
|
|
|
|
We also allow for parametrization:
|
|
|
|
from tests.common.parametrize import UniqueDatabase
|
|
|
|
@UniqueDatabase.parametrize(name_prefix='mydb', num_dbs=3, sync_ddl=True)
|
|
def test_something(self, vector, unique_database):
|
|
# fixture creates databases mydb_48A80F, mydb_48A80F2, mydb_48A80F3 with sync_ddl
|
|
self.client.execute('DROP TABLE IF EXISTS `{0}`.`mytable`'.format(unique_database))
|
|
# test does other stuff with the unique_database name as needed
|
|
|
|
The supported parameters:
|
|
|
|
name_prefix: string (defaults to test function __name__)
|
|
- prefix to be used for the database name
|
|
|
|
num_dbs: integer (defaults to 1)
|
|
- number of unique databases to create
|
|
- the name of the 2nd, 3rd, etc. databases are generated by appending "2", "3",
|
|
etc., to the first database name (which does not have a "1" suffix)
|
|
|
|
sync_ddl: boolean (defaults to False)
|
|
- indicates whether the unique database should be created with sync_ddl
|
|
|
|
For a similar DB-API 2 compliant connection/cursor that uses HS2 see the 'conn' and
|
|
'unique_cursor' fixtures below.
|
|
"""
|
|
|
|
# Test cases are at the function level, so no one should "accidentally" re-scope this.
|
|
assert 'function' == request.scope, ('This fixture must have scope "function" since '
|
|
'the fixture must guarantee unique per-test '
|
|
'databases.')
|
|
|
|
db_name_prefix = request.function.__name__
|
|
sync_ddl = False
|
|
num_dbs = 1
|
|
fixture_params = getattr(request, 'param', None)
|
|
if fixture_params is not None:
|
|
if "name_prefix" in fixture_params:
|
|
db_name_prefix = fixture_params["name_prefix"]
|
|
if "sync_ddl" in fixture_params:
|
|
sync_ddl = fixture_params["sync_ddl"]
|
|
if "num_dbs" in fixture_params:
|
|
num_dbs = fixture_params["num_dbs"]
|
|
|
|
first_db_name = '{0}_{1}'.format(db_name_prefix, testid_checksum)
|
|
db_names = [first_db_name]
|
|
for i in range(2, num_dbs + 1):
|
|
db_names.append(first_db_name + str(i))
|
|
for db_name in db_names:
|
|
if not is_valid_impala_identifier(db_name):
|
|
raise ValueError('Unique database name "{0}" is not a valid Impala identifer; check'
|
|
' test function name or any prefixes for long length or invalid '
|
|
'characters.'.format(db_name))
|
|
|
|
def cleanup_database(db_name, must_exist):
|
|
request.instance.execute_query_expect_success(request.instance.client,
|
|
'DROP DATABASE {0} `{1}` CASCADE'.format(
|
|
"" if must_exist else "IF EXISTS", db_name),
|
|
{'sync_ddl': sync_ddl})
|
|
# The database directory may not be removed if there are external tables in the
|
|
# database when it is dropped. The external locations are not removed by cascade.
|
|
# These preexisting files/directories can cause errors when tests run repeatedly or
|
|
# use a data snapshot (see IMPALA-9702), so this forces cleanup of the database
|
|
# directory.
|
|
db_location = "{0}/{1}.db".format(WAREHOUSE, db_name).lstrip('/')
|
|
request.instance.filesystem_client.delete_file_dir(db_location, recursive=True)
|
|
|
|
def cleanup():
|
|
# Make sure we don't try to drop the current session database
|
|
request.instance.execute_query_expect_success(request.instance.client, "use default")
|
|
for db_name in db_names:
|
|
cleanup_database(db_name, True)
|
|
LOG.info('Dropped database "{0}" for test ID "{1}"'.format(
|
|
db_name, str(request.node.nodeid)))
|
|
|
|
request.addfinalizer(cleanup)
|
|
|
|
for db_name in db_names:
|
|
cleanup_database(db_name, False)
|
|
request.instance.execute_query_expect_success(
|
|
request.instance.client, 'CREATE DATABASE `{0}`'.format(db_name),
|
|
{'sync_ddl': sync_ddl})
|
|
LOG.info('Created database "{0}" for test ID "{1}"'.format(db_name,
|
|
str(request.node.nodeid)))
|
|
return first_db_name
|
|
|
|
|
|
@pytest.fixture
|
|
def unique_role(request, testid_checksum):
|
|
"""Returns a unique role to any test using the fixture. The fixture does not create
|
|
a role."""
|
|
role_name_prefix = request.function.__name__
|
|
fixture_params = getattr(request, 'param', None)
|
|
if fixture_params is not None:
|
|
if 'name_prefix' in fixture_params:
|
|
role_name_prefix = fixture_params['name_prefix']
|
|
return '{0}_{1}_role'.format(role_name_prefix, testid_checksum)
|
|
|
|
|
|
@pytest.fixture
|
|
def unique_name(request, testid_checksum):
|
|
"""Returns a unique name to any test using the fixture."""
|
|
name_prefix = request.function.__name__
|
|
fixture_params = getattr(request, 'param', None)
|
|
if fixture_params is not None:
|
|
if 'name_prefix' in fixture_params:
|
|
name_prefix = fixture_params['name_prefix']
|
|
return '{0}_{1}'.format(name_prefix, testid_checksum)
|
|
|
|
|
|
@pytest.yield_fixture
|
|
def kudu_client():
|
|
"""Provides a new Kudu client as a pytest fixture. The client only exists for the
|
|
duration of the method it is used in.
|
|
"""
|
|
kudu_master = pytest.config.option.kudu_master_hosts
|
|
|
|
if "," in kudu_master:
|
|
raise Exception("Multi-master not supported yet")
|
|
if ":" in kudu_master:
|
|
host, port = kudu_master.split(":")
|
|
else:
|
|
host, port = kudu_master, DEFAULT_KUDU_MASTER_PORT
|
|
kudu_client = kudu_connect(host, port)
|
|
yield kudu_client
|
|
|
|
try:
|
|
kudu_client.close()
|
|
except Exception as e:
|
|
LOG.warn("Error closing Kudu client: %s", e)
|
|
|
|
|
|
@pytest.yield_fixture(scope="class")
|
|
def conn(request):
|
|
"""Provides a new DB-API compliant connection to Impala as a pytest fixture. The
|
|
same connection is used for all test methods in a class. The class may provide the
|
|
following customizations:
|
|
- get_db_name(): The name of the database to connect to.
|
|
- auto_create_db(): If declared and the method returns True, the database will
|
|
be created before tests run and dropped afterwards. If a database name is
|
|
provided by get_db_name(), it must not exist. Classes that use both
|
|
auto_create_db() and get_db_name() should generate a random name in
|
|
get_db_name() and cache it.
|
|
- get_conn_timeout(): The timeout, in seconds, to use for this connection.
|
|
The returned connection will have a 'db_name' property.
|
|
|
|
DEPRECATED:
|
|
See the 'unique_database' fixture above to use Impala's custom python
|
|
API instead of DB-API.
|
|
"""
|
|
db_name = __call_cls_method_if_exists(request.cls, "get_db_name")
|
|
use_unique_conn = __call_cls_method_if_exists(request.cls, "auto_create_db")
|
|
timeout = __call_cls_method_if_exists(request.cls, "get_conn_timeout") or \
|
|
DEFAULT_CONN_TIMEOUT
|
|
if use_unique_conn:
|
|
with __unique_conn(db_name=db_name, timeout=timeout) as conn:
|
|
yield conn
|
|
else:
|
|
with __auto_closed_conn(db_name=db_name, timeout=timeout) as conn:
|
|
yield conn
|
|
|
|
|
|
def __call_cls_method_if_exists(cls, method_name):
|
|
"""Returns the result of calling the method 'method_name' on class 'class' if the class
|
|
defined such a method, otherwise returns None.
|
|
"""
|
|
method = getattr(cls, method_name, None)
|
|
if method:
|
|
return method()
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def __unique_conn(db_name=None, timeout=DEFAULT_CONN_TIMEOUT):
|
|
"""Connects to Impala and creates a new database, then returns a connection to it.
|
|
This is intended to be used in a "with" block. Upon exit, the database will be
|
|
dropped. A database name can be provided by 'db_name', a database by that name
|
|
must not exist prior to calling this method.
|
|
|
|
with __unique_conn() as conn:
|
|
# Use conn
|
|
# The database no longer exists and the conn is closed.
|
|
|
|
The returned connection will have a 'db_name' property.
|
|
|
|
DEPRECATED:
|
|
See the 'unique_database' fixture above to use Impala's custom python
|
|
API instead of DB-API.
|
|
"""
|
|
if not db_name:
|
|
db_name = choice(ascii_lowercase) + "".join(sample(ascii_lowercase + digits, 5))
|
|
with __auto_closed_conn(timeout=timeout) as conn:
|
|
with __auto_closed_cursor(conn) as cur:
|
|
cur.execute("CREATE DATABASE %s" % db_name)
|
|
with __auto_closed_conn(db_name=db_name, timeout=timeout) as conn:
|
|
try:
|
|
yield conn
|
|
finally:
|
|
try:
|
|
with __auto_closed_cursor(conn) as cur:
|
|
try:
|
|
cur.execute("USE DEFAULT")
|
|
cur.execute("DROP DATABASE IF EXISTS %s CASCADE" % db_name)
|
|
except Exception as e:
|
|
LOG.warn("Error dropping database: %s", e)
|
|
except Exception as e:
|
|
LOG.warn("Error creating a cursor: %s", e)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def __auto_closed_conn(db_name=None, timeout=DEFAULT_CONN_TIMEOUT):
|
|
"""Returns a connection to Impala. This is intended to be used in a "with" block.
|
|
The connection will be closed upon exiting the block.
|
|
|
|
The returned connection will have a 'db_name' property.
|
|
|
|
DEPRECATED:
|
|
See the 'unique_database' fixture above to use Impala's custom python
|
|
API instead of DB-API.
|
|
"""
|
|
default_impalad = pytest.config.option.impalad.split(',')[0]
|
|
impalad_host = default_impalad.split(':')[0]
|
|
hs2_port = pytest.config.option.impalad_hs2_port
|
|
|
|
conn = impala_connect(host=impalad_host, port=hs2_port, database=db_name,
|
|
timeout=timeout)
|
|
try:
|
|
conn.db_name = db_name
|
|
yield conn
|
|
finally:
|
|
try:
|
|
conn.close()
|
|
except Exception as e:
|
|
LOG.warn("Error closing Impala connection: %s", e)
|
|
|
|
|
|
@pytest.yield_fixture
|
|
def cursor(conn):
|
|
"""Provides a new DB-API compliant cursor from a connection provided by the conn()
|
|
fixture. The cursor only exists for the duration of the method it is used in.
|
|
|
|
The returned cursor will have a 'conn' property. The 'conn' will have a 'db_name'
|
|
property.
|
|
|
|
DEPRECATED:
|
|
See the 'unique_database' fixture above to use Impala's custom python
|
|
API instead of DB-API.
|
|
"""
|
|
with __auto_closed_cursor(conn) as cur:
|
|
yield cur
|
|
|
|
|
|
@pytest.yield_fixture(scope="class")
|
|
def cls_cursor(conn):
|
|
"""Provides a new DB-API compliant cursor from a connection provided by the conn()
|
|
fixture. The cursor exists for the duration of the class it is used in.
|
|
|
|
The returned cursor will have a 'conn' property. The 'conn' will have a 'db_name'
|
|
property.
|
|
|
|
DEPRECATED:
|
|
See the 'unique_database' fixture above to use Impala's custom python
|
|
API instead of DB-API.
|
|
"""
|
|
with __auto_closed_cursor(conn) as cur:
|
|
yield cur
|
|
|
|
|
|
@pytest.yield_fixture
|
|
def unique_cursor():
|
|
"""Provides a new DB-API compliant cursor to a newly created Impala database. The
|
|
cursor only exists for the duration of the method it is used in. The database will
|
|
be dropped after the test executes.
|
|
|
|
The returned cursor will have a 'conn' property. The 'conn' will have a 'db_name'
|
|
property.
|
|
|
|
DEPRECATED:
|
|
See the 'unique_database' fixture above to use Impala's custom python
|
|
API instead of DB-API.
|
|
"""
|
|
with __unique_conn() as conn:
|
|
with __auto_closed_cursor(conn) as cur:
|
|
yield cur
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def __auto_closed_cursor(conn):
|
|
"""Returns a cursor created from conn. This is intended to be used in a "with" block.
|
|
The cursor will be closed upon exiting the block.
|
|
"""
|
|
cursor = conn.cursor()
|
|
cursor.conn = conn
|
|
try:
|
|
yield cursor
|
|
finally:
|
|
try:
|
|
cursor.close()
|
|
except Exception as e:
|
|
LOG.warn("Error closing Impala cursor: %s", e)
|
|
|
|
|
|
@pytest.yield_fixture
|
|
def impala_testinfra_cursor():
|
|
"""
|
|
Return ImpalaCursor object. Used for "tests of tests" for the infra for the query
|
|
generator, stress test, etc.
|
|
"""
|
|
# This differs from the cursors above, which return direct Impyla cursors. Tests that
|
|
# use this fixture want to interact with the objects in
|
|
# tests.comparison.db_connection, which need testing.
|
|
with ImpalaConnection() as conn:
|
|
cursor = conn.cursor()
|
|
try:
|
|
yield cursor
|
|
finally:
|
|
cursor.close()
|
|
|
|
|
|
@pytest.fixture(autouse=True, scope='session')
|
|
def validate_pytest_config():
|
|
"""
|
|
Validate that pytest command line options make sense.
|
|
"""
|
|
if pytest.config.option.testing_remote_cluster:
|
|
local_prefixes = ('localhost', '127.', '0.0.0.0')
|
|
if any(pytest.config.option.impalad.startswith(loc) for loc in local_prefixes):
|
|
logging.error("--testing_remote_cluster can not be used with a local impalad")
|
|
pytest.exit("Invalid pytest config option: --testing_remote_cluster")
|
|
|
|
|
|
@pytest.yield_fixture(autouse=True, scope='session')
|
|
def cluster_properties():
|
|
"""Set up test cluster properties for the test session"""
|
|
# Don't import at top level to avoid circular dependency between conftest and
|
|
# tests.common.environ, which uses command-line flags set up by conftest.
|
|
from tests.common.environ import ImpalaTestClusterProperties
|
|
cluster_properties = ImpalaTestClusterProperties.get_instance()
|
|
yield cluster_properties
|
|
|
|
|
|
@pytest.fixture(autouse=True, scope='session')
|
|
def validate_python_version():
|
|
"""Check the Python runtime version before running any tests. Since Impala switched
|
|
to the toolchain Python, which is at least v2.7, the tests will not run on a version
|
|
below that.
|
|
"""
|
|
assert sys.version_info > (2, 7), "Tests only support Python 2.7+"
|
|
|
|
|
|
@pytest.hookimpl(trylast=True)
|
|
def pytest_collection_modifyitems(items, config, session):
|
|
"""Hook to handle --shard_tests command line option.
|
|
|
|
If set, this "deselects" a subset of tests, by hashing
|
|
their id into buckets.
|
|
"""
|
|
if not config.option.shard_tests:
|
|
return
|
|
|
|
num_items = len(items)
|
|
this_shard, num_shards = map(int, config.option.shard_tests.split("/"))
|
|
assert 0 <= this_shard <= num_shards
|
|
if this_shard == num_shards:
|
|
this_shard = 0
|
|
|
|
items_selected, items_deselected = [], []
|
|
for i in items:
|
|
if hash(i.nodeid) % num_shards == this_shard:
|
|
items_selected.append(i)
|
|
else:
|
|
items_deselected.append(i)
|
|
config.hook.pytest_deselected(items=items_deselected)
|
|
|
|
# We must modify the items list in place for it to take effect.
|
|
items[:] = items_selected
|
|
|
|
logging.info("pytest shard selection enabled %s. Of %d items, selected %d items by hash.",
|
|
config.option.shard_tests, num_items, len(items))
|
|
|
|
|
|
@pytest.hookimpl(trylast=True)
|
|
def pytest_runtest_logstart(nodeid, location):
|
|
# Beeswax doesn't support commas or equals in configuration, so they are replaced.
|
|
# Spaces are removed to make the string a little bit shorter.
|
|
# The string is shortened so that it is entirely spit out by ThriftDebugString, rather
|
|
# than being elided.
|
|
tests.common.current_node = \
|
|
nodeid.replace(",", ";").replace(" ", "").replace("=", "-")[0:255]
|