From 129c6473b957e5451b776a4e893f9649fa01a11c Mon Sep 17 00:00:00 2001 From: Lenni Kuff Date: Mon, 18 Mar 2013 13:41:51 -0700 Subject: [PATCH] Update run-workload to gather, display, and archive runtime profiles for each workload query --- bin/run-workload.py | 3 +- tests/beeswax/impala_beeswax.py | 21 ++++++---- tests/benchmark/create_perf_result_schema.sql | 4 +- tests/benchmark/perf_result_datastore.py | 12 +++--- tests/benchmark/report-benchmark-results.py | 41 ++++++++----------- tests/common/query_executor.py | 11 +++-- 6 files changed, 50 insertions(+), 42 deletions(-) diff --git a/bin/run-workload.py b/bin/run-workload.py index c9741404a..a9443c358 100755 --- a/bin/run-workload.py +++ b/bin/run-workload.py @@ -131,7 +131,8 @@ def append_row_to_csv_file(csv_writer, query, query_name, result): compression_str = 'none' csv_writer.writerow([result.executor, result.workload, result.scale_factor, query, query_name, result.file_format, compression_str, - avg_time, std_dev, options.num_clients, options.iterations]) + avg_time, std_dev, options.num_clients, options.iterations, + result.execution_result.runtime_profile]) def enumerate_query_files(base_directory): """ diff --git a/tests/beeswax/impala_beeswax.py b/tests/beeswax/impala_beeswax.py index fdb823344..b2ce8b507 100644 --- a/tests/beeswax/impala_beeswax.py +++ b/tests/beeswax/impala_beeswax.py @@ -43,7 +43,7 @@ class ImpalaBeeswaxException(Exception): # Encapsulates a typical query result. class QueryResult(object): def __init__(self, query=None, success=False, data=None, schema=None, - time_taken=0, summary=''): + time_taken=0, summary='', runtime_profile=str()): self.query = query self.success = success # Insert returns an int, convert into list to have a uniform data type. @@ -55,6 +55,7 @@ class QueryResult(object): self.time_taken = time_taken self.summary = summary self.schema = schema + self.runtime_profile = runtime_profile def get_data(self): return self.__format_data() @@ -65,12 +66,13 @@ class QueryResult(object): return '' def __str__(self): - message = ('Success: %s' - 'Took: %s s\n' - 'Summary: %s\n' - 'Data:\n%s' - % (self.success,self.time_taken, - self.summary, self.__format_data()) + message = ('Summary: %s\n' + 'Success: %s\n' + 'Took: %s(s)\n' + 'Data:\n%s\n' + 'Runtime Profile:\n%s\n' + % (self.summary, self.success, self.time_taken, + self.__format_data(), self.runtime_profile) ) return message @@ -151,8 +153,13 @@ class ImpalaBeeswaxClient(object): handle = self.__execute_query(query_string.strip()) result = self.fetch_results(query_string, handle) result.time_taken = time.time() - start + # Don't include the time it takes to get the runtime profile in the execution time + result.runtime_profile = self.get_runtime_profile(handle) return result + def get_runtime_profile(self, handle): + return self.__do_rpc(lambda: self.imp_service.GetRuntimeProfile(handle)) + def execute_query_async(self, query_string): """ Executes a query asynchronously diff --git a/tests/benchmark/create_perf_result_schema.sql b/tests/benchmark/create_perf_result_schema.sql index 68a37064d..1d0aa455c 100644 --- a/tests/benchmark/create_perf_result_schema.sql +++ b/tests/benchmark/create_perf_result_schema.sql @@ -15,6 +15,7 @@ CREATE TABLE ExecutionResults ( workload_id BIGINT NOT NULL, file_type_id BIGINT NOT NULL, num_clients INT NOT NULL DEFAULT 1, + num_iterations INT NOT NULL DEFAULT 1, cluster_name char(255), executor_name char(255), avg_time double NULL, @@ -22,7 +23,8 @@ CREATE TABLE ExecutionResults ( run_date DATETIME, version char(255), notes TEXT, - is_official BOOLEAN DEFAULT FALSE, -- Is this an official result + profile TEXT, -- The query runtime profile + is_official BOOLEAN DEFAULT FALSE, -- True if this an official result PRIMARY KEY (result_id) ); diff --git a/tests/benchmark/perf_result_datastore.py b/tests/benchmark/perf_result_datastore.py index 716349670..1213bee27 100755 --- a/tests/benchmark/perf_result_datastore.py +++ b/tests/benchmark/perf_result_datastore.py @@ -46,11 +46,11 @@ class PerfResultDataStore(object): def insert_execution_result(self, query_id, workload_id, file_type_id, num_clients, cluster_name, executor_name, avg_time, stddev, run_date, version, notes, - run_info_id, num_iterations, is_official=False): + run_info_id, num_iterations, runtime_profile, is_official=False): """ Inserts a perf execution result record """ return self.__insert_execution_result(query_id, workload_id, file_type_id, num_clients, cluster_name, executor_name, avg_time, stddev, run_date, version, - notes, run_info_id, num_iterations, is_official) + notes, run_info_id, num_iterations, runtime_profile, is_official) def print_execution_results(self, run_info_id): """ Prints results that were inserted for the given run_info_id """ @@ -106,14 +106,14 @@ class PerfResultDataStore(object): @cursor_wrapper def __insert_execution_result(self, query_id, workload_id, file_type_id, num_clients, cluster_name, executor_name, avg_time, stddev, run_date, version, notes, - run_info_id, num_iterations, is_official, cursor): + run_info_id, num_iterations, runtime_profile, is_official, cursor): result = cursor.execute("insert into ExecutionResults (run_info_id, query_id, "\ "workload_id, file_type_id, num_clients, cluster_name, executor_name, avg_time,"\ - " stddev, run_date, version, notes, num_iterations, is_official) values "\ - "(%d, %d, %d, %d, %d, '%s', '%s', %s, %s, '%s', '%s', '%s', %d, %s)" %\ + " stddev, run_date, version, notes, num_iterations, profile, is_official) values"\ + "(%d, %d, %d, %d, %d, '%s', '%s', %s, %s, '%s', '%s', '%s', %d, '%s', %s)" %\ (run_info_id, query_id, workload_id, file_type_id, num_clients, cluster_name, executor_name, avg_time, stddev, run_date, version, notes, num_iterations, - is_official)) + runtime_profile, is_official)) @cursor_wrapper def __insert_query_info(self, name, query, cursor): diff --git a/tests/benchmark/report-benchmark-results.py b/tests/benchmark/report-benchmark-results.py index 8ba81d8ce..dc7d37c1d 100755 --- a/tests/benchmark/report-benchmark-results.py +++ b/tests/benchmark/report-benchmark-results.py @@ -87,8 +87,14 @@ AVG_IDX = 7 STDDEV_IDX = 8 NUM_CLIENTS_IDX = 9 NUM_ITERS_IDX = 10 -HIVE_AVG_IDX = 11 -HIVE_STDDEV_IDX = 12 +RUNTIME_PROFILE_IDX = 11 +HIVE_AVG_IDX = 12 +HIVE_STDDEV_IDX = 13 +SPEEDUP_IDX = 14 + +# These are the column indexes that will be displayed as part of the perf result table. +TABLE_COL_IDXS = range(FILE_FORMAT_IDX, NUM_ITERS_IDX + 1) +\ + [HIVE_AVG_IDX, HIVE_STDDEV_IDX, SPEEDUP_IDX] # Formats a string so that is is wrapped across multiple lines with no single line # being longer than the given width @@ -155,10 +161,10 @@ def build_table(results, verbose, reference_results = None): for query_group, group in groupby(results, key = sort_key): output += 'Query: ' + wrap_text(query_group, TOTAL_WIDTH) + '\n' table = texttable.Texttable(max_width=TOTAL_WIDTH) - table.set_deco(table.HEADER | table.VLINES | table.BORDER) table.header(build_table_header(verbose)) + table.set_deco(table.HEADER | table.VLINES | table.BORDER) - # Add reach result to the output table + # Add each result to the output table for row in group: full_row = list(row) # Don't show the hive execution times in verbose mode. @@ -186,7 +192,9 @@ def build_table(results, verbose, reference_results = None): format_if_float(calculate_speedup(ref_row[AVG_IDX], full_row[AVG_IDX])) full_row[AVG_IDX] = format_if_float(full_row[AVG_IDX]) full_row[AVG_IDX] = full_row[AVG_IDX] + ' (%sX)' % speedup - table.add_row(full_row[FILE_FORMAT_IDX:]) + + test_row = [col for i, col in enumerate(full_row) if i in TABLE_COL_IDXS] + table.add_row(test_row) output += table.draw() + '\n' return output, perf_changes @@ -309,26 +317,12 @@ def read_csv_result_file(file_name): results = [] for row in csv.reader(open(file_name, 'rb'), delimiter='|'): # Backwards compatibility: - # Older results sets may not have num_clients, so default to 1. Older results also - # may not contain num_iterations, so fill that in if needed. - # TODO: This can be removed once all results are in the new format. - if len(row) == STDDEV_IDX + 1: - row.append('1') - row.append(get_num_iterations()) - elif len(row) == NUM_CLIENTS_IDX + 1: - row.append(get_num_iterations()) + # Older results may not have runtime profile, so fill this in if detected. + if len(row) == NUM_ITERS_IDX + 1: + row.append("No profile available") results.append(row) return results -def get_num_iterations(): - # This is for backwards compatibility only - older results may not contain the - # num_iterations record. In this case try to get it from the report description. If it - # is not available there, default to 2 iterations which is the minimum for all current - # runs. - description = options.report_description if options.report_description else str() - match = re.search(r'Iterations: (\d+)', description) - return 2 if match is None else match.group(1) - def filter_sort_results(results, workload, scale_factor, key): filtered_res = [result for result in results if ( result[WORKLOAD_IDX] == workload and result[SCALE_FACTOR_IDX] == scale_factor)] @@ -382,7 +376,8 @@ def write_results_to_datastore(results): executor_name=executor, avg_time=avg_time, stddev=stddev, run_date=current_date, version=options.build_version, notes=options.report_description, run_info_id=run_info_id, - num_iterations=int(row[NUM_ITERS_IDX]), is_official=options.is_official) + num_iterations=int(row[NUM_ITERS_IDX]), runtime_profile=row[RUNTIME_PROFILE_IDX], + is_official=options.is_official) def build_summary_header(): summary = "Execution Summary (%s)\n" % date.today() diff --git a/tests/common/query_executor.py b/tests/common/query_executor.py index ccc566ebb..213eecea2 100644 --- a/tests/common/query_executor.py +++ b/tests/common/query_executor.py @@ -37,12 +37,14 @@ hive_result_regex = 'Time taken: (\d*).(\d*) seconds' # Contains details about the execution result of a query class QueryExecutionResult(object): - def __init__(self, avg_time=None, std_dev=None, data=None, note=None): + def __init__(self, avg_time=None, std_dev=None, data=None, note=None, + runtime_profile=str()): self.avg_time = avg_time self.std_dev = std_dev self.__note = note self.success = False self.data = data + self.runtime_profile = runtime_profile def set_result_note(self, note): self.__note = note @@ -191,7 +193,7 @@ def execute_using_impala_beeswax(query, query_options): return exec_result results.append(result) # We only need to print the results for a successfull run, not all. - LOG.debug('Data:\n%s\n' % results[0].get_data()) + LOG.debug('Result:\n%s\n' % results[0]) client.close_connection() # get rid of the client object del client @@ -199,7 +201,8 @@ def execute_using_impala_beeswax(query, query_options): return construct_execution_result(query_options.iterations, results) def construct_execution_result(iterations, results): - """Calculate average running time and standard deviation. + """ + Calculate average running time and standard deviation. The summary of the first result is used as the summary for the entire execution. """ @@ -208,7 +211,7 @@ def construct_execution_result(iterations, results): exec_result.data = results[0].data exec_result.beeswax_result = results[0] exec_result.set_result_note(results[0].summary) - + exec_result.runtime_profile = results[0].runtime_profile # If running more than 2 iterations, throw the first result out. Don't throw away # the first result if iterations = 2 to preserve the stddev calculation. if iterations > 2: