Files
impala/tests/comparison/data_generator.py
Joe McDonnell 1913ab46ed IMPALA-14501: Migrate most scripts from impala-python to impala-python3
To remove the dependency on Python 2, existing scripts need to use
python3 rather than python. These commands find those
locations (for impala-python and regular python):
git grep impala-python | grep -v impala-python3 | grep -v impala-python-common | grep -v init-impala-python
git grep bin/python | grep -v python3

This removes or switches most of these locations by various means:
1. If a python file has a #!/bin/env impala-python (or python) but
   doesn't have a main function, it removes the hash-bang and makes
   sure that the file is not executable.
2. Most scripts can simply switch from impala-python to impala-python3
   (or python to python3) with minimal changes.
3. The cm-api pypi package (which doesn't support Python 3) has been
   replaced by the cm-client pypi package and interfaces have changed.
   Rather than migrating the code (which hasn't been used in years), this
   deletes the old code and stops installing cm-api into the virtualenv.
   The code can be restored and revamped if there is any interest in
   interacting with CM clusters.
4. This switches tests/comparison over to impala-python3, but this code has
   bit-rotted. Some pieces can be run manually, but it can't be fully
   verified with Python 3. It shouldn't hold back the migration on its own.
5. This also replaces locations of impala-python in comments / documentation /
   READMEs.
6. kazoo (used for interacting with HBase) needed to be upgraded to a
   version that supports Python 3. The newest version of kazoo requires
   upgrades of other component versions, so this uses kazoo 2.8.0 to avoid
   needing other upgrades.

The two remaining uses of impala-python are:
 - bin/cmake_aux/create_virtualenv.sh
 - bin/impala-env-versioned-python
These will be removed separately when we drop Python 2 support
completely. In particular, these are useful for testing impala-shell
with Python 2 until we stop supporting Python 2 for impala-shell.

The docker-based tests still use /usr/bin/python, but this can
be switched over independently (and doesn't impact impala-python)

Testing:
 - Ran core job
 - Ran build + dataload on Centos 7, Redhat 8
 - Manual testing of individual scripts (except some bitrotted areas like the
   random query generator)

Change-Id: If209b761290bc7e7c716c312ea757da3e3bca6dc
Reviewed-on: http://gerrit.cloudera.org:8080/23468
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Tested-by: Michael Smith <michael.smith@cloudera.com>
2025-10-22 16:30:17 +00:00

357 lines
14 KiB
Python
Executable File

#!/usr/bin/env impala-python3
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
'''This module provides random data generation and database population.
When this module is run directly for purposes of database population, the default is
to use a fixed seed for randomization. The result should be that the generated random
data is the same regardless of when or where the execution is done.
'''
from __future__ import absolute_import, division, print_function
from builtins import filter, range
import os
from copy import deepcopy
from logging import getLogger
from random import choice, randint, seed
from time import time
from tests.comparison.data_generator_mapred_common import (
estimate_rows_per_reducer,
MB_PER_REDUCER,
serialize,
TextTableDataGenerator)
from tests.comparison.common import Column, Table
from tests.comparison.db_types import (
Char,
Decimal,
EXACT_TYPES,
get_char_class,
get_decimal_class,
get_varchar_class,
String,
Timestamp,
TYPES,
VarChar)
from tests.comparison import db_connection
LOG = getLogger(__name__)
def index_tables_in_db_if_possible(cursor):
if not cursor.conn.supports_index_creation:
return
for table_name in cursor.list_table_names():
LOG.info('Indexing %s on %s' % (table_name, cursor.db_type))
cursor.index_table(table_name)
def migrate_db(src_cursor, dst_cursor, include_table_names=None):
'''Read table metadata and data from the source database and create a replica in
the destination database. For example, the Impala functional test database could
be copied into Postgresql.
'''
for table_name in src_cursor.list_table_names():
if include_table_names and table_name not in include_table_names:
continue
table = src_cursor.describe_table(table_name)
dst_cursor.create_table(table)
src_cursor.execute('SELECT * FROM ' + table_name)
while True:
rows = src_cursor.fetchmany(size=100)
if not rows:
break
sql = dst_cursor.make_insert_sql_from_data(table, rows)
dst_cursor.execute(sql)
index_tables_in_db_if_possible(dst_cursor)
class DbPopulator(object):
'''This class will populate a database with randomly generated data. The population
includes table creation and data generation. Table names are hard coded as
table_<table number>.
'''
def __init__(self, db_engine=db_connection.IMPALA):
self.cluster = None
self.db_name = None
self.db_engine = db_engine
self.min_col_count = None
self.max_col_count = None
self.min_row_count = None
self.max_row_count = None
self.allowed_storage_formats = None
self.randomization_seed = None
def populate_db(self, table_count, postgresql_conn=None):
'''Create tables with a random number of cols.
The given db_name must have already been created.
'''
self.cluster.hdfs.ensure_home_dir()
hdfs = self.cluster.hdfs.create_client()
table_and_generators = list()
for table_idx in range(table_count):
table = self._create_random_table(
'table_%s' % (table_idx + 1),
self.min_col_count,
self.max_col_count,
self.allowed_storage_formats)
self._prepare_table_storage(table, self.db_name)
if table.storage_format == 'TEXTFILE':
text_table = table
else:
text_table = deepcopy(table)
text_table.name += '_text'
text_table.storage_format = 'TEXTFILE'
text_table.storage_location = None
text_table.schema_location = None
self._prepare_table_storage(text_table, self.db_name)
table_data_generator = TextTableDataGenerator()
table_data_generator.randomization_seed = self.randomization_seed
table_data_generator.table = text_table
table_data_generator.row_count = randint(self.min_row_count, self.max_row_count)
table_and_generators.append((table, table_data_generator))
self._run_data_generator_mr_job([g for _, g in table_and_generators], self.db_name)
with self.cluster.hive.cursor(db_name=self.db_name) as cursor:
for table, table_data_generator in table_and_generators:
cursor.create_table(table)
text_table = table_data_generator.table
if postgresql_conn:
with postgresql_conn.cursor() as postgresql_cursor:
postgresql_cursor.create_table(table)
for data_file in hdfs.list(text_table.storage_location):
with hdfs.read(text_table.storage_location + '/' + data_file) as reader:
postgresql_cursor.copy_expert(
r"COPY %s FROM STDIN WITH DELIMITER E'\x01'" % table.name, reader)
if table.storage_format != 'TEXTFILE':
cursor.create_table(text_table)
cursor.execute('INSERT INTO %s SELECT * FROM %s'
% (table.name, text_table.name))
cursor.drop_table(text_table.name)
if self.db_engine is db_connection.IMPALA:
with self.cluster.impala.cursor(db_name=self.db_name) as cursor:
cursor.invalidate_metadata()
cursor.compute_stats()
elif self.db_engine is db_connection.HIVE:
with self.cluster.hive.cursor(db_name=self.db_name) as cursor:
cursor.invalidate_metadata()
cursor.compute_stats()
else:
raise ValueError("db_engine must be of type %s or %s", db_connection.IMPALA,
db_connection.HIVE)
if postgresql_conn:
with postgresql_conn.cursor() as postgresql_cursor:
index_tables_in_db_if_possible(postgresql_cursor)
def _create_random_table(self,
table_name,
min_col_count,
max_col_count,
allowed_storage_formats):
'''Create and return a Table with a random number of cols.'''
col_count = randint(min_col_count, max_col_count)
storage_format = choice(allowed_storage_formats)
table = Table(table_name)
table.storage_format = storage_format
allowed_types = list(TYPES)
# Avro doesn't support timestamps yet.
if table.storage_format == 'AVRO':
allowed_types.remove(Timestamp)
# TODO: 'table.cols' returns a copy of all scalar cols, so 'table.cols.append()'
# doesn't actually modify the table's columns. 'table.cols' should be changed
# to allow access to the real columns.
cols = table.cols
for col_idx in range(col_count):
col_type = choice(allowed_types)
col_type = \
choice(list(filter(lambda type_: issubclass(type_, col_type), EXACT_TYPES)))
if issubclass(col_type, VarChar) and not issubclass(col_type, String):
col_type = get_varchar_class(randint(1, VarChar.MAX))
elif issubclass(col_type, Char) and not issubclass(col_type, String):
col_type = get_char_class(randint(1, Char.MAX))
elif issubclass(col_type, Decimal):
max_digits = randint(1, Decimal.MAX_DIGITS)
col_type = get_decimal_class(max_digits, randint(1, max_digits))
col = Column(
table,
'%s_col_%s' % (col_type.__name__.lower(), col_idx + 1),
col_type)
cols.append(col)
table.cols = cols
return table
def _prepare_table_storage(self, table, db_name):
with self.cluster.hive.cursor(db_name=self.db_name) as cursor:
cursor.ensure_storage_location(table)
hdfs = self.cluster.hdfs.create_client()
if hdfs.exists(table.storage_location):
hdfs.delete(table.storage_location, recursive=True)
hdfs.makedirs(table.storage_location, permission='777')
def _run_data_generator_mr_job(self, table_data_generators, db_name):
timestamp = int(time())
mapper_input_file = '/tmp/data_gen_%s_mr_input_%s' % (db_name, timestamp)
hdfs = self.cluster.hdfs.create_client()
if hdfs.exists(mapper_input_file):
hdfs.delete(mapper_input_file)
reducer_count = 0
mapper_input_data = list()
for table_data_generator in table_data_generators:
reducer_count += (table_data_generator.row_count
// estimate_rows_per_reducer(table_data_generator, MB_PER_REDUCER)) + 1
mapper_input_data.append(serialize(table_data_generator))
hdfs.write(mapper_input_file, data=b'\n'.join(mapper_input_data))
files = ['common.py', 'db_types.py', 'data_generator_mapred_common.py',
'data_generator_mapper.py', 'data_generator_reducer.py',
'random_val_generator.py']
dir_path = os.path.dirname(__file__)
files = [os.path.join(dir_path, f) for f in files]
hdfs_output_dir = '/tmp/data_gen_%s_mr_output_%s' % (db_name, timestamp)
if hdfs.exists(hdfs_output_dir):
hdfs.delete(hdfs_output_dir, recursive=True)
LOG.info('Starting MR job to generate data for %s', db_name)
self.cluster.yarn.run_mr_job(self.cluster.yarn.find_mr_streaming_jar(), job_args=r'''
-D mapred.reduce.tasks=%s \
-D stream.num.map.output.key.fields=2 \
-libjars '%s/share/hadoop/hdfs/lib/*' \
-files %s \
-input %s \
-output %s \
-mapper data_generator_mapper.py \
-reducer data_generator_reducer.py'''.strip()
% (reducer_count, os.environ["HADOOP_HOME"], ','.join(files), mapper_input_file,
hdfs_output_dir))
if __name__ == '__main__':
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
from tests.comparison import cli_options
parser = ArgumentParser(
usage='usage: \n'
' %(prog)s [options] [populate]\n\n'
' Create and populate database(s). The Impala database will always be \n'
' included. Postgres is optional. The other databases are not supported.\n\n'
' %(prog)s [options] migrate\n\n'
' Migrate an Impala database to another database type. The destination \n'
' database will be dropped and recreated.',
formatter_class=ArgumentDefaultsHelpFormatter)
cli_options.add_logging_options(parser)
cli_options.add_cluster_options(parser)
cli_options.add_db_name_option(parser)
cli_options.add_connection_option_groups(parser)
group = parser.add_argument_group('Database Population Options')
group.add_argument('--randomization-seed', default=1, type=int,
help='The randomization will be initialized with this seed. Using the same seed '
'will produce the same results across runs.')
cli_options.add_storage_format_options(group)
group.add_argument('--create-data-files', default=False, action='store_true',
help='Create files that can be used to repopulate the databases elsewhere.')
group.add_argument('--table-count', default=10, type=int,
help='The number of tables to generate.')
group.add_argument('--min-column-count', default=1, type=int,
help='The minimum number of columns to generate per table.')
group.add_argument('--max-column-count', default=100, type=int,
help='The maximum number of columns to generate per table.')
group.add_argument('--min-row-count', default=(10 ** 3), type=int,
help='The minimum number of rows to generate per table.')
group.add_argument('--max-row-count', default=(10 ** 6), type=int,
help='The maximum number of rows to generate per table.')
parser.add_argument_group(group)
group = parser.add_argument_group('Database Migration Options')
group.add_argument('--migrate-table-names',
help='Table names should be separated with commas. The default is to migrate all '
'tables.')
parser.add_argument_group(group)
parser.add_argument('command', nargs='*', help='The command to run either "populate"'
' or "migrate".')
args = parser.parse_args()
if len(args.command) > 1:
raise Exception('Only one command can be chosen. Requested commands were: %s'
% args.command)
command = args.command[0] if args.command else 'populate'
if command not in ('populate', 'migrate'):
raise Exception('Command must either be "populate" or "migrate" but was "%s"'
% command)
if command == 'migrate' and \
not any((args.use_mysql, args.use_postgresql, args.use_oracle)):
raise Exception('At least one destination database must be chosen with '
'--use-<database type>')
cli_options.configure_logging(args.log_level, debug_log_file=args.debug_log_file)
seed(args.randomization_seed)
cluster = cli_options.create_cluster(args)
populator = DbPopulator(db_connection.HIVE if args.use_hive else db_connection.IMPALA)
if command == 'populate':
populator.randomization_seed = args.randomization_seed
populator.cluster = cluster
populator.db_name = args.db_name
populator.min_col_count = args.min_column_count
populator.max_col_count = args.max_column_count
populator.min_row_count = args.min_row_count
populator.max_row_count = args.max_row_count
populator.allowed_storage_formats = args.storage_file_formats.split(',')
if args.use_hive:
with cluster.hive.connect() as conn:
with conn.cursor() as cursor:
cursor.ensure_empty_db(args.db_name)
else:
with cluster.impala.connect() as conn:
with conn.cursor() as cursor:
cursor.invalidate_metadata()
cursor.ensure_empty_db(args.db_name)
if args.use_postgresql:
with cli_options.create_connection(args) as postgresql_conn:
with postgresql_conn.cursor() as cursor:
cursor.ensure_empty_db(args.db_name)
postgresql_conn = cli_options.create_connection(args, db_name=args.db_name)
else:
postgresql_conn = None
populator.populate_db(args.table_count, postgresql_conn=postgresql_conn)
else:
if args.migrate_table_names:
table_names = args.migrate_table_names.split(',')
else:
table_names = None
with cli_options.create_connection(args) as conn:
with conn.cursor() as cursor:
cursor.ensure_empty_db(args.db_name)
with cli_options.create_connection(args, db_name=args.db_name) as conn:
with conn.cursor() as dst:
with cluster.impala.cursor(db_name=args.db_name) as src:
migrate_db(src, dst, include_table_names=table_names)