#!/usr/bin/env impala-python3 # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # # This script is used to load the proper datasets for the specified workloads. It loads # all data via Hive except for parquet data which needs to be loaded via Impala. # Most ddl commands are executed by Impala. from __future__ import absolute_import, division, print_function import collections import getpass import logging import os import re import sqlparse import subprocess import sys import time import traceback from optparse import OptionParser from tests.common.impala_connection import ImpylaHS2Connection from tests.common.test_vector import HS2 from multiprocessing.pool import ThreadPool LOG = logging.getLogger('load-data.py') parser = OptionParser() parser.add_option("-e", "--exploration_strategy", dest="exploration_strategy", default="core", help="The exploration strategy for schema gen: 'core', "\ "'pairwise', or 'exhaustive'") parser.add_option("--hive_warehouse_dir", dest="hive_warehouse_dir", default="/test-warehouse", help="The HDFS path to the base Hive test warehouse directory") parser.add_option("-w", "--workloads", dest="workloads", help="Comma-separated list of workloads to load data for. If 'all' is "\ "specified then data for all workloads is loaded.") parser.add_option("-s", "--scale_factor", dest="scale_factor", default="", help="An optional scale factor to generate the schema for") parser.add_option("-f", "--force_reload", dest="force_reload", action="store_true", default=False, help='Skips HDFS exists check and reloads all tables') parser.add_option("--impalad", dest="impalad", default="localhost", help="Impala daemon to connect to") parser.add_option("--hive_hs2_hostport", dest="hive_hs2_hostport", default="localhost:11050", help="HS2 host:Port to issue Hive queries against using beeline") parser.add_option("--table_names", dest="table_names", default=None, help="Only load the specified tables - specified as a comma-seperated "\ "list of base table names") parser.add_option("--table_formats", dest="table_formats", default=None, help="Override the test vectors and load using the specified table "\ "formats. Ex. --table_formats=seq/snap/block,text/none") parser.add_option("--hdfs_namenode", dest="hdfs_namenode", default="localhost:20500", help="HDFS name node for Avro schema URLs, default localhost:20500") parser.add_option("--workload_dir", dest="workload_dir", default=os.environ['IMPALA_WORKLOAD_DIR'], help="Directory that contains Impala workloads") parser.add_option("--dataset_dir", dest="dataset_dir", default=os.environ['IMPALA_DATASET_DIR'], help="Directory that contains Impala datasets") parser.add_option("--use_kerberos", action="store_true", default=False, help="Load data on a kerberized cluster.") parser.add_option("--principal", default=None, dest="principal", help="Kerberos service principal, required if --use_kerberos is set") parser.add_option("--num_processes", type="int", dest="num_processes", default=os.environ['IMPALA_BUILD_THREADS'], help="Number of parallel processes to use.") options, args = parser.parse_args() SQL_OUTPUT_DIR = os.environ['IMPALA_DATA_LOADING_SQL_DIR'] WORKLOAD_DIR = options.workload_dir DATASET_DIR = options.dataset_dir TESTDATA_BIN_DIR = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin') AVRO_SCHEMA_DIR = "avro_schemas" GENERATE_SCHEMA_CMD = "generate-schema-statements.py --exploration_strategy=%s "\ "--workload=%s --scale_factor=%s --verbose" # Load data using Hive's beeline because the Hive shell has regressed (HIVE-5515). # The Hive shell is stateful, meaning that certain series of actions lead to problems. # Examples of problems due to the statefullness of the Hive shell: # - Creating an HBase table changes the replication factor to 1 for subsequent LOADs. # - INSERTs into an HBase table fail if they are the first stmt executed in a session. # However, beeline itself also has bugs. For example, inserting a NULL literal into # a string-typed column leads to an NPE. We work around these problems by using LOAD from # a datafile instead of doing INSERTs. HIVE_CMD = os.path.join(os.environ['HIVE_HOME'], 'bin/beeline') hive_auth = "auth=none" if options.use_kerberos: if not options.principal: print("--principal is required when --use_kerberos is specified") exit(1) hive_auth = "principal=" + options.principal HIVE_ARGS = '-n %s -u "jdbc:hive2://%s/default;%s" --verbose=true'\ % (getpass.getuser(), options.hive_hs2_hostport, hive_auth) HADOOP_CMD = os.path.join(os.environ['HADOOP_HOME'], 'bin/hadoop') HS2_HOST_PORT = "{}:{}".format(options.impalad, 21050) def available_workloads(workload_dir): return [subdir for subdir in os.listdir(workload_dir) if os.path.isdir(os.path.join(workload_dir, subdir))] def validate_workloads(all_workloads, workloads): for workload in workloads: if workload not in all_workloads: LOG.error('Workload \'%s\' not found in workload directory' % workload) LOG.error('Available workloads: ' + ', '.join(all_workloads)) sys.exit(1) def exec_cmd(cmd, error_msg=None, exit_on_error=True, out_file=None): """Run the given command in the shell returning whether the command succeeded. If 'error_msg' is set, log the error message on failure. If 'exit_on_error' is True, exit the program on failure. If 'out_file' is specified, log all output to that file.""" success = True if out_file: with open(out_file, 'w') as f: ret_val = subprocess.call(cmd, shell=True, stderr=f, stdout=f) else: ret_val = subprocess.call(cmd, shell=True) if ret_val != 0: if error_msg: LOG.info(error_msg) if exit_on_error: sys.exit(ret_val) success = False return success def exec_hive_query_from_file_beeline(file_name): if not os.path.exists(file_name): LOG.info("Error: File {0} not found".format(file_name)) return False LOG.info("Beginning execution of hive SQL: {0}".format(file_name)) output_file = file_name + ".log" hive_cmd = "{0} {1} -f {2}".format(HIVE_CMD, HIVE_ARGS, file_name) is_success = exec_cmd(hive_cmd, exit_on_error=False, out_file=output_file) if is_success: LOG.info("Finished execution of hive SQL: {0}".format(file_name)) else: LOG.info("Error executing hive SQL: {0} See: {1}".format(file_name, \ output_file)) return is_success def exec_hbase_query_from_file(file_name, step_name): if not os.path.exists(file_name): return LOG.info('Begin step "%s".' % step_name) start_time = time.time() hbase_cmd = "hbase shell %s" % file_name LOG.info('Executing HBase Command: %s' % hbase_cmd) exec_cmd(hbase_cmd, error_msg='Error executing hbase create commands') total_time = time.time() - start_time LOG.info('End step "%s". Total time: %.2fs\n' % (step_name, total_time)) # KERBEROS TODO: fails when kerberized and impalad principal isn't "impala" def exec_impala_query_from_file(file_name): """Execute each query in an Impala query file individually""" if not os.path.exists(file_name): LOG.info("Error: File {0} not found".format(file_name)) return False LOG.info("Beginning execution of impala SQL on {0}: {1}".format( options.impalad, file_name)) is_success = True impala_client = ImpylaHS2Connection(HS2_HOST_PORT, use_kerberos=options.use_kerberos) output_file = file_name + ".log" query = None with open(output_file, 'w') as out_file: try: impala_client.connect() with open(file_name, 'r+') as query_file: queries = sqlparse.split(query_file.read()) for query in queries: query = sqlparse.format(query.rstrip(';'), strip_comments=True) if query.strip() != "": result = impala_client.execute(query) out_file.write("{0}\n{1}\n".format(query, result)) except Exception as e: if query: out_file.write("ERROR: {0}\n".format(query)) else: out_file.write("Encounter errors before parsing any queries.\n") traceback.print_exc(file=out_file) is_success = False if is_success: LOG.info("Finished execution of impala SQL: {0}".format(file_name)) else: LOG.info("Error executing impala SQL: {0} See: {1}".format(file_name, \ output_file)) return is_success def run_dataset_preload(dataset): """Execute a preload script if present in dataset directory. E.g. to generate data before loading""" dataset_preload_script = os.path.join(DATASET_DIR, dataset, "preload") if os.path.exists(dataset_preload_script): LOG.info("Running preload script for " + dataset) if options.scale_factor != "" and int(options.scale_factor) > 1: dataset_preload_script += " " + str(options.scale_factor) exec_cmd(dataset_preload_script, error_msg="Error executing preload script for " + dataset, exit_on_error=True) def generate_schema_statements(workload): generate_cmd = GENERATE_SCHEMA_CMD % (options.exploration_strategy, workload, options.scale_factor) if options.table_names: generate_cmd += " --table_names=%s" % options.table_names if options.force_reload: generate_cmd += " --force_reload" if options.table_formats: generate_cmd += " --table_formats=%s" % options.table_formats if options.hive_warehouse_dir is not None: generate_cmd += " --hive_warehouse_dir=%s" % options.hive_warehouse_dir if options.hdfs_namenode is not None: generate_cmd += " --hdfs_namenode=%s" % options.hdfs_namenode generate_cmd += " --backend=%s" % HS2_HOST_PORT LOG.info('Executing Generate Schema Command: ' + generate_cmd) schema_cmd = os.path.join(TESTDATA_BIN_DIR, generate_cmd) error_msg = 'Error generating schema statements for workload: ' + workload exec_cmd(schema_cmd, error_msg=error_msg) def get_dataset_for_workload(workload): dimension_file_name = os.path.join(WORKLOAD_DIR, workload, '%s_dimensions.csv' % workload) if not os.path.isfile(dimension_file_name): LOG.error('Dimension file not found: ' + dimension_file_name) sys.exit(1) with open(dimension_file_name, 'r') as input_file: match = re.search(r'dataset:\s*([\w\-\.]+)', input_file.read()) if match: return match.group(1) else: LOG.error('Dimension file does not contain dataset for workload \'%s\'' % (workload)) sys.exit(1) def copy_avro_schemas_to_hdfs(schemas_dir): """Recursively copies all of schemas_dir to the test warehouse.""" if not os.path.exists(schemas_dir): LOG.info('Avro schema dir (%s) does not exist. Skipping copy to HDFS.' % schemas_dir) return exec_hadoop_fs_cmd("-mkdir -p " + options.hive_warehouse_dir) exec_hadoop_fs_cmd("-put -f %s %s/" % (schemas_dir, options.hive_warehouse_dir)) def exec_hadoop_fs_cmd(args, exit_on_error=True): cmd = "%s fs %s" % (HADOOP_CMD, args) LOG.info("Executing Hadoop command: " + cmd) exec_cmd(cmd, error_msg="Error executing Hadoop command, exiting", exit_on_error=exit_on_error) def exec_query_files_parallel(thread_pool, query_files, execution_type, step_name): """Executes the query files provided using the execution engine specified in parallel using the given thread pool. Aborts immediately if any execution encounters an error.""" assert(execution_type == 'impala' or execution_type == 'hive') if len(query_files) == 0: return if execution_type == 'impala': execution_function = exec_impala_query_from_file elif execution_type == 'hive': execution_function = exec_hive_query_from_file_beeline LOG.info('Begin step "%s".' % step_name) start_time = time.time() for result in thread_pool.imap_unordered(execution_function, query_files): if not result: thread_pool.terminate() sys.exit(1) total_time = time.time() - start_time LOG.info('End step "%s". Total time: %.2fs\n' % (step_name, total_time)) def impala_exec_query_files_parallel(thread_pool, query_files, step_name): exec_query_files_parallel(thread_pool, query_files, 'impala', step_name) def hive_exec_query_files_parallel(thread_pool, query_files, step_name): exec_query_files_parallel(thread_pool, query_files, 'hive', step_name) def main(): logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%H:%M:%S') LOG.setLevel(logging.DEBUG) # Having the actual command line at the top of each data-load-* log can help # when debugging dataload issues. # LOG.debug(' '.join(sys.argv)) all_workloads = available_workloads(WORKLOAD_DIR) workloads = [] if options.workloads is None: LOG.error("At least one workload name must be specified.") parser.print_help() sys.exit(1) elif options.workloads == 'all': LOG.info('Loading data for all workloads.') workloads = all_workloads else: workloads = options.workloads.split(",") validate_workloads(all_workloads, workloads) LOG.info('Starting data load for the following workloads: ' + ', '.join(workloads)) LOG.info('Running with {0} threads'.format(options.num_processes)) # Note: The processes are in whatever the caller's directory is, so all paths # passed to the pool need to be absolute paths. This will allow the pool # to be used for different workloads (and thus different directories) # simultaneously. thread_pool = ThreadPool(processes=options.num_processes) loading_time_map = collections.defaultdict(float) for workload in workloads: start_time = time.time() dataset = get_dataset_for_workload(workload) run_dataset_preload(dataset) # This script is tightly coupled with testdata/bin/generate-schema-statements.py # Specifically, this script is expecting the following: # 1. generate-schema-statements.py generates files and puts them in the # directory ${IMPALA_DATA_LOADING_SQL_DIR}/${workload} # (e.g. ${IMPALA_HOME}/logs/data_loading/sql/tpch) # 2. generate-schema-statements.py populates the subdirectory # avro_schemas/${workload} with JSON files specifying the Avro schema for the # tables being loaded. # 3. generate-schema-statements.py uses a particular naming scheme to distinguish # between SQL files of different load phases. # # Using the following variables: # workload_exploration = ${workload}-${exploration_strategy} and # file_format_suffix = ${file_format}-${codec}-${compression_type} # # A. Impala table creation scripts run in Impala to create tables, partitions, # and views. There is one for each file format. They take the form: # create-${workload_exploration}-impala-generated-${file_format_suffix}.sql # # B. Hive creation/load scripts run in Hive to load data into tables and create # tables or views that Impala does not support. There is one for each # file format. They take the form: # load-${workload_exploration}-hive-generated-${file_format_suffix}.sql # # C. HBase creation script runs through the hbase commandline to create # HBase tables. (Only generated if loading HBase table.) It takes the form: # load-${workload_exploration}-hbase-generated.create # # D. HBase postload script runs through the hbase commandline to flush # HBase tables. (Only generated if loading HBase table.) It takes the form: # post-load-${workload_exploration}-hbase-generated.sql # # E. Impala load scripts run in Impala to load data. Only Parquet and Kudu # are loaded through Impala. There is one for each of those formats loaded. # They take the form: # load-${workload_exploration}-impala-generated-${file_format_suffix}.sql # # F. Invalidation script runs through Impala to invalidate/refresh metadata # for tables. It takes the form: # invalidate-${workload_exploration}-impala-generated.sql generate_schema_statements(workload) # Determine the directory from #1 sql_dir = os.path.join(SQL_OUTPUT_DIR, dataset) assert os.path.isdir(sql_dir),\ ("Could not find the generated SQL files for loading dataset '%s'.\ \nExpected to find the SQL files in: %s" % (dataset, sql_dir)) # Copy the avro schemas (see #2) into HDFS avro_schemas_path = os.path.join(sql_dir, AVRO_SCHEMA_DIR) copy_avro_schemas_to_hdfs(avro_schemas_path) # List all of the files in the sql directory to sort out the various types of # files (see #3). dataset_dir_contents = [os.path.join(sql_dir, f) for f in os.listdir(sql_dir)] workload_exploration = "%s-%s" % (workload, options.exploration_strategy) # Remove the AVRO_SCHEMA_DIR from the list of files if os.path.exists(avro_schemas_path): dataset_dir_contents.remove(avro_schemas_path) # Match for Impala create files (3.A) impala_create_match = 'create-%s-impala-generated' % workload_exploration # Match for Hive create/load files (3.B) hive_load_match = 'load-%s-hive-generated' % workload_exploration # Match for HBase creation script (3.C) hbase_create_match = 'load-%s-hbase-generated.create' % workload_exploration # Match for HBase post-load script (3.D) hbase_postload_match = 'post-load-%s-hbase-generated.sql' % workload_exploration # Match for Impala load scripts (3.E) impala_load_match = 'load-%s-impala-generated' % workload_exploration # Match for Impala invalidate script (3.F) invalidate_match = 'invalidate-%s-impala-generated' % workload_exploration impala_create_files = [] hive_load_text_files = [] hive_load_orc_files = [] hive_load_nontext_files = [] hbase_create_files = [] hbase_postload_files = [] impala_load_files = [] invalidate_files = [] for filename in dataset_dir_contents: if impala_create_match in filename: impala_create_files.append(filename) elif hive_load_match in filename: if 'text-none-none' in filename: hive_load_text_files.append(filename) elif 'orc-def-block' in filename: hive_load_orc_files.append(filename) else: hive_load_nontext_files.append(filename) elif hbase_create_match in filename: hbase_create_files.append(filename) elif hbase_postload_match in filename: hbase_postload_files.append(filename) elif impala_load_match in filename: impala_load_files.append(filename) elif invalidate_match in filename: invalidate_files.append(filename) else: assert False, "Unexpected input file {0}".format(filename) # Simple helper function to dump a header followed by the filenames def log_file_list(header, file_list): if (len(file_list) == 0): return LOG.debug(header) list(map(LOG.debug, list(map(os.path.basename, file_list)))) LOG.debug("\n") log_file_list("Impala Create Files:", impala_create_files) log_file_list("Hive Load Text Files:", hive_load_text_files) log_file_list("Hive Load Orc Files:", hive_load_orc_files) log_file_list("Hive Load Non-Text Files:", hive_load_nontext_files) log_file_list("HBase Create Files:", hbase_create_files) log_file_list("HBase Post-Load Files:", hbase_postload_files) log_file_list("Impala Load Files:", impala_load_files) log_file_list("Impala Invalidate Files:", invalidate_files) # Execute the data loading scripts. # Creating tables in Impala has no dependencies, so we execute them first. # HBase table inserts are done via hive, so the hbase tables need to be created before # running the hive scripts. Some of the Impala inserts depend on hive tables, # so they're done at the end. Finally, the Hbase Tables that have been filled with data # need to be flushed. impala_exec_query_files_parallel(thread_pool, impala_create_files, "Impala Create") # There should be at most one hbase creation script assert(len(hbase_create_files) <= 1) for hbase_create in hbase_create_files: exec_hbase_query_from_file(hbase_create, "HBase Create") # If this is loading text tables plus multiple other formats, the text tables # need to be loaded first assert(len(hive_load_text_files) <= 1) hive_exec_query_files_parallel(thread_pool, hive_load_text_files, "Hive Load Text") # IMPALA-9923: Run ORC serially separately from other non-text formats. This hacks # around flakiness seen when loading this in parallel (see IMPALA-12630 comments for # broken tests). This should be removed as soon as possible. assert(len(hive_load_orc_files) <= 1) hive_exec_query_files_parallel(thread_pool, hive_load_orc_files, "Hive Load ORC") # Load all non-text formats (goes parallel) hive_exec_query_files_parallel(thread_pool, hive_load_nontext_files, "Hive Load Non-Text") assert(len(hbase_postload_files) <= 1) for hbase_postload in hbase_postload_files: exec_hbase_query_from_file(hbase_postload, "HBase Post-Load") # Invalidate so that Impala sees the loads done by Hive before loading Parquet/Kudu # Note: This only invalidates tables for this workload. assert(len(invalidate_files) <= 1) if impala_load_files: impala_exec_query_files_parallel(thread_pool, invalidate_files, "Impala Invalidate 1") impala_exec_query_files_parallel(thread_pool, impala_load_files, "Impala Load") # Final invalidate for this workload impala_exec_query_files_parallel(thread_pool, invalidate_files, "Impala Invalidate 2") loading_time_map[workload] = time.time() - start_time total_time = 0.0 thread_pool.close() thread_pool.join() for workload, load_time in loading_time_map.items(): total_time += load_time LOG.info('Data loading for workload \'%s\' completed in: %.2fs'\ % (workload, load_time)) LOG.info('Total load time: %.2fs\n' % total_time) if __name__ == "__main__": main()