Updated Impala performance schema and test vector generation

This change updates the Impala performance schema and test vector generation
techniques. It also migrates the existing benchmark scripts that were Ruby over
to use Python. The changes has a few parts:

1) Conversion of test vector generation and benchmark statement generation from
Ruby to Python. A result of this was also to update the benchmark test vector
and dimension files to be written in CSV format (python doesn't have built-in
YAML support)

2) Standardize on the naming for benchmark tables to (somewhat match Query
tests). In general the form is:
* If file_format=text and compression=none, do not use a        table suffix
* Abbreviate sequence file as (seq) rc file as (rc) etc
* If using BLOCK compression don't append anything to table name, if using
 'record' append 'record'

3) Created a new way to adding new schemas. this is the
benchmark_schema_template.sql file. The generate_benchmark_statements.py script
reads this in and breaks up the sections. The section format is:
====
Data Set Name
---
BASE table name
---
CREATE STATEMENT Template
---
INSERT ... SELECT * format
---
LOAD Base statement
---
LOAD STATEMENT Format

Where BASE Table is a table the other file formats/compression types can be
generated from. This would generally be a local file.

The thinking is that if the files already exist in HDFS then we can just load
the file directly rather than issue an INSERT ... SELECT * statement. The
generate_benchmark_statements.py script has been updated to use this new
template as well as query HDFS for each table to determine how it should be
created. It then outputs an ideal file call load-benchmark-*-generated.sql.
Since this file is geneated dynamically we can remove the old benchmark
statement files.

4) This has been hooked into load-benchmark-data.sh and run_query has been
updated to use the new format as well
This commit is contained in:
Lenni Kuff
2012-07-05 17:44:58 -07:00
parent e4d3fa74fb
commit 0da77037e3
26 changed files with 560 additions and 812 deletions

2
.gitignore vendored
View File

@@ -8,6 +8,8 @@ org.eclipse.jdt.core.prefs
benchmark_results.csv
reference_benchmark_results.csv
testdata/data/test-warehouse
testdata/bin/create-benchmark-*-generated.sql
testdata/bin/load-benchmark-*-generated.sql
pprof.out

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env bash
# Copyright (c) 2011 Cloudera, Inc. All rights reserved.
# Copyright (c) 2012 Cloudera, Inc. All rights reserved.
#
# Script that creates schema and loads data into hive for running benchmarks.
# By default the script will load the base data for the "core" scenario.
# If 'pairwise' is specified as a parameter the pairwise combinations of workload
@@ -14,26 +14,34 @@ bin=`cd "$bin"; pwd`
set -e
exploration_strategy=core
if [ $1 ]; then
exploration_strategy=$1
fi
BENCHMARK_SCRIPT_DIR=$IMPALA_HOME/testdata/bin
function execute_hive_query_from_file {
hive_args="-hiveconf hive.root.logger=WARN,console -v -f"
"$HIVE_HOME/bin/hive" $hive_args $1
hive_args="-hiveconf hive.root.logger=WARN,console -v -f"
"$HIVE_HOME/bin/hive" $hive_args $1
}
execute_hive_query_from_file "$IMPALA_HOME/testdata/bin/create-benchmark.sql"
execute_hive_query_from_file "$IMPALA_HOME/testdata/bin/load-benchmark.sql"
pushd "$IMPALA_HOME/testdata/bin";
./generate_benchmark_statements.py --exploration_strategy $exploration_strategy
popd
if [ "$1" = "exhaustive" ]
then
execute_hive_query_from_file "$IMPALA_HOME/testdata/bin/create-benchmark-exhaustive-generated.sql"
execute_hive_query_from_file "$IMPALA_HOME/testdata/bin/load-benchmark-exhaustive-generated.sql"
elif [ "$1" = "pairwise" ]
then
execute_hive_query_from_file "$IMPALA_HOME/testdata/bin/create-benchmark-pairwise-generated.sql"
execute_hive_query_from_file "$IMPALA_HOME/testdata/bin/load-benchmark-pairwise-generated-sql"
if [ "$exploration_strategy" = "exhaustive" ]; then
execute_hive_query_from_file "$BENCHMARK_SCRIPT_DIR/create-benchmark-exhaustive-generated.sql"
execute_hive_query_from_file "$BENCHMARK_SCRIPT_DIR/load-benchmark-exhaustive-generated.sql"
elif [ "$exploration_strategy" = "pairwise" ]; then
execute_hive_query_from_file "$BENCHMARK_SCRIPT_DIR/create-benchmark-pairwise-generated.sql"
execute_hive_query_from_file "$BENCHMARK_SCRIPT_DIR/load-benchmark-pairwise-generated-sql"
elif [ "$exploration_strategy" = "core" ]; then
execute_hive_query_from_file "$BENCHMARK_SCRIPT_DIR/create-benchmark-core-generated.sql"
execute_hive_query_from_file "$BENCHMARK_SCRIPT_DIR/load-benchmark-core-generated.sql"
else
execute_hive_query_from_file "$IMPALA_HOME/testdata/bin/create-benchmark-core-generated.sql"
execute_hive_query_from_file "$IMPALA_HOME/testdata/bin/load-benchmark-core-generated.sql"
echo "Invalid exploration strategy: $exploration_strategy"
exit 1
fi
$IMPALA_HOME/testdata/bin/generate-block-ids.sh

View File

@@ -38,19 +38,19 @@ then
# Run sample queries - outputs .gcda files
be/build/release/service/runquery -query="\
select count(field) from grep1gb_text_none where field like '%xyz%';\
select count(field) from grep1gb_rc_file_none where field like '%xyz%';\
select count(field) from grep1gb_sequence_file_snappy where field like '%xyz%';\
select sourceIP, SUM(adRevenue) FROM uservisits_web_sequence_file_none \
select count(field) from grep1gb where field like '%xyz%';\
select count(field) from grep1gb_rc_file where field like '%xyz%';\
select count(field) from grep1gb_seq_snap where field like '%xyz%';\
select sourceIP, SUM(adRevenue) FROM uservisits_web_seq \
GROUP by sourceIP order by SUM(adRevenue) desc limit 10;\
select sourceIP, SUM(adRevenue) FROM uservisits_web_text_none \
select sourceIP, SUM(adRevenue) FROM uservisits \
GROUP by sourceIP order by SUM(adRevenue) desc limit 10;\
select sourceIP, SUM(adRevenue) FROM uservisits_web_rc_file_none GROUP by sourceIP \
select sourceIP, SUM(adRevenue) FROM uservisits_web_rc GROUP by sourceIP \
order by SUM(adRevenue) desc limit 10;select sourceIP, SUM(adRevenue) \
FROM uservisits_web_text_none \
FROM uservisits \
GROUP by sourceIP order by SUM(adRevenue) desc limit 10;\
select uv.sourceip, avg(r.pagerank), sum(uv.adrevenue) as totalrevenue \
from uservisits_web_text_none uv join rankings_web_text_none r on \
from uservisits uv join rankings r on \
(r.pageurl = uv.desturl) \
where uv.visitdate > '1999-01-01' and uv.visitdate < '2000-01-01' \
group by uv.sourceip order by totalrevenue desc limit 1"\

View File

@@ -226,6 +226,7 @@ def run_query(query, prime_buffer_cache, iterations):
if not run_success:
print "Query did not run succesfully"
print "Failed Query: %s\n" % (query)
query_output.seek(0)
query_err.seek(0)
for line in query_output.readlines():
@@ -252,12 +253,29 @@ def run_query(query, prime_buffer_cache, iterations):
return [output, execution_result]
def choose_input_vector_file_name(exploration_strategy):
return "benchmark_%s.vector" % exploration_strategy
return "benchmark_%s.csv" % exploration_strategy
def build_query(
query_format_string, exploration_strategy, data_set, file_format, compression):
table_name = "%s_%s_%s" % (data_set, file_format, compression)
return query_format_string % {'table_name': table_name}
def build_table_suffix(file_format, codec, compression_type):
if file_format == 'text' and codec == 'none':
return ''
elif codec == 'none':
return '_%s' % (file_format)
elif compression_type == 'record':
return '_%s_%s_record' % (file_format, codec)
else:
return '_%s_%s' % (file_format, codec)
def build_query(query_format_string, exploration_strategy, data_set,
file_format, codec, compression_type):
table_suffix = build_table_suffix(file_format, codec, compression_type)
return query_format_string % {'table_suffix': table_suffix}
def read_csv_vector_file(file_name):
results = []
with open(file_name, 'rb') as vector_file:
for row in csv.reader(vector_file, delimiter=','):
results.append(row)
return results
os.chdir(os.environ['IMPALA_BE_DIR'])
@@ -267,30 +285,30 @@ os.chdir(os.environ['IMPALA_BE_DIR'])
# TODO: it would be good if this table also contained the expected numbers and
# automatically flag regressions. How do we reconcile the fact we are running on
# different machines?
queries = {'grep1GB': [
["select count(*) from %(table_name)s", 1, 5],
["select count(field) from %(table_name)s", 0, 5],
["select count(field) from %(table_name)s where field like '%%xyz%%'", 0, 5]
queries = {'grep1gb': [
["select count(*) from grep1gb%(table_suffix)s", 1, 5],
["select count(field) from grep1gb%(table_suffix)s", 0, 5],
["select count(field) from grep1gb%(table_suffix)s where field like '%%xyz%%'", 0, 5]
],
'web': [
["select uv.sourceip, avg(r.pagerank), sum(uv.adrevenue) as totalrevenue "\
"from uservisits_%(table_name)s uv join rankings_%(table_name)s r on "\
"from uservisits%(table_suffix)s uv join rankings%(table_suffix)s r on "\
"(r.pageurl = uv.desturl) where uv.visitdate > '1999-01-01' and uv.visitdate "\
"< '2000-01-01' group by uv.sourceip order by totalrevenue desc limit 1", 1, 5],
["select sourceIP, SUM(adRevenue) FROM uservisits_%(table_name)s GROUP by sourceIP "\
["select sourceIP, SUM(adRevenue) FROM uservisits%(table_suffix)s GROUP by sourceIP "\
"order by SUM(adRevenue) desc limit 10", 1, 5],
["select pageRank, pageURL from rankings_%(table_name)s where pageRank > 10 "\
["select pageRank, pageURL from rankings%(table_suffix)s where pageRank > 10 "\
"order by pageRank limit 100", 1, 5],
["select count(*) from rankings_%(table_name)s where "\
["select count(*) from rankings%(table_suffix)s where "\
"pageRank > 10 && pageRank < 25", 1, 5],
["select avg(adRevenue) from uservisits_%(table_name)s", 1, 5],
["select avg(adRevenue) from uservisits_%(table_name)s "\
["select avg(adRevenue) from uservisits%(table_suffix)s", 1, 5],
["select avg(adRevenue) from uservisits%(table_suffix)s "\
"where visitdate > '1999-07-01' and visitdate < '1999-12-31'", 1, 5],
],
'grep10GB': [
["select count(field) from %(table_name)s where field like '%%xyz%%'", 0, 1]
'grep10gb': [
["select count(field) from grep10gb%(table_suffix)s where field like '%%xyz%%'", 0, 1]
]
}
@@ -310,16 +328,17 @@ def write_to_csv(result_map, output_csv_file):
# Run all queries
if (len(options.query) == 0):
vector_file = open(
'../testdata/bin/' + choose_input_vector_file_name(options.exploration_strategy))
vector_file_path = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/',
choose_input_vector_file_name(options.exploration_strategy))
vector = read_csv_vector_file(vector_file_path)
output = ""
result_map = collections.defaultdict(list)
for line in vector_file:
file_format, data_set, compression = line.split()[:3]
for row in vector:
file_format, data_set, codec, compression_type = row[:4]
for query in queries[data_set]:
query_string = build_query(query[0], options.exploration_strategy, data_set,
file_format, compression)
file_format, codec, compression_type)
result = run_query(query_string, query[1], query[2])
output += result[0]
print result[0]
@@ -333,10 +352,9 @@ if (len(options.query) == 0):
if options.verbose != 0:
print "--------------------------------------------------------------------------"
execution_detail = QueryExecutionDetail(file_format, compression, execution_result,
execution_detail = QueryExecutionDetail(file_format, codec, execution_result,
hive_execution_result)
result_map[query[0]].append(execution_detail)
vector_file.close()
print "\nResults saving to: " + options.results_csv_file
write_to_csv(result_map, options.results_csv_file)

8
testdata/bin/benchmark_core.csv vendored Normal file
View File

@@ -0,0 +1,8 @@
text,grep1gb,none,none
text,grep10gb,none,none
text,web,none,none
seq,grep1gb,bzip,none
seq,web,snap,record
seq,web,none,none
rc,grep1gb,def,block
rc,web,none,none
1 text grep1gb none none
2 text grep10gb none none
3 text web none none
4 seq grep1gb bzip none
5 seq web snap record
6 seq web none none
7 rc grep1gb def block
8 rc web none none

View File

@@ -1,8 +0,0 @@
text grep1GB none
text grep10GB none
text web none
sequence_file grep1GB snappy
sequence_file web snappy
sequence_file web none
rc_file grep1GB none
rc_file web none

4
testdata/bin/benchmark_dimensions.csv vendored Normal file
View File

@@ -0,0 +1,4 @@
text,seq,rc
grep1gb,grep10gb,web
none,def,gzip,bzip,snap
none,block,record
1 text,seq,rc
2 grep1gb,grep10gb,web
3 none,def,gzip,bzip,snap
4 none,block,record

View File

@@ -1,3 +0,0 @@
file_format: [text, sequence_file, rc_file]
data_set: [grep1GB, grep10GB, web]
compression: [none, default, gzip, bzip2, snappy]

69
testdata/bin/benchmark_exhaustive.csv vendored Normal file
View File

@@ -0,0 +1,69 @@
text,grep1gb,none,none
text,grep10gb,none,none
text,web,none,none
seq,grep1gb,none,none
seq,grep1gb,def,none
seq,grep1gb,def,block
seq,grep1gb,def,record
seq,grep1gb,gzip,none
seq,grep1gb,gzip,block
seq,grep1gb,gzip,record
seq,grep1gb,bzip,none
seq,grep1gb,bzip,block
seq,grep1gb,bzip,record
seq,grep1gb,snap,none
seq,grep1gb,snap,block
seq,grep1gb,snap,record
seq,grep10gb,none,none
seq,grep10gb,def,none
seq,grep10gb,def,block
seq,grep10gb,def,record
seq,grep10gb,gzip,none
seq,grep10gb,gzip,block
seq,grep10gb,gzip,record
seq,grep10gb,bzip,none
seq,grep10gb,bzip,block
seq,grep10gb,bzip,record
seq,grep10gb,snap,none
seq,grep10gb,snap,block
seq,grep10gb,snap,record
seq,web,none,none
seq,web,def,none
seq,web,def,block
seq,web,def,record
seq,web,gzip,none
seq,web,gzip,block
seq,web,gzip,record
seq,web,bzip,none
seq,web,bzip,block
seq,web,bzip,record
seq,web,snap,none
seq,web,snap,block
seq,web,snap,record
rc,grep1gb,none,none
rc,grep1gb,def,none
rc,grep1gb,def,block
rc,grep1gb,gzip,none
rc,grep1gb,gzip,block
rc,grep1gb,bzip,none
rc,grep1gb,bzip,block
rc,grep1gb,snap,none
rc,grep1gb,snap,block
rc,grep10gb,none,none
rc,grep10gb,def,none
rc,grep10gb,def,block
rc,grep10gb,gzip,none
rc,grep10gb,gzip,block
rc,grep10gb,bzip,none
rc,grep10gb,bzip,block
rc,grep10gb,snap,none
rc,grep10gb,snap,block
rc,web,none,none
rc,web,def,none
rc,web,def,block
rc,web,gzip,none
rc,web,gzip,block
rc,web,bzip,none
rc,web,bzip,block
rc,web,snap,none
rc,web,snap,block
1 text grep1gb none none
2 text grep10gb none none
3 text web none none
4 seq grep1gb none none
5 seq grep1gb def none
6 seq grep1gb def block
7 seq grep1gb def record
8 seq grep1gb gzip none
9 seq grep1gb gzip block
10 seq grep1gb gzip record
11 seq grep1gb bzip none
12 seq grep1gb bzip block
13 seq grep1gb bzip record
14 seq grep1gb snap none
15 seq grep1gb snap block
16 seq grep1gb snap record
17 seq grep10gb none none
18 seq grep10gb def none
19 seq grep10gb def block
20 seq grep10gb def record
21 seq grep10gb gzip none
22 seq grep10gb gzip block
23 seq grep10gb gzip record
24 seq grep10gb bzip none
25 seq grep10gb bzip block
26 seq grep10gb bzip record
27 seq grep10gb snap none
28 seq grep10gb snap block
29 seq grep10gb snap record
30 seq web none none
31 seq web def none
32 seq web def block
33 seq web def record
34 seq web gzip none
35 seq web gzip block
36 seq web gzip record
37 seq web bzip none
38 seq web bzip block
39 seq web bzip record
40 seq web snap none
41 seq web snap block
42 seq web snap record
43 rc grep1gb none none
44 rc grep1gb def none
45 rc grep1gb def block
46 rc grep1gb gzip none
47 rc grep1gb gzip block
48 rc grep1gb bzip none
49 rc grep1gb bzip block
50 rc grep1gb snap none
51 rc grep1gb snap block
52 rc grep10gb none none
53 rc grep10gb def none
54 rc grep10gb def block
55 rc grep10gb gzip none
56 rc grep10gb gzip block
57 rc grep10gb bzip none
58 rc grep10gb bzip block
59 rc grep10gb snap none
60 rc grep10gb snap block
61 rc web none none
62 rc web def none
63 rc web def block
64 rc web gzip none
65 rc web gzip block
66 rc web bzip none
67 rc web bzip block
68 rc web snap none
69 rc web snap block

View File

@@ -1,18 +0,0 @@
text grep1GB none
sequence_file grep1GB none
text grep10GB none
sequence_file grep10GB none
text web none
sequence_file web none
sequence_file grep1GB default
sequence_file grep10GB default
sequence_file web default
sequence_file grep1GB gzip
sequence_file grep10GB gzip
sequence_file web gzip
sequence_file grep1GB bzip2
sequence_file grep10GB bzip2
sequence_file web bzip2
sequence_file grep1GB snappy
sequence_file grep10GB snappy
sequence_file web snappy

7
testdata/bin/benchmark_pairwise.csv vendored Normal file
View File

@@ -0,0 +1,7 @@
text,grep1gb,none,none
seq,grep10gb,def,none
rc,web,gzip,none
rc,grep10gb,bzip,block
seq,grep1gb,snap,block
text,web,none,none
text,grep10gb,none,none
1 text grep1gb none none
2 seq grep10gb def none
3 rc web gzip none
4 rc grep10gb bzip block
5 seq grep1gb snap block
6 text web none none
7 text grep10gb none none

View File

@@ -1,8 +0,0 @@
text grep1GB none
text grep10GB none
text web none
sequence_file grep10GB none
sequence_file web default
sequence_file grep1GB gzip
sequence_file grep10GB bzip2
sequence_file grep10GB snappy

View File

@@ -0,0 +1,114 @@
# Copyright (c) 2012 Cloudera, Inc. All rights reserved.
# This file is used to define schema templates for generating and loading data for
# Impala tests. The goal is to provide a single place to define a table + data files
# and have the schema and data load statements generated for each combination of file
# format, compression, etc. The way this works is by specifying how to create a
# 'base table'. The base table can be used to generate tables in other file formats
# by performing the defined INSERT / SELECT INTO statement. Each new table using the
# file format/compression combination needs to have a unique name, so all the
# statements are pameterized on table name.
# This file is read in by the 'generate_benchmark_statements.py' script to
# to generate all the schema for the Imapla benchmark tests.
#
# Each table is defined as a new section in this file with the following format:
# === <- Start new section
# Data set name - Used to group sets of tables together
# --- <- End sub-section
# Base table name
# --- <- End sub-section
# DROP TABLE statement
# CREATE TABLE statement - Statement to drop and create a table
# --- <- End sub-section
# INSERT/SELECT * - The INSERT/SELECT * command for loading from the base table
# --- <- End sub-section
# LOAD from LOCAL - How to load data for the the base table
# === <- End section
===
grep1gb
---
grep1gb
---
DROP TABLE %(table_name)s;
CREATE EXTERNAL TABLE %(table_name)s (field string) partitioned by (chunk int) stored as %(file_format)s
LOCATION '${hiveconf:hive.metastore.warehouse.dir}/%(table_name)s';
ALTER TABLE %(table_name)s ADD PARTITION (chunk=0);
ALTER TABLE %(table_name)s ADD PARTITION (chunk=1);
ALTER TABLE %(table_name)s ADD PARTITION (chunk=2);
ALTER TABLE %(table_name)s ADD PARTITION (chunk=3);
ALTER TABLE %(table_name)s ADD PARTITION (chunk=4);
ALTER TABLE %(table_name)s ADD PARTITION (chunk=5);
---
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
FROM %(base_table_name)s INSERT OVERWRITE TABLE %(table_name)s PARTITION(chunk) SELECT *;
---
LOAD DATA LOCAL INPATH '${env:IMPALA_HOME}/testdata/hive_benchmark/grep1GB/part-00000' OVERWRITE INTO TABLE %(table_name)s PARTITION(chunk=0);
LOAD DATA LOCAL INPATH '${env:IMPALA_HOME}/testdata/hive_benchmark/grep1GB/part-00001' OVERWRITE INTO TABLE %(table_name)s PARTITION(chunk=1);
LOAD DATA LOCAL INPATH '${env:IMPALA_HOME}/testdata/hive_benchmark/grep1GB/part-00002' OVERWRITE INTO TABLE %(table_name)s PARTITION(chunk=2);
LOAD DATA LOCAL INPATH '${env:IMPALA_HOME}/testdata/hive_benchmark/grep1GB/part-00003' OVERWRITE INTO TABLE %(table_name)s PARTITION(chunk=3);
LOAD DATA LOCAL INPATH '${env:IMPALA_HOME}/testdata/hive_benchmark/grep1GB/part-00004' OVERWRITE INTO TABLE %(table_name)s PARTITION(chunk=4);
LOAD DATA LOCAL INPATH '${env:IMPALA_HOME}/testdata/hive_benchmark/grep1GB/part-00005' OVERWRITE INTO TABLE %(table_name)s PARTITION(chunk=5);
===
grep10gb
---
grep10gb
---
DROP TABLE %(table_name)s;
CREATE EXTERNAL TABLE %(table_name)s (field string) partitioned by (chunk int) stored as %(file_format)s
LOCATION '${hiveconf:hive.metastore.warehouse.dir}/%(table_name)s';
ALTER TABLE %(table_name)s ADD PARTITION (chunk=0);
ALTER TABLE %(table_name)s ADD PARTITION (chunk=1);
ALTER TABLE %(table_name)s ADD PARTITION (chunk=2);
ALTER TABLE %(table_name)s ADD PARTITION (chunk=3);
ALTER TABLE %(table_name)s ADD PARTITION (chunk=4);
ALTER TABLE %(table_name)s ADD PARTITION (chunk=5);
---
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
FROM %(base_table_name)s INSERT OVERWRITE TABLE %(table_name)s PARTITION(chunk) SELECT *;
---
LOAD DATA LOCAL INPATH '${env:IMPALA_HOME}/testdata/hive_benchmark/grep10GB/part-00000' OVERWRITE INTO TABLE %(table_name)s PARTITION(chunk=0);
LOAD DATA LOCAL INPATH '${env:IMPALA_HOME}/testdata/hive_benchmark/grep10GB/part-00001' OVERWRITE INTO TABLE %(table_name)s PARTITION(chunk=1);
LOAD DATA LOCAL INPATH '${env:IMPALA_HOME}/testdata/hive_benchmark/grep10GB/part-00002' OVERWRITE INTO TABLE %(table_name)s PARTITION(chunk=2);
LOAD DATA LOCAL INPATH '${env:IMPALA_HOME}/testdata/hive_benchmark/grep10GB/part-00003' OVERWRITE INTO TABLE %(table_name)s PARTITION(chunk=3);
LOAD DATA LOCAL INPATH '${env:IMPALA_HOME}/testdata/hive_benchmark/grep10GB/part-00004' OVERWRITE INTO TABLE %(table_name)s PARTITION(chunk=4);
LOAD DATA LOCAL INPATH '${env:IMPALA_HOME}/testdata/hive_benchmark/grep10GB/part-00005' OVERWRITE INTO TABLE %(table_name)s PARTITION(chunk=5);
===
web
---
rankings
---
DROP TABLE %(table_name)s;
CREATE EXTERNAL TABLE %(table_name)s (
pageRank int,
pageURL string,
avgDuration int)
row format delimited fields terminated by '|' stored as %(file_format)s
LOCATION '${hiveconf:hive.metastore.warehouse.dir}/%(table_name)s/Rankings.dat';
---
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
FROM %(base_table_name)s INSERT OVERWRITE TABLE %(table_name)s SELECT *;
---
LOAD DATA LOCAL INPATH '${env:IMPALA_HOME}/testdata/hive_benchmark/html1GB/Rankings.dat' OVERWRITE INTO TABLE %(table_name)s;
===
web
---
uservisits
---
DROP TABLE %(table_name)s;
CREATE EXTERNAL TABLE %(table_name)s (
sourceIP string,
destURL string,
visitDate string,
adRevenue float,
userAgent string,
cCode string,
lCode string,
sKeyword string,
avgTimeOnSite int)
row format delimited fields terminated by '|' stored as %(file_format)s
LOCATION '${hiveconf:hive.metastore.warehouse.dir}/%(table_name)s/UserVisits.dat';
---
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
FROM %(base_table_name)s INSERT OVERWRITE TABLE %(table_name)s SELECT *;
---
LOAD DATA LOCAL INPATH '${env:IMPALA_HOME}/testdata/hive_benchmark/html1GB/UserVisits.dat' OVERWRITE INTO TABLE %(table_name)s;
===

View File

@@ -1,3 +0,0 @@
text grep1GB none
text grep10GB none
text web none

View File

@@ -1,66 +0,0 @@
DROP TABLE IF EXISTS grep1GB_sequence_file_snappy;
CREATE TABLE grep1GB_sequence_file_snappy (field string) partitioned by (chunk int) stored as sequencefile;
DROP TABLE IF EXISTS Rankings_web_sequence_file_snappy;
CREATE TABLE Rankings_web_sequence_file_snappy (
pageRank int,
pageURL string,
avgDuration int)
row format delimited fields terminated by '|' stored as sequencefile;
DROP TABLE IF EXISTS UserVisits_web_sequence_file_snappy;
CREATE TABLE UserVisits_web_sequence_file_snappy (
sourceIP string,
destURL string,
visitDate string,
adRevenue float,
userAgent string,
cCode string,
lCode string,
sKeyword string,
avgTimeOnSite int)
row format delimited fields terminated by '|' stored as sequencefile;
DROP TABLE IF EXISTS Rankings_web_sequence_file_none;
CREATE TABLE Rankings_web_sequence_file_none (
pageRank int,
pageURL string,
avgDuration int)
row format delimited fields terminated by '|' stored as sequencefile;
DROP TABLE IF EXISTS UserVisits_web_sequence_file_none;
CREATE TABLE UserVisits_web_sequence_file_none (
sourceIP string,
destURL string,
visitDate string,
adRevenue float,
userAgent string,
cCode string,
lCode string,
sKeyword string,
avgTimeOnSite int)
row format delimited fields terminated by '|' stored as sequencefile;
DROP TABLE IF EXISTS grep1GB_rc_file_none;
CREATE TABLE grep1GB_rc_file_none (field string) partitioned by (chunk int) stored as rcfile;
DROP TABLE IF EXISTS Rankings_web_rc_file_none;
CREATE TABLE Rankings_web_rc_file_none (
pageRank int,
pageURL string,
avgDuration int)
row format delimited fields terminated by '|' stored as rcfile;
DROP TABLE IF EXISTS UserVisits_web_rc_file_none;
CREATE TABLE UserVisits_web_rc_file_none (
sourceIP string,
destURL string,
visitDate string,
adRevenue float,
userAgent string,
cCode string,
lCode string,
sKeyword string,
avgTimeOnSite int)
row format delimited fields terminated by '|' stored as rcfile;

View File

@@ -1,130 +0,0 @@
DROP TABLE IF EXISTS grep1GB_sequence_file_none;
CREATE TABLE grep1GB_sequence_file_none (field string) partitioned by (chunk int) stored as sequencefile;
DROP TABLE IF EXISTS grep10GB_sequence_file_none;
CREATE TABLE grep10GB_sequence_file_none (field string) partitioned by (chunk int) stored as sequencefile;
DROP TABLE IF EXISTS Rankings_web_sequence_file_none;
CREATE TABLE Rankings_web_sequence_file_none (
pageRank int,
pageURL string,
avgDuration int)
row format delimited fields terminated by '|' stored as sequencefile;
DROP TABLE IF EXISTS UserVisits_web_sequence_file_none;
CREATE TABLE UserVisits_web_sequence_file_none (
sourceIP string,
destURL string,
visitDate string,
adRevenue float,
userAgent string,
cCode string,
lCode string,
sKeyword string,
avgTimeOnSite int)
row format delimited fields terminated by '|' stored as sequencefile;
DROP TABLE IF EXISTS grep1GB_sequence_file_default;
CREATE TABLE grep1GB_sequence_file_default (field string) partitioned by (chunk int) stored as sequencefile;
DROP TABLE IF EXISTS grep10GB_sequence_file_default;
CREATE TABLE grep10GB_sequence_file_default (field string) partitioned by (chunk int) stored as sequencefile;
DROP TABLE IF EXISTS Rankings_web_sequence_file_default;
CREATE TABLE Rankings_web_sequence_file_default (
pageRank int,
pageURL string,
avgDuration int)
row format delimited fields terminated by '|' stored as sequencefile;
DROP TABLE IF EXISTS UserVisits_web_sequence_file_default;
CREATE TABLE UserVisits_web_sequence_file_default (
sourceIP string,
destURL string,
visitDate string,
adRevenue float,
userAgent string,
cCode string,
lCode string,
sKeyword string,
avgTimeOnSite int)
row format delimited fields terminated by '|' stored as sequencefile;
DROP TABLE IF EXISTS grep1GB_sequence_file_gzip;
CREATE TABLE grep1GB_sequence_file_gzip (field string) partitioned by (chunk int) stored as sequencefile;
DROP TABLE IF EXISTS grep10GB_sequence_file_gzip;
CREATE TABLE grep10GB_sequence_file_gzip (field string) partitioned by (chunk int) stored as sequencefile;
DROP TABLE IF EXISTS Rankings_web_sequence_file_gzip;
CREATE TABLE Rankings_web_sequence_file_gzip (
pageRank int,
pageURL string,
avgDuration int)
row format delimited fields terminated by '|' stored as sequencefile;
DROP TABLE IF EXISTS UserVisits_web_sequence_file_gzip;
CREATE TABLE UserVisits_web_sequence_file_gzip (
sourceIP string,
destURL string,
visitDate string,
adRevenue float,
userAgent string,
cCode string,
lCode string,
sKeyword string,
avgTimeOnSite int)
row format delimited fields terminated by '|' stored as sequencefile;
DROP TABLE IF EXISTS grep1GB_sequence_file_bzip2;
CREATE TABLE grep1GB_sequence_file_bzip2 (field string) partitioned by (chunk int) stored as sequencefile;
DROP TABLE IF EXISTS grep10GB_sequence_file_bzip2;
CREATE TABLE grep10GB_sequence_file_bzip2 (field string) partitioned by (chunk int) stored as sequencefile;
DROP TABLE IF EXISTS Rankings_web_sequence_file_bzip2;
CREATE TABLE Rankings_web_sequence_file_bzip2 (
pageRank int,
pageURL string,
avgDuration int)
row format delimited fields terminated by '|' stored as sequencefile;
DROP TABLE IF EXISTS UserVisits_web_sequence_file_bzip2;
CREATE TABLE UserVisits_web_sequence_file_bzip2 (
sourceIP string,
destURL string,
visitDate string,
adRevenue float,
userAgent string,
cCode string,
lCode string,
sKeyword string,
avgTimeOnSite int)
row format delimited fields terminated by '|' stored as sequencefile;
DROP TABLE IF EXISTS grep1GB_sequence_file_snappy;
CREATE TABLE grep1GB_sequence_file_snappy (field string) partitioned by (chunk int) stored as sequencefile;
DROP TABLE IF EXISTS grep10GB_sequence_file_snappy;
CREATE TABLE grep10GB_sequence_file_snappy (field string) partitioned by (chunk int) stored as sequencefile;
DROP TABLE IF EXISTS Rankings_web_sequence_file_snappy;
CREATE TABLE Rankings_web_sequence_file_snappy (
pageRank int,
pageURL string,
avgDuration int)
row format delimited fields terminated by '|' stored as sequencefile;
DROP TABLE IF EXISTS UserVisits_web_sequence_file_snappy;
CREATE TABLE UserVisits_web_sequence_file_snappy (
sourceIP string,
destURL string,
visitDate string,
adRevenue float,
userAgent string,
cCode string,
lCode string,
sKeyword string,
avgTimeOnSite int)
row format delimited fields terminated by '|' stored as sequencefile;

View File

@@ -1,32 +0,0 @@
DROP TABLE IF EXISTS grep10GB_sequence_file_none;
CREATE TABLE grep10GB_sequence_file_none (field string) partitioned by (chunk int) stored as sequencefile;
DROP TABLE IF EXISTS Rankings_web_sequence_file_default;
CREATE TABLE Rankings_web_sequence_file_default (
pageRank int,
pageURL string,
avgDuration int)
row format delimited fields terminated by '|' stored as sequencefile;
DROP TABLE IF EXISTS UserVisits_web_sequence_file_default;
CREATE TABLE UserVisits_web_sequence_file_default (
sourceIP string,
destURL string,
visitDate string,
adRevenue float,
userAgent string,
cCode string,
lCode string,
sKeyword string,
avgTimeOnSite int)
row format delimited fields terminated by '|' stored as sequencefile;
DROP TABLE IF EXISTS grep1GB_sequence_file_gzip;
CREATE TABLE grep1GB_sequence_file_gzip (field string) partitioned by (chunk int) stored as sequencefile;
DROP TABLE IF EXISTS grep10GB_sequence_file_bzip2;
CREATE TABLE grep10GB_sequence_file_bzip2 (field string) partitioned by (chunk int) stored as sequencefile;
DROP TABLE IF EXISTS grep10GB_sequence_file_snappy;
CREATE TABLE grep10GB_sequence_file_snappy (field string) partitioned by (chunk int) stored as sequencefile;

View File

@@ -1,29 +0,0 @@
DROP TABLE IF EXISTS Grep1GB_text_none;
CREATE TABLE Grep1GB_text_none (
field string)
partitioned by (chunk int);
DROP TABLE IF EXISTS Grep10GB_text_none;
CREATE TABLE Grep10GB_text_none (
field string)
partitioned by (chunk int);
DROP TABLE IF EXISTS Rankings_web_text_none;
CREATE TABLE Rankings_web_text_none (
pageRank int,
pageURL string,
avgDuration int)
row format delimited fields terminated by '|' stored as textfile;
DROP TABLE IF EXISTS UserVisits_web_text_none;
CREATE TABLE UserVisits_web_text_none (
sourceIP string,
destURL string,
visitDate string,
adRevenue float,
userAgent string,
cCode string,
lCode string,
sKeyword string,
avgTimeOnSite int)
row format delimited fields terminated by '|' stored as textfile;

194
testdata/bin/generate_benchmark_statements.py vendored Executable file
View File

@@ -0,0 +1,194 @@
#!/usr/bin/env python
# Copyright (c) 2012 Cloudera, Inc. All rights reserved.
# This script generates the "CREATE TABLE", "INSERT", and "LOAD" statements for loading
# benchmark data and writes them to create-benchmark*-generated.sql and
# load-benchmark*-generated.sql.
#
# The statements that are generated are based on an input test vector
# (read from a file) that describes the coverage desired. For example, currently
# we want to run benchmarks with different data sets, across different file types, and
# with different compression algorithms set. To improve data loading performance this
# script will generate an INSERT INTO statement to generate the data if the file doesn't
# already exist in HDFS. If the file does already exist in HDFS then we simply issue a
# LOAD statement which is much faster.
#
# The input test vectors are generated via the 'generate_test_vectors.py' so
# ensure that script has been run (or the test vector files already exist) before
# running this script.
#
# Note: This statement generation is assuming the following data loading workflow:
# 1) Load all the data in the specified source table
# 2) Create tables for the new file formats and compression types
# 3) Run INSERT OVERWRITE TABLE SELECT * from the source table into the new tables
# or LOAD directly if the file already exists in HDFS.
import collections
import csv
import math
import os
import random
import subprocess
import sys
from itertools import product
from optparse import OptionParser
parser = OptionParser()
parser.add_option("--exploration_strategy", dest="exploration_strategy", default="core",
help="The exploration strategy for benchmark gen: 'core', "\
"'pairwise', or 'exhaustive'")
parser.add_option("--hive_warehouse_dir", dest="hive_warehouse_dir",
default="/test-warehouse",
help="The HDFS path to the base Hive test warehouse directory")
(options, args) = parser.parse_args()
COMPRESSION_TYPE = "SET mapred.output.compression.type=%s;"
COMPRESSION_ENABLED = "SET hive.exec.compress.output=%s;"
COMPRESSION_CODEC =\
"SET mapred.output.compression.codec=org.apache.hadoop.io.compress.%s;"
SET_DYNAMIC_PARTITION_STATEMENT = "SET hive.exec.dynamic.partition=true;"
SET_PARTITION_MODE_NONSTRICT_STATEMENT = "SET hive.exec.dynamic.partition.mode=nonstrict;"
FILE_FORMAT_IDX = 0
DATA_SET_IDX = 1
CODEC_IDX = 2
COMPRESSION_TYPE_IDX = 3
COMPRESSION_MAP = {'def': 'DefaultCodec',
'gzip': 'GzipCodec',
'bzip': 'BZip2Codec',
'snap': 'SnappyCodec',
'none': ''
}
FILE_FORMAT_MAP = {'text': 'TEXTFILE',
'seq': 'SEQUENCEFILE',
'rc': 'RCFILE'
}
class SqlGenerationStatement:
def __init__(self, base_table_name, create, insert, load_local):
self.base_table_name = base_table_name.strip()
self.create = create.strip()
self.insert = insert.strip()
self.load_local = load_local.strip()
def build_create_statement(table_template, table_suffix, file_format):
return table_template % {'table_name': table_suffix,
'file_format': FILE_FORMAT_MAP[file_format]
}
def build_compression_codec_statement(codec, compression_type):
compression_codec = COMPRESSION_MAP[codec]
if compression_codec:
return COMPRESSION_TYPE % compression_type.upper() + '\n' +\
COMPRESSION_CODEC % compression_codec
else:
return ''
def build_codec_enabled_statement(codec):
compression_enabled = 'false' if codec == 'none' else 'true'
return COMPRESSION_ENABLED % compression_enabled
def build_insert_into_statement(insert, base_table_name, table_name):
statement = SET_PARTITION_MODE_NONSTRICT_STATEMENT + "\n"
statement += SET_DYNAMIC_PARTITION_STATEMENT + "\n"
statement += insert % {'base_table_name': base_table_name, 'table_name': table_name}
return statement
def build_insert(insert, table_name, base_table_name, codec, compression_type):
output = build_codec_enabled_statement(codec) + "\n"
output += build_compression_codec_statement(codec, compression_type) + "\n"
output += build_insert_into_statement(insert, base_table_name, table_name) + "\n"
return output
def build_load_statement(load_template, table_suffix):
return load_template % {'table_name': table_suffix}
def build_table_suffix(file_format, codec, compression_type):
if file_format == 'text' and codec != 'none':
print 'Unsupported combination of file_format (text) and compression codec.'
sys.exit(1)
elif file_format == 'text' and codec == 'none':
return ''
elif codec == 'none':
return '_%s' % (file_format)
elif compression_type == 'record':
return '_%s_%s_record' % (file_format, codec)
else:
return '_%s_%s' % (file_format, codec)
def read_csv_vector_file(file_name):
results = []
with open(file_name, 'rb') as csv_file:
for row in csv.reader(csv_file, delimiter=','):
results.append(row)
return results
def write_array_to_file(file_name, array):
with open(file_name, 'w') as f:
f.write('\n\n'.join(array))
def does_dir_exist_in_hdfs(path):
return subprocess.call(["hadoop", "fs", "-test", "-e", path]) == 0
def write_statements_to_file_based_on_input_vector(output_name, input_file_name,
statements):
output_create = []
output_load = []
output_load_base = []
results = read_csv_vector_file(input_file_name)
for row in results:
file_format, data_set, codec, compression_type = row[:4]
for s in statements[data_set.strip()]:
create = s.create
insert = s.insert
load_local = s.load_local
table_name = s.base_table_name +\
build_table_suffix(file_format, codec, compression_type)
output_create.append(build_create_statement(create, table_name, file_format))
# If the directory already exists in HDFS, assume that data files already exist
# and skip loading the data. Otherwise, the data is generated using either an
# INSERT INTO statement or a LOAD statement.
data_path = os.path.join(options.hive_warehouse_dir, table_name)
if does_dir_exist_in_hdfs(data_path):
print 'Path:', data_path, 'already exists in HDFS. Data loading can be skipped.'
else:
print 'Path:', data_path, 'does not exists in HDFS. Data file will be generated.'
if table_name == s.base_table_name:
output_load_base.append(build_load_statement(load_local, table_name))
else:
output_load.append(build_insert(insert, table_name, s.base_table_name,
codec, compression_type))
# Make sure we create the base tables before the remaining tables
output_load = output_load_base + output_load
write_array_to_file('create-benchmark-' + output_name + '-generated.sql', output_create)
write_array_to_file('load-benchmark-' + output_name + '-generated.sql', output_load)
def parse_benchmark_file(file_name):
template = open(file_name, 'rb')
statements = collections.defaultdict(list)
for section in template.read().split('==='):
sub_section = section.split('---')
if len(sub_section) == 5:
data_set = sub_section[0]
gen_statement = SqlGenerationStatement(*sub_section[1:5])
statements[data_set.strip()].append(gen_statement)
else:
print 'Skipping invalid subsection:', sub_section
return statements
if (options.exploration_strategy != 'core' and
options.exploration_strategy != 'pairwise' and
options.exploration_strategy != 'exhaustive'):
print 'Invalid exploration strategy:', options.exploration_strategy
sys.exit(1)
statements = parse_benchmark_file('benchmark_schema_template.sql')
write_statements_to_file_based_on_input_vector(
options.exploration_strategy,
'benchmark_%s.csv' % options.exploration_strategy,
statements)

View File

@@ -1,170 +0,0 @@
#!/usr/bin/env ruby
# Copyright (c) 2012 Cloudera, Inc. All rights reserved.
# This script generates the "CREATE TABLE" and "INSERT" statements for loading
# benchmark data and writes them to create-benchmark*-generated.sql and
# load-benchmark*-generated.sql respectively.
#
# The statements that are generated are based on statements based on an input test vector
# (read from a file) that describes the coverage desired. For example, currently
# we want to run benchmarks with different data sets, across different file types, and
# with different compression algorithms set.
# The input test vectors are generated via the 'generate_test_vectors.rb' so
# ensure that script has been run (or the test vector files already exist) before
# running this script.
#
# Note: This statement generation is assuming the following data loading workflow:
# 1) Load all the data in its orginal format (text)
# 2) Create tables for the new file formats and compression types
# 3) Run INSERT OVERWRITE TABLE SELECT * from the original table into the new tables
#
# TODO: Convert this script to python
### CONSTANTS
# Used by the HTML workload
RANKINGS_CREATE_STATEMENT =
"DROP TABLE IF EXISTS Rankings_%s;
CREATE TABLE Rankings_%s (
pageRank int,
pageURL string,
avgDuration int)
row format delimited fields terminated by '|' %s;"
# Used by the HTML workload
USER_VISITS_CREATE_STATEMENT =
"DROP TABLE IF EXISTS UserVisits_%s;
CREATE TABLE UserVisits_%s (
sourceIP string,
destURL string,
visitDate string,
adRevenue float,
userAgent string,
cCode string,
lCode string,
sKeyword string,
avgTimeOnSite int)
row format delimited fields terminated by '|' %s;"
USER_VISTS_INSERT_STATEMENT =
"FROM UserVisits_web_text_none INSERT OVERWRITE TABLE UserVisits_%s SELECT *;"
RANKINGS_INSERT_STATEMENT =
"FROM Rankings_web_text_none INSERT OVERWRITE TABLE Rankings_%s SELECT *;"
GREP_CREATE_TABLE_STATEMENT =
"DROP TABLE IF EXISTS %s;
CREATE TABLE %s (field string) partitioned by (chunk int) %s;"
GREP_INSERT_STATEMENT =
"FROM %s_text_none INSERT OVERWRITE TABLE %s PARTITION(chunk) SELECT *;"
# Compression type will be another dimension in the future.
COMPRESSION_TYPE = "SET mapred.output.compression.type=BLOCK;"
COMPRESSION_ENABLED = "SET hive.exec.compress.output=%s;"
COMPRESSION_CODEC =
"SET mapred.output.compression.codec=org.apache.hadoop.io.compress.%s;"
SET_DYNAMIC_PARTITION_STATEMENT = "SET hive.exec.dynamic.partition=true;"
SET_PARTITION_MODE_NONSTRICT_STATEMENT = "SET hive.exec.dynamic.partition.mode=nonstrict;"
###
def generate_file_format(file_format)
case file_format
when "text" then "stored as textfile"
when "rc_file" then "stored as rcfile"
when "sequence_file" then "stored as sequencefile"
else raise "Unknown file format #{file_format}"
end
end
# Source tables are used to load data initial data in. Currently with in text format
# with no compression. These are then copied to other data formats. Because of this
# we skip creating of insert and create statements for these tables.
def is_source_table(file_format, compression)
file_format == 'text' and compression == 'none'
end
def generate_create_statement(table_name, data_set, file_format)
case data_set
when "grep1GB", "grep10GB" then GREP_CREATE_TABLE_STATEMENT %
[table_name, table_name, generate_file_format(file_format)]
when "web" then
create = RANKINGS_CREATE_STATEMENT %
[table_name, table_name, generate_file_format(file_format)] + "\n\n"
create += USER_VISITS_CREATE_STATEMENT %
[table_name, table_name, generate_file_format(file_format)]
else raise "Unknown data set: #{data_set}"
end
end
def generate_compression_codec_statement(compression)
if compression == "none" then return "" end
codec = case compression
when 'default' then "DefaultCodec"
when 'gzip' then "GzipCodec"
when 'bzip2' then "BZip2Codec"
when 'snappy' then "SnappyCodec"
else raise "Unknown compression format: #{compression}"
end
"%s\n%s" % [COMPRESSION_TYPE, COMPRESSION_CODEC % codec]
end
def generate_compression_enabled_statement(compression)
compression == "none" ? COMPRESSION_ENABLED % "false" : COMPRESSION_ENABLED % "true"
end
def generate_insert_statement(data_set, table_name)
statement = SET_PARTITION_MODE_NONSTRICT_STATEMENT + "\n"
statement += SET_DYNAMIC_PARTITION_STATEMENT + "\n"
statement += case data_set
when "grep1GB", "grep10GB" then GREP_INSERT_STATEMENT % [data_set, table_name]
when "web" then
insert = USER_VISTS_INSERT_STATEMENT % [table_name] + "\n"
insert += RANKINGS_INSERT_STATEMENT % [table_name]
else raise "Unknown data set: #{data_set}"
end
end
def generate_statements(table_name, file_format, data_set, compression)
output = generate_compression_enabled_statement(compression) + "\n"
output += generate_compression_codec_statement(compression) + "\n"
output += generate_insert_statement(data_set, table_name) + "\n"
end
def build_table_name(file_format, data_set, compression)
"#{data_set}_#{file_format}_#{compression}"
end
def write_statements_to_file_based_on_input_vector(output_name, input_file_name)
output_create = ""
output_load = ""
# Expected Input Format: <file format> <data set> <compression>
File.open(input_file_name, 'r') do |file|
while line = file.gets
file_format, data_set, compression = line.split
# Don't want to generate create/insert statements for sources
# tables because they are already created.
if is_source_table(file_format, compression) then next end
table_name = build_table_name(file_format, data_set, compression)
output_create +=
generate_create_statement(table_name, data_set, file_format) + "\n\n"
output_load +=
generate_statements(table_name, file_format, data_set, compression) + "\n"
end
end
File.open("create-benchmark-#{output_name}-generated.sql", 'w') do |file|
file.puts output_create
end
File.open("load-benchmark-#{output_name}-generated.sql", 'w') do |file|
file.puts output_load
end
end
write_statements_to_file_based_on_input_vector(
"exhaustive", "benchmark_exhaustive.vector")
write_statements_to_file_based_on_input_vector("pairwise", "benchmark_pairwise.vector")
write_statements_to_file_based_on_input_vector("core", "benchmark_core.vector")

87
testdata/bin/generate_test_vectors.py vendored Executable file
View File

@@ -0,0 +1,87 @@
#!/usr/bin/env python
# Copyright (c) 2012 Cloudera, Inc. All rights reserved.
#
# This script is used to generate test "vectors" based on a dimension input file.
# A vector in this context is simply a permutation of the values in the the
# dimension input file. For example, in this case the script is generating test vectors
# for the Impala / Hive benchmark suite so interesting dimensions are data set,
# file format, and compression algorithm. More can be added later.
# The output of running this script is a list of vectors. Currently two different vector
# outputs are generated - an "exhaustive" vector which contains all permutations and a
# "pairwise" vector that contains a subset of the vectors by chosing all combinations of
# pairs (the pairwise strategy). More information about pairwise can be found at
# http://www.pairwise.org.
#
# The end goal is to have a reduced set of test vectors to provide coverage but don't take
# as long to run as the exhaustive set of vectors along with a set of vectors that provide
# full coverage. This is especially important for benchmarks which work on very large data
# sets.
#
# The output files output can then be read in by other tests by other scripts,tools,tests.
# In the benchmark case the vector files are used by generate_benchmark_statements.rb to
# dynamically build the schema and data for running benchmarks.
#
# We use the Python 'AllPairs' module which can be download from:
# http://pypi.python.org/pypi/AllPairs/2.0.1
#
import csv
import math
import os
import random
from itertools import product
from optparse import OptionParser
import metacomm.combinatorics.all_pairs2
all_pairs = metacomm.combinatorics.all_pairs2.all_pairs2
parser = OptionParser()
parser.add_option("--dimension_file", dest="dimension_file",
default = "benchmark_dimensions.csv",
help="The file containing the list of dimensions.")
(options, args) = parser.parse_args()
FILE_FORMAT_IDX = 0
DATA_SET_IDX = 1
COMPRESSION_IDX = 2
COMPRESSION_TYPE_IDX = 3
class VectorGenerator:
def __init__(self, input_vectors):
self.input_vectors = input_vectors
def generate_pairwise_matrix(self, filter_func = None):
if filter_func is None:
filter_func = lambda vector: True
return all_pairs(self.input_vectors, filter_func = is_valid_combination)
def generate_exhaustive_matrix(self, filter_func = None):
if filter_func is None:
filter_func = lambda vector: True
return [list(vec) for vec in product(*self.input_vectors) if filter_func(vec)]
def is_valid_combination(vector):
if len(vector) == 4:
return not ((vector[FILE_FORMAT_IDX] == 'text' and vector[COMPRESSION_IDX] != 'none') or
(vector[COMPRESSION_IDX] == 'none' and vector[COMPRESSION_TYPE_IDX] != 'none') or
(vector[FILE_FORMAT_IDX] != 'seq' and vector[COMPRESSION_TYPE_IDX] == 'record'))
return True
def read_csv_vector_file(file_name):
results = []
for row in csv.reader(open(file_name, 'rb'), delimiter=','):
results.append(row)
return results
def write_vectors_to_csv(output_csv_file, matrix):
csv_writer = csv.writer(open(output_csv_file, 'wb'),
delimiter=',',
quoting=csv.QUOTE_MINIMAL)
for vector in matrix:
csv_writer.writerow(vector)
vectors = read_csv_vector_file(options.dimension_file)
vg = VectorGenerator(vectors)
write_vectors_to_csv('benchmark_pairwise.csv',
vg.generate_pairwise_matrix(is_valid_combination))
write_vectors_to_csv('benchmark_exhaustive.csv',
vg.generate_exhaustive_matrix(is_valid_combination))

View File

@@ -1,108 +0,0 @@
#!/usr/bin/env ruby
# Copyright (c) 2012 Cloudera, Inc. All rights reserved.
#
# This script is used to generate test "vectors" based on a dimension input file.
# A vector in this context is simply a permutation of the values in the the
# dimension input file. For example, in this case the script is generating test vectors
# for the Impala / Hive benchmark suite so interesting dimensions are data set,
# file format, and compression algorithm. More can be added later.
# The output of running this script is a list of vectors. Currently two different vector
# outputs are generated - an "exhaustive" vector which contains all permutations and a
# "pairwise" vector that contains a subset of the vectors by chosing all combinations of
# pairs (the pairwise strategy). More information about pairwise can be found at
# http://www.pairwise.org.
#
# The end goal is to have a reduced set of test vectors to provide coverage but don't take
# as long to run as the exhaustive set of vectors along with a set of vectors that provide
# full coverage. This is especially important for benchmarks which work on very large data
# sets.
#
# The output files output can then be read in by other tests by other scripts,tools,tests.
# In the benchmark case the vector files are used by generate_benchmark_statements.rb to
# dynamically build the schema and data for running benchmarks.
#
# TODO: Convert this script to python
require 'rubygems'
require 'pairwise' #from the 'pairwise' ruby gem
require 'yaml'
class VectorGenerator
def initialize(input_vectors)
@input_vectors = input_vectors
end
def generate_pairwise_matrix
Pairwise.combinations(*@input_vectors)
end
def generate_exhaustive_matrix
@input_vectors[0].product(*@input_vectors[1..-1])
end
end
class BenchmarkVector
attr_accessor :compression, :data_set, :file_format
def initialize(input_vector)
# This is assuming a specific sort order. TODO: Improve this
@compression, @data_set, @file_format = input_vector
end
def to_a
[@compression, @data_set, @file_format]
end
def to_s
"#{@file_format} #{@data_set} #{@compression}"
end
end
def generate_matrix(exploration_strategy, input_vectors)
generator = VectorGenerator.new(input_vectors)
vectors = Array.new()
if exploration_strategy == 'pairwise'
vectors = generator.generate_pairwise_matrix.collect {|item| BenchmarkVector.new(item)}
elsif exploration_strategy == 'exhaustive'
vectors = generator.generate_exhaustive_matrix.collect {|item| BenchmarkVector.new(item)}
else
raise "Unsupported option";
end
apply_constraints(vectors)
end
def write_matrix_to_file(file_name, matrix)
file = File.open(file_name, 'w')
# This is assuming a specific sort order. TODO: Improve this
matrix.each {|vector|
file.puts("#{vector.file_format} #{vector.data_set} #{vector.compression}")}
file.close
end
def apply_constraints(input_vectors)
# rc_files don't currently work with large data sets
input_vectors = input_vectors.select {|vector| vector.file_format != 'rc_file'}
# text file format does not support compression
input_vectors = input_vectors.select {|vector|
!(vector.file_format == 'text' && vector.compression != 'none') }
end
if ARGV.length != 1 then raise "usage: generate_test_vectors.rb <dimension file>" end
dimension_file = ARGV[0]
input_hash = YAML.load_file(dimension_file)
# this is a hacky way to ensure fixed ordering.
input_vectors = Array.new()
input_hash.keys.sort.each {|k| input_vectors.push((input_hash[k]))}
write_matrix_to_file('benchmark_pairwise.vector',
generate_matrix('pairwise', input_vectors.to_a))
write_matrix_to_file('benchmark_exhaustive.vector',
generate_matrix('exhaustive', input_vectors.to_a))

View File

@@ -1,30 +0,0 @@
SET hive.exec.compress.output=true;
SET mapred.output.compression.type=BLOCK;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.dynamic.partition=true;
FROM grep1GB_text_none INSERT OVERWRITE TABLE grep1GB_sequence_file_snappy PARTITION(chunk) SELECT *;
SET hive.exec.compress.output=true;
SET mapred.output.compression.type=BLOCK;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.dynamic.partition=true;
FROM UserVisits_web_text_none INSERT OVERWRITE TABLE UserVisits_web_sequence_file_snappy SELECT *;
FROM Rankings_web_text_none INSERT OVERWRITE TABLE Rankings_web_sequence_file_snappy SELECT *;
SET hive.exec.compress.output=false;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.dynamic.partition=true;
FROM UserVisits_web_text_none INSERT OVERWRITE TABLE UserVisits_web_sequence_file_none SELECT *;
FROM Rankings_web_text_none INSERT OVERWRITE TABLE Rankings_web_sequence_file_none SELECT *;
FROM grep1GB_text_none INSERT OVERWRITE TABLE grep1GB_rc_file_none PARTITION(chunk) SELECT *;
SET hive.exec.compress.output=false;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.dynamic.partition=true;
FROM UserVisits_web_text_none INSERT OVERWRITE TABLE UserVisits_web_rc_file_none SELECT *;
FROM Rankings_web_text_none INSERT OVERWRITE TABLE Rankings_web_rc_file_none SELECT *;

View File

@@ -1,107 +0,0 @@
SET hive.exec.compress.output=false;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.dynamic.partition=true;
FROM grep1GB_text_none INSERT OVERWRITE TABLE grep1GB_sequence_file_none PARTITION(chunk) SELECT *;
SET hive.exec.compress.output=false;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.dynamic.partition=true;
FROM grep10GB_text_none INSERT OVERWRITE TABLE grep10GB_sequence_file_none PARTITION(chunk) SELECT *;
SET hive.exec.compress.output=false;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.dynamic.partition=true;
FROM UserVisits_web_text_none INSERT OVERWRITE TABLE UserVisits_web_sequence_file_none SELECT *;
FROM Rankings_web_text_none INSERT OVERWRITE TABLE Rankings_web_sequence_file_none SELECT *;
SET hive.exec.compress.output=true;
SET mapred.output.compression.type=BLOCK;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.DefaultCodec;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.dynamic.partition=true;
FROM grep1GB_text_none INSERT OVERWRITE TABLE grep1GB_sequence_file_default PARTITION(chunk) SELECT *;
SET hive.exec.compress.output=true;
SET mapred.output.compression.type=BLOCK;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.DefaultCodec;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.dynamic.partition=true;
FROM grep10GB_text_none INSERT OVERWRITE TABLE grep10GB_sequence_file_default PARTITION(chunk) SELECT *;
SET hive.exec.compress.output=true;
SET mapred.output.compression.type=BLOCK;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.DefaultCodec;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.dynamic.partition=true;
FROM UserVisits_web_text_none INSERT OVERWRITE TABLE UserVisits_web_sequence_file_default SELECT *;
FROM Rankings_web_text_none INSERT OVERWRITE TABLE Rankings_web_sequence_file_default SELECT *;
SET hive.exec.compress.output=true;
SET mapred.output.compression.type=BLOCK;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.dynamic.partition=true;
FROM grep1GB_text_none INSERT OVERWRITE TABLE grep1GB_sequence_file_gzip PARTITION(chunk) SELECT *;
SET hive.exec.compress.output=true;
SET mapred.output.compression.type=BLOCK;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.dynamic.partition=true;
FROM grep10GB_text_none INSERT OVERWRITE TABLE grep10GB_sequence_file_gzip PARTITION(chunk) SELECT *;
SET hive.exec.compress.output=true;
SET mapred.output.compression.type=BLOCK;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.dynamic.partition=true;
FROM UserVisits_web_text_none INSERT OVERWRITE TABLE UserVisits_web_sequence_file_gzip SELECT *;
FROM Rankings_web_text_none INSERT OVERWRITE TABLE Rankings_web_sequence_file_gzip SELECT *;
SET hive.exec.compress.output=true;
SET mapred.output.compression.type=BLOCK;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.dynamic.partition=true;
FROM grep1GB_text_none INSERT OVERWRITE TABLE grep1GB_sequence_file_bzip2 PARTITION(chunk) SELECT *;
SET hive.exec.compress.output=true;
SET mapred.output.compression.type=BLOCK;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.dynamic.partition=true;
FROM grep10GB_text_none INSERT OVERWRITE TABLE grep10GB_sequence_file_bzip2 PARTITION(chunk) SELECT *;
SET hive.exec.compress.output=true;
SET mapred.output.compression.type=BLOCK;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.dynamic.partition=true;
FROM UserVisits_web_text_none INSERT OVERWRITE TABLE UserVisits_web_sequence_file_bzip2 SELECT *;
FROM Rankings_web_text_none INSERT OVERWRITE TABLE Rankings_web_sequence_file_bzip2 SELECT *;
SET hive.exec.compress.output=true;
SET mapred.output.compression.type=BLOCK;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.dynamic.partition=true;
FROM grep1GB_text_none INSERT OVERWRITE TABLE grep1GB_sequence_file_snappy PARTITION(chunk) SELECT *;
SET hive.exec.compress.output=true;
SET mapred.output.compression.type=BLOCK;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.dynamic.partition=true;
FROM grep10GB_text_none INSERT OVERWRITE TABLE grep10GB_sequence_file_snappy PARTITION(chunk) SELECT *;
SET hive.exec.compress.output=true;
SET mapred.output.compression.type=BLOCK;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.dynamic.partition=true;
FROM UserVisits_web_text_none INSERT OVERWRITE TABLE UserVisits_web_sequence_file_snappy SELECT *;
FROM Rankings_web_text_none INSERT OVERWRITE TABLE Rankings_web_sequence_file_snappy SELECT *;

View File

@@ -1,35 +0,0 @@
SET hive.exec.compress.output=false;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.dynamic.partition=true;
FROM grep10GB_text_none INSERT OVERWRITE TABLE grep10GB_sequence_file_none PARTITION(chunk) SELECT *;
SET hive.exec.compress.output=true;
SET mapred.output.compression.type=BLOCK;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.DefaultCodec;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.dynamic.partition=true;
FROM UserVisits_web_text_none INSERT OVERWRITE TABLE UserVisits_web_sequence_file_default SELECT *;
FROM Rankings_web_text_none INSERT OVERWRITE TABLE Rankings_web_sequence_file_default SELECT *;
SET hive.exec.compress.output=true;
SET mapred.output.compression.type=BLOCK;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.dynamic.partition=true;
FROM grep1GB_text_none INSERT OVERWRITE TABLE grep1GB_sequence_file_gzip PARTITION(chunk) SELECT *;
SET hive.exec.compress.output=true;
SET mapred.output.compression.type=BLOCK;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.dynamic.partition=true;
FROM grep10GB_text_none INSERT OVERWRITE TABLE grep10GB_sequence_file_bzip2 PARTITION(chunk) SELECT *;
SET hive.exec.compress.output=true;
SET mapred.output.compression.type=BLOCK;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.dynamic.partition=true;
FROM grep10GB_text_none INSERT OVERWRITE TABLE grep10GB_sequence_file_snappy PARTITION(chunk) SELECT *;

View File

@@ -1,16 +0,0 @@
LOAD DATA LOCAL INPATH '${env:IMPALA_HOME}/testdata/hive_benchmark/grep1GB/part-00000' OVERWRITE INTO TABLE Grep1GB_text_none PARTITION(chunk=0);
LOAD DATA LOCAL INPATH '${env:IMPALA_HOME}/testdata/hive_benchmark/grep1GB/part-00001' OVERWRITE INTO TABLE Grep1GB_text_none PARTITION(chunk=1);
LOAD DATA LOCAL INPATH '${env:IMPALA_HOME}/testdata/hive_benchmark/grep1GB/part-00002' OVERWRITE INTO TABLE Grep1GB_text_none PARTITION(chunk=2);
LOAD DATA LOCAL INPATH '${env:IMPALA_HOME}/testdata/hive_benchmark/grep1GB/part-00003' OVERWRITE INTO TABLE Grep1GB_text_none PARTITION(chunk=3);
LOAD DATA LOCAL INPATH '${env:IMPALA_HOME}/testdata/hive_benchmark/grep1GB/part-00004' OVERWRITE INTO TABLE Grep1GB_text_none PARTITION(chunk=4);
LOAD DATA LOCAL INPATH '${env:IMPALA_HOME}/testdata/hive_benchmark/grep1GB/part-00005' OVERWRITE INTO TABLE Grep1GB_text_none PARTITION(chunk=5);
LOAD DATA LOCAL INPATH '${env:IMPALA_HOME}/testdata/hive_benchmark/grep10GB/part-00000' OVERWRITE INTO TABLE Grep10GB_text_none PARTITION(chunk=0);
LOAD DATA LOCAL INPATH '${env:IMPALA_HOME}/testdata/hive_benchmark/grep10GB/part-00001' OVERWRITE INTO TABLE Grep10GB_text_none PARTITION(chunk=1);
LOAD DATA LOCAL INPATH '${env:IMPALA_HOME}/testdata/hive_benchmark/grep10GB/part-00002' OVERWRITE INTO TABLE Grep10GB_text_none PARTITION(chunk=2);
LOAD DATA LOCAL INPATH '${env:IMPALA_HOME}/testdata/hive_benchmark/grep10GB/part-00003' OVERWRITE INTO TABLE Grep10GB_text_none PARTITION(chunk=3);
LOAD DATA LOCAL INPATH '${env:IMPALA_HOME}/testdata/hive_benchmark/grep10GB/part-00004' OVERWRITE INTO TABLE Grep10GB_text_none PARTITION(chunk=4);
LOAD DATA LOCAL INPATH '${env:IMPALA_HOME}/testdata/hive_benchmark/grep10GB/part-00005' OVERWRITE INTO TABLE Grep10GB_text_none PARTITION(chunk=5);
LOAD DATA LOCAL INPATH '${env:IMPALA_HOME}/testdata/hive_benchmark/html1GB/Rankings.dat' OVERWRITE INTO TABLE Rankings_web_text_none;
LOAD DATA LOCAL INPATH '${env:IMPALA_HOME}/testdata/hive_benchmark/html1GB/UserVisits.dat' OVERWRITE INTO TABLE UserVisits_web_text_none;