mirror of
https://github.com/apache/impala.git
synced 2026-01-08 12:02:54 -05:00
Changes: * Added hive cli options back in (removed in commit "Stress test: Various changes") * Modifications so that if --use-hive is specified, a Hive connection is actually created * A few minor bug fixes so that the RQG can be run locally * Modified MiniCluster to use HADOOP_CONF_DIR and HIVE_CONF_DIR rather than a hard-coded file under IMPALA_HOME * Fixed fe/src/test/resources/hive-default.xml so that it is a valid XML file, it was missing a few element terminators that cause an exception in the cluster.py file Testing: * Hive integration tested locally by invoking the data generator via the command: ./data-generator.py \ --db-name=functional \ --use-hive \ --min-row-count=50 \ --max-row-count=100 \ --storage-file-formats textfile \ --use-postgresql \ --postgresql-user stakiar and the discrepancy checker via the command: ./discrepancy-checker.py \ --db-name=functional \ --use-hive \ --use-postgresql \ --postgresql-user stakiar \ --test-db-type HIVE \ --timeout 300 \ --query-count 50 \ --profile hive * The output of the above two commands is essentially the same as the Impala output, however, about 20% of the queries will fail when the discrepancy checker is run * Regression testing done by running Leopard in a local VM running Ubuntu 14.04, and by running the discrepancy checker against Impala while inside an Impala Docker container Change-Id: Ifb1199b50a5b65c21de7876fb70cc03bda1a9b46 Reviewed-on: http://gerrit.cloudera.org:8080/4011 Reviewed-by: Taras Bobrovytsky <tbobrovytsky@cloudera.com> Tested-by: Taras Bobrovytsky <tbobrovytsky@cloudera.com>
352 lines
14 KiB
Python
Executable File
352 lines
14 KiB
Python
Executable File
#!/usr/bin/env impala-python
|
|
|
|
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
'''This module provides random data generation and database population.
|
|
|
|
When this module is run directly for purposes of database population, the default is
|
|
to use a fixed seed for randomization. The result should be that the generated random
|
|
data is the same regardless of when or where the execution is done.
|
|
|
|
'''
|
|
|
|
import os
|
|
from copy import deepcopy
|
|
from logging import getLogger
|
|
from random import choice, randint, seed
|
|
from time import time
|
|
|
|
from data_generator_mapred_common import (
|
|
estimate_rows_per_reducer,
|
|
MB_PER_REDUCER,
|
|
serialize,
|
|
TextTableDataGenerator)
|
|
from common import Column, Table
|
|
from db_types import (
|
|
Char,
|
|
Decimal,
|
|
EXACT_TYPES,
|
|
get_char_class,
|
|
get_decimal_class,
|
|
get_varchar_class,
|
|
String,
|
|
Timestamp,
|
|
TYPES,
|
|
VarChar)
|
|
from tests.comparison import db_connection
|
|
|
|
LOG = getLogger(__name__)
|
|
|
|
def index_tables_in_db_if_possible(cursor):
|
|
if not cursor.conn.supports_index_creation:
|
|
return
|
|
for table_name in cursor.list_table_names():
|
|
LOG.info('Indexing %s on %s' % (table_name, cursor.db_type))
|
|
cursor.index_table(table_name)
|
|
|
|
|
|
def migrate_db(src_cursor, dst_cursor, include_table_names=None):
|
|
'''Read table metadata and data from the source database and create a replica in
|
|
the destination database. For example, the Impala functional test database could
|
|
be copied into Postgresql.
|
|
'''
|
|
for table_name in src_cursor.list_table_names():
|
|
if include_table_names and table_name not in include_table_names:
|
|
continue
|
|
table = src_cursor.describe_table(table_name)
|
|
dst_cursor.create_table(table)
|
|
src_cursor.execute('SELECT * FROM ' + table_name)
|
|
while True:
|
|
rows = src_cursor.fetchmany(size=100)
|
|
if not rows:
|
|
break
|
|
sql = dst_cursor.make_insert_sql_from_data(table, rows)
|
|
dst_cursor.execute(sql)
|
|
index_tables_in_db_if_possible(dst_cursor)
|
|
|
|
|
|
class DbPopulator(object):
|
|
'''This class will populate a database with randomly generated data. The population
|
|
includes table creation and data generation. Table names are hard coded as
|
|
table_<table number>.
|
|
|
|
'''
|
|
|
|
def __init__(self, db_engine=db_connection.IMPALA):
|
|
self.cluster = None
|
|
self.db_name = None
|
|
self.db_engine = db_engine
|
|
|
|
self.min_col_count = None
|
|
self.max_col_count = None
|
|
self.min_row_count = None
|
|
self.max_row_count = None
|
|
self.allowed_storage_formats = None
|
|
self.randomization_seed = None
|
|
|
|
def populate_db(self, table_count, postgresql_conn=None):
|
|
'''Create tables with a random number of cols.
|
|
|
|
The given db_name must have already been created.
|
|
'''
|
|
self.cluster.hdfs.ensure_home_dir()
|
|
hdfs = self.cluster.hdfs.create_client()
|
|
|
|
table_and_generators = list()
|
|
for table_idx in xrange(table_count):
|
|
table = self._create_random_table(
|
|
'table_%s' % (table_idx + 1),
|
|
self.min_col_count,
|
|
self.max_col_count,
|
|
self.allowed_storage_formats)
|
|
self._prepare_table_storage(table, self.db_name)
|
|
if table.storage_format == 'TEXTFILE':
|
|
text_table = table
|
|
else:
|
|
text_table = deepcopy(table)
|
|
text_table.name += '_text'
|
|
text_table.storage_format = 'TEXTFILE'
|
|
text_table.storage_location = None
|
|
text_table.schema_location = None
|
|
self._prepare_table_storage(text_table, self.db_name)
|
|
table_data_generator = TextTableDataGenerator()
|
|
table_data_generator.randomization_seed = self.randomization_seed
|
|
table_data_generator.table = text_table
|
|
table_data_generator.row_count = randint(self.min_row_count, self.max_row_count)
|
|
table_and_generators.append((table, table_data_generator))
|
|
|
|
self._run_data_generator_mr_job([g for _, g in table_and_generators], self.db_name)
|
|
|
|
with self.cluster.hive.cursor(db_name=self.db_name) as cursor:
|
|
for table, table_data_generator in table_and_generators:
|
|
cursor.create_table(table)
|
|
text_table = table_data_generator.table
|
|
if postgresql_conn:
|
|
with postgresql_conn.cursor() as postgresql_cursor:
|
|
postgresql_cursor.create_table(table)
|
|
for data_file in hdfs.list(text_table.storage_location):
|
|
with hdfs.read(text_table.storage_location + '/' + data_file) as reader:
|
|
postgresql_cursor.copy_expert(
|
|
r"COPY %s FROM STDIN WITH DELIMITER E'\x01'" % table.name, reader)
|
|
if table.storage_format != 'TEXTFILE':
|
|
cursor.create_table(text_table)
|
|
cursor.execute('INSERT INTO %s SELECT * FROM %s'
|
|
% (table.name, text_table.name))
|
|
cursor.drop_table(text_table.name)
|
|
if self.db_engine is db_connection.IMPALA:
|
|
with self.cluster.impala.cursor(db_name=self.db_name) as cursor:
|
|
cursor.invalidate_metadata()
|
|
cursor.compute_stats()
|
|
elif self.db_engine is db_connection.HIVE:
|
|
with self.cluster.hive.cursor(db_name=self.db_name) as cursor:
|
|
cursor.invalidate_metadata()
|
|
cursor.compute_stats()
|
|
else:
|
|
raise ValueError("db_engine must be of type %s or %s", db_connection.IMPALA,
|
|
db_connection.HIVE)
|
|
if postgresql_conn:
|
|
with postgresql_conn.cursor() as postgresql_cursor:
|
|
index_tables_in_db_if_possible(postgresql_cursor)
|
|
|
|
def _create_random_table(self,
|
|
table_name,
|
|
min_col_count,
|
|
max_col_count,
|
|
allowed_storage_formats):
|
|
'''Create and return a Table with a random number of cols.'''
|
|
col_count = randint(min_col_count, max_col_count)
|
|
storage_format = choice(allowed_storage_formats)
|
|
table = Table(table_name)
|
|
table.storage_format = storage_format
|
|
allowed_types = list(TYPES)
|
|
# Avro doesn't support timestamps yet.
|
|
if table.storage_format == 'AVRO':
|
|
allowed_types.remove(Timestamp)
|
|
# TODO: 'table.cols' returns a copy of all scalar cols, so 'table.cols.append()'
|
|
# doesn't actually modify the table's columns. 'table.cols' should be changed
|
|
# to allow access to the real columns.
|
|
cols = table.cols
|
|
for col_idx in xrange(col_count):
|
|
col_type = choice(allowed_types)
|
|
col_type = choice(filter(lambda type_: issubclass(type_, col_type), EXACT_TYPES))
|
|
if issubclass(col_type, VarChar) and not issubclass(col_type, String):
|
|
col_type = get_varchar_class(randint(1, VarChar.MAX))
|
|
elif issubclass(col_type, Char) and not issubclass(col_type, String):
|
|
col_type = get_char_class(randint(1, Char.MAX))
|
|
elif issubclass(col_type, Decimal):
|
|
max_digits = randint(1, Decimal.MAX_DIGITS)
|
|
col_type = get_decimal_class(max_digits, randint(1, max_digits))
|
|
col = Column(
|
|
table,
|
|
'%s_col_%s' % (col_type.__name__.lower(), col_idx + 1),
|
|
col_type)
|
|
cols.append(col)
|
|
table.cols = cols
|
|
return table
|
|
|
|
def _prepare_table_storage(self, table, db_name):
|
|
with self.cluster.hive.cursor(db_name=self.db_name) as cursor:
|
|
cursor.ensure_storage_location(table)
|
|
hdfs = self.cluster.hdfs.create_client()
|
|
if hdfs.exists(table.storage_location):
|
|
hdfs.delete(table.storage_location, recursive=True)
|
|
hdfs.makedirs(table.storage_location, permission='777')
|
|
|
|
def _run_data_generator_mr_job(self, table_data_generators, db_name):
|
|
timestamp = int(time())
|
|
mapper_input_file = '/tmp/data_gen_%s_mr_input_%s' % (db_name, timestamp)
|
|
hdfs = self.cluster.hdfs.create_client()
|
|
if hdfs.exists(mapper_input_file):
|
|
hdfs.delete(mapper_input_file)
|
|
reducer_count = 0
|
|
mapper_input_data = list()
|
|
for table_data_generator in table_data_generators:
|
|
reducer_count += (table_data_generator.row_count /
|
|
estimate_rows_per_reducer(table_data_generator, MB_PER_REDUCER)) + 1
|
|
mapper_input_data.append(serialize(table_data_generator))
|
|
hdfs.write(mapper_input_file, data='\n'.join(mapper_input_data))
|
|
|
|
files = ['common.py', 'db_types.py', 'data_generator_mapred_common.py',
|
|
'data_generator_mapper.py', 'data_generator_reducer.py',
|
|
'random_val_generator.py']
|
|
dir_path = os.path.dirname(__file__)
|
|
files = [os.path.join(dir_path, f) for f in files]
|
|
|
|
hdfs_output_dir = '/tmp/data_gen_%s_mr_output_%s' % (db_name, timestamp)
|
|
if hdfs.exists(hdfs_output_dir):
|
|
hdfs.delete(hdfs_output_dir, recursive=True)
|
|
|
|
LOG.info('Starting MR job to generate data for %s', db_name)
|
|
self.cluster.yarn.run_mr_job(self.cluster.yarn.find_mr_streaming_jar(), job_args=r'''
|
|
-D mapred.reduce.tasks=%s \
|
|
-D stream.num.map.output.key.fields=2 \
|
|
-files %s \
|
|
-input %s \
|
|
-output %s \
|
|
-mapper data_generator_mapper.py \
|
|
-reducer data_generator_reducer.py'''.strip()
|
|
% (reducer_count, ','.join(files), mapper_input_file, hdfs_output_dir))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
|
|
|
|
import cli_options
|
|
|
|
parser = ArgumentParser(
|
|
usage='usage: \n'
|
|
' %(prog)s [options] [populate]\n\n'
|
|
' Create and populate database(s). The Impala database will always be \n'
|
|
' included. Postgres is optional. The other databases are not supported.\n\n'
|
|
' %(prog)s [options] migrate\n\n'
|
|
' Migrate an Impala database to another database type. The destination \n'
|
|
' database will be dropped and recreated.',
|
|
formatter_class=ArgumentDefaultsHelpFormatter)
|
|
cli_options.add_logging_options(parser)
|
|
cli_options.add_cluster_options(parser)
|
|
cli_options.add_db_name_option(parser)
|
|
cli_options.add_connection_option_groups(parser)
|
|
|
|
group = parser.add_argument_group('Database Population Options')
|
|
group.add_argument('--randomization-seed', default=1, type=int,
|
|
help='The randomization will be initialized with this seed. Using the same seed '
|
|
'will produce the same results across runs.')
|
|
cli_options.add_storage_format_options(group)
|
|
group.add_argument('--create-data-files', default=False, action='store_true',
|
|
help='Create files that can be used to repopulate the databases elsewhere.')
|
|
group.add_argument('--table-count', default=10, type=int,
|
|
help='The number of tables to generate.')
|
|
group.add_argument('--min-column-count', default=1, type=int,
|
|
help='The minimum number of columns to generate per table.')
|
|
group.add_argument('--max-column-count', default=100, type=int,
|
|
help='The maximum number of columns to generate per table.')
|
|
group.add_argument('--min-row-count', default=(10 ** 3), type=int,
|
|
help='The minimum number of rows to generate per table.')
|
|
group.add_argument('--max-row-count', default=(10 ** 6), type=int,
|
|
help='The maximum number of rows to generate per table.')
|
|
parser.add_argument_group(group)
|
|
|
|
group = parser.add_argument_group('Database Migration Options')
|
|
group.add_argument('--migrate-table-names',
|
|
help='Table names should be separated with commas. The default is to migrate all '
|
|
'tables.')
|
|
parser.add_argument_group(group)
|
|
parser.add_argument('command', nargs='*', help='The command to run either "populate"'
|
|
' or "migrate".')
|
|
args = parser.parse_args()
|
|
if len(args.command) > 1:
|
|
raise Exception('Only one command can be chosen. Requested commands were: %s'
|
|
% args.command)
|
|
command = args.command[0] if args.command else 'populate'
|
|
if command not in ('populate', 'migrate'):
|
|
raise Exception('Command must either be "populate" or "migrate" but was "%s"'
|
|
% command)
|
|
if command == 'migrate' and \
|
|
not any((args.use_mysql, args.use_postgresql, args.use_oracle)):
|
|
raise Exception('At least one destination database must be chosen with '
|
|
'--use-<database type>')
|
|
|
|
cli_options.configure_logging(args.log_level, debug_log_file=args.debug_log_file)
|
|
|
|
seed(args.randomization_seed)
|
|
|
|
cluster = cli_options.create_cluster(args)
|
|
|
|
populator = DbPopulator(db_connection.HIVE if args.use_hive else db_connection.IMPALA)
|
|
if command == 'populate':
|
|
populator.randomization_seed = args.randomization_seed
|
|
populator.cluster = cluster
|
|
populator.db_name = args.db_name
|
|
populator.min_col_count = args.min_column_count
|
|
populator.max_col_count = args.max_column_count
|
|
populator.min_row_count = args.min_row_count
|
|
populator.max_row_count = args.max_row_count
|
|
populator.allowed_storage_formats = args.storage_file_formats.split(',')
|
|
|
|
if args.use_hive:
|
|
with cluster.hive.connect() as conn:
|
|
with conn.cursor() as cursor:
|
|
cursor.ensure_empty_db(args.db_name)
|
|
else:
|
|
with cluster.impala.connect() as conn:
|
|
with conn.cursor() as cursor:
|
|
cursor.invalidate_metadata()
|
|
cursor.ensure_empty_db(args.db_name)
|
|
|
|
if args.use_postgresql:
|
|
with cli_options.create_connection(args) as postgresql_conn:
|
|
with postgresql_conn.cursor() as cursor:
|
|
cursor.ensure_empty_db(args.db_name)
|
|
postgresql_conn = cli_options.create_connection(args, db_name=args.db_name)
|
|
else:
|
|
postgresql_conn = None
|
|
populator.populate_db(args.table_count, postgresql_conn=postgresql_conn)
|
|
else:
|
|
if args.migrate_table_names:
|
|
table_names = args.migrate_table_names.split(',')
|
|
else:
|
|
table_names = None
|
|
with cli_options.create_connection(args) as conn:
|
|
with conn.cursor() as cursor:
|
|
cursor.ensure_empty_db(args.db_name)
|
|
with cli_options.create_connection(args, db_name=args.db_name) as conn:
|
|
with conn.cursor() as dst:
|
|
with cluster.impala.cursor(db_name=args.db_name) as src:
|
|
migrate_db(src, dst, include_table_names=table_names)
|