Files
impala/tests/stress/queries.py
Joe McDonnell 1913ab46ed IMPALA-14501: Migrate most scripts from impala-python to impala-python3
To remove the dependency on Python 2, existing scripts need to use
python3 rather than python. These commands find those
locations (for impala-python and regular python):
git grep impala-python | grep -v impala-python3 | grep -v impala-python-common | grep -v init-impala-python
git grep bin/python | grep -v python3

This removes or switches most of these locations by various means:
1. If a python file has a #!/bin/env impala-python (or python) but
   doesn't have a main function, it removes the hash-bang and makes
   sure that the file is not executable.
2. Most scripts can simply switch from impala-python to impala-python3
   (or python to python3) with minimal changes.
3. The cm-api pypi package (which doesn't support Python 3) has been
   replaced by the cm-client pypi package and interfaces have changed.
   Rather than migrating the code (which hasn't been used in years), this
   deletes the old code and stops installing cm-api into the virtualenv.
   The code can be restored and revamped if there is any interest in
   interacting with CM clusters.
4. This switches tests/comparison over to impala-python3, but this code has
   bit-rotted. Some pieces can be run manually, but it can't be fully
   verified with Python 3. It shouldn't hold back the migration on its own.
5. This also replaces locations of impala-python in comments / documentation /
   READMEs.
6. kazoo (used for interacting with HBase) needed to be upgraded to a
   version that supports Python 3. The newest version of kazoo requires
   upgrades of other component versions, so this uses kazoo 2.8.0 to avoid
   needing other upgrades.

The two remaining uses of impala-python are:
 - bin/cmake_aux/create_virtualenv.sh
 - bin/impala-env-versioned-python
These will be removed separately when we drop Python 2 support
completely. In particular, these are useful for testing impala-shell
with Python 2 until we stop supporting Python 2 for impala-shell.

The docker-based tests still use /usr/bin/python, but this can
be switched over independently (and doesn't impact impala-python)

Testing:
 - Ran core job
 - Ran build + dataload on Centos 7, Redhat 8
 - Manual testing of individual scripts (except some bitrotted areas like the
   random query generator)

Change-Id: If209b761290bc7e7c716c312ea757da3e3bca6dc
Reviewed-on: http://gerrit.cloudera.org:8080/23468
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Tested-by: Michael Smith <michael.smith@cloudera.com>
2025-10-22 16:30:17 +00:00

302 lines
13 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# This module implements helpers for representing queries to be executed by the
# stress test, loading them and generating them.
from __future__ import absolute_import, division, print_function
from builtins import range
import logging
import os
from textwrap import dedent
from tests.comparison.db_types import Int, TinyInt, SmallInt, BigInt
from tests.comparison.query_generator import QueryGenerator
from tests.comparison.query_profile import DefaultProfile
from tests.comparison.model_translator import SqlWriter
from tests.util.parse_util import match_memory_estimate, parse_mem_to_mb
import tests.util.test_file_parser as test_file_parser
LOG = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
class QueryType(object):
COMPUTE_STATS, DELETE, INSERT, SELECT, UPDATE, UPSERT = list(range(6))
class Query(object):
"""Contains a SQL statement along with expected runtime information.
This class is used as a struct, with the fields filled out by calling
classes."""
def __init__(self):
self.name = None
self.sql = None
# In order to be able to make good estimates for DML queries in the binary search,
# we need to bring the table to a good initial state before excuting the sql. Running
# set_up_sql accomplishes this task.
self.set_up_sql = None
self.db_name = None
self.result_hash = None
self.required_mem_mb_with_spilling = None
self.required_mem_mb_without_spilling = None
self.solo_runtime_profile_with_spilling = None
self.solo_runtime_profile_without_spilling = None
self.solo_runtime_secs_with_spilling = None
self.solo_runtime_secs_without_spilling = None
# Query options to set before running the query.
self.options = {}
# Determines the order in which we will populate query runtime info. Queries with the
# lowest population_order property will be handled first.
self.population_order = 0
# Type of query. Can have the following values: SELECT, COMPUTE_STATS, INSERT, UPDATE,
# UPSERT, DELETE.
self.query_type = QueryType.SELECT
self._logical_query_id = None
def __repr__(self):
return dedent("""
<Query
Mem: %(required_mem_mb_with_spilling)s
Mem no-spilling: %(required_mem_mb_without_spilling)s
Solo Runtime: %(solo_runtime_secs_with_spilling)s
Solo Runtime no-spilling: %(solo_runtime_secs_without_spilling)s
DB: %(db_name)s
Options: %(options)s
Set up SQL: %(set_up_sql)s>
SQL: %(sql)s>
Population order: %(population_order)r>
""".strip() % self.__dict__)
@property
def logical_query_id(self):
"""
Return a meanginful unique str identifier for the query.
Example: "tpcds_300_decimal_parquet_q21"
"""
if self._logical_query_id is None:
self._logical_query_id = '{0}_{1}'.format(self.db_name, self.name)
return self._logical_query_id
def write_runtime_info_profiles(self, directory):
"""Write profiles for spilling and non-spilling into directory (str)."""
profiles_to_write = [
(self.logical_query_id + "_profile_with_spilling.txt",
self.solo_runtime_profile_with_spilling),
(self.logical_query_id + "_profile_without_spilling.txt",
self.solo_runtime_profile_without_spilling),
]
for filename, profile in profiles_to_write:
if profile is None:
LOG.debug("No profile recorded for {0}".format(filename))
continue
with open(os.path.join(directory, filename), "w") as fh:
fh.write(profile)
def load_tpc_queries(workload):
"""Returns a list of TPC queries. 'workload' should either be 'tpch' or 'tpcds'."""
LOG.info("Loading %s queries", workload)
queries = []
for query_name, query_sql in test_file_parser.load_tpc_queries(workload,
include_stress_queries=True).items():
query = Query()
query.name = query_name
query.sql = query_sql
queries.append(query)
return queries
def load_queries_from_test_file(file_path, db_name=None):
LOG.debug("Loading queries from %s", file_path)
test_cases = test_file_parser.parse_query_test_file(file_path)
queries = list()
for test_case in test_cases:
query = Query()
query.sql = test_file_parser.remove_comments(test_case["QUERY"])
query.db_name = db_name
queries.append(query)
return queries
def generate_compute_stats_queries(cursor):
"""For each table in the database that cursor is connected to, generate several compute
stats queries. Each query will have a different value for the MT_DOP query option.
"""
LOG.info("Generating Compute Stats queries")
tables = [cursor.describe_table(t) for t in cursor.list_table_names()
if not t.endswith("_original")]
result = []
mt_dop_values = [str(2**k) for k in range(5)]
for table in tables:
for mt_dop_value in mt_dop_values:
compute_query = Query()
compute_query.population_order = 1
compute_query.query_type = QueryType.COMPUTE_STATS
compute_query.sql = "COMPUTE STATS {0}".format(table.name)
compute_query.options["MT_DOP"] = mt_dop_value
compute_query.db_name = cursor.db_name
compute_query.name = "compute_stats_{0}_mt_dop_{1}".format(
table.name, compute_query.options["MT_DOP"])
result.append(compute_query)
LOG.debug("Added compute stats query: {0}".format(compute_query))
return result
def generate_DML_queries(cursor, dml_mod_values):
"""Generate insert, upsert, update, delete DML statements.
For each table in the database that cursor is connected to, create 4 DML queries
(insert, upsert, update, delete) for each mod value in 'dml_mod_values'. This value
controls which rows will be affected. The generated queries assume that for each table
in the database, there exists a table with a '_original' suffix that is never modified.
This function has some limitations:
1. Only generates DML statements against Kudu tables, and ignores non-Kudu tables.
2. Requires that the type of the first column of the primary key is an integer type.
"""
LOG.info("Generating DML queries")
tables = [cursor.describe_table(t) for t in cursor.list_table_names()
if not t.endswith("_original")]
result = []
for table in tables:
if not table.primary_keys:
# Skip non-Kudu tables. If a table has no primary keys, then it cannot be a Kudu
# table.
LOG.debug("Skipping table '{0}' because it has no primary keys.".format(table.name))
continue
if len(table.primary_keys) > 1:
# TODO(IMPALA-4665): Add support for tables with multiple primary keys.
LOG.debug("Skipping table '{0}' because it has more than "
"1 primary key column.".format(table.name))
continue
primary_key = table.primary_keys[0]
if primary_key.exact_type not in (Int, TinyInt, SmallInt, BigInt):
# We want to be able to apply the modulo operation on the primary key. If the
# the first primary key column happens to not be an integer, we will skip
# generating queries for this table
LOG.debug("Skipping table '{0}' because the first column '{1}' in the "
"primary key is not an integer.".format(table.name, primary_key.name))
continue
for mod_value in dml_mod_values:
# Insert
insert_query = Query()
# Populate runtime info for Insert and Upsert queries before Update and Delete
# queries because tables remain in original state after running the Insert and
# Upsert queries. During the binary search in runtime info population for the
# Insert query, we first delete some rows and then reinsert them, so the table
# remains in the original state. For the delete, the order is reversed, so the table
# is not in the original state after running the the delete (or update) query. This
# is why population_order is smaller for Insert and Upsert queries.
insert_query.population_order = 1
insert_query.query_type = QueryType.INSERT
insert_query.name = "insert_{0}".format(table.name)
insert_query.db_name = cursor.db_name
insert_query.sql = (
"INSERT INTO TABLE {0} SELECT * FROM {0}_original "
"WHERE {1} % {2} = 0").format(table.name, primary_key.name, mod_value)
# Upsert
upsert_query = Query()
upsert_query.population_order = 1
upsert_query.query_type = QueryType.UPSERT
upsert_query.name = "upsert_{0}".format(table.name)
upsert_query.db_name = cursor.db_name
upsert_query.sql = (
"UPSERT INTO TABLE {0} SELECT * "
"FROM {0}_original WHERE {1} % {2} = 0").format(
table.name, primary_key.name, mod_value)
# Update
update_query = Query()
update_query.population_order = 2
update_query.query_type = QueryType.UPDATE
update_query.name = "update_{0}".format(table.name)
update_query.db_name = cursor.db_name
update_list = ', '.join(
'a.{0} = b.{0}'.format(col.name)
for col in table.cols if not col.is_primary_key)
update_query.sql = (
"UPDATE a SET {update_list} FROM {table_name} a JOIN {table_name}_original b "
"ON a.{pk} = b.{pk} + 1 WHERE a.{pk} % {mod_value} = 0").format(
table_name=table.name, pk=primary_key.name, mod_value=mod_value,
update_list=update_list)
# Delete
delete_query = Query()
delete_query.population_order = 2
delete_query.query_type = QueryType.DELETE
delete_query.name = "delete_{0}".format(table.name)
delete_query.db_name = cursor.db_name
delete_query.sql = ("DELETE FROM {0} WHERE {1} % {2} = 0").format(
table.name, primary_key.name, mod_value)
if table.name + "_original" in set(table.name for table in tables):
insert_query.set_up_sql = "DELETE FROM {0} WHERE {1} % {2} = 0".format(
table.name, primary_key.name, mod_value)
upsert_query.set_up_sql = insert_query.set_up_sql
update_query.set_up_sql = (
"UPSERT INTO TABLE {0} SELECT * FROM {0}_original "
"WHERE {1} % {2} = 0").format(table.name, primary_key.name, mod_value)
delete_query.set_up_sql = update_query.set_up_sql
result.append(insert_query)
LOG.debug("Added insert query: {0}".format(insert_query))
result.append(update_query)
LOG.debug("Added update query: {0}".format(update_query))
result.append(upsert_query)
LOG.debug("Added upsert query: {0}".format(upsert_query))
result.append(delete_query)
LOG.debug("Added delete query: {0}".format(delete_query))
assert len(result) > 0, "No DML queries were added."
return result
def generate_random_queries(impala, random_db):
"""Generator function to produce random queries. 'impala' is the Impala service
object. random_db is the name of the database that queries should be
generated for."""
with impala.cursor(db_name=random_db) as cursor:
tables = [cursor.describe_table(t) for t in cursor.list_table_names()]
query_generator = QueryGenerator(DefaultProfile())
model_translator = SqlWriter.create()
while True:
query_model = query_generator.generate_statement(tables)
sql = model_translator.write_query(query_model)
query = Query()
query.sql = sql
query.db_name = random_db
yield query
def estimate_query_mem_mb_usage(query, impalad_conn):
"""Runs an explain plan then extracts and returns the estimated memory needed to run
the query.
"""
with impalad_conn.cursor() as cursor:
LOG.debug("Using %s database", query.db_name)
if query.db_name:
cursor.execute('USE ' + query.db_name)
if query.query_type == QueryType.COMPUTE_STATS:
# Running "explain" on compute stats is not supported by Impala.
return
LOG.debug("Explaining query\n%s", query.sql)
cursor.execute('EXPLAIN ' + query.sql)
explain_rows = cursor.fetchall()
explain_lines = [row[0] for row in explain_rows]
mem_limit, units = match_memory_estimate(explain_lines)
return parse_mem_to_mb(mem_limit, units)