mirror of
https://github.com/apache/impala.git
synced 2026-01-06 15:01:43 -05:00
Always create global BufferPool at startup using 80% of memory and limit reservations to 80% of query memory (same as BufferedBlockMgr). The query's initial reservation is computed in the planner, claimed centrally (managed by the InitialReservations class) and distributed to query operators from there. min_spillable_buffer_size and default_spillable_buffer_size query options control the buffer size that the planner selects for spilling operators. Port ExecNodes to use BufferPool: * Each ExecNode has to claim its reservation during Open() * Port Sorter to use BufferPool. * Switch from BufferedTupleStream to BufferedTupleStreamV2 * Port HashTable to use BufferPool via a Suballocator. This also makes PAGG memory consumption more efficient (avoid wasting buffers) and improve the spilling algorithm: * Allow preaggs to execute with 0 reservation - if streams and hash tables cannot be allocated, it will pass through rows. * Halve the buffer requirement for spilling aggs - avoid allocating buffers for aggregated and unaggregated streams simultaneously. * Rebuild spilled partitions instead of repartitioning (IMPALA-2708) TODO in follow-up patches: * Rename BufferedTupleStreamV2 to BufferedTupleStream * Implement max_row_size query option. Testing: * Updated tests to reflect new memory requirements Change-Id: I7fc7fe1c04e9dfb1a0c749fb56a5e0f2bf9c6c3e Reviewed-on: http://gerrit.cloudera.org:8080/5801 Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com> Tested-by: Impala Public Jenkins
941 lines
39 KiB
Python
Executable File
941 lines
39 KiB
Python
Executable File
#!/usr/bin/env impala-python
|
|
|
|
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
'''This module will run random queries against existing databases and compare the
|
|
results.
|
|
|
|
'''
|
|
# TODO: IMPALA-4600: refactor this module
|
|
|
|
from copy import deepcopy
|
|
from decimal import Decimal
|
|
from itertools import izip
|
|
from logging import getLogger
|
|
from math import isinf, isnan
|
|
from os import getenv, symlink, unlink
|
|
from os.path import join as join_path
|
|
from random import choice, randint
|
|
from string import ascii_lowercase, digits
|
|
from subprocess import call
|
|
from tempfile import gettempdir
|
|
from threading import current_thread, Thread
|
|
from time import time
|
|
|
|
from tests.comparison.db_types import BigInt
|
|
from tests.comparison.db_connection import (
|
|
DbCursor,
|
|
IMPALA,
|
|
HIVE,
|
|
MYSQL,
|
|
ORACLE,
|
|
POSTGRESQL)
|
|
from tests.comparison.model_translator import SqlWriter
|
|
from tests.comparison.query import (
|
|
FromClause,
|
|
InsertClause,
|
|
InsertStatement,
|
|
Query,
|
|
StatementExecutionMode,
|
|
SelectClause,
|
|
SelectItem)
|
|
from tests.comparison.query_flattener import QueryFlattener
|
|
from tests.comparison.statement_generator import get_generator
|
|
from tests.comparison import db_connection
|
|
|
|
LOG = getLogger(__name__)
|
|
|
|
|
|
class QueryResultComparator(object):
|
|
'''Used for comparing the results of a Query across two databases'''
|
|
|
|
# Used when comparing FLOAT values
|
|
EPSILON = 0.1
|
|
|
|
# The DECIMAL values will be rounded before comparison
|
|
DECIMAL_PLACES = 2
|
|
|
|
def __init__(self, query_profile, ref_conn,
|
|
test_conn, query_timeout_seconds, flatten_dialect=None):
|
|
'''test/ref_conn arguments should be an instance of DbConnection'''
|
|
ref_cursor = ref_conn.cursor()
|
|
test_cursor = test_conn.cursor()
|
|
|
|
self.ref_conn = ref_conn
|
|
self.ref_sql_writer = SqlWriter.create(
|
|
dialect=ref_conn.db_type, nulls_order_asc=query_profile.nulls_order_asc())
|
|
self.test_conn = test_conn
|
|
self.test_sql_writer = SqlWriter.create(dialect=test_conn.db_type)
|
|
|
|
self.query_executor = QueryExecutor(
|
|
[ref_cursor, test_cursor],
|
|
[self.ref_sql_writer, self.test_sql_writer],
|
|
query_timeout_seconds=query_timeout_seconds,
|
|
flatten_dialect=flatten_dialect)
|
|
|
|
@property
|
|
def test_db_type(self):
|
|
return self.test_conn.db_type
|
|
|
|
@property
|
|
def ref_db_type(self):
|
|
return self.ref_conn.db_type
|
|
|
|
def compare_query_results(self, query):
|
|
'''Execute the query, compare the data, and return a ComparisonResult, which
|
|
summarizes the outcome.
|
|
'''
|
|
comparison_result = ComparisonResult(query, self.test_db_type, self.ref_db_type)
|
|
(ref_sql, ref_exception, ref_data_set, ref_cursor_description), (test_sql,
|
|
test_exception, test_data_set, test_cursor_description) = \
|
|
self.query_executor.fetch_query_results(query)
|
|
|
|
comparison_result.ref_sql = ref_sql
|
|
comparison_result.test_sql = test_sql
|
|
|
|
if ref_exception:
|
|
comparison_result.exception = ref_exception
|
|
error_message = str(ref_exception)
|
|
if 'Year is out of valid range: 1400..9999' in error_message:
|
|
# This comes from Postgresql. Overflow errors will be ignored.
|
|
comparison_result.exception = TypeOverflow(error_message)
|
|
LOG.debug('%s encountered an error running query: %s',
|
|
self.ref_conn.db_type, ref_exception, exc_info=True)
|
|
return comparison_result
|
|
|
|
if test_exception:
|
|
# "known errors" will be ignored
|
|
error_message = str(test_exception)
|
|
known_error = None
|
|
|
|
if self.test_db_type is db_connection.IMPALA:
|
|
if 'Expressions in the ORDER BY clause must not be constant' in error_message \
|
|
or 'Expressions in the PARTITION BY clause must not be consta' in error_message:
|
|
# It's too much work to avoid this bug. Just ignore it if it comes up.
|
|
known_error = KnownError('IMPALA-1354')
|
|
elif 'GROUP BY expression must not contain aggregate functions' in error_message \
|
|
or 'select list expression not produced by aggregation output' in error_message:
|
|
known_error = KnownError('IMPALA-1423')
|
|
elif ('max(' in error_message or 'min(' in error_message) \
|
|
and 'only supported with an UNBOUNDED PRECEDING start bound' in error_message:
|
|
# This analytic isn't supported and ignoring this here is much easier than not
|
|
# generating the query...
|
|
known_error = KnownError('MAX UNBOUNDED PRECISION')
|
|
elif 'IN and/or EXISTS subquery predicates are not supported in binary predicates' \
|
|
in error_message:
|
|
known_error = KnownError('IMPALA-1418')
|
|
elif 'Unsupported predicate with subquery' in error_message:
|
|
known_error = KnownError('IMPALA-1950')
|
|
elif 'RIGHT OUTER JOIN type with no equi-join' in error_message:
|
|
known_error = KnownError('IMPALA-3063')
|
|
elif 'Operation is in ERROR_STATE' in error_message:
|
|
known_error = KnownError('Mem limit exceeded')
|
|
elif self.test_db_type is db_connection.HIVE:
|
|
if 'ParseException line' in error_message and 'missing ) at' in \
|
|
error_message and query.select_clause and \
|
|
query.select_clause.analytic_items:
|
|
known_error = KnownError("HIVE-14871")
|
|
|
|
if known_error:
|
|
comparison_result.exception = known_error
|
|
else:
|
|
comparison_result.exception = test_exception
|
|
LOG.debug('%s encountered an error running query: %s',
|
|
self.test_conn.db_type, test_exception, exc_info=True)
|
|
return comparison_result
|
|
|
|
comparison_result.ref_row_count = len(ref_data_set)
|
|
comparison_result.test_row_count = len(test_data_set)
|
|
comparison_result.query_resulted_in_data = (comparison_result.test_row_count > 0 or
|
|
comparison_result.ref_row_count > 0)
|
|
if comparison_result.ref_row_count != comparison_result.test_row_count:
|
|
return comparison_result
|
|
|
|
# Standardize data (round FLOATs) in each column, and sort the data set
|
|
for data_set in (ref_data_set, test_data_set):
|
|
for row_idx, row in enumerate(data_set):
|
|
data_set[row_idx] = []
|
|
for col_idx, col in enumerate(row):
|
|
data_set[row_idx].append(self.standardize_data(col,
|
|
ref_cursor_description[col_idx], test_cursor_description[col_idx]))
|
|
# TODO: If the query has an ORDER BY clause, sorting should only be done within
|
|
# subsets of rows that have the same order by values.
|
|
data_set.sort(cmp=self.row_sort_cmp)
|
|
|
|
found_data = False # Will be set to True if the result contains non-zero/NULL data
|
|
for ref_row, test_row in izip(ref_data_set, test_data_set):
|
|
for col_idx, (ref_val, test_val) in enumerate(izip(ref_row, test_row)):
|
|
if ref_val or test_val: # Ignores zeros, ex "SELECT COUNT(*) ... WHERE FALSE"
|
|
found_data = True
|
|
if self.vals_are_equal(ref_val, test_val):
|
|
continue
|
|
if isinstance(test_val, int) \
|
|
and isinstance(ref_val, (int, float, Decimal)) \
|
|
and abs(ref_val) > BigInt.MAX:
|
|
# Impala will return incorrect results if the val is greater than max BigInt
|
|
comparison_result.exception = KnownError('IMPALA-865')
|
|
elif isinstance(test_val, float) \
|
|
and (isinf(test_val) or isnan(test_val)):
|
|
# In some cases, Impala gives NaNs and Infs instead of NULLs
|
|
comparison_result.exception = KnownError('IMPALA-724')
|
|
comparison_result.ref_row = ref_row
|
|
comparison_result.test_row = test_row
|
|
comparison_result.mismatch_at_row_number = row_idx + 1
|
|
comparison_result.mismatch_at_col_number = col_idx + 1
|
|
return comparison_result
|
|
comparison_result.query_resulted_in_data = found_data
|
|
|
|
# If we're here, it means ref and test data sets are equal for a DML statement.
|
|
if isinstance(query, (InsertStatement,)):
|
|
comparison_result.modified_rows_count = test_data_set[0][0]
|
|
|
|
return comparison_result
|
|
|
|
def standardize_data(self, data, ref_col_description, test_col_description):
|
|
'''Return a val that is suitable for comparison.'''
|
|
# For float data we need to round otherwise differences in precision will cause errors
|
|
if isinstance(data, float):
|
|
return round(data, self.DECIMAL_PLACES)
|
|
if isinstance(data, Decimal):
|
|
if ref_col_description[5] is not None and test_col_description[5] is not None:
|
|
return round(data, min(ref_col_description[5], test_col_description[5]))
|
|
return data
|
|
|
|
def row_sort_cmp(self, ref_row, test_row):
|
|
'''Comparison used for sorting. '''
|
|
for ref_val, test_val in izip(ref_row, test_row):
|
|
if ref_val is None and test_val is not None:
|
|
return -1
|
|
if ref_val is not None and test_val is None:
|
|
return 1
|
|
result = cmp(ref_val, test_val)
|
|
if result:
|
|
return result
|
|
return 0
|
|
|
|
def vals_are_equal(self, ref, test):
|
|
'''Compares if two values are equal in two cells. Floats are considered equal if the
|
|
difference between them is very small.'''
|
|
if ref == test:
|
|
return True
|
|
# For some reason Postgresql will return Decimals when using some aggregate
|
|
# functions such as AVG().
|
|
if isinstance(ref, (float, Decimal)) and isinstance(test, float):
|
|
return self.floats_are_equal(ref, test)
|
|
LOG.debug("Values differ, reference: %s (%s), test: %s (%s)",
|
|
ref, type(ref),
|
|
test, type(test))
|
|
return False
|
|
|
|
def floats_are_equal(self, ref, test):
|
|
'''Compare two floats.'''
|
|
ref = round(ref, self.DECIMAL_PLACES)
|
|
test = round(test, self.DECIMAL_PLACES)
|
|
diff = abs(ref - test)
|
|
if ref * test == 0:
|
|
return diff < self.EPSILON
|
|
result = diff / (abs(ref) + abs(test)) < self.EPSILON
|
|
if not result:
|
|
LOG.debug("Floats differ, diff: %s, |reference|: %s, |test|: %s",
|
|
diff, abs(ref), abs(test))
|
|
return result
|
|
|
|
|
|
class QueryExecutor(object):
|
|
'''Concurrently executes queries'''
|
|
|
|
# TODO: Set to false while IMPALA-3336 is a problem. Disabling random query options
|
|
# seems to reduce IMPALA-3336 occurances.
|
|
ENABLE_RANDOM_QUERY_OPTIONS = False
|
|
|
|
# If the number of rows * cols is greater than this val, then the comparison will
|
|
# be aborted. Raising this value also raises the risk of python being OOM killed. At
|
|
# 10M python would get OOM killed occasionally even on a physical machine with 32GB
|
|
# ram.
|
|
TOO_MUCH_DATA = 1000 * 1000
|
|
|
|
def __init__(self, cursors, sql_writers, query_timeout_seconds, flatten_dialect=None):
|
|
'''cursors should be a list of db_connector.Cursors.
|
|
|
|
sql_writers should be a list of model_translator.SqlWriters, with translators in
|
|
the same order as cursors in "cursors".
|
|
'''
|
|
self.query_timeout_seconds = query_timeout_seconds
|
|
self.cursors = cursors
|
|
self.sql_writers = sql_writers
|
|
self.query_logs = list()
|
|
# SQL dialect for which the queries should be flattened
|
|
self.flatten_dialect = flatten_dialect
|
|
|
|
for cursor in cursors:
|
|
# A list of all queries attempted
|
|
query_log_path = gettempdir() + '/test_query_log_%s_%s.sql' \
|
|
% (cursor.db_type.lower(), time())
|
|
self.query_logs.append(open(query_log_path, 'w'))
|
|
link = gettempdir() + '/test_query_log_%s.sql' % cursor.db_type.lower()
|
|
try:
|
|
unlink(link)
|
|
except OSError as e:
|
|
if 'No such file' not in str(e):
|
|
raise e
|
|
try:
|
|
symlink(query_log_path, link)
|
|
except OSError as e:
|
|
# TODO: Figure out what the error message is where there is a race condition
|
|
# and ignore it.
|
|
raise e
|
|
|
|
# In case the query will be executed as a "CREATE TABLE <name> AS ..." or
|
|
# "CREATE VIEW <name> AS ...", this will be the value of "<name>".
|
|
self._table_or_view_name = None
|
|
|
|
def set_impala_query_options(self, cursor):
|
|
opts = """
|
|
SET MEM_LIMIT={mem_limit};
|
|
SET BATCH_SIZE={batch_size};
|
|
SET DISABLE_CODEGEN={disable_codegen};
|
|
SET DISABLE_OUTERMOST_TOPN={disable_outermost_topn};
|
|
SET DISABLE_ROW_RUNTIME_FILTERING={disable_row_runtime_filtering};
|
|
SET DISABLE_STREAMING_PREAGGREGATIONS={disable_streaming_preaggregations};
|
|
SET DISABLE_UNSAFE_SPILLS={disable_unsafe_spills};
|
|
SET EXEC_SINGLE_NODE_ROWS_THRESHOLD={exec_single_node_rows_threshold};
|
|
SET BUFFER_POOL_LIMIT={buffer_pool_limit};
|
|
SET MAX_IO_BUFFERS={max_io_buffers};
|
|
SET MAX_SCAN_RANGE_LENGTH={max_scan_range_length};
|
|
SET NUM_NODES={num_nodes};
|
|
SET NUM_SCANNER_THREADS={num_scanner_threads};
|
|
SET OPTIMIZE_PARTITION_KEY_SCANS={optimize_partition_key_scans};
|
|
SET RUNTIME_BLOOM_FILTER_SIZE={runtime_bloom_filter_size};
|
|
SET RUNTIME_FILTER_MODE={runtime_filter_mode};
|
|
SET RUNTIME_FILTER_WAIT_TIME_MS={runtime_filter_wait_time_ms};
|
|
SET SCAN_NODE_CODEGEN_THRESHOLD={scan_node_codegen_threshold}""".format(
|
|
mem_limit=randint(1024 ** 3, 10 * 1024 ** 3),
|
|
batch_size=randint(1, 4096),
|
|
disable_codegen=choice((0, 1)),
|
|
disable_outermost_topn=choice((0, 1)),
|
|
disable_row_runtime_filtering=choice((0, 1)),
|
|
disable_streaming_preaggregations=choice((0, 1)),
|
|
disable_unsafe_spills=choice((0, 1)),
|
|
exec_single_node_rows_threshold=randint(1, 100000000),
|
|
buffer_pool_limit=randint(1, 100000000),
|
|
max_io_buffers=randint(1, 100000000),
|
|
max_scan_range_length=randint(1, 100000000),
|
|
num_nodes=randint(3, 3),
|
|
num_scanner_threads=randint(1, 100),
|
|
optimize_partition_key_scans=choice((0, 1)),
|
|
random_replica=choice((0, 1)),
|
|
replica_preference=choice(("CACHE_LOCAL", "DISK_LOCAL", "REMOTE")),
|
|
runtime_bloom_filter_size=randint(4096, 16777216),
|
|
runtime_filter_mode=choice(("OFF", "LOCAL", "GLOBAL")),
|
|
runtime_filter_wait_time_ms=randint(1, 100000000),
|
|
scan_node_codegen_threshold=randint(1, 100000000))
|
|
LOG.debug(opts)
|
|
for opt in opts.strip().split(";"):
|
|
cursor.execute(opt)
|
|
|
|
def fetch_query_results(self, query):
|
|
'''Concurrently execute the query using each cursor and return a list of tuples
|
|
containing the result information for each cursor. The tuple format is
|
|
(<exception or None>, <data set or None>).
|
|
|
|
If query_timeout_seconds is reached and the connection is killable then the
|
|
query will be cancelled and the connection reset. Otherwise the query will
|
|
continue to run in the background.
|
|
|
|
"query" should be an instance of query.Query.
|
|
'''
|
|
if query.execution in (StatementExecutionMode.CREATE_TABLE_AS,
|
|
StatementExecutionMode.CREATE_VIEW_AS):
|
|
self._table_or_view_name = self._create_random_table_name()
|
|
elif isinstance(query, (InsertStatement,)):
|
|
self._table_or_view_name = query.dml_table.name
|
|
|
|
query_threads = list()
|
|
for sql_writer, cursor, log_file in izip(
|
|
self.sql_writers, self.cursors, self.query_logs
|
|
):
|
|
if self.ENABLE_RANDOM_QUERY_OPTIONS and cursor.db_type == IMPALA:
|
|
self.set_impala_query_options(cursor)
|
|
query_thread = Thread(
|
|
target=self._fetch_sql_results,
|
|
args=[query, cursor, sql_writer, log_file],
|
|
name='{db_type}-exec-{id_}'.format(
|
|
db_type=cursor.db_type, id_=id(query)))
|
|
query_thread.daemon = True
|
|
query_thread.sql = ''
|
|
query_thread.data_set = None
|
|
query_thread.cursor_description = None
|
|
query_thread.exception = None
|
|
query_thread.start()
|
|
query_threads.append(query_thread)
|
|
|
|
end_time = time() + self.query_timeout_seconds
|
|
for query_thread, cursor in izip(query_threads, self.cursors):
|
|
join_time = end_time - time()
|
|
if join_time > 0:
|
|
query_thread.join(join_time)
|
|
if query_thread.is_alive():
|
|
# Kill connection and reconnect to return cursor to initial state.
|
|
if cursor.conn.supports_kill:
|
|
LOG.debug('Attempting to kill connection')
|
|
cursor.conn.kill()
|
|
LOG.debug('Kill connection')
|
|
try:
|
|
# TODO: Sometimes this takes a very long time causing the program to appear to
|
|
# hang. Maybe this should be done in another thread so a timeout can be
|
|
# applied?
|
|
cursor.close()
|
|
except Exception as e:
|
|
LOG.info('Error closing cursor: %s', e)
|
|
cursor.reconnect()
|
|
query_thread.exception = QueryTimeout(
|
|
'Query timed out after %s seconds' % self.query_timeout_seconds)
|
|
|
|
if (query.execution in (StatementExecutionMode.CREATE_TABLE_AS,
|
|
StatementExecutionMode.DML_TEST)):
|
|
cursor.drop_table(self._table_or_view_name)
|
|
elif query.execution == StatementExecutionMode.CREATE_VIEW_AS:
|
|
cursor.drop_view(self._table_or_view_name)
|
|
|
|
return [(query_thread.sql, query_thread.exception, query_thread.data_set,
|
|
query_thread.cursor_description) for query_thread in query_threads]
|
|
|
|
def _fetch_sql_results(self, query, cursor, sql_writer, log_file):
|
|
'''Execute the query using the cursor and set the result or exception on the local
|
|
thread.
|
|
'''
|
|
try:
|
|
log_file.write('/***** Start Query *****/\n')
|
|
if sql_writer.DIALECT == self.flatten_dialect:
|
|
# Converts the query model for the flattened version of the data. This is for
|
|
# testing of Impala nested types support.
|
|
query = deepcopy(query)
|
|
QueryFlattener().flatten(query)
|
|
if query.execution == StatementExecutionMode.CREATE_TABLE_AS:
|
|
setup_sql = sql_writer.write_create_table_as(query, self._table_or_view_name)
|
|
query_sql = 'SELECT * FROM ' + self._table_or_view_name
|
|
elif query.execution == StatementExecutionMode.CREATE_VIEW_AS:
|
|
setup_sql = sql_writer.write_create_view(query, self._table_or_view_name)
|
|
query_sql = 'SELECT * FROM ' + self._table_or_view_name
|
|
elif isinstance(query, (InsertStatement,)):
|
|
setup_sql = sql_writer.write_query(query)
|
|
# TODO: improve validation (IMPALA-4599). This is good enough for looking for
|
|
# crashes on DML statements
|
|
query_sql = 'SELECT COUNT(*) FROM ' + self._table_or_view_name
|
|
else:
|
|
setup_sql = None
|
|
query_sql = sql_writer.write_query(query)
|
|
if setup_sql:
|
|
LOG.debug("Executing on %s:\n%s", cursor.db_type, setup_sql)
|
|
current_thread().sql = setup_sql + ';\n'
|
|
log_file.write(setup_sql + ';\n')
|
|
log_file.flush()
|
|
cursor.execute(setup_sql)
|
|
LOG.debug("Executing on %s:\n%s", cursor.db_type, query_sql)
|
|
current_thread().sql += query_sql
|
|
log_file.write(query_sql + ';\n')
|
|
log_file.write('/***** End Query *****/\n')
|
|
log_file.flush()
|
|
cursor.execute(query_sql)
|
|
col_count = len(cursor.description)
|
|
batch_size = max(10000 / col_count, 1)
|
|
row_limit = self.TOO_MUCH_DATA / col_count
|
|
data_set = list()
|
|
current_thread().data_set = data_set
|
|
current_thread().cursor_description = cursor.description
|
|
LOG.debug("Fetching results from %s", cursor.db_type)
|
|
while True:
|
|
batch = cursor.fetchmany(batch_size)
|
|
data_set.extend(batch)
|
|
if len(batch) < batch_size:
|
|
if cursor.db_type == IMPALA:
|
|
impala_log = cursor.get_log()
|
|
if 'Expression overflowed, returning NULL' in impala_log:
|
|
raise TypeOverflow('Numeric overflow; data may not match')
|
|
break
|
|
if len(data_set) > row_limit:
|
|
raise DataLimitExceeded('Too much data')
|
|
if isinstance(query, (InsertStatement,)):
|
|
LOG.debug('Total row count for {0}: {1}'.format(
|
|
cursor.db_type, str(data_set)))
|
|
except Exception as e:
|
|
current_thread().exception = e
|
|
|
|
def _create_random_table_name(self):
|
|
char_choices = ascii_lowercase
|
|
chars = list()
|
|
for idx in xrange(4): # will result in ~1M combinations
|
|
if idx == 1:
|
|
char_choices += '_' + digits
|
|
chars.append(choice(char_choices))
|
|
return 'qgen_' + ''.join(chars)
|
|
|
|
|
|
class ComparisonResult(object):
|
|
'''Represents a result.'''
|
|
|
|
def __init__(self, query, test_db_type, ref_db_type):
|
|
self.query = query
|
|
self.test_db_type = test_db_type
|
|
self.ref_db_type = ref_db_type
|
|
self.ref_sql = None
|
|
self.test_sql = None
|
|
self.query_resulted_in_data = False
|
|
self.ref_row_count = None
|
|
self.test_row_count = None
|
|
self.mismatch_at_row_number = None
|
|
self.mismatch_at_col_number = None
|
|
self.ref_row = None # The test row where mismatch happened
|
|
self.test_row = None # The reference row where mismatch happened
|
|
self.exception = None
|
|
self.modified_rows_count = None
|
|
self._error_message = None
|
|
|
|
@property
|
|
def error(self):
|
|
if not self._error_message:
|
|
if self.exception:
|
|
self._error_message = str(self.exception)
|
|
elif (self.ref_row_count or self.test_row_count) and \
|
|
self.ref_row_count != self.test_row_count:
|
|
self._error_message = 'Row counts do not match: %s %s rows vs %s %s rows' \
|
|
% (self.test_row_count,
|
|
self.test_db_type,
|
|
self.ref_db_type,
|
|
self.ref_row_count)
|
|
elif self.mismatch_at_row_number is not None:
|
|
# Write a row like "[a, b, <<c>>, d]" where c is a bad value
|
|
test_row = '[' + ', '.join(
|
|
'<<' + str(val) + '>>' if idx == self.mismatch_at_col_number - 1 else str(val)
|
|
for idx, val in enumerate(self.test_row)
|
|
) + ']'
|
|
ref_row = '[' + ', '.join(
|
|
'<<' + str(val) + '>>' if idx == self.mismatch_at_col_number - 1 else str(val)
|
|
for idx, val in enumerate(self.ref_row)
|
|
) + ']'
|
|
self._error_message = \
|
|
'Column %s in row %s does not match: %s %s row vs %s %s row' \
|
|
% (self.mismatch_at_col_number,
|
|
self.mismatch_at_row_number,
|
|
test_row,
|
|
self.test_db_type,
|
|
ref_row,
|
|
self.ref_db_type)
|
|
return self._error_message
|
|
|
|
@property
|
|
def is_known_error(self):
|
|
return isinstance(self.exception, KnownError)
|
|
|
|
@property
|
|
def query_timed_out(self):
|
|
return isinstance(self.exception, QueryTimeout)
|
|
|
|
|
|
QueryTimeout = type('QueryTimeout', (Exception, ), {})
|
|
TypeOverflow = type('TypeOverflow', (Exception, ), {})
|
|
DataLimitExceeded = type('DataLimitExceeded', (Exception, ), {})
|
|
|
|
|
|
class KnownError(Exception):
|
|
|
|
def __init__(self, jira_url):
|
|
Exception.__init__(self, 'Known issue: ' + jira_url)
|
|
self.jira_url = jira_url
|
|
|
|
|
|
class FrontendExceptionSearcher(object):
|
|
|
|
def __init__(self, query_profile, ref_conn, test_conn):
|
|
'''query_profile should be an instance of one of the profiles in query_profile.py'''
|
|
self.query_profile = query_profile
|
|
self.ref_conn = ref_conn
|
|
self.test_conn = test_conn
|
|
self.ref_sql_writer = SqlWriter.create(dialect=ref_conn.db_type)
|
|
self.test_sql_writer = SqlWriter.create(dialect=test_conn.db_type)
|
|
with ref_conn.cursor() as ref_cursor:
|
|
with test_conn.cursor() as test_cursor:
|
|
self.common_tables = DbCursor.describe_common_tables([ref_cursor, test_cursor])
|
|
if not self.common_tables:
|
|
raise Exception("Unable to find a common set of tables in both databases")
|
|
|
|
def search(self, number_of_test_queries):
|
|
|
|
def on_ref_db_error(e, sql):
|
|
LOG.warn("Error generating explain plan for reference db:\n%s\n%s" % (e, sql))
|
|
|
|
def on_test_db_error(e, sql):
|
|
LOG.error("Error generating explain plan for test db:\n%s" % sql)
|
|
raise e
|
|
|
|
for idx in xrange(number_of_test_queries):
|
|
LOG.info("Explaining query #%s" % (idx + 1))
|
|
statement_type = self.query_profile.choose_statement()
|
|
statement_generator = get_generator(statement_type)(self.query_profile)
|
|
if issubclass(statement_type, (InsertStatement,)):
|
|
dml_table = self.query_profile.choose_table(self.common_tables)
|
|
else:
|
|
dml_table = None
|
|
query = statement_generator.generate_statement(
|
|
self.common_tables, dml_table=dml_table)
|
|
if not self._explain_query(self.ref_conn, self.ref_sql_writer, query,
|
|
on_ref_db_error):
|
|
continue
|
|
self._explain_query(self.test_conn, self.test_sql_writer, query,
|
|
on_test_db_error)
|
|
|
|
def _explain_query(self, conn, writer, query, exception_handler):
|
|
sql = writer.write_query(query)
|
|
try:
|
|
with conn.cursor() as cursor:
|
|
cursor.execute("EXPLAIN %s" % sql)
|
|
return True
|
|
except Exception as e:
|
|
exception_handler(e, sql)
|
|
return False
|
|
|
|
|
|
class QueryResultDiffSearcher(object):
|
|
'''This class uses the query generator (query_generator.py) along with the
|
|
query profile (query_profile.py) to randomly generate queries then executes the
|
|
queries on the reference and test databases, then compares the results.
|
|
'''
|
|
|
|
# Sometimes things get into a bad state and the same error loops forever
|
|
ABORT_ON_REPEAT_ERROR_COUNT = 2
|
|
|
|
COPY_TABLE_SUFFIX = '__qgen_copy'
|
|
|
|
def __init__(self, query_profile, ref_conn, test_conn):
|
|
'''query_profile should be an instance of one of the profiles in query_profile.py'''
|
|
self.query_profile = query_profile
|
|
self.ref_conn = ref_conn
|
|
self.test_conn = test_conn
|
|
with ref_conn.cursor() as ref_cursor:
|
|
with test_conn.cursor() as test_cursor:
|
|
self.common_tables = DbCursor.describe_common_tables([ref_cursor, test_cursor])
|
|
if not self.common_tables:
|
|
raise Exception("Unable to find a common set of tables in both databases")
|
|
|
|
def _concurrently_copy_table(self, src_table):
|
|
"""
|
|
Given a Table object, create another Table with the same schema and return the new
|
|
Table object. The schema will be created in both the test and reference databases.
|
|
|
|
The data is then copied in both the ref and test databases using threads.
|
|
"""
|
|
with test_conn.cursor() as test_cursor:
|
|
test_cursor.execute('SHOW CREATE TABLE {0}'.format(src_table.name))
|
|
(create_table_sql,) = test_cursor.fetchall()[0]
|
|
new_table_name = src_table.name + self.COPY_TABLE_SUFFIX
|
|
create_table_sql = create_table_sql.replace(src_table.name, new_table_name, 1)
|
|
test_cursor.drop_table(new_table_name)
|
|
test_cursor.execute(create_table_sql)
|
|
new_table = test_cursor.describe_table(new_table_name)
|
|
with ref_conn.cursor() as ref_cursor:
|
|
ref_cursor.drop_table(new_table_name)
|
|
ref_cursor.create_table(new_table)
|
|
|
|
copy_select_query = Query()
|
|
copy_select_query.select_clause = SelectClause(
|
|
[SelectItem(col) for col in src_table.cols])
|
|
copy_select_query.from_clause = FromClause(src_table)
|
|
|
|
if new_table.primary_keys:
|
|
conflict_action = InsertClause.CONFLICT_ACTION_IGNORE
|
|
else:
|
|
conflict_action = InsertClause.CONFLICT_ACTION_DEFAULT
|
|
|
|
table_copy_statement = InsertStatement(
|
|
insert_clause=InsertClause(new_table, conflict_action=conflict_action),
|
|
select_query=copy_select_query, execution=StatementExecutionMode.DML_SETUP)
|
|
|
|
result = self.query_result_comparator.compare_query_results(table_copy_statement)
|
|
if result.error:
|
|
raise Exception('setup SQL to copy table failed: {0}'.format(result.error))
|
|
self._dml_table_size = result.modified_rows_count
|
|
|
|
return new_table
|
|
|
|
def search(self, number_of_test_queries, stop_on_result_mismatch, stop_on_crash,
|
|
query_timeout_seconds):
|
|
'''Returns an instance of SearchResults, which is a summary report. This method
|
|
oversees the generation, execution, and comparison of queries.
|
|
|
|
number_of_test_queries should an integer indicating the maximum number of queries
|
|
to generate and execute.
|
|
'''
|
|
start_time = time()
|
|
self.query_result_comparator = QueryResultComparator(
|
|
self.query_profile, self.ref_conn, self.test_conn, query_timeout_seconds)
|
|
query_count = 0
|
|
queries_resulted_in_data_count = 0
|
|
mismatch_count = 0
|
|
query_timeout_count = 0
|
|
known_error_count = 0
|
|
test_crash_count = 0
|
|
last_error = None
|
|
repeat_error_count = 0
|
|
count_effective_dml_statements = 0
|
|
count_rows_affected_by_dml = 0
|
|
|
|
while number_of_test_queries > query_count:
|
|
statement_type = self.query_profile.choose_statement()
|
|
statement_generator = get_generator(statement_type)(self.query_profile)
|
|
dml_table = None
|
|
if issubclass(statement_type, (InsertStatement,)):
|
|
dml_choice_src_table = self.query_profile.choose_table(self.common_tables)
|
|
# Copy the table we want to INSERT/UPSERT INTO. Do this for the following reasons:
|
|
#
|
|
# 1. If we don't copy, the tables will get larger and larger
|
|
# 2. If we want to avoid tables getting larger and larger, we have to come up
|
|
# with some threshold about when to cut and start over.
|
|
# 3. If we keep INSERT/UPSERTing into tables and finally find a crash, we have to
|
|
# replay all previous INSERT/UPSERTs again. Those INSERTs may not produce the
|
|
# same rows as before. To maximize the chance of bug reproduction, run every
|
|
# INSERT/UPSERT on a pristine table.
|
|
dml_table = self._concurrently_copy_table(dml_choice_src_table)
|
|
statement = statement_generator.generate_statement(
|
|
self.common_tables, dml_table=dml_table)
|
|
if isinstance(statement, Query):
|
|
# we can re-write statement execution here to possibly be a CREATE TABLE AS SELECT
|
|
# or CREATE VIEW AS SELECT
|
|
statement.execution = self.query_profile.get_query_execution()
|
|
query_count += 1
|
|
LOG.info('Running query #%s', query_count)
|
|
result = self.query_result_comparator.compare_query_results(statement)
|
|
if result.query_resulted_in_data:
|
|
queries_resulted_in_data_count += 1
|
|
if result.modified_rows_count:
|
|
count_effective_dml_statements += 1
|
|
count_rows_affected_by_dml += abs(
|
|
result.modified_rows_count - self._dml_table_size)
|
|
if isinstance(result.exception, DataLimitExceeded) \
|
|
or isinstance(result.exception, TypeOverflow):
|
|
continue
|
|
if result.error:
|
|
# TODO: These first two come from psycopg2, the postgres driver. Maybe we should
|
|
# try a different driver? Or maybe the usage of the driver isn't correct.
|
|
# Anyhow ignore these failures.
|
|
if 'division by zero' in result.error \
|
|
or 'out of range' in result.error:
|
|
LOG.debug('Ignoring error: %s', result.error)
|
|
query_count -= 1
|
|
continue
|
|
|
|
if result.is_known_error:
|
|
known_error_count += 1
|
|
elif result.query_timed_out:
|
|
query_timeout_count += 1
|
|
else:
|
|
mismatch_count += 1
|
|
|
|
print('---Test Query---\n')
|
|
print(result.test_sql + '\n')
|
|
print('---Reference Query---\n')
|
|
print(result.ref_sql + '\n')
|
|
print('---Error---\n')
|
|
print(result.error + '\n')
|
|
print('------\n')
|
|
|
|
if 'Could not connect' in result.error \
|
|
or "Couldn't open transport for" in result.error:
|
|
if stop_on_crash:
|
|
break
|
|
# Assume Impala crashed and try restarting
|
|
test_crash_count += 1
|
|
LOG.info('Restarting Impala')
|
|
impalad_args = [
|
|
'-convert_legacy_hive_parquet_utc_timestamps=true',
|
|
]
|
|
impala_restart_cmd = [
|
|
join_path(getenv('IMPALA_HOME'), 'bin/start-impala-cluster.py'),
|
|
'--log_dir={0}'.format(getenv('LOG_DIR', "/tmp/")),
|
|
'--impalad_args="{0}"'.format(' '.join(impalad_args)),
|
|
]
|
|
call(impala_restart_cmd)
|
|
self.test_conn.reconnect()
|
|
self.query_result_comparator.test_cursor = self.test_conn.cursor()
|
|
result = self.query_result_comparator.compare_query_results(statement)
|
|
if result.error:
|
|
LOG.info('Restarting Impala')
|
|
call(impala_restart_cmd)
|
|
self.test_conn.reconnect()
|
|
self.query_result_comparator.test_cursor = self.test_conn.cursor()
|
|
else:
|
|
break
|
|
|
|
if stop_on_result_mismatch and \
|
|
not (result.is_known_error or result.query_timed_out):
|
|
break
|
|
|
|
if last_error == result.error \
|
|
and not (result.is_known_error or result.query_timed_out):
|
|
repeat_error_count += 1
|
|
if repeat_error_count == self.ABORT_ON_REPEAT_ERROR_COUNT:
|
|
break
|
|
else:
|
|
last_error = result.error
|
|
repeat_error_count = 0
|
|
else:
|
|
if result.query_resulted_in_data:
|
|
LOG.info('Results matched (%s rows)', result.test_row_count)
|
|
else:
|
|
LOG.info('Query did not produce meaningful data')
|
|
last_error = None
|
|
repeat_error_count = 0
|
|
|
|
return SearchResults(
|
|
query_count,
|
|
queries_resulted_in_data_count,
|
|
mismatch_count,
|
|
query_timeout_count,
|
|
known_error_count,
|
|
test_crash_count,
|
|
time() - start_time,
|
|
count_effective_dml_statements,
|
|
count_rows_affected_by_dml)
|
|
|
|
|
|
class SearchResults(object):
|
|
'''This class holds information about the outcome of a search run.'''
|
|
|
|
def __init__(self,
|
|
query_count,
|
|
queries_resulted_in_data_count,
|
|
mismatch_count,
|
|
query_timeout_count,
|
|
known_error_count,
|
|
test_crash_count,
|
|
run_time_in_seconds,
|
|
count_effective_dml_statements,
|
|
count_rows_affected_by_dml
|
|
):
|
|
# Approx number of queries run, some queries may have been ignored
|
|
self.query_count = query_count
|
|
self.queries_resulted_in_data_count = queries_resulted_in_data_count
|
|
# Number of queries that had an error or result mismatch
|
|
self.mismatch_count = mismatch_count
|
|
self.query_timeout_count = query_timeout_count
|
|
self.known_error_count = known_error_count
|
|
self.test_crash_count = test_crash_count
|
|
self.run_time_in_seconds = run_time_in_seconds
|
|
# number of DML statements that actually modified tables
|
|
self.count_effective_dml_statements = count_effective_dml_statements
|
|
# total number of rows modified by DML statemnts
|
|
self.count_rows_affected_by_dml = count_rows_affected_by_dml
|
|
|
|
def __str__(self):
|
|
'''Returns the string representation of the results.'''
|
|
mins, secs = divmod(self.run_time_in_seconds, 60)
|
|
hours, mins = divmod(mins, 60)
|
|
hours = int(hours)
|
|
mins = int(mins)
|
|
if hours:
|
|
run_time = '%s hour and %s minutes' % (hours, mins)
|
|
else:
|
|
secs = int(secs)
|
|
run_time = '%s seconds' % secs
|
|
if mins:
|
|
run_time = '%s mins and ' % mins + run_time
|
|
summary_params = self.__dict__
|
|
summary_params['run_time'] = run_time
|
|
return (
|
|
'%(mismatch_count)s mismatches found after running %(query_count)s queries in '
|
|
'%(run_time)s.\n'
|
|
'%(queries_resulted_in_data_count)s of %(query_count)s queries produced results.'
|
|
'\n'
|
|
'%(count_effective_dml_statements)s of %(query_count)s statements modified a '
|
|
'total of %(count_rows_affected_by_dml)s rows\n'
|
|
'%(test_crash_count)s crashes occurred.\n'
|
|
'%(known_error_count)s queries were excluded from the mismatch count because '
|
|
'they are known errors.\n'
|
|
'%(query_timeout_count)s queries timed out and were excluded from all counts.') \
|
|
% summary_params
|
|
|
|
|
|
if __name__ == '__main__':
|
|
import sys
|
|
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
|
|
|
|
from tests.comparison import cli_options
|
|
from tests.comparison.query_profile import PROFILES
|
|
|
|
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
|
|
cli_options.add_logging_options(parser)
|
|
cli_options.add_db_name_option(parser)
|
|
cli_options.add_cluster_options(parser)
|
|
cli_options.add_connection_option_groups(parser)
|
|
cli_options.add_timeout_option(parser)
|
|
|
|
parser.add_argument('--test-db-type', default=IMPALA,
|
|
choices=(HIVE, IMPALA, MYSQL, ORACLE, POSTGRESQL),
|
|
help='The type of the test database to use. Ex: IMPALA.')
|
|
parser.add_argument('--ref-db-type', default=POSTGRESQL,
|
|
choices=(MYSQL, ORACLE, POSTGRESQL),
|
|
help='The type of the ref database to use. Ex: POSTGRESQL.')
|
|
parser.add_argument('--stop-on-mismatch', default=False, action='store_true',
|
|
help='Exit immediately upon find a discrepancy in a query result.')
|
|
parser.add_argument('--stop-on-crash', default=False, action='store_true',
|
|
help='Exit immediately if Impala crashes.')
|
|
parser.add_argument('--query-count', default=1000000, type=int,
|
|
help='Exit after running the given number of queries.')
|
|
parser.add_argument('--exclude-types', default='',
|
|
help='A comma separated list of data types to exclude while generating queries.')
|
|
parser.add_argument('--explain-only', action='store_true',
|
|
help="Don't run the queries only explain them to see if there was an error in "
|
|
"planning.")
|
|
profiles = dict()
|
|
for profile in PROFILES:
|
|
profile_name = profile.__name__
|
|
if profile_name.endswith('Profile'):
|
|
profile_name = profile_name[:-1 * len('Profile')]
|
|
profiles[profile_name.lower()] = profile
|
|
parser.add_argument('--profile', default='default',
|
|
choices=(sorted(profiles.keys())),
|
|
help='Determines the mix of SQL features to use during query generation.')
|
|
# TODO: Seed the random query generator for repeatable queries?
|
|
|
|
args = parser.parse_args()
|
|
cli_options.configure_logging(
|
|
args.log_level, debug_log_file=args.debug_log_file, log_thread_name=True)
|
|
cluster = cli_options.create_cluster(args)
|
|
|
|
ref_conn = cli_options.create_connection(args, args.ref_db_type, db_name=args.db_name)
|
|
if args.test_db_type == IMPALA:
|
|
test_conn = cluster.impala.connect(db_name=args.db_name)
|
|
elif args.test_db_type == HIVE:
|
|
test_conn = cluster.hive.connect(db_name=args.db_name)
|
|
else:
|
|
test_conn = cli_options.create_connection(
|
|
args, args.test_db_type, db_name=args.db_name)
|
|
# Create an instance of profile class (e.g. DefaultProfile)
|
|
query_profile = profiles[args.profile]()
|
|
if args.explain_only:
|
|
searcher = FrontendExceptionSearcher(query_profile, ref_conn, test_conn)
|
|
searcher.search(args.query_count)
|
|
else:
|
|
diff_searcher = QueryResultDiffSearcher(query_profile, ref_conn, test_conn)
|
|
query_timeout_seconds = args.timeout
|
|
search_results = diff_searcher.search(
|
|
args.query_count, args.stop_on_mismatch, args.stop_on_crash,
|
|
query_timeout_seconds)
|
|
print(search_results)
|
|
sys.exit(search_results.mismatch_count)
|