Update run-workload to gather, display, and archive runtime profiles for each workload query

This commit is contained in:
Lenni Kuff
2013-03-18 13:41:51 -07:00
committed by Henry Robinson
parent 5ed84d7f65
commit 129c6473b9
6 changed files with 50 additions and 42 deletions

View File

@@ -131,7 +131,8 @@ def append_row_to_csv_file(csv_writer, query, query_name, result):
compression_str = 'none'
csv_writer.writerow([result.executor, result.workload, result.scale_factor,
query, query_name, result.file_format, compression_str,
avg_time, std_dev, options.num_clients, options.iterations])
avg_time, std_dev, options.num_clients, options.iterations,
result.execution_result.runtime_profile])
def enumerate_query_files(base_directory):
"""

View File

@@ -43,7 +43,7 @@ class ImpalaBeeswaxException(Exception):
# Encapsulates a typical query result.
class QueryResult(object):
def __init__(self, query=None, success=False, data=None, schema=None,
time_taken=0, summary=''):
time_taken=0, summary='', runtime_profile=str()):
self.query = query
self.success = success
# Insert returns an int, convert into list to have a uniform data type.
@@ -55,6 +55,7 @@ class QueryResult(object):
self.time_taken = time_taken
self.summary = summary
self.schema = schema
self.runtime_profile = runtime_profile
def get_data(self):
return self.__format_data()
@@ -65,12 +66,13 @@ class QueryResult(object):
return ''
def __str__(self):
message = ('Success: %s'
'Took: %s s\n'
'Summary: %s\n'
'Data:\n%s'
% (self.success,self.time_taken,
self.summary, self.__format_data())
message = ('Summary: %s\n'
'Success: %s\n'
'Took: %s(s)\n'
'Data:\n%s\n'
'Runtime Profile:\n%s\n'
% (self.summary, self.success, self.time_taken,
self.__format_data(), self.runtime_profile)
)
return message
@@ -151,8 +153,13 @@ class ImpalaBeeswaxClient(object):
handle = self.__execute_query(query_string.strip())
result = self.fetch_results(query_string, handle)
result.time_taken = time.time() - start
# Don't include the time it takes to get the runtime profile in the execution time
result.runtime_profile = self.get_runtime_profile(handle)
return result
def get_runtime_profile(self, handle):
return self.__do_rpc(lambda: self.imp_service.GetRuntimeProfile(handle))
def execute_query_async(self, query_string):
"""
Executes a query asynchronously

View File

@@ -15,6 +15,7 @@ CREATE TABLE ExecutionResults (
workload_id BIGINT NOT NULL,
file_type_id BIGINT NOT NULL,
num_clients INT NOT NULL DEFAULT 1,
num_iterations INT NOT NULL DEFAULT 1,
cluster_name char(255),
executor_name char(255),
avg_time double NULL,
@@ -22,7 +23,8 @@ CREATE TABLE ExecutionResults (
run_date DATETIME,
version char(255),
notes TEXT,
is_official BOOLEAN DEFAULT FALSE, -- Is this an official result
profile TEXT, -- The query runtime profile
is_official BOOLEAN DEFAULT FALSE, -- True if this an official result
PRIMARY KEY (result_id)
);

View File

@@ -46,11 +46,11 @@ class PerfResultDataStore(object):
def insert_execution_result(self, query_id, workload_id, file_type_id, num_clients,
cluster_name, executor_name, avg_time, stddev, run_date, version, notes,
run_info_id, num_iterations, is_official=False):
run_info_id, num_iterations, runtime_profile, is_official=False):
""" Inserts a perf execution result record """
return self.__insert_execution_result(query_id, workload_id, file_type_id,
num_clients, cluster_name, executor_name, avg_time, stddev, run_date, version,
notes, run_info_id, num_iterations, is_official)
notes, run_info_id, num_iterations, runtime_profile, is_official)
def print_execution_results(self, run_info_id):
""" Prints results that were inserted for the given run_info_id """
@@ -106,14 +106,14 @@ class PerfResultDataStore(object):
@cursor_wrapper
def __insert_execution_result(self, query_id, workload_id, file_type_id, num_clients,
cluster_name, executor_name, avg_time, stddev, run_date, version, notes,
run_info_id, num_iterations, is_official, cursor):
run_info_id, num_iterations, runtime_profile, is_official, cursor):
result = cursor.execute("insert into ExecutionResults (run_info_id, query_id, "\
"workload_id, file_type_id, num_clients, cluster_name, executor_name, avg_time,"\
" stddev, run_date, version, notes, num_iterations, is_official) values "\
"(%d, %d, %d, %d, %d, '%s', '%s', %s, %s, '%s', '%s', '%s', %d, %s)" %\
" stddev, run_date, version, notes, num_iterations, profile, is_official) values"\
"(%d, %d, %d, %d, %d, '%s', '%s', %s, %s, '%s', '%s', '%s', %d, '%s', %s)" %\
(run_info_id, query_id, workload_id, file_type_id, num_clients, cluster_name,
executor_name, avg_time, stddev, run_date, version, notes, num_iterations,
is_official))
runtime_profile, is_official))
@cursor_wrapper
def __insert_query_info(self, name, query, cursor):

View File

