Parallelize data loaded through Impala to speed up data loading.

Currently, we execute all the queries involved in data loading serially. This change
creates a separate .sql file for each file format, compression codec and compression
scheme combination, and executes all the files in parallel. Additionally, we now store all the
.sql files (independent of workload) in $IMPALA_HOME/data_load_files/<dataset_name>. Note
that only data loaded through Impala is parallelized, data loaded through hive and hbase
remains serial.

On our build machines, the time taken to load all the data from snapshot was on the order
of 15 minutes.

Change-Id: If8a862c43f0e75b506ca05d83eacdc05621cbbf8
Reviewed-on: http://gerrit.ent.cloudera.com:8080/804
Reviewed-by: Ishaan Joshi <ishaan@cloudera.com>
Tested-by: Ishaan Joshi <ishaan@cloudera.com>
Tested-by: jenkins
This commit is contained in:
ishaan
2013-10-22 13:56:39 -07:00
committed by Henry Robinson
parent 8350bd7d65
commit fcdcf1a9d8
2 changed files with 119 additions and 49 deletions

View File

@@ -7,12 +7,16 @@
import collections
import os
import re
import sqlparse
import subprocess
import sys
import tempfile
import time
from itertools import product
from optparse import OptionParser
from Queue import Queue
from tests.beeswax.impala_beeswax import *
from threading import Thread
parser = OptionParser()
parser.add_option("-e", "--exploration_strategy", dest="exploration_strategy",
@@ -32,7 +36,7 @@ parser.add_option("-f", "--force_reload", dest="force_reload", action="store_tru
parser.add_option("--compute_stats", dest="compute_stats", action="store_true",
default= False, help="Execute COMPUTE STATISTICS statements on the "\
"tables that are loaded")
parser.add_option("--impalad", dest="impala_shell_args", default="localhost:21000",
parser.add_option("--impalad", dest="impalad", default="localhost:21000",
help="Impala daemon to connect to")
parser.add_option("--table_names", dest="table_names", default=None,
help="Only load the specified tables - specified as a comma-seperated "\
@@ -48,8 +52,11 @@ parser.add_option("--workload_dir", dest="workload_dir",
parser.add_option("--dataset_dir", dest="dataset_dir",
default=os.environ['IMPALA_DATASET_DIR'],
help="Directory that contains Impala datasets")
parser.add_option("--use_kerberos", action="store_true", default=False,
help="Load data on a kerberized cluster.")
options, args = parser.parse_args()
DATA_LOAD_DIR = '/tmp/data-load-files'
WORKLOAD_DIR = options.workload_dir
DATASET_DIR = options.dataset_dir
TESTDATA_BIN_DIR = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin')
@@ -59,7 +66,6 @@ GENERATE_SCHEMA_CMD = "generate-schema-statements.py --exploration_strategy=%s "
"--workload=%s --scale_factor=%s --verbose"
HIVE_CMD = os.path.join(os.environ['HIVE_HOME'], 'bin/hive')
HIVE_ARGS = "-hiveconf hive.root.logger=WARN,console -v"
IMPALA_SHELL_CMD = os.path.join(os.environ['IMPALA_HOME'], 'bin/impala-shell.sh')
HADOOP_CMD = os.path.join(os.environ['HADOOP_HOME'], 'bin/hadoop')
def available_workloads(workload_dir):
@@ -74,31 +80,44 @@ def validate_workloads(all_workloads, workloads):
sys.exit(1)
def exec_cmd(cmd, error_msg, expect_success=True):
ret_val = subprocess.call(cmd, shell=True)
if expect_success and ret_val != 0:
print error_msg
sys.exit(ret_val)
ret_val = -1
try:
ret_val = subprocess.call(cmd, shell=True)
except Exception as e:
error_msg = "%s: %s" % (error_msg, str(e))
finally:
if expect_success and ret_val != 0:
print error_msg
return ret_val
def exec_hive_query_from_file(file_name):
if not os.path.exists(file_name): return
hive_cmd = "%s %s -f %s" % (HIVE_CMD, HIVE_ARGS, file_name)
print 'Executing Hive Command: ' + hive_cmd
print 'Executing Hive Command: %s' % hive_cmd
exec_cmd(hive_cmd, 'Error executing file from Hive: ' + file_name)
def exec_hbase_query_from_file(file_name):
if not os.path.exists(file_name): return
hbase_cmd = "hbase shell %s" % file_name
print 'Executing HBase Command: %s' % hbase_cmd
exec_cmd(hbase_cmd, 'Error executing hbase create commands')
def exec_impala_query_from_file(file_name):
impala_refresh_cmd = "%s --impalad=%s -q 'invalidate metadata'" %\
(IMPALA_SHELL_CMD, options.impala_shell_args)
impala_cmd = "%s --impalad=%s -f %s" %\
(IMPALA_SHELL_CMD, options.impala_shell_args, file_name)
# Refresh catalog before and after
exec_cmd(impala_refresh_cmd, 'Error executing refresh from Impala.')
print 'Executing Impala Command: ' + impala_cmd
exec_cmd(impala_cmd, 'Error executing file from Impala: ' + file_name)
exec_cmd(impala_refresh_cmd, 'Error executing refresh from Impala.')
def exec_impala_query_from_file(file_name, result_queue):
"""Execute each query in an Impala query file individually"""
is_success = True
try:
impala_client = ImpalaBeeswaxClient(options.impalad,
use_kerberos=options.use_kerberos)
impala_client.connect()
with open(file_name, 'r+') as query_file:
queries = sqlparse.split(query_file.read())
for query in queries:
query = sqlparse.format(query.rstrip(';'), strip_comments=True)
result = impala_client.execute(query)
except Exception as e:
print "Data Loading from Impala failed with error: %s" % str(e)
is_success = False
result_queue.put(is_success)
def exec_bash_script(file_name):
bash_cmd = "bash %s" % file_name
@@ -152,6 +171,32 @@ def exec_hadoop_fs_cmd(args, expect_success=True):
exec_cmd(cmd, "Error executing Hadoop command, exiting",
expect_success=expect_success)
def exec_impala_query_from_file_parallel(query_files):
if not query_files: return
# Refresh Catalog
print "Invalidating metadata"
impala_client = ImpalaBeeswaxClient(options.impalad, use_kerberos=options.use_kerberos)
impala_client.connect()
impala_client.execute('invalidate metadata')
result_queue = Queue()
threads = []
for query_file in query_files:
thread = Thread(target=exec_impala_query_from_file, args=[query_file, result_queue])
thread.daemon = True
threads.append(thread)
thread.start()
# Keep looping until the number of results retrieved is the same as the number of
# threads spawned, or until a data loading query fails. result_queue.get() will
# block until a result is available in the queue.
num_fetched_results = 0
while num_fetched_results < len(threads):
success = result_queue.get()
num_fetched_results += 1
if not success: sys.exit(1)
# There is a small window where a thread may still be alive even if all the threads have
# finished putting their results in the queue.
for thread in threads: thread.join()
if __name__ == "__main__":
all_workloads = available_workloads(WORKLOAD_DIR)
workloads = []
@@ -166,37 +211,36 @@ if __name__ == "__main__":
workloads = options.workloads.split(",")
validate_workloads(all_workloads, workloads)
print 'Starting data load for the following workloads: ' + ', '.join(workloads)
loading_time_map = collections.defaultdict(float)
for workload in workloads:
start_time = time.time()
dataset = get_dataset_for_workload(workload)
dataset_dir = os.path.join(DATASET_DIR, dataset)
os.chdir(dataset_dir)
generate_schema_statements(workload)
assert os.path.isdir(os.path.join(DATA_LOAD_DIR, dataset)), ("Data loading files "
"do not exist for (%s)" % dataset)
os.chdir(os.path.join(DATA_LOAD_DIR, dataset))
copy_avro_schemas_to_hdfs(AVRO_SCHEMA_DIR)
dataset_dir_contents = os.listdir(os.getcwd())
load_file_substr = "%s-%s" % (workload, options.exploration_strategy)
# Data loading with Impala is done in parallel, each file format has a separate query
# file.
create_filename = '%s-impala-generated' % load_file_substr
load_filename = '%s-impala-load-generated' % load_file_substr
impala_create_files = [f for f in dataset_dir_contents if create_filename in f]
impala_load_files = [f for f in dataset_dir_contents if load_filename in f]
generated_hbase_file = 'load-%s-%s-hbase.create' % (workload,
options.exploration_strategy)
if os.path.exists(generated_hbase_file):
exec_hbase_query_from_file(os.path.join(dataset_dir, generated_hbase_file))
generated_impala_file = \
'load-%s-%s-impala-generated.sql' % (workload, options.exploration_strategy)
if os.path.exists(generated_impala_file):
exec_impala_query_from_file(os.path.join(dataset_dir, generated_impala_file))
generated_hive_file =\
'load-%s-%s-hive-generated.sql' % (workload, options.exploration_strategy)
if os.path.exists(generated_hive_file):
exec_hive_query_from_file(os.path.join(dataset_dir, generated_hive_file))
generated_impala_file = \
'load-%s-%s-impala-load-generated.sql' % (workload, options.exploration_strategy)
if os.path.exists(generated_impala_file):
exec_impala_query_from_file(os.path.join(dataset_dir, generated_impala_file))
# Execute the data loading scripts.
# Creating tables in Impala have no dependencies, so we execute them first.
# HBase table inserts are done via hive, so the hbase tables need to be created before
# running the hive script. Finally, some of the Impala inserts depend on hive tables,
# so they're done at the end.
exec_impala_query_from_file_parallel(impala_create_files)
exec_hbase_query_from_file('load-%s-hbase-generated.create' % load_file_substr)
exec_hive_query_from_file('load-%s-hive-generated.sql' % load_file_substr)
exec_impala_query_from_file_parallel(impala_load_files)
loading_time_map[workload] = time.time() - start_time
total_time = 0.0

View File

@@ -26,10 +26,12 @@
# or LOAD directly if the file already exists in HDFS.
import collections
import csv
import glob
import math
import json
import os
import random
import shutil
import subprocess
import sys
import tempfile
@@ -70,8 +72,9 @@ if options.workload is None:
parser.print_help()
sys.exit(1)
WORKLOAD_DIR = os.environ['IMPALA_HOME'] + '/testdata/workloads'
DATASET_DIR = os.environ['IMPALA_HOME'] + '/testdata/datasets'
DATA_LOAD_DIR = '/tmp/data-load-files'
WORKLOAD_DIR = os.path.join(os.environ['IMPALA_HOME'], 'testdata', 'workloads')
DATASET_DIR = os.path.join(os.environ['IMPALA_HOME'], 'testdata', 'datasets')
AVRO_SCHEMA_DIR = "avro_schemas"
IMPALA_SUPPORTED_INSERT_FORMATS = ['parquet', 'hbase', 'text']
@@ -357,20 +360,20 @@ class Statements(object):
self.load_base = list()
def write_to_file(self, filename):
# Make sure we create the base tables first. It is important that we always write
# to the output file, even if there are no statements to generate. This makes sure
# the output file is empty and the user doesn't unexpectedly load some stale data.
# If there is no content to write, skip
if self.__is_empty(): return
output = self.create + self.load_base + self.load
with open(filename, 'w') as f:
f.write('\n\n'.join(output))
def __is_empty(self):
return not (self.create or self.load or self.load_base)
def generate_statements(output_name, test_vectors, sections,
schema_include_constraints, schema_exclude_constraints):
# TODO: This method has become very unwieldy. It has to be re-factored sooner than
# later.
# Parquet statements to be executed separately by Impala
impala_output = Statements()
impala_load = Statements()
hive_output = Statements()
hbase_output = Statements()
@@ -379,6 +382,8 @@ def generate_statements(output_name, test_vectors, sections,
table_names = [name.lower() for name in options.table_names.split(',')]
existing_tables = get_hdfs_subdirs_with_data(options.hive_warehouse_dir)
for row in test_vectors:
impala_output = Statements()
impala_load = Statements()
file_format, data_set, codec, compression_type =\
[row.file_format, row.dataset, row.compression_codec, row.compression_type]
table_format = '%s/%s/%s' % (file_format, codec, compression_type)
@@ -526,11 +531,15 @@ def generate_statements(output_name, test_vectors, sections,
else:
print 'Empty insert for table %s. Skipping insert generation' % table_name
impala_output.write_to_file('load-' + output_name + '-impala-generated.sql')
impala_load.write_to_file('load-' + output_name + '-impala-load-generated.sql')
impala_output.write_to_file("load-%s-impala-generated-%s-%s-%s.sql" %
(output_name, file_format, codec, compression_type))
impala_load.write_to_file("load-%s-impala-load-generated-%s-%s-%s.sql" %
(output_name, file_format, codec, compression_type))
hive_output.write_to_file('load-' + output_name + '-hive-generated.sql')
hbase_output.create.append("exit")
hbase_output.write_to_file('load-' + output_name + '-hbase.create')
hbase_output.write_to_file('load-' + output_name + '-hbase-generated.create')
def parse_schema_template_file(file_name):
VALID_SECTION_NAMES = ['DATASET', 'BASE_TABLE_NAME', 'COLUMNS', 'PARTITION_COLUMNS',
@@ -555,6 +564,23 @@ if __name__ == "__main__":
target_dataset = test_vectors[0].dataset
print 'Target Dataset: ' + target_dataset
dataset_load_dir = os.path.join(DATA_LOAD_DIR, target_dataset)
# If the directory containing the sql files does not exist, create it. Else nuke all the
# files corresponding to the current workload.
try:
os.makedirs(dataset_load_dir)
except OSError:
# Directory already exists, remove it.
shutil.rmtree(dataset_load_dir)
# Recreate the workload dir
os.makedirs(dataset_load_dir)
finally:
# Make sure that the directory was created and is empty.
assert os.path.isdir(dataset_load_dir)
assert len(os.listdir(dataset_load_dir)) == 0
# Make the dataset dir the current working directory
os.chdir(dataset_load_dir)
schema_template_file = os.path.join(DATASET_DIR, target_dataset,
'%s_schema_template.sql' % target_dataset)