mirror of
https://github.com/apache/impala.git
synced 2025-12-30 03:01:44 -05:00
This patch lays the groundwork for loading data and running end-to-end
tests on a remote CDH cluster. The requirements for the cluster to run
the tests are:
- Managed by Cloudera Manager (CM)
- GPL Extras need to be installed
- KMS and KeyTrustee installed and available as a service
- SERDEPROPERTIES in the Hive DB modified to accept wide tables
- Hive warehouse dir points to /test-warehouse
The actual data loading is done via a new script, remote_data_load.py,
which takes the CM host as an argument. It can be run from a client
machine that is not a node of the cluster, but it needs to have the
Impala repo checked out and Impala built. This insures that all of the
necessary data load scripts are available, as well as setting up the
environment properly (client binaries like beeline and the hbase shell
are available, python libraries like cm_api are installed, necessary
environment variables are defined, etc.)
It should be noted that running remote_data_load.py will overwrite
any local XML config files with the configurations downloaded from
the remote cluster.
Usage: remote_data_load.py [options] <cm_host address>
Options:
-h, --help show this help message and exit
--snapshot-file=SNAPSHOT_FILE
Path to the test-warehouse archive
--cm-user=CM_USER Cloudera Manager admin user
--cm-pass=CM_PASS Cloudera Manager admin user password
--gateway=GATEWAY Gateway host to upload the data from. If not
set, uses the CM host as gateway.
--ssh-user=SSH_USER System user on the remote machine with
passwordless SSH configured.
--no-load Do not try to load the snapshot
--exploration-strategy=EXPLORATION_STRATEGY
--test Run end-to-end tests against cluster
Testing:
This patch is being submitted with the understanding that there are
still clean up issues that need to be addressed in the remote data
load script, for which JIRA's have been filed.
However, since many of the existing build scripts also had to be
modified, it is more important to make sure that no regressions were
inadvertently introduced into the existing data load process. Loading
data to a local mini-cluster was checked repeatedly while this patch
was being developed, as well as running it against the Jenkins job
that provides the test-warehouse snapshot used by the many other
Impala CI builds that run daily.
Change-Id: I1f443a1728a1d28168090c6f54e82dec2cb073e9
Reviewed-on: http://gerrit.cloudera.org:8080/4769
Reviewed-by: Taras Bobrovytsky <tbobrovytsky@cloudera.com>
Tested-by: Internal Jenkins
338 lines
15 KiB
Python
Executable File
338 lines
15 KiB
Python
Executable File
#!/usr/bin/env impala-python
|
|
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
# This script is used to load the proper datasets for the specified workloads. It loads
|
|
# all data via Hive except for parquet data which needs to be loaded via Impala.
|
|
# Most ddl commands are executed by Impala.
|
|
import collections
|
|
import getpass
|
|
import logging
|
|
import os
|
|
import re
|
|
import sqlparse
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
import traceback
|
|
|
|
from itertools import product
|
|
from optparse import OptionParser
|
|
from Queue import Queue
|
|
from tests.beeswax.impala_beeswax import *
|
|
from threading import Thread
|
|
|
|
logging.basicConfig()
|
|
LOG = logging.getLogger('load-data.py')
|
|
LOG.setLevel(logging.DEBUG)
|
|
|
|
parser = OptionParser()
|
|
parser.add_option("-e", "--exploration_strategy", dest="exploration_strategy",
|
|
default="core",
|
|
help="The exploration strategy for schema gen: 'core', "\
|
|
"'pairwise', or 'exhaustive'")
|
|
parser.add_option("--hive_warehouse_dir", dest="hive_warehouse_dir",
|
|
default="/test-warehouse",
|
|
help="The HDFS path to the base Hive test warehouse directory")
|
|
parser.add_option("-w", "--workloads", dest="workloads",
|
|
help="Comma-separated list of workloads to load data for. If 'all' is "\
|
|
"specified then data for all workloads is loaded.")
|
|
parser.add_option("-s", "--scale_factor", dest="scale_factor", default="",
|
|
help="An optional scale factor to generate the schema for")
|
|
parser.add_option("-f", "--force_reload", dest="force_reload", action="store_true",
|
|
default=False, help='Skips HDFS exists check and reloads all tables')
|
|
parser.add_option("--impalad", dest="impalad", default="localhost:21000",
|
|
help="Impala daemon to connect to")
|
|
parser.add_option("--hive_hs2_hostport", dest="hive_hs2_hostport",
|
|
default="localhost:11050",
|
|
help="HS2 host:Port to issue Hive queries against using beeline")
|
|
parser.add_option("--table_names", dest="table_names", default=None,
|
|
help="Only load the specified tables - specified as a comma-seperated "\
|
|
"list of base table names")
|
|
parser.add_option("--table_formats", dest="table_formats", default=None,
|
|
help="Override the test vectors and load using the specified table "\
|
|
"formats. Ex. --table_formats=seq/snap/block,text/none")
|
|
parser.add_option("--hdfs_namenode", dest="hdfs_namenode", default="localhost:20500",
|
|
help="HDFS name node for Avro schema URLs, default localhost:20500")
|
|
parser.add_option("--workload_dir", dest="workload_dir",
|
|
default=os.environ['IMPALA_WORKLOAD_DIR'],
|
|
help="Directory that contains Impala workloads")
|
|
parser.add_option("--dataset_dir", dest="dataset_dir",
|
|
default=os.environ['IMPALA_DATASET_DIR'],
|
|
help="Directory that contains Impala datasets")
|
|
parser.add_option("--use_kerberos", action="store_true", default=False,
|
|
help="Load data on a kerberized cluster.")
|
|
parser.add_option("--principal", default=None, dest="principal",
|
|
help="Kerberos service principal, required if --use_kerberos is set")
|
|
|
|
options, args = parser.parse_args()
|
|
|
|
SQL_OUTPUT_DIR = os.environ['IMPALA_DATA_LOADING_SQL_DIR']
|
|
WORKLOAD_DIR = options.workload_dir
|
|
DATASET_DIR = options.dataset_dir
|
|
TESTDATA_BIN_DIR = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin')
|
|
AVRO_SCHEMA_DIR = "avro_schemas"
|
|
|
|
GENERATE_SCHEMA_CMD = "generate-schema-statements.py --exploration_strategy=%s "\
|
|
"--workload=%s --scale_factor=%s --verbose"
|
|
# Load data using Hive's beeline because the Hive shell has regressed (HIVE-5515).
|
|
# The Hive shell is stateful, meaning that certain series of actions lead to problems.
|
|
# Examples of problems due to the statefullness of the Hive shell:
|
|
# - Creating an HBase table changes the replication factor to 1 for subsequent LOADs.
|
|
# - INSERTs into an HBase table fail if they are the first stmt executed in a session.
|
|
# However, beeline itself also has bugs. For example, inserting a NULL literal into
|
|
# a string-typed column leads to an NPE. We work around these problems by using LOAD from
|
|
# a datafile instead of doing INSERTs.
|
|
HIVE_CMD = os.path.join(os.environ['HIVE_HOME'], 'bin/beeline')
|
|
|
|
hive_auth = "auth=none"
|
|
if options.use_kerberos:
|
|
if not options.principal:
|
|
print "--principal is required when --use_kerberos is specified"
|
|
exit(1)
|
|
hive_auth = "principal=" + options.principal
|
|
|
|
HIVE_ARGS = '-n %s -u "jdbc:hive2://%s/default;%s" --verbose=true'\
|
|
% (getpass.getuser(), options.hive_hs2_hostport, hive_auth)
|
|
|
|
HADOOP_CMD = os.path.join(os.environ['HADOOP_HOME'], 'bin/hadoop')
|
|
|
|
def available_workloads(workload_dir):
|
|
return [subdir for subdir in os.listdir(workload_dir)
|
|
if os.path.isdir(os.path.join(workload_dir, subdir))]
|
|
|
|
def validate_workloads(all_workloads, workloads):
|
|
for workload in workloads:
|
|
if workload not in all_workloads:
|
|
print 'Workload \'%s\' not found in workload directory' % workload
|
|
print 'Available workloads: ' + ', '.join(all_workloads)
|
|
sys.exit(1)
|
|
|
|
def exec_cmd(cmd, error_msg, exit_on_error=True):
|
|
ret_val = -1
|
|
try:
|
|
ret_val = subprocess.call(cmd, shell=True)
|
|
except Exception as e:
|
|
error_msg = "%s: %s" % (error_msg, str(e))
|
|
finally:
|
|
if ret_val != 0:
|
|
print error_msg
|
|
if exit_on_error: sys.exit(ret_val)
|
|
return ret_val
|
|
|
|
def exec_hive_query_from_file(file_name):
|
|
if not os.path.exists(file_name): return
|
|
hive_cmd = "%s %s -f %s" % (HIVE_CMD, HIVE_ARGS, file_name)
|
|
print 'Executing Hive Command: %s' % hive_cmd
|
|
exec_cmd(hive_cmd, 'Error executing file from Hive: ' + file_name)
|
|
|
|
def exec_hbase_query_from_file(file_name):
|
|
if not os.path.exists(file_name): return
|
|
hbase_cmd = "hbase shell %s" % file_name
|
|
print 'Executing HBase Command: %s' % hbase_cmd
|
|
exec_cmd(hbase_cmd, 'Error executing hbase create commands')
|
|
|
|
# KERBEROS TODO: fails when kerberized and impalad principal isn't "impala"
|
|
def exec_impala_query_from_file(file_name):
|
|
"""Execute each query in an Impala query file individually"""
|
|
is_success = True
|
|
impala_client = ImpalaBeeswaxClient(options.impalad, use_kerberos=options.use_kerberos)
|
|
try:
|
|
impala_client.connect()
|
|
with open(file_name, 'r+') as query_file:
|
|
queries = sqlparse.split(query_file.read())
|
|
for query in queries:
|
|
query = sqlparse.format(query.rstrip(';'), strip_comments=True)
|
|
print '(%s):\n%s\n' % (file_name, query.strip())
|
|
if query.strip() != "":
|
|
result = impala_client.execute(query)
|
|
except Exception as e:
|
|
print "Data Loading from Impala failed with error: %s" % str(e)
|
|
traceback.print_exc()
|
|
is_success = False
|
|
finally:
|
|
impala_client.close_connection()
|
|
return is_success
|
|
|
|
def exec_bash_script(file_name):
|
|
bash_cmd = "bash %s" % file_name
|
|
print 'Executing Bash Command: ' + bash_cmd
|
|
exec_cmd(bash_cmd, 'Error bash script: ' + file_name)
|
|
|
|
def run_dataset_preload(dataset):
|
|
"""Execute a preload script if present in dataset directory. E.g. to generate data
|
|
before loading"""
|
|
dataset_preload_script = os.path.join(DATASET_DIR, dataset, "preload")
|
|
if os.path.exists(dataset_preload_script):
|
|
print("Running preload script for " + dataset)
|
|
exec_cmd(dataset_preload_script, "Error executing preload script for " + dataset,
|
|
exit_on_error=True)
|
|
|
|
def generate_schema_statements(workload):
|
|
generate_cmd = GENERATE_SCHEMA_CMD % (options.exploration_strategy, workload,
|
|
options.scale_factor)
|
|
if options.table_names:
|
|
generate_cmd += " --table_names=%s" % options.table_names
|
|
if options.force_reload:
|
|
generate_cmd += " --force_reload"
|
|
if options.table_formats:
|
|
generate_cmd += " --table_formats=%s" % options.table_formats
|
|
if options.hive_warehouse_dir is not None:
|
|
generate_cmd += " --hive_warehouse_dir=%s" % options.hive_warehouse_dir
|
|
if options.hdfs_namenode is not None:
|
|
generate_cmd += " --hdfs_namenode=%s" % options.hdfs_namenode
|
|
generate_cmd += " --backend=%s" % options.impalad
|
|
print 'Executing Generate Schema Command: ' + generate_cmd
|
|
schema_cmd = os.path.join(TESTDATA_BIN_DIR, generate_cmd)
|
|
error_msg = 'Error generating schema statements for workload: ' + workload
|
|
exec_cmd(schema_cmd, error_msg)
|
|
|
|
def get_dataset_for_workload(workload):
|
|
dimension_file_name = os.path.join(WORKLOAD_DIR, workload,
|
|
'%s_dimensions.csv' % workload)
|
|
if not os.path.isfile(dimension_file_name):
|
|
print 'Dimension file not found: ' + dimension_file_name
|
|
sys.exit(1)
|
|
with open(dimension_file_name, 'rb') as input_file:
|
|
match = re.search('dataset:\s*([\w\-\.]+)', input_file.read())
|
|
if match:
|
|
return match.group(1)
|
|
else:
|
|
print 'Dimension file does not contain dataset for workload \'%s\'' % (workload)
|
|
sys.exit(1)
|
|
|
|
def copy_avro_schemas_to_hdfs(schemas_dir):
|
|
"""Recursively copies all of schemas_dir to the test warehouse."""
|
|
if not os.path.exists(schemas_dir):
|
|
print 'Avro schema dir (%s) does not exist. Skipping copy to HDFS.' % schemas_dir
|
|
return
|
|
|
|
exec_hadoop_fs_cmd("-mkdir -p " + options.hive_warehouse_dir)
|
|
exec_hadoop_fs_cmd("-put -f %s %s/" % (schemas_dir, options.hive_warehouse_dir))
|
|
|
|
def exec_hadoop_fs_cmd(args, exit_on_error=True):
|
|
cmd = "%s fs %s" % (HADOOP_CMD, args)
|
|
print "Executing Hadoop command: " + cmd
|
|
exec_cmd(cmd, "Error executing Hadoop command, exiting",
|
|
exit_on_error=exit_on_error)
|
|
|
|
def exec_impala_query_from_file_parallel(query_files):
|
|
# Get the name of the query file that loads the base tables, if it exists.
|
|
# TODO: Find a better way to detect the file that loads the base tables.
|
|
create_base_table_file = next((q for q in query_files if 'text' in q), None)
|
|
if create_base_table_file:
|
|
is_success = exec_impala_query_from_file(create_base_table_file)
|
|
query_files.remove(create_base_table_file)
|
|
# If loading the base tables failed, exit with a non zero error code.
|
|
if not is_success: sys.exit(1)
|
|
if not query_files: return
|
|
threads = []
|
|
result_queue = Queue()
|
|
for query_file in query_files:
|
|
thread = Thread(target=lambda x: result_queue.put(exec_impala_query_from_file(x)),
|
|
args=[query_file])
|
|
thread.daemon = True
|
|
threads.append(thread)
|
|
thread.start()
|
|
# Keep looping until the number of results retrieved is the same as the number of
|
|
# threads spawned, or until a data loading query fails. result_queue.get() will
|
|
# block until a result is available in the queue.
|
|
num_fetched_results = 0
|
|
while num_fetched_results < len(threads):
|
|
success = result_queue.get()
|
|
num_fetched_results += 1
|
|
if not success: sys.exit(1)
|
|
# There is a small window where a thread may still be alive even if all the threads have
|
|
# finished putting their results in the queue.
|
|
for thread in threads: thread.join()
|
|
|
|
def invalidate_impala_metadata():
|
|
print "Invalidating Metadata"
|
|
impala_client = ImpalaBeeswaxClient(options.impalad, use_kerberos=options.use_kerberos)
|
|
impala_client.connect()
|
|
try:
|
|
impala_client.execute('invalidate metadata')
|
|
finally:
|
|
impala_client.close_connection()
|
|
|
|
if __name__ == "__main__":
|
|
# Having the actual command line at the top of each data-load-* log can help
|
|
# when debugging dataload issues.
|
|
#
|
|
LOG.debug(' '.join(sys.argv))
|
|
|
|
all_workloads = available_workloads(WORKLOAD_DIR)
|
|
workloads = []
|
|
if options.workloads is None:
|
|
print "At least one workload name must be specified."
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
elif options.workloads == 'all':
|
|
print 'Loading data for all workloads.'
|
|
workloads = all_workloads
|
|
else:
|
|
workloads = options.workloads.split(",")
|
|
validate_workloads(all_workloads, workloads)
|
|
|
|
print 'Starting data load for the following workloads: ' + ', '.join(workloads)
|
|
|
|
loading_time_map = collections.defaultdict(float)
|
|
for workload in workloads:
|
|
start_time = time.time()
|
|
dataset = get_dataset_for_workload(workload)
|
|
run_dataset_preload(dataset)
|
|
generate_schema_statements(workload)
|
|
sql_dir = os.path.join(SQL_OUTPUT_DIR, dataset)
|
|
assert os.path.isdir(sql_dir),\
|
|
("Could not find the generated SQL files for loading dataset '%s'.\
|
|
\nExpected to find the SQL files in: %s" % (dataset, sql_dir))
|
|
os.chdir(os.path.join(SQL_OUTPUT_DIR, dataset))
|
|
copy_avro_schemas_to_hdfs(AVRO_SCHEMA_DIR)
|
|
dataset_dir_contents = os.listdir(os.getcwd())
|
|
load_file_substr = "%s-%s" % (workload, options.exploration_strategy)
|
|
# Data loading with Impala is done in parallel, each file format has a separate query
|
|
# file.
|
|
create_filename = '%s-impala-generated' % load_file_substr
|
|
load_filename = '%s-impala-load-generated' % load_file_substr
|
|
impala_create_files = [f for f in dataset_dir_contents if create_filename in f]
|
|
impala_load_files = [f for f in dataset_dir_contents if load_filename in f]
|
|
|
|
# Execute the data loading scripts.
|
|
# Creating tables in Impala has no dependencies, so we execute them first.
|
|
# HBase table inserts are done via hive, so the hbase tables need to be created before
|
|
# running the hive script. Some of the Impala inserts depend on hive tables,
|
|
# so they're done at the end. Finally, the Hbase Tables that have been filled with data
|
|
# need to be flushed.
|
|
exec_impala_query_from_file_parallel(impala_create_files)
|
|
exec_hbase_query_from_file('load-%s-hbase-generated.create' % load_file_substr)
|
|
exec_hive_query_from_file('load-%s-hive-generated.sql' % load_file_substr)
|
|
exec_hbase_query_from_file('post-load-%s-hbase-generated.sql' % load_file_substr)
|
|
|
|
if impala_load_files: invalidate_impala_metadata()
|
|
exec_impala_query_from_file_parallel(impala_load_files)
|
|
loading_time_map[workload] = time.time() - start_time
|
|
|
|
invalidate_impala_metadata()
|
|
total_time = 0.0
|
|
for workload, load_time in loading_time_map.iteritems():
|
|
total_time += load_time
|
|
print 'Data loading for workload \'%s\' completed in: %.2fs'\
|
|
% (workload, load_time)
|
|
print 'Total load time: %.2fs\n' % total_time
|