A few small fixes

Queries now return rows on both our small (query test) data set as well as the 10TB
data set. This change also fixes a problem with python not being set properly and
adds support for reporting query results using the geometric mean

Change-Id: Ia432148d96645ecda3f63900b3bfbd29c706d886
This commit is contained in:
Lenni Kuff
2012-10-08 17:54:27 -07:00
committed by Henry Robinson
parent d2442a3b71
commit 231b66f37f
4 changed files with 55 additions and 17 deletions

3
.gitignore vendored
View File

@@ -5,8 +5,7 @@ thirdparty
cscope.files
cscope.out
org.eclipse.jdt.core.prefs
benchmark_results.csv
reference_benchmark_results.csv
*benchmark_results.csv*
load-trevni-*-generated.sh
load-*-generated.sql
bin/version.info

View File

@@ -57,8 +57,7 @@ export HBASE_CONF_DIR=$HIVE_CONF_DIR
# set the python path for test modules and beeswax
PYTHONPATH=$IMPALA_HOME:$IMPALA_HOME/shell/gen-py:$HIVE_HOME/lib/py
PYTHONPATH=$IMPALA_HOME/thirdparty/python-thrift-0.7.0:$PYTHONPATH
export PYTHONPATH
export PYTHONPATH=$PYTHONPATH:$IMPALA_HOME/thirdparty/python-thrift-0.7.0
# These arguments are, despite the name, passed to every JVM created
# by an impalad. So they must point to the location of

View File

@@ -20,7 +20,6 @@ import csv
import math
import os
import sys
from perf_result_datastore import PerfResultDataStore
from datetime import date, datetime
from itertools import groupby
from optparse import OptionParser
@@ -103,7 +102,7 @@ def find_matching_row_in_reference_results(search_row, reference_results):
for row in reference_results:
if not row:
continue;
if (row[QUERY_IDX] == search_row[QUERY_IDX] and
if (row[QUERY_NAME_IDX] == search_row[QUERY_NAME_IDX] and
row[FILE_FORMAT_IDX] == search_row[FILE_FORMAT_IDX] and
row[COMPRESSION_IDX] == search_row[COMPRESSION_IDX] and
row[SCALE_FACTOR_IDX] == search_row[SCALE_FACTOR_IDX] and
@@ -120,6 +119,12 @@ def calculate_speedup(reference, actual):
def calculate_impala_hive_speedup(row):
return calculate_speedup(row[HIVE_AVG_IDX], row[AVG_IDX])
def calculate_geomean(times):
""" Calculates the geometric mean of the given collection of numerics """
if len(times) > 0:
return (reduce(lambda x, y: float(x) * float(y), times)) ** (1.0 / len(times))
return 'N/A'
def build_table_header(verbose):
table_header = ['File Format', 'Compression', 'Avg(s)', 'StdDev(s)']
if verbose:
@@ -158,6 +163,28 @@ def build_table(results, verbose, reference_results = None):
output += "-" * TOTAL_WIDTH + '\n\n'
return output
def geometric_mean_execution_time(results):
"""
Returns the geometric mean of the average execution times
Returns three sets of numbers - the mean of all the Impala times, the mean of the
Impala times that have matching hive results, and the mean of the hive results.
"""
impala_avgs = []
impala_avgs_with_hive_match = []
hive_avgs = []
for row in results:
impala_avg, hive_avg = (row[AVG_IDX], row[HIVE_AVG_IDX])
if impala_avg != 'N/A':
impala_avgs.append(impala_avg)
if hive_avg != 'N/A':
impala_avgs_with_hive_match.append(impala_avg)
hive_avgs.append(hive_avg)
return calculate_geomean(impala_avgs),\
calculate_geomean(impala_avgs_with_hive_match),\
calculate_geomean(hive_avgs)
# Returns the sum of the average execution times for the given result
# collection
def sum_avg_execution_time(results):
@@ -177,6 +204,13 @@ def sum_execution_time_by_key(results, key):
execution_results[key] = (sum_avg_execution_time(group))
return execution_results
def geometric_mean_execution_time_by_key(results, key):
results.sort(key = key)
execution_results = dict()
for key, group in groupby(results, key=key):
execution_results[key] = (geometric_mean_execution_time(group))
return execution_results
# Returns dictionary of column_value to sum of the average times grouped by the specified
# column index
def sum_execution_time_by_col_idx(results, column_index):
@@ -191,9 +225,9 @@ def sum_execution_by_query(results):
def sum_execution_by_compression(results):
return sum_execution_time_by_col_idx(results, COMPRESSION_IDX)
def sum_execution_by_file_format_compression(results):
def geometric_mean_by_file_format_compression(results):
key = lambda x: (x[FILE_FORMAT_IDX], x[COMPRESSION_IDX])
return sum_execution_time_by_key(results, key)
return geometric_mean_execution_time_by_key(results, key)
# Writes perf tests results in a "fake" JUnit output format. The main use case for this
# is so the Jenkins Perf plugin can be leveraged to report results. We create a few
@@ -336,12 +370,14 @@ if not options.no_output_table:
summary += ' ' + build_padded_row_string(header, COL_WIDTH) + '\n'
# Calculate execution details for each workload/scale factor
for file_format_compression, times in sum_execution_by_file_format_compression(
for file_format_compression, times in geometric_mean_by_file_format_compression(
filtered_results).iteritems():
file_format, compression = file_format_compression
impala_avg, hive_avg = times
impala_speedup = format_if_float(calculate_speedup(hive_avg, impala_avg)) +\
'X' if hive_avg != 0 else 'N/A'
impala_avg, impala_with_hive_match_avg, hive_avg = times
impala_speedup = format_if_float(
calculate_speedup(hive_avg, impala_with_hive_match_avg)) +\
'X' if hive_avg != 'N/A' else 'N/A'
summary += ' ' + build_padded_row_string(
[file_format, compression, impala_avg, impala_speedup], COL_WIDTH) + '\n'
summary += '\n'
@@ -362,4 +398,5 @@ if options.junit_output_file:
if options.save_to_db:
print 'Saving perf results to database'
from perf_result_datastore import PerfResultDataStore
write_results_to_datastore(results)

View File

@@ -274,10 +274,14 @@ def build_query(query_format_string, file_format, codec, compression_type,
"""
database_name = database_name_to_use(workload, scale_factor)
table_suffix = build_table_suffix(file_format, codec, compression_type)
# $TABLE is used as a token for table suffix in the queries. Here we insert the proper
# database name based on the workload and query.
replace_from = '(\w+\.){0,1}(?P<table_name>\w+)\$TABLE'
replace_by = '%s%s%s' % (database_name, r'\g<table_name>', table_suffix)
# $TABLE is used as a token for table suffix in the queries. Here we replace the token
# the proper database name based on the workload and scale factor.
# There also may be cases where there is dbname.table_name without a
# $TABLE (in the case of insert).
replace_from =\
'(%(workload)s\.)(?P<table_name>\w+)(\$TABLE){0,1}' % {'workload': workload}
replace_by = '%s%s' % (database_name, r'\g<table_name>')
return re.sub(replace_from, replace_by, query_format_string)
def read_vector_file(file_name):
@@ -412,7 +416,6 @@ def execute_queries(query_map, workload, scale_factor, vector_row):
query_name = query
query_string = build_query(query.strip(), file_format, codec, compression_type,
workload, scale_factor)
execution_result = QueryExecutionResult()
if not options.skip_impala:
summary += "Results Using Impala:\n"