@@ -87,8 +87,14 @@ AVG_IDX = 7
STDDEV_IDX = 8
NUM_CLIENTS_IDX = 9
NUM_ITERS_IDX = 10
HIVE_AVG_IDX = 11
HIVE_STDDEV_IDX = 12
RUNTIME_PROFILE_IDX = 11
HIVE_AVG_IDX = 12
HIVE_STDDEV_IDX = 13
SPEEDUP_IDX = 14
# These are the column indexes that will be displayed as part of the perf result table.
TABLE_COL_IDXS = range(FILE_FORMAT_IDX, NUM_ITERS_IDX + 1) +\
[HIVE_AVG_IDX, HIVE_STDDEV_IDX, SPEEDUP_IDX]
# Formats a string so that is is wrapped across multiple lines with no single line
# being longer than the given width
@@ -155,10 +161,10 @@ def build_table(results, verbose, reference_results = None):
for query_group, group in groupby(results, key = sort_key):
output += 'Query: ' + wrap_text(query_group, TOTAL_WIDTH) + '\n'
table = texttable.Texttable(max_width=TOTAL_WIDTH)
table.set_deco(table.HEADER | table.VLINES | table.BORDER)
table.header(build_table_header(verbose))
table.set_deco(table.HEADER | table.VLINES | table.BORDER)
# Add reach result to the output table
# Add each result to the output table
for row in group:
full_row = list(row)
# Don't show the hive execution times in verbose mode.
@@ -186,7 +192,9 @@ def build_table(results, verbose, reference_results = None):
format_if_float(calculate_speedup(ref_row[AVG_IDX], full_row[AVG_IDX]))
full_row[AVG_IDX] = format_if_float(full_row[AVG_IDX])
full_row[AVG_IDX] = full_row[AVG_IDX] + ' (%sX)' % speedup
table.add_row(full_row[FILE_FORMAT_IDX:])
test_row = [col for i, col in enumerate(full_row) if i in TABLE_COL_IDXS]
table.add_row(test_row)
output += table.draw() + '\n'
return output, perf_changes
@@ -309,26 +317,12 @@ def read_csv_result_file(file_name):
results = []
for row in csv.reader(open(file_name, 'rb'), delimiter='|'):
# Backwards compatibility:
# Older results sets may not have num_clients, so default to 1. Older results also
# may not contain num_iterations, so fill that in if needed.
# TODO: This can be removed once all results are in the new format.
if len(row) == STDDEV_IDX + 1:
row.append('1')
row.append(get_num_iterations())
elif len(row) == NUM_CLIENTS_IDX + 1:
row.append(get_num_iterations())
# Older results may not have runtime profile, so fill this in if detected.
if len(row) == NUM_ITERS_IDX + 1:
row.append("No profile available")
results.append(row)
return results
def get_num_iterations():
# This is for backwards compatibility only - older results may not contain the
# num_iterations record. In this case try to get it from the report description. If it
# is not available there, default to 2 iterations which is the minimum for all current
# runs.
description = options.report_description if options.report_description else str()
match = re.search(r'Iterations: (\d+)', description)
return 2 if match is None else match.group(1)
def filter_sort_results(results, workload, scale_factor, key):
filtered_res = [result for result in results if (
result[WORKLOAD_IDX] == workload and result[SCALE_FACTOR_IDX] == scale_factor)]
@@ -382,7 +376,8 @@ def write_results_to_datastore(results):
executor_name=executor, avg_time=avg_time, stddev=stddev,
run_date=current_date, version=options.build_version,
notes=options.report_description, run_info_id=run_info_id,
num_iterations=int(row[NUM_ITERS_IDX]), is_official=options.is_official)
num_iterations=int(row[NUM_ITERS_IDX]), runtime_profile=row[RUNTIME_PROFILE_IDX],
is_official=options.is_official)
def build_summary_header():
summary = "Execution Summary (%s)\n" % date.today()

View File

@@ -37,12 +37,14 @@ hive_result_regex = 'Time taken: (\d*).(\d*) seconds'
# Contains details about the execution result of a query
class QueryExecutionResult(object):
def __init__(self, avg_time=None, std_dev=None, data=None, note=None):
def __init__(self, avg_time=None, std_dev=None, data=None, note=None,
runtime_profile=str()):
self.avg_time = avg_time
self.std_dev = std_dev
self.__note = note
self.success = False
self.data = data
self.runtime_profile = runtime_profile
def set_result_note(self, note):
self.__note = note
@@ -191,7 +193,7 @@ def execute_using_impala_beeswax(query, query_options):
return exec_result
results.append(result)
# We only need to print the results for a successfull run, not all.
LOG.debug('Data:\n%s\n' % results[0].get_data())
LOG.debug('Result:\n%s\n' % results[0])
client.close_connection()
# get rid of the client object
del client
@@ -199,7 +201,8 @@ def execute_using_impala_beeswax(query, query_options):
return construct_execution_result(query_options.iterations, results)
def construct_execution_result(iterations, results):
"""Calculate average running time and standard deviation.
"""
Calculate average running time and standard deviation.
The summary of the first result is used as the summary for the entire execution.
"""
@@ -208,7 +211,7 @@ def construct_execution_result(iterations, results):
exec_result.data = results[0].data
exec_result.beeswax_result = results[0]
exec_result.set_result_note(results[0].summary)
exec_result.runtime_profile = results[0].runtime_profile
# If running more than 2 iterations, throw the first result out. Don't throw away
# the first result if iterations = 2 to preserve the stddev calculation.
if iterations > 2: