Files
impala/bin/load-data.py
ishaan 565d15579c Add the ability to use a workload as the unit of execution in the Impala benchmark runner.
At the moment, a query is the default unit of execution and parallelism in the Impala
performance suite. With this change, we now have the ability to treat a workload as the
unit of execution. A workload is defined as a unique combination of the dataset, scale
factor, a subset (or all) of the queries in the dataset, and a table format (file format,
compression codec and compression scheme).

It introduces two new command line options in bin/run-workload.py:
  * --execution_scope
    The default scope is 'query', and it maintains previous semantics. The
    new scope is 'workload', which toggles the unit of execution to a workload.
  * --shuffle_query_exec_order.
    Shuffles the order in which queries are executed (only applicable when the
    execution_scope if workload), defaults to False.

Change-Id: I790d75f0896210cda8eb999015b0be04246e4c45
Reviewed-on: http://gerrit.ent.cloudera.com:8080/503
Reviewed-by: Ishaan Joshi <ishaan@cloudera.com>
Tested-by: Ishaan Joshi <ishaan@cloudera.com>
2014-01-08 10:53:07 -08:00

208 lines
9.2 KiB
Python
Executable File

#!/usr/bin/env python
# Copyright (c) 2012 Cloudera, Inc. All rights reserved.
#
# This script is used to load the proper datasets for the specified workloads. It loads
# all data via Hive except for parquet data which needs to be loaded via Impala.
# Most ddl commands are executed by Impala.
import collections
import os
import re
import subprocess
import sys
import tempfile
import time
from itertools import product
from optparse import OptionParser
parser = OptionParser()
parser.add_option("-e", "--exploration_strategy", dest="exploration_strategy",
default="core",
help="The exploration strategy for schema gen: 'core', "\
"'pairwise', or 'exhaustive'")
parser.add_option("--hive_warehouse_dir", dest="hive_warehouse_dir",
default="/test-warehouse",
help="The HDFS path to the base Hive test warehouse directory")
parser.add_option("-w", "--workloads", dest="workloads",
help="Comma-separated list of workloads to load data for. If 'all' is "\
"specified then data for all workloads is loaded.")
parser.add_option("-s", "--scale_factor", dest="scale_factor", default="",
help="An optional scale factor to generate the schema for")
parser.add_option("-f", "--force_reload", dest="force_reload", action="store_true",
default=False, help='Skips HDFS exists check and reloads all tables')
parser.add_option("--compute_stats", dest="compute_stats", action="store_true",
default= False, help="Execute COMPUTE STATISTICS statements on the "\
"tables that are loaded")
parser.add_option("--impalad", dest="impala_shell_args", default="localhost:21000",
help="Impala daemon to connect to")
parser.add_option("--table_names", dest="table_names", default=None,
help="Only load the specified tables - specified as a comma-seperated "\
"list of base table names")
parser.add_option("--table_formats", dest="table_formats", default=None,
help="Override the test vectors and load using the specified table "\
"formats. Ex. --table_formats=seq/snap/block,text/none")
parser.add_option("--hdfs_namenode", dest="hdfs_namenode", default="localhost:20500",
help="HDFS name node for Avro schema URLs, default localhost:20500")
parser.add_option("--workload_dir", dest="workload_dir",
default=os.environ['IMPALA_WORKLOAD_DIR'],
help="Directory that contains Impala workloads")
parser.add_option("--dataset_dir", dest="dataset_dir",
default=os.environ['IMPALA_DATASET_DIR'],
help="Directory that contains Impala datasets")
options, args = parser.parse_args()
WORKLOAD_DIR = options.workload_dir
DATASET_DIR = options.dataset_dir
TESTDATA_BIN_DIR = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin')
AVRO_SCHEMA_DIR = "avro_schemas"
GENERATE_SCHEMA_CMD = "generate-schema-statements.py --exploration_strategy=%s "\
"--workload=%s --scale_factor=%s --verbose"
HIVE_CMD = os.path.join(os.environ['HIVE_HOME'], 'bin/hive')
HIVE_ARGS = "-hiveconf hive.root.logger=WARN,console -v"
IMPALA_SHELL_CMD = os.path.join(os.environ['IMPALA_HOME'], 'bin/impala-shell.sh')
HADOOP_CMD = os.path.join(os.environ['HADOOP_HOME'], 'bin/hadoop')
def available_workloads(workload_dir):
return [subdir for subdir in os.listdir(workload_dir)
if os.path.isdir(os.path.join(workload_dir, subdir))]
def validate_workloads(all_workloads, workloads):
for workload in workloads:
if workload not in all_workloads:
print 'Workload \'%s\' not found in workload directory' % workload
print 'Available workloads: ' + ', '.join(all_workloads)
sys.exit(1)
def exec_cmd(cmd, error_msg, expect_success=True):
ret_val = subprocess.call(cmd, shell=True)
if expect_success and ret_val != 0:
print error_msg
sys.exit(ret_val)
return ret_val
def exec_hive_query_from_file(file_name):
hive_cmd = "%s %s -f %s" % (HIVE_CMD, HIVE_ARGS, file_name)
print 'Executing Hive Command: ' + hive_cmd
exec_cmd(hive_cmd, 'Error executing file from Hive: ' + file_name)
def exec_hbase_query_from_file(file_name):
hbase_cmd = "hbase shell %s" % file_name
exec_cmd(hbase_cmd, 'Error executing hbase create commands')
def exec_impala_query_from_file(file_name):
impala_refresh_cmd = "%s --impalad=%s -q 'invalidate metadata'" %\
(IMPALA_SHELL_CMD, options.impala_shell_args)
impala_cmd = "%s --impalad=%s -f %s" %\
(IMPALA_SHELL_CMD, options.impala_shell_args, file_name)
# Refresh catalog before and after
exec_cmd(impala_refresh_cmd, 'Error executing refresh from Impala.')
print 'Executing Impala Command: ' + impala_cmd
exec_cmd(impala_cmd, 'Error executing file from Impala: ' + file_name)
exec_cmd(impala_refresh_cmd, 'Error executing refresh from Impala.')
def exec_bash_script(file_name):
bash_cmd = "bash %s" % file_name
print 'Executing Bash Command: ' + bash_cmd
exec_cmd(bash_cmd, 'Error bash script: ' + file_name)
def generate_schema_statements(workload):
generate_cmd = GENERATE_SCHEMA_CMD % (options.exploration_strategy, workload,
options.scale_factor)
if options.table_names:
generate_cmd += " --table_names=%s" % options.table_names
if options.force_reload:
generate_cmd += " --force_reload"
if options.table_formats:
generate_cmd += " --table_formats=%s" % options.table_formats
if options.hive_warehouse_dir is not None:
generate_cmd += " --hive_warehouse_dir=%s" % options.hive_warehouse_dir
if options.hdfs_namenode is not None:
generate_cmd += " --hdfs_namenode=%s" % options.hdfs_namenode
print 'Executing Generate Schema Command: ' + generate_cmd
schema_cmd = os.path.join(TESTDATA_BIN_DIR, generate_cmd)
error_msg = 'Error generating schema statements for workload: ' + workload
exec_cmd(schema_cmd, error_msg)
def get_dataset_for_workload(workload):
dimension_file_name = os.path.join(WORKLOAD_DIR, workload,
'%s_dimensions.csv' % workload)
if not os.path.isfile(dimension_file_name):
print 'Dimension file not found: ' + dimension_file_name
sys.exit(1)
with open(dimension_file_name, 'rb') as input_file:
match = re.search('dataset:\s*([\w\-\.]+)', input_file.read())
if match:
return match.group(1)
else:
print 'Dimension file does not contain dataset for workload \'%s\'' % (workload)
sys.exit(1)
def copy_avro_schemas_to_hdfs(schemas_dir):
"""Recursively copies all of schemas_dir to the test warehouse."""
if not os.path.exists(schemas_dir):
print 'Avro schema dir (%s) does not exist. Skipping copy to HDFS.' % schemas_dir
return
exec_hadoop_fs_cmd("-mkdir -p " + options.hive_warehouse_dir)
exec_hadoop_fs_cmd("-put -f %s %s/" % (schemas_dir, options.hive_warehouse_dir))
def exec_hadoop_fs_cmd(args, expect_success=True):
cmd = "%s fs %s" % (HADOOP_CMD, args)
print "Executing Hadoop command: " + cmd
exec_cmd(cmd, "Error executing Hadoop command, exiting",
expect_success=expect_success)
if __name__ == "__main__":
all_workloads = available_workloads(WORKLOAD_DIR)
workloads = []
if options.workloads is None:
print "At least one workload name must be specified."
parser.print_help()
sys.exit(1)
elif options.workloads == 'all':
print 'Loading data for all workloads.'
workloads = all_workloads
else:
workloads = options.workloads.split(",")
validate_workloads(all_workloads, workloads)
print 'Starting data load for the following workloads: ' + ', '.join(workloads)
loading_time_map = collections.defaultdict(float)
for workload in workloads:
start_time = time.time()
dataset = get_dataset_for_workload(workload)
dataset_dir = os.path.join(DATASET_DIR, dataset)
os.chdir(dataset_dir)
generate_schema_statements(workload)
copy_avro_schemas_to_hdfs(AVRO_SCHEMA_DIR)
generated_hbase_file = 'load-%s-%s-hbase.create' % (workload,
options.exploration_strategy)
if os.path.exists(generated_hbase_file):
exec_hbase_query_from_file(os.path.join(dataset_dir, generated_hbase_file))
generated_impala_file = \
'load-%s-%s-impala-generated.sql' % (workload, options.exploration_strategy)
if os.path.exists(generated_impala_file):
exec_impala_query_from_file(os.path.join(dataset_dir, generated_impala_file))
generated_hive_file =\
'load-%s-%s-hive-generated.sql' % (workload, options.exploration_strategy)
if os.path.exists(generated_hive_file):
exec_hive_query_from_file(os.path.join(dataset_dir, generated_hive_file))
generated_impala_file = \
'load-%s-%s-impala-load-generated.sql' % (workload, options.exploration_strategy)
if os.path.exists(generated_impala_file):
exec_impala_query_from_file(os.path.join(dataset_dir, generated_impala_file))
loading_time_map[workload] = time.time() - start_time
total_time = 0.0
for workload, load_time in loading_time_map.iteritems():
total_time += load_time
print 'Data loading for workload \'%s\' completed in: %.2fs'\
% (workload, load_time)
print 'Total load time: %.2fs\n' % total_time