diff --git a/tests/beeswax/impala_beeswax.py b/tests/beeswax/impala_beeswax.py index 37159f77f..f432cc6fd 100755 --- a/tests/beeswax/impala_beeswax.py +++ b/tests/beeswax/impala_beeswax.py @@ -17,10 +17,16 @@ import shlex import traceback import getpass import re +import prettytable from beeswaxd import BeeswaxService from beeswaxd.BeeswaxService import QueryState from datetime import datetime +try: + # If Exec Summary is not implemented in Impala, this cannot be imported + from ExecStats.ttypes import TExecStats +except ImportError: + pass from ImpalaService import ImpalaService from ImpalaService.ImpalaService import TImpalaQueryOptions, TResetTableReq from tests.util.thrift_util import create_transport @@ -57,6 +63,7 @@ class ImpalaBeeswaxResult(object): self.summary = kwargs.get('summary', str()) self.schema = kwargs.get('schema', None) self.runtime_profile = kwargs.get('runtime_profile', str()) + self.exec_summary = kwargs.get('exec_summary', None) def get_data(self): return self.__format_data() @@ -154,7 +161,7 @@ class ImpalaBeeswaxClient(object): start = time.time() start_time = datetime.now() handle = self.__execute_query(query_string.strip()) - result = self.fetch_results(query_string, handle) + result = self.fetch_results(query_string, handle) result.time_taken = time.time() - start result.start_time = start_time # Don't include the time it takes to get the runtime profile in the execution time @@ -164,8 +171,125 @@ class ImpalaBeeswaxClient(object): # the handle twice. if self.__get_query_type(query_string) != 'insert': self.close_query(handle) + result.exec_summary = self.get_exec_summary(handle) return result + def get_exec_summary(self, handle): + """Calls GetExecSummary() for the last query handle""" + try: + summary = self.__do_rpc(lambda: self.imp_service.GetExecSummary(handle)) + except ImpalaBeeswaxException: + summary = None + + if summary is None or summary.nodes is None: + return None + # If exec summary is not implemented in Impala, this function returns, so we do not + # get the function __build_summary_table which requires TExecStats to be imported. + + output = [] + self.__build_summary_table(summary, 0, False, 0, output) + return output + + def __build_summary_table(self, summary, idx, is_fragment_root, indent_level, output): + """NOTE: This was taken impala_shell.py. This method will be a placed in a library + that is shared between impala_shell and this file. + + Direct translation of Coordinator::PrintExecSummary() to recursively build a list + of rows of summary statistics, one per exec node + + summary: the TExecSummary object that contains all the summary data + + idx: the index of the node to print + + is_fragment_root: true if the node to print is the root of a fragment (and therefore + feeds into an exchange) + + indent_level: the number of spaces to print before writing the node's label, to give + the appearance of a tree. The 0th child of a node has the same indent_level as its + parent. All other children have an indent_level of one greater than their parent. + + output: the list of rows into which to append the rows produced for this node and its + children. + + Returns the index of the next exec node in summary.exec_nodes that should be + processed, used internally to this method only. + """ + attrs = ["latency_ns", "cpu_time_ns", "cardinality", "memory_used"] + + # Initialise aggregate and maximum stats + agg_stats, max_stats = TExecStats(), TExecStats() + for attr in attrs: + setattr(agg_stats, attr, 0) + setattr(max_stats, attr, 0) + + node = summary.nodes[idx] + for stats in node.exec_stats: + for attr in attrs: + val = getattr(stats, attr) + if val is not None: + setattr(agg_stats, attr, getattr(agg_stats, attr) + val) + setattr(max_stats, attr, max(getattr(max_stats, attr), val)) + + if len(node.exec_stats) > 0: + avg_time = agg_stats.latency_ns / len(node.exec_stats) + else: + avg_time = 0 + + # If the node is a broadcast-receiving exchange node, the cardinality of rows produced + # is the max over all instances (which should all have received the same number of + # rows). Otherwise, the cardinality is the sum over all instances which process + # disjoint partitions. + if node.is_broadcast and is_fragment_root: + cardinality = max_stats.cardinality + else: + cardinality = agg_stats.cardinality + + est_stats = node.estimated_stats + + label_prefix = "" + if indent_level > 0: + label_prefix = "|" + if is_fragment_root: + label_prefix += " " * indent_level + else: + label_prefix += "--" * indent_level + + row = {} + row["prefix"] = label_prefix + row["operator"] = node.label + row["num_hosts"] = len(node.exec_stats) + row["avg_time"] = avg_time + row["max_time"] = max_stats.latency_ns + row["num_rows"] = cardinality + row["est_num_rows"] = est_stats.cardinality + row["peak_mem"] = max_stats.memory_used + row["est_peak_mem"] = est_stats.memory_used + row["detail"] = node.label_detail + output.append(row) + + try: + sender_idx = summary.exch_to_sender_map[idx] + # This is an exchange node, so the sender is a fragment root, and should be printed + # next. + self.__build_summary_table(summary, sender_idx, True, indent_level, output) + except (KeyError, TypeError): + # Fall through if idx not in map, or if exch_to_sender_map itself is not set + pass + + idx += 1 + if node.num_children > 0: + first_child_output = [] + idx = \ + self.__build_summary_table(summary, idx, False, indent_level, first_child_output) + for child_idx in xrange(1, node.num_children): + # All other children are indented (we only have 0, 1 or 2 children for every exec + # node at the moment) + idx = self.__build_summary_table(summary, idx, False, indent_level + 1, output) + output += first_child_output + return idx + + + def get_runtime_profile(self, handle): return self.__do_rpc(lambda: self.imp_service.GetRuntimeProfile(handle)) diff --git a/tests/benchmark/perf_result_datastore.py b/tests/benchmark/perf_result_datastore.py index 20370919c..ebf47d012 100755 --- a/tests/benchmark/perf_result_datastore.py +++ b/tests/benchmark/perf_result_datastore.py @@ -20,9 +20,9 @@ class PerfResultDataStore(object): (username, password, host, database_name) self.connection = MySQLdb.connect(host, username, password, database_name) - def get_file_format_id(self, file_format, compression): + def get_file_format_id(self, file_format, compression_codec, compression_type): """ Gets the file_format_id for the fiven file_format/compression codec""" - return self.__get_file_format_id(file_format, compression) + return self.__get_file_format_id(file_format, compression_codec, compression_type) def get_query_id(self, query_name, query): """ Gets the query_id for the given query name and query text """ @@ -70,12 +70,8 @@ class PerfResultDataStore(object): # Internal methods @cursor_wrapper - def __get_file_format_id(self, file_format, compression, cursor): + def __get_file_format_id(self, file_format, compression_codec, compression_type, cursor): """ Gets the file_format_id for the fiven file_format/compression codec""" - if compression == 'none': - compression_codec, compression_type = ['none', 'none'] - else: - compression_codec, compression_type = compression.split('/') result = cursor.execute("select file_type_id from FileType where format=%s and "\ "compression_codec=%s and compression_type=%s", (file_format, compression_codec, compression_type)) diff --git a/tests/benchmark/report-benchmark-results.py b/tests/benchmark/report-benchmark-results.py index 24debd3fa..83dce8345 100755 --- a/tests/benchmark/report-benchmark-results.py +++ b/tests/benchmark/report-benchmark-results.py @@ -1,444 +1,791 @@ #!/usr/bin/env python -# Copyright (c) 2012 Cloudera, Inc. All rights reserved. +# Copyright (c) 2014 Cloudera, Inc. All rights reserved. # # This script provides help with parsing and reporting of perf results. It currently # provides three main capabilities: # 1) Printing perf results to console in 'pretty' format # 2) Comparing two perf result sets together and displaying comparison results to console # 3) Outputting the perf results in JUnit format which is useful for plugging in to -# Jenkins perf reporting. -# -# The input to this script is a benchmark result CSV file which should be generated using -# the 'run-workload.py' script. The input CSV file has the format: -# ||||| -# ||| -# -# TODO: Minimize the logic in this script so it doesn't get any more complex. Additional -# reporting will be enabled when perf results are stored in a database as well as CSV -# files. -import csv +# Jenkins perf reporting. + +# By default in Python if you divide an int by another int (5 / 2), the result will also +# be an int (2). The following line changes this behavior so that float will be returned +# if necessary (2.5). +from __future__ import division + import difflib +import json import math import os -import re -import sys -import texttable +import prettytable +from collections import defaultdict from datetime import date, datetime -from itertools import groupby from optparse import OptionParser -from tests.util.calculation_util import calculate_geomean, calculate_tval +from tests.util.calculation_util import calculate_tval, calculate_avg, calculate_stddev +from time import gmtime, strftime + +# String constants +AVG = 'avg' +AVG_TIME = 'avg_time' +AVG_TIME_CHANGE = 'avg_time_change' +AVG_TIME_CHANGE_TOTAL = 'avg_time_change_total' +CLIENT_NAME = 'client_name' +COMPRESSION_CODEC = 'compression_codec' +COMPRESSION_TYPE = 'compression_type' +DETAIL = 'detail' +EST_NUM_ROWS = 'est_num_rows' +EST_PEAK_MEM = 'est_peak_mem' +EXECUTOR_NAME = 'executor_name' +EXEC_SUMMARY = 'exec_summary' +FILE_FORMAT = 'file_format' +ITERATIONS = 'iterations' +MAX_TIME = 'max_time' +MAX_TIME_CHANGE = 'max_time_change' +NAME = 'name' +NUM_CLIENTS = 'num_clients' +NUM_HOSTS = 'num_hosts' +NUM_ROWS = 'num_rows' +OPERATOR = 'operator' +PEAK_MEM = 'peak_mem' +PEAK_MEM_CHANGE = 'peak_mem_change' +PREFIX = 'prefix' +QUERY = 'query' +QUERY_STR = 'query_str' +RESULT_LIST = 'result_list' +RUNTIME_PROFILE = 'runtime_profile' +SCALE_FACTOR = 'scale_factor' +STDDEV = 'stddev' +STDDEV_TIME = 'stddev_time' +TEST_VECTOR = 'test_vector' +TIME_TAKEN = 'time_taken' +TOTAL = 'total' +WORKLOAD_NAME = 'workload_name' parser = OptionParser() parser.add_option("--input_result_file", dest="result_file", - default=os.environ['IMPALA_HOME'] + '/benchmark_results.csv', - help="The input CSV file with benchmark results") + default=os.environ['IMPALA_HOME'] + '/benchmark_results.json', + help="The input JSON file with benchmark results") parser.add_option("--reference_result_file", dest="reference_result_file", - default=os.environ['IMPALA_HOME'] + '/reference_benchmark_results.csv', - help="The input CSV file with reference benchmark results") -parser.add_option("--hive_result_file", dest="hive_result_file", - default=os.environ['IMPALA_HOME'] + '/hive_benchmark_results.csv', - help="The input CSV file with the hive reference benchmark results") + default=os.environ['IMPALA_HOME'] + '/reference_benchmark_results.json', + help="The input JSON file with reference benchmark results") parser.add_option("--junit_output_file", dest="junit_output_file", default='', - help='If set, outputs results in Junit format to the specified file') + help='If set, outputs results in Junit format to the specified file') parser.add_option("--no_output_table", dest="no_output_table", action="store_true", - default= False, help='Outputs results in table format to the console') + default= False, help='Outputs results in table format to the console') parser.add_option("--report_description", dest="report_description", default=None, - help='Optional description for the report.') + help='Optional description for the report.') parser.add_option("--cluster_name", dest="cluster_name", default='UNKNOWN', - help="Name of the cluster the results are from (ex. Bolt)") + help="Name of the cluster the results are from (ex. Bolt)") parser.add_option("--verbose", "-v", dest="verbose", action="store_true", - default= False, help='Outputs to console with with increased verbosity') + default= False, help='Outputs to console with with increased verbosity') parser.add_option("--build_version", dest="build_version", default='UNKNOWN', - help="Build/version info about the Impalad instance results are from.") + help="Build/version info about the Impalad instance results are from.") parser.add_option("--lab_run_info", dest="lab_run_info", default='UNKNOWN', - help="Information about the lab run (name/id) that published "\ - "the results.") + help="Information about the lab run (name/id) that published "\ + "the results.") parser.add_option("--tval_threshold", dest="tval_threshold", default=None, - type="float", help="The ttest t-value at which a performance change "\ - "will be flagged as sigificant.") + type="float", help="The ttest t-value at which a performance change "\ + "will be flagged as sigificant.") parser.add_option("--min_percent_change_threshold", - dest="min_percent_change_threshold", default=5.0, - type="float", help="Any performance changes below this threshold" \ - " will not be classified as significant. If the user specifies an" \ - " empty value, the threshold will be set to 0") + dest="min_percent_change_threshold", default=5.0, + type="float", help="Any performance changes below this threshold" \ + " will not be classified as significant. If the user specifies an" \ + " empty value, the threshold will be set to 0") parser.add_option("--max_percent_change_threshold", - dest="max_percent_change_threshold", default=20.0, - type="float", help="Any performance changes above this threshold"\ - " will be classified as significant. If the user specifies an" \ - " empty value, the threshold will be set to the system's maxint") + dest="max_percent_change_threshold", default=20.0, + type="float", help="Any performance changes above this threshold"\ + " will be classified as significant. If the user specifies an" \ + " empty value, the threshold will be set to the system's maxint") parser.add_option("--allowed_latency_diff_secs", - dest="allowed_latency_diff_secs", default=0.0, type="float", - help="If specified, only a timing change that differs by more than\ - this value will be considered significant.") + dest="allowed_latency_diff_secs", default=0.0, type="float", + help="If specified, only a timing change that differs by more than\ + this value will be considered significant.") # These parameters are specific to recording results in a database. This is optional parser.add_option("--save_to_db", dest="save_to_db", action="store_true", - default= False, help='Saves results to the specified database.') + default= False, help='Saves results to the specified database.') parser.add_option("--is_official", dest="is_official", action="store_true", - default= False, help='Indicates this is an official perf run result') + default= False, help='Indicates this is an official perf run result') parser.add_option("--db_host", dest="db_host", default='localhost', - help="Machine hosting the database") + help="Machine hosting the database") parser.add_option("--db_name", dest="db_name", default='perf_results', - help="Name of the perf database.") + help="Name of the perf database.") parser.add_option("--db_username", dest="db_username", default='hiveuser', - help="Username used to connect to the database.") + help="Username used to connect to the database.") parser.add_option("--db_password", dest="db_password", default='password', - help="Password used to connect to the the database.") + help="Password used to connect to the the database.") options, args = parser.parse_args() -# Disable thresholds -if options.min_percent_change_threshold == None: - options.min_percent_change_threshold = 0.0 -if options.max_percent_change_threshold == None: - options.max_percent_change_threshold = sys.maxint +def get_dict_from_json(filename): + """Given a JSON file, return a nested dictionary. -if options.min_percent_change_threshold >= options.max_percent_change_threshold: - print "Minimun threshold must always be greater than the maximum threshold" - exit(1) + Everything in this file is based on the nested dictionary data structure. The dictionary + is structured as follows: Top level maps to workload. Each workload maps to queries. + Each query maps to file_format. Each file format is contains a key "result_list" that + maps to a list of QueryResult (look at query.py) dictionaries. The compute stats method + add additional keys such as "avg" or "stddev" here. -VERBOSE = options.verbose -COL_WIDTH = 18 -TOTAL_WIDTH = 135 if VERBOSE else 110 + Here's how the keys are structred: + To get a workload, the key looks like this: + (('workload_name', 'tpch'), ('scale_factor', '300gb')) + Each workload has a key that looks like this: + (('name', 'TPCH_Q10')) + Each Query has a key like this: + (('file_format', 'text'), ('compression_codec', 'zip'), + ('compression_type', 'block')) -# These are the indexes in the input row for each column value -EXECUTOR_IDX = 0 -WORKLOAD_IDX = 1 -SCALE_FACTOR_IDX = 2 -QUERY_NAME_IDX = 3 -QUERY_IDX = 4 -FILE_FORMAT_IDX = 5 -COMPRESSION_IDX = 6 -AVG_IDX = 7 -STDDEV_IDX = 8 -NUM_CLIENTS_IDX = 9 -NUM_ITERS_IDX = 10 -RUNTIME_PROFILE_IDX = 11 -HIVE_AVG_IDX = 12 -HIVE_STDDEV_IDX = 13 -SPEEDUP_IDX = 14 + This is useful for finding queries in a certain category and computing stats -# These are the column indexes that will be displayed as part of the perf result table. -TABLE_COL_IDXS = range(FILE_FORMAT_IDX, NUM_ITERS_IDX + 1) +\ - [HIVE_AVG_IDX, HIVE_STDDEV_IDX, SPEEDUP_IDX] + Args: + filename (str): path to the JSON file -# Formats a string so that is is wrapped across multiple lines with no single line -# being longer than the given width -def wrap_text(text, width): - return '\n'.join([text[width * i : width * (i + 1)] \ - for i in xrange(int(math.ceil(1.0 * len(text) / width)))]) - -# Formats float values to have two decimal places. If the input string is not a float -# then the original value is returned -def format_if_float(float_str): - try: - return "%0.2f" % float(float_str) - except (ValueError, TypeError): - return str(float_str) - -# Returns a string representation of the row with columns padded by the -# the given column width -def build_padded_row_string(row, column_width): - return ''.join([format_if_float(col).ljust(column_width) for col in row]) - -def find_matching_row_in_reference_results(search_row, reference_results): - for row in reference_results: - if not row: - continue; - if (row[QUERY_NAME_IDX] == search_row[QUERY_NAME_IDX] and - row[FILE_FORMAT_IDX] == search_row[FILE_FORMAT_IDX] and - row[COMPRESSION_IDX] == search_row[COMPRESSION_IDX] and - row[SCALE_FACTOR_IDX] == search_row[SCALE_FACTOR_IDX] and - row[NUM_CLIENTS_IDX] == search_row[NUM_CLIENTS_IDX] and - row[WORKLOAD_IDX] == search_row[WORKLOAD_IDX]): - return row - return None - -def calculate_speedup(reference, actual): - if actual != 'N/A' and reference != 'N/A' and actual != 0: - return float(reference) / float(actual); - else: - return 'N/A' - -def calculate_impala_hive_speedup(row): - return calculate_speedup(row[HIVE_AVG_IDX], row[AVG_IDX]) - -def calculate_geomean_wrapper(times): - """Wrapper around calculate_geomean that returns 'N/A' if the collection is empty""" - if len(times) == 0: - return 'N/A' - return calculate_geomean(times) - -def build_table_header(verbose): - table_header =\ - ['File Format', 'Compression', 'Avg(s)', 'StdDev(s)', 'Num Clients', 'Iters'] - if verbose: - table_header += ['Hive Avg(s)', 'Hive StdDev(s)'] - return table_header + ['Speedup (vs Hive)'] - -def build_table(results, verbose, reference_results = None): - """ Builds a table of query execution results, grouped by query name """ - output = str() - perf_changes = str() - - # Group the results by query name - sort_key = lambda x: (x[QUERY_NAME_IDX]) - results.sort(key = sort_key) - for query_group, group in groupby(results, key = sort_key): - output += 'Query: ' + wrap_text(query_group, TOTAL_WIDTH) + '\n' - table = texttable.Texttable(max_width=TOTAL_WIDTH) - table.header(build_table_header(verbose)) - table.set_deco(table.HEADER | table.VLINES | table.BORDER) - - # Add each result to the output table - for row in group: - full_row = list(row) - # Don't show the hive execution times in verbose mode. - if not VERBOSE: - del full_row[HIVE_STDDEV_IDX] - del full_row[HIVE_AVG_IDX] - - # Show Impala speedup over Hive - full_row.append(format_if_float(calculate_impala_hive_speedup(row)) + 'X') - - # If a reference result was specified, search for the matching record and display - # the speedup versus the reference. - if reference_results is not None: - ref_row = find_matching_row_in_reference_results(row, reference_results) - - # Found a matching row in the reference results, format and display speedup - # information and check for significant performance changes, if enabled. - if ref_row is not None: - was_change_significant, is_regression =\ - check_perf_change_significance(full_row, ref_row) - diff_args = [full_row, ref_row] - if was_change_significant: - perf_changes += build_perf_change_str(full_row, ref_row, is_regression) - try: - generate_runtime_profile_diff(full_row, ref_row, was_change_significant, - is_regression) - except Exception as e: - print 'Could not generate an html diff: %s' % e - - speedup =\ - format_if_float(calculate_speedup(ref_row[AVG_IDX], full_row[AVG_IDX])) - full_row[AVG_IDX] = format_if_float(full_row[AVG_IDX]) - full_row[AVG_IDX] = full_row[AVG_IDX] + ' (%sX)' % speedup - - test_row = [col for i, col in enumerate(full_row) if i in TABLE_COL_IDXS] - table.add_row(test_row) - - output += table.draw() + '\n' - return output, perf_changes - -def generate_runtime_profile_diff(current_row, ref_row, was_change_significant, - is_regression): - """Generate an html diff of the runtime profiles. - - Generates a diff of the baseline vs current runtime profile to $IMPALA_HOME/results - in html format. The diff file is tagged with the relavent query information - and whether its an improvement or a regression ( if applicable ) + returns: + dict: a nested dictionary with grouped queries """ + + def add_result(query_result): + """Add query to the dictionary. + + Automatically finds the path in the nested dictionary and adds the result to the + appropriate list. + + TODO: This method is hard to reason about, so it needs to be made more streamlined. + """ + + def get_key(level_num): + """Build a key for a particular nesting level. + + The key is built by extracting the appropriate values from query_result. + """ + + level = list() + # In the outer layer, we group by workload name and scale factor + level.append([('query', 'workload_name'), ('query', 'scale_factor')]) + # In the middle layer, we group by query name + level.append([('query', 'name')]) + # In the inner layer, we group by file format and compression type + level.append([('query', 'test_vector', 'file_format'), + ('query', 'test_vector', 'compression_codec'), + ('query', 'test_vector', 'compression_type')]) + + key = [] + + def get_nested_val(path): + """given a path to a variable in query result, extract the value. + + For example, to extract compression_type from the query_result, we need to follow + the this path in the nested dictionary: + "query_result" -> "query" -> "test_vector" -> "compression_type" + """ + cur = query_result + for step in path: + cur = cur[step] + return cur + + for path in level[level_num]: + key.append((path[-1], get_nested_val(path))) + + return tuple(key) + + # grouped is the nested dictionary defined in the outer function get_dict_from_json. + # It stores all the results grouped by query name and other parameters. + cur = grouped + # range(3) because there are 3 levels of nesting, as defined in get_key + for level_num in range(3): + cur = cur[get_key(level_num)] + cur[RESULT_LIST].append(query_result) + + with open(filename, "r") as f: + data = json.load(f) + grouped = defaultdict( lambda: defaultdict( + lambda: defaultdict(lambda: defaultdict(list)))) + for workload_name, workload in data.items(): + for query_result in workload: + add_result(query_result) + return grouped + +def calculate_time_stats(grouped): + """Adds statistics to the nested dictionary. We are calculating the average runtime + and Standard Deviation for each query type. + """ + + for workload_scale in grouped: + for query_name in grouped[workload_scale]: + for file_format in grouped[workload_scale][query_name]: + result_list = grouped[workload_scale][query_name][file_format][RESULT_LIST] + avg = calculate_avg( + [query_results[TIME_TAKEN] for query_results in result_list]) + dev = calculate_stddev( + [query_results[TIME_TAKEN] for query_results in result_list]) + num_clients = max( + int(query_results[CLIENT_NAME]) for query_results in result_list) + iterations = len(result_list) + + grouped[workload_scale][query_name][file_format][AVG] = avg + grouped[workload_scale][query_name][file_format][STDDEV] = dev + grouped[workload_scale][query_name][file_format][NUM_CLIENTS] = num_clients + grouped[workload_scale][query_name][file_format][ITERATIONS] = iterations + +def calculate_workload_file_format_runtimes(grouped): + """Calculate average time for each workload and scale factor, for each file format and + compression. + + This returns a new dictionary with avarage times. + + Here's an example of how this dictionary is structured: + dictionary-> + (('workload', 'tpch'), ('scale', '300gb'))-> + (('file_format','parquet'), ('compression_codec','zip'), ('compression_type','block'))-> + 'avg' + + We also have access to the list of QueryResult associated with each file_format + + The difference between this dictionary and grouped_queries is that query name is missing + after workload. + """ + new_dict = defaultdict(lambda: defaultdict(lambda: defaultdict(list))) + + # First populate the dictionary with query results + for workload_scale, workload in grouped.items(): + for query_name, file_formats in workload.items(): + for file_format, results in file_formats.items(): + new_dict[workload_scale][file_format][RESULT_LIST].extend(results[RESULT_LIST]) + + # Do the average calculation. Standard deviation could also be calculated here + for workload_scale in new_dict: + for file_format in new_dict[workload_scale]: + avg = calculate_avg([query_results[TIME_TAKEN] + for query_results in new_dict[workload_scale][file_format][RESULT_LIST]]) + new_dict[workload_scale][file_format][AVG] = avg + return new_dict + +def build_perf_change_str(result, ref_result, regression): + """Build a performance change string""" + + perf_change_type = "regression" if regression else "improvement" + query = result[RESULT_LIST][0][QUERY] + + query_name = query[NAME] + file_format = query[TEST_VECTOR][FILE_FORMAT] + compression_codec = query[TEST_VECTOR][COMPRESSION_CODEC] + compression_type = query[TEST_VECTOR][COMPRESSION_TYPE] + + template = ("\nSignificant perf {perf_change_type} detected: " + "{query_name} [{file_format}/{compression_codec}/{compression_type}] " + "({ref_avg:.3f}s -> {avg:.3f}s)") + return template.format( + perf_change_type = perf_change_type, + query_name = query_name, + file_format = file_format, + compression_codec = compression_codec, + compression_type = compression_type, + ref_avg = ref_result[AVG], + avg = result[AVG]) + +def prettyprint(val, units, divisor): + """ Print a value in human readable format along with it's unit. + + We start at the leftmost unit in the list and keep dividing the value by divisor until + the value is less than divisor. The value is then printed along with the unit type. + + Args: + val (int or float): Value to be printed. + units (list of str): Unit names for different sizes. + divisor (float): ratio between two consecutive units. + """ + for unit in units: + if abs(val) < divisor: + if unit == units[0]: + return "%d%s" % (val, unit) + else: + return "%3.2f%s" % (val, unit) + val /= divisor + +def prettyprint_bytes(byte_val): + return prettyprint(byte_val, ['B', 'KB', 'MB', 'GB', 'TB'], 1024.0) + +def prettyprint_values(unit_val): + return prettyprint(unit_val, ["", "K", "M", "B"], 1000.0) + +def prettyprint_time(time_val): + return prettyprint(time_val, ["ns", "us", "ms", "s"], 1000.0) + +def prettyprint_percent(percent_val): + return '{0:+.2%}'.format(percent_val) + +class CombinedExecSummaries(object): + """All execution summaries for each query are combined into this object. + + The overall average time is calculated for each node by averaging the average time + from each execution summary. The max time time is calculated by getting the max time + of max times. + + This object can be compared to another one and ExecSummaryComparison can be generated. + + Args: + exec_summaries (list of list of dict): A list of exec summaries (list of dict is how + it is received from the beeswax client. + + Attributes: + rows (list of dict): each dict represents a row in the summary table. Each row in rows + is a dictionary. Each dictionary has the following keys: + prefix (str) + operator (str) + num_hosts (int) + num_rows (int) + est_num_rows (int) + detail (str) + avg_time (float): averge of average times in all the execution summaries + stddev_time: standard deviation of times in all the execution summaries + max_time: maximum of max times in all the execution summaries + peak_mem (int) + est_peak_mem (int) + """ + + def __init__(self, exec_summaries): + # We want to make sure that all execution summaries have the same structure before + # we can combine them. If not, err_str will contain the reason why we can't combine + # the exec summaries. + ok, err_str = self.__check_exec_summary_schema(exec_summaries) + self.error_str = err_str + + self.rows = [] + if ok: + self.__build_rows(exec_summaries) + + def __build_rows(self, exec_summaries): + + first_exec_summary = exec_summaries[0] + + for row_num, row in enumerate(first_exec_summary): + combined_row = {} + # Copy fixed values from the first exec summary + for key in [PREFIX, OPERATOR, NUM_HOSTS, NUM_ROWS, EST_NUM_ROWS, DETAIL]: + combined_row[key] = row[key] + + avg_times = [exec_summary[row_num][AVG_TIME] for exec_summary in exec_summaries] + max_times = [exec_summary[row_num][MAX_TIME] for exec_summary in exec_summaries] + peak_mems = [exec_summary[row_num][PEAK_MEM] for exec_summary in exec_summaries] + est_peak_mems = [exec_summary[row_num][EST_PEAK_MEM] + for exec_summary in exec_summaries] + + # Set the calculated values + combined_row[AVG_TIME] = calculate_avg(avg_times) + combined_row[STDDEV_TIME] = calculate_stddev(avg_times) + combined_row[MAX_TIME] = max(max_times) + combined_row[PEAK_MEM] = max(peak_mems) + combined_row[EST_PEAK_MEM] = max(est_peak_mems) + self.rows.append(combined_row) + + def is_same_schema(self, reference): + """Check if the reference CombinedExecSummaries summary has the same schema as this + one. (For example, the operator names are the same for each node). + + The purpose of this is to check if it makes sense to combine this object with a + reference one to produce ExecSummaryComparison. + + Args: + reference (CombinedExecSummaries): comparison + + Returns: + bool: True if the schama's are similar enough to be compared, False otherwise. + """ + + if len(self.rows) != len(reference.rows): return False + + for row_num, row in enumerate(self.rows): + ref_row = reference.rows[row_num] + + if row[OPERATOR] != ref_row[OPERATOR]: + return False + + return True + + def __str__(self): + if self.error_str: return self.error_str + + table = prettytable.PrettyTable( + ["Operator", + "#Hosts", + "Avg Time", + "Std Dev", + "Max Time", + "#Rows", + "Est #Rows"]) + table.align = 'l' + + for row in self.rows: + table_row = [ row[PREFIX] + row[OPERATOR], + prettyprint_values(row[NUM_HOSTS]), + prettyprint_time(row[AVG_TIME]), + prettyprint_time(row[STDDEV_TIME]), + prettyprint_time(row[MAX_TIME]), + prettyprint_values(row[NUM_ROWS]), + prettyprint_values(row[EST_NUM_ROWS])] + table.add_row(table_row) + + return str(table) + + @property + def total_runtime(self): + return sum([row[AVG_TIME] for row in self.rows]) + + def __check_exec_summary_schema(self, exec_summaries): + """Check if all given exec summaries have the same structure. + + This method is called to check if it is possible a single CombinedExecSummaries from + the list of exec_summaries. (For example all exec summaries must have the same + number of nodes.) + + This method is somewhat similar to is_same_schema. The difference is that + is_same_schema() checks if two CombinedExecSummaries have the same structure and this + method checks if all exec summaries in the list have the same structure. + + Args: + exec_summaries (list of dict): each dict represents an exec_summary + + Returns: + (bool, str): True if all exec summaries have the same structure, otherwise False + followed by a string containing the explanation. + """ + + err = 'Summaries cannot be combined: ' + + if len(exec_summaries) < 1: + return False, err + 'no exec summaries Found' + + first_exec_summary = exec_summaries[0] + if len(first_exec_summary) < 1: + return False, err + 'exec summary contains no nodes' + + for exec_summary in exec_summaries: + if len(exec_summary) != len(first_exec_summary): + return False, err + 'different number of nodes in exec summaries' + + for row_num, row in enumerate(exec_summary): + comp_row = first_exec_summary[row_num] + if row[OPERATOR] != comp_row[OPERATOR]: + return False, err + 'different operator' + + return True, str() + +class ExecSummaryComparison(object): + """Represents a comparison between two CombinedExecSummaries. + + Args: + combined_summary (CombinedExecSummaries): current summary. + ref_combined_summary (CombinedExecSummaries): reference summaries. + + Attributes: + rows (list of dict): Each dict represents a single row. Each dict has the following + keys: + prefix (str) + operator (str) + num_hosts (int) + avg_time (float) + stddev_time (float) + avg_time_change (float): % change in avg time compared to reference + avg_time_change_total (float): % change in avg time compared to total of the query + max_time (float) + max_time_change (float): % change in max time compared to reference + peak_mem (int) + peak_mem_change (float): % change compared to reference + num_rows (int) + est_num_rows (int) + est_peak_mem (int) + detail (str) + combined_summary (CombinedExecSummaries): original combined summary + ref_combined_summary (CombinedExecSummaries): original reference combined summary. + If the comparison cannot be constructed, these summaries can be printed. + + Another possible way to implement this is to generate this object when we call + CombinedExecSummaries.compare(reference). + """ + + def __init__(self, combined_summary, ref_combined_summary): + + # Store the original summaries, in case we can't build a comparison + self.combined_summary = combined_summary + self.ref_combined_summary = ref_combined_summary + + # If some error happened during calculations, store it here + self.error_str = str() + + self.rows = [] + self.__build_rows() + + def __build_rows(self): + + if self.combined_summary.is_same_schema(self.ref_combined_summary): + for i, row in enumerate(self.combined_summary.rows): + ref_row = self.ref_combined_summary.rows[i] + + comparison_row = {} + for key in [PREFIX, OPERATOR, NUM_HOSTS, AVG_TIME, STDDEV_TIME, MAX_TIME, PEAK_MEM, + NUM_ROWS, EST_NUM_ROWS, EST_PEAK_MEM, DETAIL]: + comparison_row[key] = row[key] + + comparison_row[AVG_TIME_CHANGE] = self.__calculate_change( + row[AVG_TIME], ref_row[AVG_TIME], ref_row[AVG_TIME]) + + comparison_row[AVG_TIME_CHANGE_TOTAL] = self.__calculate_change( + row[AVG_TIME], ref_row[AVG_TIME], self.ref_combined_summary.total_runtime) + + comparison_row[MAX_TIME_CHANGE] = self.__calculate_change( + row[MAX_TIME], ref_row[MAX_TIME], ref_row[MAX_TIME]) + + comparison_row[PEAK_MEM_CHANGE] = self.__calculate_change( + row[PEAK_MEM], ref_row[PEAK_MEM], ref_row[PEAK_MEM]) + + self.rows.append(comparison_row) + else: + self.error_str = 'Execution summary structures are different' + + def __str__(self): + """Construct a PrettyTable containing the comparison""" + if self.error_str: + # If the summary comparison could not be constructed, output both summaries + output = self.error_str + '\n' + output += 'Execution Summary: \n' + output += str(self.combined_summary) + '\n' + output += 'Reference Execution Summary: \n' + output += str(self.ref_combined_summary) + return output + + table = prettytable.PrettyTable( + ["Operator", + "#Hosts", + "Avg Time", + "Std Dev", + "Avg Change", + "Tot Change", + "Max Time", + "Max Change", + "#Rows", + "Est #Rows"]) + table.align = 'l' + + for row in self.rows: + table_row = [ row[PREFIX] + row[OPERATOR], + prettyprint_values(row[NUM_HOSTS]), + prettyprint_time(row[AVG_TIME]), + prettyprint_time(row[STDDEV_TIME]), + prettyprint_percent(row[AVG_TIME_CHANGE]), + prettyprint_percent(row[AVG_TIME_CHANGE_TOTAL]), + prettyprint_time(row[MAX_TIME]), + prettyprint_percent(row[MAX_TIME_CHANGE]), + prettyprint_values(row[NUM_ROWS]), + prettyprint_values(row[EST_NUM_ROWS]) ] + + table.add_row(table_row) + + return str(table) + + def __calculate_change(self, val, ref_val, compare_val): + """Calculate how big the change in val compared to ref_val is compared to total""" + if ref_val == 0: + return 0 + change = abs(val - ref_val) / compare_val + return change if val > ref_val else -change + +def save_runtime_diffs(results, ref_results, change_significant, is_regression): + """Given results and reference results, generate and output an HTML file + containing the Runtime Profile diff. + """ + diff = difflib.HtmlDiff(wrapcolumn=90, linejunk=difflib.IS_LINE_JUNK) - file_name_prefix = "%s-%s-%s-%s" % (current_row[QUERY_NAME_IDX], - current_row[SCALE_FACTOR_IDX], - current_row[FILE_FORMAT_IDX], - current_row[COMPRESSION_IDX]) - if was_change_significant: - file_name_prefix += '-regression' if is_regression else '-improvement' - file_name = '%s.html' % file_name_prefix - # Some compressions codecs have a `/`, which is not a valid file name character. - file_name = file_name.replace('/', '-') + + # We are comparing last queries in each run because they should have the most + # stable performance (unlike the first queries) + runtime_profile = results[RESULT_LIST][-1][RUNTIME_PROFILE] + ref_runtime_profile = ref_results[RESULT_LIST][-1][RUNTIME_PROFILE] + + template = ("{prefix}-{query_name}-{scale_factor}-{file_format}-{compression_codec}" + "-{compression_type}") + + query = results[RESULT_LIST][-1][QUERY] + + # Neutral - no improvement or regression + prefix = 'neu' + if change_significant: + prefix = 'reg' if is_regression else 'imp' + + file_name = template.format( + prefix = prefix, + query_name = query[NAME], + scale_factor = query[SCALE_FACTOR], + file_format = query[TEST_VECTOR][FILE_FORMAT], + compression_codec = query[TEST_VECTOR][COMPRESSION_CODEC], + compression_type = query[TEST_VECTOR][COMPRESSION_TYPE]) + + # Go into results dir dir_path = os.path.join(os.environ["IMPALA_HOME"], 'results') - # If dir_path does not exist, create a directory. If it does exist - # and is not a directory, remove the file and create a directory. + if not os.path.exists(dir_path): - os.mkdirs(dir_path) + os.mkdir(dir_path) elif not os.path.isdir(dir_path): raise RuntimeError("Unable to create $IMPALA_HOME/results, results file exists") - file_path = os.path.join(dir_path, file_name) - html_diff = diff.make_file(ref_row[RUNTIME_PROFILE_IDX].splitlines(1), - current_row[RUNTIME_PROFILE_IDX].splitlines(1), - fromdesc="Baseline Runtime Profile", todesc="Current Runtime Profile") - with open(file_path, 'w+') as f: - f.write(html_diff) -def build_perf_change_str(row, ref_row, regression): - perf_change_type = "regression" if regression else "improvement" - return "Significant perf %s detected: %s [%s/%s] (%ss -> %ss)\n" %\ - (perf_change_type, row[QUERY_NAME_IDX], row[FILE_FORMAT_IDX], row[COMPRESSION_IDX], - format_if_float(ref_row[AVG_IDX]), format_if_float(row[AVG_IDX])) + runtime_profile_file_name = file_name + "-runtime_profile.html" -def check_perf_change_significance(row, ref_row): - # Cast values to the proper types - ref_stddev = 0.0 if ref_row[STDDEV_IDX] == 'N/A' else float(ref_row[STDDEV_IDX]) - stddev = 0.0 if row[STDDEV_IDX] == 'N/A' else float(row[STDDEV_IDX]) - avg, ref_avg = map(float, [row[AVG_IDX], ref_row[AVG_IDX]]) - iters, ref_iters = map(int, [row[NUM_ITERS_IDX], ref_row[NUM_ITERS_IDX]]) - stddevs_are_zero = (ref_stddev == 0.0) and (stddev == 0.0) - percent_difference = abs(ref_avg - avg) * 100 / ref_avg - # If the result is within the allowed_latency_diff_secs, mark it as insignificant. - if abs(ref_avg - avg) < options.allowed_latency_diff_secs: + runtime_profile_file_path = os.path.join(dir_path, runtime_profile_file_name) + + runtime_profile_diff = diff.make_file( + ref_runtime_profile.splitlines(), + runtime_profile.splitlines(), + fromdesc = "Baseline Runtime Profile", + todesc = "Current Runtime Profile") + + with open(runtime_profile_file_path, 'w+') as f: + f.write(runtime_profile_diff) + +def build_exec_summary_str(results, ref_results): + exec_summaries = [result[EXEC_SUMMARY] for result in results[RESULT_LIST]] + ref_exec_summaries = [result[EXEC_SUMMARY] for result in ref_results[RESULT_LIST]] + + if None in exec_summaries or None in ref_exec_summaries: + return 'Unable to construct exec summary comparison\n' + + combined_summary = CombinedExecSummaries(exec_summaries) + ref_combined_summary = CombinedExecSummaries(ref_exec_summaries) + + comparison = ExecSummaryComparison(combined_summary, ref_combined_summary) + + return str(comparison) + '\n' + +def compare_time_stats(grouped, ref_grouped): + """Given two nested dictionaries generated by get_dict_from_json, after running + calculate_time_stats on both, compare the performance of the given run to a reference + run. + + A string will be returned with instances where there is a significant performance + difference + """ + out_str = str() + all_exec_summaries = str() + for workload_scale_key, workload in grouped.items(): + for query_name, file_formats in workload.items(): + for file_format, results in file_formats.items(): + ref_results = ref_grouped[workload_scale_key][query_name][file_format] + change_significant, is_regression = check_perf_change_significance( + results, ref_results) + + if change_significant: + out_str += build_perf_change_str(results, ref_results, is_regression) + '\n' + out_str += build_exec_summary_str(results, ref_results) + + try: + save_runtime_diffs(results, ref_results, change_significant, is_regression) + except Exception as e: + print 'Could not generate an html diff: %s' % e + + return out_str + +def check_perf_change_significance(stat, ref_stat): + absolute_difference = abs(ref_stat[AVG] - stat[AVG]) + percent_difference = abs(ref_stat[AVG] - stat[AVG]) * 100 / ref_stat[AVG] + stddevs_are_zero = (ref_stat[STDDEV] == 0) and (stat[STDDEV] == 0) + if absolute_difference < options.allowed_latency_diff_secs: return False, False - # If result is within min_percent_change_threshold of the baseline, - # mark it as insignificant and ignore the t-test. if percent_difference < options.min_percent_change_threshold: return False, False - # If the average is more than max_percent_change_threshold of the baseline, - # ignore the t-test and mark it as significant. - elif percent_difference > options.max_percent_change_threshold: - return True, ref_avg - avg < 0 - # If both stddev and ref_stddev are 0, the t-test is meaningless, and causes a divide - # by zero exception. - elif options.tval_threshold and not stddevs_are_zero: - tval = calculate_tval(avg, stddev, iters, ref_avg, ref_stddev, ref_iters) - # TODO: Currently, this doesn't take into account the degrees of freedom - # (number of iterations). In the future the regression threshold could be updated to - # specify the confidence interval, and based on the tval result we can lookup whether - # we are in/not in that interval. + if percent_difference > options.max_percent_change_threshold: + return True, ref_stat[AVG] < stat[AVG] + if options.tval_threshold and not stddevs_are_zero: + tval = calculate_tval(stat[AVG], stat[STDDEV], stat[ITERATIONS], + ref_stat[AVG], ref_stat[STDDEV], ref_stat[ITERATIONS]) return abs(tval) > options.tval_threshold, tval > options.tval_threshold return False, False -def geometric_mean_execution_time(results): - """ - Returns the geometric mean of the average execution times +def build_summary_header(): + summary = "Execution Summary ({0})\n".format(date.today()) + if options.report_description: + summary += 'Run Description: {0}\n'.format(options.report_description) + if options.cluster_name: + summary += '\nCluster Name: {0}\n'.format(options.cluster_name) + if options.build_version: + summary += 'Impala Build Version: {0}\n'.format(options.build_version) + if options.lab_run_info: + summary += 'Lab Run Info: {0}\n'.format(options.lab_run_info) + return summary - Returns three sets of numbers - the mean of all the Impala times, the mean of the - Impala times that have matching hive results, and the mean of the hive results. - """ - impala_avgs = [] - impala_avgs_with_hive_match = [] - hive_avgs = [] - for row in results: - impala_avg, hive_avg = (row[AVG_IDX], row[HIVE_AVG_IDX]) - if impala_avg != 'N/A': - impala_avgs.append(float(impala_avg)) - if hive_avg != 'N/A': - impala_avgs_with_hive_match.append(float(impala_avg)) - hive_avgs.append(float(hive_avg)) +def get_summary_str(workload_ff): + """This prints a table containing the average run time per file format""" + summary_str = str() + summary_str += build_summary_header() + '\n' - return calculate_geomean_wrapper(impala_avgs),\ - calculate_geomean_wrapper(impala_avgs_with_hive_match),\ - calculate_geomean_wrapper(hive_avgs) + for workload_scale in workload_ff: + summary_str += "{0} / {1} \n".format(workload_scale[0][1], workload_scale[1][1]) + table = prettytable.PrettyTable(["File Format", "Compression", "Impala Avg"]) + table.align = 'l' + table.float_format = '.3' + for file_format in workload_ff[workload_scale]: + ff = file_format[0][1] + compression = file_format[1][1] + " / " + file_format[2][1] + avg = str(workload_ff[workload_scale][file_format][AVG]) + table.add_row([ff, compression, avg]) + summary_str += str(table) + '\n' + return summary_str -# Returns the sum of the average execution times for the given result -# collection -def sum_avg_execution_time(results): - impala_time = 0 - hive_time = 0 - for row in results: - impala_time += float(row[AVG_IDX]) if str(row[AVG_IDX]) != 'N/A' else 0 - hive_time += float(row[HIVE_AVG_IDX]) if str(row[HIVE_AVG_IDX]) != 'N/A' else 0 - return impala_time, hive_time +def get_stats_str(grouped): + stats_str = str() + for workload_scale_key, workload in grouped.items(): + stats_str += "Workload / Scale Factor: {0} / {1}".format(workload_scale_key[0][1], + workload_scale_key[1][1]) + for query_name, file_formats in workload.items(): + stats_str += "\n\nQuery: {0} \n".format(query_name[0][1]) + table = prettytable.PrettyTable( + ["File Format", "Compression", "Avg(s)", "StdDev(s)", "Num Clients", "Iters"]) + table.align = 'l' + table.float_format = '.3' + for file_format, results in file_formats.items(): + table_row = [] + # File Format + table_row.append(file_format[0][1]) + # Compression + table_row.append(file_format[1][1] + " / " + file_format[2][1]) + table_row.append(results[AVG]) + table_row.append(results[STDDEV]) + table_row.append(results[NUM_CLIENTS]) + table_row.append(results[ITERATIONS]) + table.add_row(table_row) + stats_str += str(table) + return stats_str -# Returns dictionary of column_value to sum of the average times grouped by the specified -# key function -def sum_execution_time_by_key(results, key): - results.sort(key = key) - execution_results = dict() - for key, group in groupby(results, key=key): - execution_results[key] = (sum_avg_execution_time(group)) - return execution_results +def all_query_results(grouped): + for workload_scale_key, workload in grouped.items(): + for query_name, file_formats in workload.items(): + for file_format, results in file_formats.items(): + yield(results) -def geometric_mean_execution_time_by_key(results, key): - results.sort(key = key) - execution_results = dict() - for key, group in groupby(results, key=key): - execution_results[key] = geometric_mean_execution_time(group) - return execution_results -# Returns dictionary of column_value to sum of the average times grouped by the specified -# column index -def sum_execution_time_by_col_idx(results, column_index): - return sum_execution_time_by_key(results, key=lambda x: x[column_index]) - -def sum_execution_by_file_format(results): - return sum_execution_time_by_col_idx(results, FILE_FORMAT_IDX) - -def sum_execution_by_query(results): - return sum_execution_time_by_col_idx(results, QUERY_IDX) - -def sum_execution_by_compression(results): - return sum_execution_time_by_col_idx(results, COMPRESSION_IDX) - -def geometric_mean_by_file_format_compression(results): - key = lambda x: (x[FILE_FORMAT_IDX], x[COMPRESSION_IDX]) - return geometric_mean_execution_time_by_key(results, key) - -# Writes perf tests results in a "fake" JUnit output format. The main use case for this -# is so the Jenkins Perf plugin can be leveraged to report results. We create a few -# "fake" tests that are actually just aggregating the execution times in different ways. -# For example, create tests that have the aggregate execution time for each file format -# so we can see if a perf regression happens in this area. -def write_junit_output_file(results, output_file): - test_case_format = '' - - lines = [''] - for file_format, time in sum_execution_by_file_format(results).iteritems(): - lines.append(test_case_format % (format_if_float(time), 'sum_avg_' + file_format)) - - for compression, time in sum_execution_by_compression(results).iteritems(): - lines.append(test_case_format % (format_if_float(time), 'sum_avg_' + compression)) - - for query, time in sum_execution_by_query(results).iteritems(): - lines.append(test_case_format % (format_if_float(time), 'sum_avg_' + query)) - - total_tests = len(lines) - sum_avg = format_if_float(sum_avg_execution_time(results)) - lines[0] = lines[0] % (sum_avg, total_tests) - lines.append('') - output_file.write('\n'.join(lines)) - -# read results file in CSV format, then copies to a list and returns the value -def read_csv_result_file(file_name): - results = [] - - # The default field size limit is too small to read big runtime profiles. Set - # the limit to an artibrarily large value. - csv.field_size_limit(sys.maxint) - for row in csv.reader(open(file_name, 'rb'), delimiter='|'): - # Backwards compatibility: - # Older results may not have runtime profile, so fill this in if detected. - if len(row) == NUM_ITERS_IDX + 1: - row.append("No profile available") - results.append(row) - return results - -def filter_sort_results(results, workload, scale_factor, key): - filtered_res = [result for result in results if ( - result[WORKLOAD_IDX] == workload and result[SCALE_FACTOR_IDX] == scale_factor)] - return sorted(filtered_res, key=sort_key) - -def scale_factor_name(scale_factor): - return scale_factor if scale_factor else 'default' - -def merge_hive_results(results, hive_results): - new_results = [] - for row in results: - matching_row = find_matching_row_in_reference_results(row, hive_results) - if matching_row is not None: - new_results.append(row + [matching_row[AVG_IDX], matching_row[STDDEV_IDX]]) - else: - new_results.append(row + ['N/A', 'N/A']) - return new_results - -def write_results_to_datastore(results): +def write_results_to_datastore(grouped): """ Saves results to a database """ + from perf_result_datastore import PerfResultDataStore + print 'Saving perf results to database' current_date = datetime.now() data_store = PerfResultDataStore(host=options.db_host, username=options.db_username, password=options.db_password, database_name=options.db_name) run_info_id = data_store.insert_run_info(options.lab_run_info) - for row in results: - # We ignore everything after the stddev column - executor, workload, scale_factor, query_name, query, file_format,\ - compression, avg_time, stddev = row[0:STDDEV_IDX + 1] + for results in all_query_results(grouped): + first_query_result = results[RESULT_LIST][0] + executor_name = first_query_result[EXECUTOR_NAME] + workload = first_query_result[QUERY][WORKLOAD_NAME] + scale_factor = first_query_result[QUERY][SCALE_FACTOR] + query_name = first_query_result[QUERY][NAME] + query = first_query_result[QUERY][QUERY_STR] + file_format = first_query_result[QUERY][TEST_VECTOR][FILE_FORMAT] + compression_codec = first_query_result[QUERY][TEST_VECTOR][COMPRESSION_CODEC] + compression_type = first_query_result[QUERY][TEST_VECTOR][COMPRESSION_TYPE] + avg_time = results[AVG] + stddev = results[STDDEV] + num_clients = results[NUM_CLIENTS] + num_iterations = results[ITERATIONS] + runtime_profile = first_query_result[RUNTIME_PROFILE] - # Instead of storing 'N/A' in the database we want to store NULL - avg_time = avg_time if avg_time and avg_time != 'N/A' else 'NULL' - stddev = stddev if stddev and stddev != 'N/A' else 'NULL' - - file_type_id = data_store.get_file_format_id(file_format, compression) + file_type_id = data_store.get_file_format_id( + file_format, compression_codec, compression_type) if file_type_id is None: print 'Skipping unkown file type: %s / %s' % (file_format, compression) continue @@ -452,103 +799,54 @@ def write_results_to_datastore(results): query_id = data_store.insert_query_info(query_name, query) data_store.insert_execution_result( - query_id=query_id, workload_id=workload_id, file_type_id=file_type_id, - num_clients=int(row[NUM_CLIENTS_IDX]), cluster_name=options.cluster_name, - executor_name=executor, avg_time=avg_time, stddev=stddev, - run_date=current_date, version=options.build_version, - notes=options.report_description, run_info_id=run_info_id, - num_iterations=int(row[NUM_ITERS_IDX]), runtime_profile=row[RUNTIME_PROFILE_IDX], - is_official=options.is_official) + query_id = query_id, + workload_id = workload_id, + file_type_id = file_type_id, + num_clients = num_clients, + cluster_name = options.cluster_name, + executor_name = executor_name, + avg_time = avg_time, + stddev = stddev, + run_date = current_date, + version = options.build_version, + notes = options.report_description, + run_info_id = run_info_id, + num_iterations = num_iterations, + runtime_profile = runtime_profile, + is_official = options.is_official) -def build_summary_header(): - summary = "Execution Summary (%s)\n" % date.today() - if options.report_description: - summary += 'Run Description: %s\n' % options.report_description - if options.cluster_name: - summary += '\nCluster Name: %s\n' % options.cluster_name - if options.build_version: - summary += 'Impala Build Version: %s\n' % options.build_version - if options.lab_run_info: - summary += 'Lab Run Info: %s\n' % options.lab_run_info - return summary +if __name__ == "__main__": + """Workflow: + 1. Build a nested dictionary for the current result JSON and reference result JSON. + 2. Calculate runtime statistics for each query for both results and reference results. + 5. Save performance statistics to the performance database. + 3. Construct a string with a an overview of workload runtime and detailed performance + comparison for queries with significant performance change. + """ + # Generate a dictionary based on the JSON file + grouped = get_dict_from_json(options.result_file) -reference_results = list() -hive_reference_results = list() -results = list() -perf_changes_detected = True -if os.path.isfile(options.result_file): - results = read_csv_result_file(options.result_file) -else: - print 'Results file: ' + options.result_file + ' not found.' - sys.exit(1) + try: + # Generate a dictionary based on the reference JSON file + ref_grouped = get_dict_from_json(options.reference_result_file) + except Exception as e: + # If reference result file could not be read we can still continue. The result can + # be saved to the performance database. + print 'Could not read reference result file: %s' % e + ref_grouped = None -if os.path.isfile(options.hive_result_file): - hive_reference_results = read_csv_result_file(options.hive_result_file) -else: - print 'Hive result file: ' + options.hive_result_file + ' not found' + # Calculate average runtime and stddev for each query type + calculate_time_stats(grouped) + if ref_grouped is not None: + calculate_time_stats(ref_grouped) -# We want to marge hive results, even if they are empty, so row indexes stay the same. -results = merge_hive_results(results, hive_reference_results) + if options.save_to_db: write_results_to_datastore(grouped) -if os.path.isfile(options.reference_result_file): - reference_results = read_csv_result_file(options.reference_result_file) -else: - print 'No Impala reference result file found.' + summary_str = get_summary_str(calculate_workload_file_format_runtimes(grouped)) + stats_str = get_stats_str(grouped) + comparison_str = 'No Comparison' if ref_grouped is None else compare_time_stats( + grouped, ref_grouped) -if not options.no_output_table: - summary, table_output = str(), str() - - sort_key = lambda k: (k[WORKLOAD_IDX], k[SCALE_FACTOR_IDX]) - results_sorted = sorted(results, key=sort_key) - - summary += build_summary_header() - if results: - summary += 'Num Clients: %s' % results[0][NUM_CLIENTS_IDX] - summary += "\nWorkload / Scale Factor\n\n" - - # First step is to break the result down into groups or workload/scale factor - for workload_scale_factor, group in groupby(results_sorted, key=sort_key): - workload, scale_factor = workload_scale_factor - summary += '%s / %s\n' % (workload, scale_factor_name(scale_factor)) - - # Based on the current workload/scale factor grouping, filter and sort results - filtered_results = filter_sort_results(results, workload, scale_factor, sort_key) - header = ['File Format', 'Compression', 'Impala Avg(s)', 'Impala Speedup (vs Hive)'] - summary += ' ' + build_padded_row_string(header, COL_WIDTH) + '\n' - - # Calculate execution details for each workload/scale factor - for file_format_compression, times in geometric_mean_by_file_format_compression( - filtered_results).iteritems(): - file_format, compression = file_format_compression - impala_avg, impala_with_hive_match_avg, hive_avg = times - impala_speedup = format_if_float( - calculate_speedup(hive_avg, impala_with_hive_match_avg)) +\ - 'X' if hive_avg != 'N/A' else 'N/A' - - summary += ' ' + build_padded_row_string( - [file_format, compression, impala_avg, impala_speedup], COL_WIDTH) + '\n' - summary += '\n' - - table_output += "-" * TOTAL_WIDTH + '\n' - table_output += "-- Workload / Scale Factor: %s / %s\n" %\ - (workload, scale_factor_name(scale_factor)) - table_output += "-" * TOTAL_WIDTH + '\n' - - # Build a table with detailed execution results for the workload/scale factor - output, perf_changes = build_table(filtered_results, VERBOSE, reference_results) - table_output += output + '\n' - if perf_changes: - perf_changes_detected = True - summary += '\n'.join([' !! ' + l for l in perf_changes.split('\n') if l]) + '\n\n' - print summary, table_output - print 'Total Avg Execution Time: ' + str(sum_avg_execution_time(results)[0]) - -if options.junit_output_file: - write_junit_output_file(results, open(options.junit_output_file, 'w')) - -if options.save_to_db: - print 'Saving perf results to database' - from perf_result_datastore import PerfResultDataStore - write_results_to_datastore(results) - -exit(911 if perf_changes_detected else 0) + print summary_str + print stats_str + print comparison_str diff --git a/tests/common/query.py b/tests/common/query.py index d0a02983c..441b63f2b 100644 --- a/tests/common/query.py +++ b/tests/common/query.py @@ -18,7 +18,18 @@ from tests.util.test_file_parser import QueryTestSectionReader # TODO: This interface needs to be more robust; At the moment, it has two users with # completely different uses (the benchmark suite and the impala test suite) class Query(object): - """Represents a query and all the information neede to execute it""" + """Represents a query and all the information neede to execute it + + Attributes: + query_str (str): The SQL query string. + name (str): query name? + scale_factor (str): for example 300gb, used to determine the database. + test_vector (?): Specifies some parameters + results (list of ?): ? + workload_name (str): for example tpch, tpcds, visa (used to determine directory) + db (str): ? represents the database + table_format_str (str): ? + """ def __init__(self, **kwargs): self.query_str = kwargs.get('query_str') self.name = kwargs.get('name') @@ -41,6 +52,7 @@ class Query(object): self.db == other.db) def __build_query(self): + """Populates db, query_str, table_format_str""" self.db = QueryTestSectionReader.get_db_name(self.test_vector, self.scale_factor) self.query_str = QueryTestSectionReader.build_query(self.query_str.strip()) self.table_format_str = '%s/%s/%s' % (self.test_vector.file_format, @@ -56,16 +68,27 @@ class Query(object): class QueryResult(object): """Contains the results of a query execution. - A query execution results contains the following fields: - query - The query object - time_taken - Time taken to execute the query - start_time - The time at which the client submits the query. - data - Query results - client_name - The thread id - runtime_profile - Saved runtime profile of the query's execution. - query_error - Empty string if the query succeeded. Error returned by the client if - it failed. + Parameters: + Required: + query (Query): The query object associated with this result. + start_time (datetime): Timestamp at the start of execution. + query_config (BeeswaxQueryExecConfig) + client_name (int): The thread id + + Optional: + time_taken (float): Time taken to execute the query. + summary (str): query exection summary (ex. returned 10 rows) + data (list of str): Query results returned by Impala. + runtime_profile (str): Saved runtime profile of the query's execution. + exec_summary (TExecSummary) + success (bool): True if the execution was successful. + + Attributes - these are modified by another class: + query_error (str): Empty string if the query succeeded. Error returned by the client + if it failed. + executor_name (str) """ + def __init__(self, query, **kwargs): self.query = query self.time_taken = kwargs.get('time_taken', 0.0) @@ -75,6 +98,7 @@ class QueryResult(object): self.query_config = kwargs.get('query_config') self.client_name = kwargs.get('client_name') self.runtime_profile = kwargs.get('runtime_profile', str()) + self.exec_summary = kwargs.get('exec_summary', str()) self.success = kwargs.get('success', False) self.query_error = str() self.executor_name = str() diff --git a/tests/common/query_executor.py b/tests/common/query_executor.py index 80249a0f6..a77323e9a 100644 --- a/tests/common/query_executor.py +++ b/tests/common/query_executor.py @@ -50,13 +50,22 @@ hive_result_regex = 'Time taken: (\d*).(\d*) seconds' ## TODO: Split executors into their own modules. class QueryExecConfig(object): - """Base Class for Execution Configs""" + """Base Class for Execution Configs + + Attributes: + plugin_runner (PluginRunner?) + """ def __init__(self, plugin_runner=None): self.plugin_runner = plugin_runner class ImpalaQueryExecConfig(QueryExecConfig): - """Base class for Impala query execution config""" + """Base class for Impala query execution config + + Attributes: + impalad (str): address of impalad : + """ + def __init__(self, plugin_runner=None, impalad='localhost:21000'): super(ImpalaQueryExecConfig, self).__init__(plugin_runner=plugin_runner) self._impalad = impalad @@ -71,8 +80,14 @@ class ImpalaQueryExecConfig(QueryExecConfig): class JdbcQueryExecConfig(ImpalaQueryExecConfig): - """Impala query execution config for jdbc""" + """Impala query execution config for jdbc + + Attributes: + tranport (?): ? + """ + JDBC_CLIENT_PATH = os.path.join(os.environ['IMPALA_HOME'], 'bin/run-jdbc-client.sh') + def __init__(self, plugin_runner=None, impalad='localhost:21050', transport=None): super(JdbcQueryExecConfig, self).__init__(plugin_runner=plugin_runner, impalad=impalad) @@ -87,9 +102,20 @@ class JdbcQueryExecConfig(ImpalaQueryExecConfig): return JdbcQueryExecConfig.JDBC_CLIENT_PATH + ' -i "%s" -t %s' % (self._impalad, self.transport) - class BeeswaxQueryExecConfig(ImpalaQueryExecConfig): - """Impala query execution config for beeswax""" + """Impala query execution config for beeswax + + Args: + use_kerberos (boolean) + exec_options (str): String formatted as "opt1:val1;opt2:val2" + impalad (str): address of impalad : + plugin_runner (?): ? + + Attributes: + use_kerberos (boolean) + exec_options (dict str -> str): execution options + """ + def __init__(self, use_kerberos=False, exec_options=None, impalad='localhost:21000', plugin_runner=None): super(BeeswaxQueryExecConfig, self).__init__(plugin_runner=plugin_runner, @@ -99,7 +125,12 @@ class BeeswaxQueryExecConfig(ImpalaQueryExecConfig): self.__build_options(exec_options) def __build_options(self, exec_options): - """Read the exec_options into a dictionary""" + """Read the exec_options into self.exec_options + + Args: + exec_options (str): String formatted as "opt1:val1;opt2:val2" + """ + if exec_options: # exec_options are seperated by ; on the command line options = exec_options.split(';') @@ -121,13 +152,27 @@ class HiveQueryExecConfig(QueryExecConfig): class QueryExecutor(object): - def __init__(self, name, query, func, config, exit_on_error): - """ - Executes a query. + """Executes a query. - The query_exec_func needs to be a function that accepts a QueryExecOption parameter - and returns a QueryResult. - """ + Args: + name (str): eg. "hive" + query (str): string containing SQL query to be executed + func (function): Function that accepts a QueryExecOption parameter and returns a + QueryResult. Eg. execute_using_impala_beeswax + config (QueryExecOption) + exit_on_error (boolean): Exit right after an error encountered. + + Attributes: + exec_func (function): Function that accepts a QueryExecOption parameter and returns a + QueryResult. + exec_config (QueryExecOption) + query (str): string containing SQL query to be executed + exit_on_error (boolean): Exit right after an error encountered. + executor_name (str): eg. "hive" + result (QueryResult): Contains the result after execute method is called. + """ + + def __init__(self, name, query, func, config, exit_on_error): self.exec_func = func self.exec_config = config self.query = query @@ -163,7 +208,15 @@ class QueryExecutor(object): return self.__result def establish_beeswax_connection(query, query_config): - """Establish a connection to the user specified impalad""" + """Establish a connection to the user specified impalad. + + Args: + query_config (QueryExecConfig) + + Returns: + (boolean, ImpalaBeeswaxClient): True if successful + """ + # TODO: Make this generic, for hive etc. use_kerberos = query_config.use_kerberos client = ImpalaBeeswaxClient(query_config.impalad, use_kerberos=use_kerberos) @@ -177,8 +230,16 @@ def establish_beeswax_connection(query, query_config): def execute_using_impala_beeswax(query, query_config): """Executes a query using beeswax. - A new client is created per query, then destroyed. Returns QueryResult() + A new client is created per query, then destroyed. + + Args: + query (str): string containing the query to be executed. + query_config (QueryExecConfig) + + Returns: + QueryResult """ + # Create a client object to talk to impalad exec_result = QueryResult(query, query_config=query_config) plugin_runner = query_config.plugin_runner @@ -203,23 +264,52 @@ def execute_using_impala_beeswax(query, query_config): return construct_exec_result(result, exec_result) def build_context(query, query_config): + """Build context based on query config for plugin_runner. + + Why not pass QueryExecConfig to plugins directly? + + Args: + query (str) + query_config (QueryExecConfig) + + Returns: + dict str -> str + """ + context = vars(query_config) context['query'] = query return context def construct_exec_result(result, exec_result): + """ Transform an ImpalaBeeswaxResult object to a QueryResult object. + + Args: + result (ImpalaBeeswasResult): Tranfers data from here. + exec_result (QueryResult): Transfers data to here. + + Returns: + QueryResult """ - Transform an ImpalaBeeswaxResult object to a QueryResult object. - """ + # Return immedietely if the query failed. if not result.success: return exec_result exec_result.success = True - for attr in ['data', 'runtime_profile', 'start_time', 'time_taken', 'summary']: + attrs = ['data', 'runtime_profile', 'start_time', + 'time_taken', 'summary', 'exec_summary'] + for attr in attrs: setattr(exec_result, attr, getattr(result, attr)) return exec_result def execute_shell_cmd(cmd): - """Executes a command in the shell, pipes the output to local variables""" + """Executes a command in the shell, pipes the output to local variables + + Args: + cmd (str): Command to be executed. + + Returns: + (str, str, str): return code, stdout, stderr + """ + LOG.debug('Executing: %s' % (cmd,)) # Popen needs a list as its first parameter. # The first element is the command, with the rest being arguments. diff --git a/tests/common/scheduler.py b/tests/common/scheduler.py index 2f411cca8..bc98c5879 100644 --- a/tests/common/scheduler.py +++ b/tests/common/scheduler.py @@ -34,12 +34,21 @@ LOG.setLevel(level=logging.DEBUG) class Scheduler(object): """Schedules the submission of workloads across one of more clients. - A workload execution expects the following arguments: - query_executors: A list of initialized query executor objects. - shuffle: Change the order of execution of queries in a workload. By default, the queries - are executed sorted by query name. - num_clients: The degree of parallelism. - impalads: A list of impalads to connect to. Ignored when the executor is hive. + Args: + query_executors (list of QueryExecutor): the objects should be initialized. + shuffle (boolean): If True, change the order of execution of queries in a workload. + By default, the queries are executed sorted by query name. + num_clients (int): Number of concurrent clients. + impalads (list of str): A list of impalads to connect to. Ignored when the executor + is hive. + + Attributes: + query_executors (list of QueryExecutor): initialized query executors + shuffle (boolean): shuffle query executors + iterations (int): number of iterations ALL query executors will run + query_iterations (int): number of times each query executor will execute + impalads (list of str?): list of impalads for execution. It is rotated after each execution. + num_clients (int): Number of concurrent clients """ def __init__(self, **kwargs): self.query_executors = kwargs.get('query_executors') @@ -77,7 +86,12 @@ class Scheduler(object): return self.impalads[-1] def __run_queries(self, thread_num): - """Runs the list of query executors""" + """This method is run by every thread concurrently. + + Args: + thread_num (int): Thread number. Used for setting the client name in the result. + """ + # each thread gets its own copy of query_executors query_executors = deepcopy(sorted(self.query_executors, key=lambda x: x.query.name)) for j in xrange(self.iterations): diff --git a/tests/common/workload.py b/tests/common/workload.py index 8313a4eff..1ab03c1e4 100644 --- a/tests/common/workload.py +++ b/tests/common/workload.py @@ -27,7 +27,18 @@ class Workload(object): A workload is the internal representation for the set of queries on a dataset. It consists of the dataset name, and a mapping of query names to query strings. + + Args: + name (str): workload name. (Eg. tpch) + query_name_filters (list of str): List of regular expressions used for matching query + names + + Attributes: + name (str): workload name (Eg. tpch) + __query_map (dict): contains a query name -> string mapping; mapping of query name to + section (ex. "TPCH-Q10" -> "select * from...") """ + WORKLOAD_DIR = os.environ['IMPALA_WORKLOAD_DIR'] def __init__(self, name, query_name_filters=None): @@ -82,7 +93,15 @@ class Workload(object): Transform all the queries in the workload's query map to query objects based on the input test vector and scale factor. + + Args: + test_vector (?): query vector + scale_factor (str): eg. "300gb" + + Returns: + (list of Query): these will be consumed by ? """ + queries = list() for query_name, query_str in self.__query_map.iteritems(): queries.append(Query(name=query_name, diff --git a/tests/common/workload_runner.py b/tests/common/workload_runner.py index 168f75f5b..3805225ae 100755 --- a/tests/common/workload_runner.py +++ b/tests/common/workload_runner.py @@ -37,6 +37,19 @@ class WorkloadRunner(object): Internally, for each workload, this module looks up and parses that workload's query files and reads the workload's test vector to determine what combination(s) of file format / compression to run with. + + Args: + workload (Workload) + scale_factor (str): eg. "300gb" + config (WorkloadConfig) + + Attributes: + workload (Workload) + scale_factor (str): eg. "300gb" + config (WorkloadConfig) + exit_on_error (boolean) + results (list of QueryResult) + __test_vectors (list of ?) """ def __init__(self, workload, scale_factor, config): self.workload = workload @@ -106,7 +119,7 @@ class WorkloadRunner(object): self.exit_on_error) query_executors.append(query_executor) # Initialize the scheduler. - scheduler= Scheduler(query_executors=query_executors, + scheduler = Scheduler(query_executors=query_executors, shuffle=self.config.shuffle_queries, iterations=self.config.workload_iterations, query_iterations=self.config.query_iterations, diff --git a/tests/util/plugin_runner.py b/tests/util/plugin_runner.py index 07ec46991..2277414b5 100644 --- a/tests/util/plugin_runner.py +++ b/tests/util/plugin_runner.py @@ -36,38 +36,38 @@ class PluginRunner(object): ''' def __init__(self, plugin_infos): - self._available_modules = self._get_plugin_modules() - self._get_plugins_from_modules(plugin_infos) + self.__available_modules = self.__get_plugin_modules() + self.__get_plugins_from_modules(plugin_infos) @property def plugins(self): - return self._plugins + return self.__plugins def __getstate__(self): state = self.__dict__.copy() - del state['_available_modules'] + del state['__available_modules'] return state - def _get_plugin_modules(self): + def __get_plugin_modules(self): ''' Gets all the modules in the directory and imports them''' modules = pkgutil.iter_modules(path=[PLUGIN_DIR]) available_modules = [] for loader, mod_name, ispkg in modules: yield __import__("tests.benchmark.plugins.%s" % mod_name, fromlist=[mod_name]) - def _get_plugins_from_modules(self, plugin_infos): + def __get_plugins_from_modules(self, plugin_infos): '''Look for user specified plugins in the available modules.''' - self._plugins = [] + self.__plugins = [] plugin_names = [] - for module in self._available_modules: + for module in self.__available_modules: for plugin_info in plugin_infos: - plugin_name, scope = self._get_plugin_info(plugin_info) + plugin_name, scope = self.__get_plugin_info(plugin_info) plugin_names.append(plugin_name) if hasattr(module, plugin_name): - self._plugins.append(getattr(module, plugin_name)(scope=scope.lower())) + self.__plugins.append(getattr(module, plugin_name)(scope=scope.lower())) # The plugin(s) that could not be loaded are captured in the set difference # between plugin_names and self.__plugins - plugins_found = [p.__name__ for p in self._plugins] + plugins_found = [p.__name__ for p in self.__plugins] LOG.debug("Plugins found: %s" % ', '.join(plugins_found)) plugins_not_found = set(plugin_names).difference(plugins_found) # If the user's entered a plugin that does not exist, raise an error. @@ -75,7 +75,7 @@ class PluginRunner(object): msg = "Plugin(s) not found: %s" % (','.join(list(plugins_not_found))) raise RuntimeError, msg - def _get_plugin_info(self, plugin_info): + def __get_plugin_info(self, plugin_info): info = plugin_info.split(':') if len(info) == 1: return info[0], 'query' @@ -85,20 +85,20 @@ class PluginRunner(object): raise ValueError("Plugin names specified in the form [:]") def print_plugin_names(self): - for p in self._plugins: + for p in self.__plugins: LOG.debug("Plugin: %s, Scope: %s" % (p.__name__, p.scope)) def run_plugins_pre(self, context=None, scope=None): - if len(self._plugins) == 0: return + if len(self.__plugins) == 0: return if context: context['scope'] = scope - for p in self._plugins: + for p in self.__plugins: if not scope or p.scope == scope.lower(): LOG.debug('Running pre-hook for %s at scope %s' % (p.__name__, scope)) p.run_pre_hook(context=context) def run_plugins_post(self, context=None, scope=None): - if len(self._plugins) == 0: return - for p in self._plugins: + if len(self.__plugins) == 0: return + for p in self.__plugins: if not scope or p.scope == scope.lower(): LOG.debug('Running post-hook for %s at scope %s' % (p.__name__, scope)) p.run_post_hook(context=context)