mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
To remove the dependency on Python 2, existing scripts need to use python3 rather than python. These commands find those locations (for impala-python and regular python): git grep impala-python | grep -v impala-python3 | grep -v impala-python-common | grep -v init-impala-python git grep bin/python | grep -v python3 This removes or switches most of these locations by various means: 1. If a python file has a #!/bin/env impala-python (or python) but doesn't have a main function, it removes the hash-bang and makes sure that the file is not executable. 2. Most scripts can simply switch from impala-python to impala-python3 (or python to python3) with minimal changes. 3. The cm-api pypi package (which doesn't support Python 3) has been replaced by the cm-client pypi package and interfaces have changed. Rather than migrating the code (which hasn't been used in years), this deletes the old code and stops installing cm-api into the virtualenv. The code can be restored and revamped if there is any interest in interacting with CM clusters. 4. This switches tests/comparison over to impala-python3, but this code has bit-rotted. Some pieces can be run manually, but it can't be fully verified with Python 3. It shouldn't hold back the migration on its own. 5. This also replaces locations of impala-python in comments / documentation / READMEs. 6. kazoo (used for interacting with HBase) needed to be upgraded to a version that supports Python 3. The newest version of kazoo requires upgrades of other component versions, so this uses kazoo 2.8.0 to avoid needing other upgrades. The two remaining uses of impala-python are: - bin/cmake_aux/create_virtualenv.sh - bin/impala-env-versioned-python These will be removed separately when we drop Python 2 support completely. In particular, these are useful for testing impala-shell with Python 2 until we stop supporting Python 2 for impala-shell. The docker-based tests still use /usr/bin/python, but this can be switched over independently (and doesn't impact impala-python) Testing: - Ran core job - Ran build + dataload on Centos 7, Redhat 8 - Manual testing of individual scripts (except some bitrotted areas like the random query generator) Change-Id: If209b761290bc7e7c716c312ea757da3e3bca6dc Reviewed-on: http://gerrit.cloudera.org:8080/23468 Reviewed-by: Michael Smith <michael.smith@cloudera.com> Tested-by: Michael Smith <michael.smith@cloudera.com>
302 lines
13 KiB
Python
302 lines
13 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
# This module implements helpers for representing queries to be executed by the
|
|
# stress test, loading them and generating them.
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
from builtins import range
|
|
import logging
|
|
import os
|
|
from textwrap import dedent
|
|
|
|
from tests.comparison.db_types import Int, TinyInt, SmallInt, BigInt
|
|
from tests.comparison.query_generator import QueryGenerator
|
|
from tests.comparison.query_profile import DefaultProfile
|
|
from tests.comparison.model_translator import SqlWriter
|
|
from tests.util.parse_util import match_memory_estimate, parse_mem_to_mb
|
|
import tests.util.test_file_parser as test_file_parser
|
|
|
|
LOG = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
|
|
|
|
|
|
class QueryType(object):
|
|
COMPUTE_STATS, DELETE, INSERT, SELECT, UPDATE, UPSERT = list(range(6))
|
|
|
|
|
|
class Query(object):
|
|
"""Contains a SQL statement along with expected runtime information.
|
|
This class is used as a struct, with the fields filled out by calling
|
|
classes."""
|
|
|
|
def __init__(self):
|
|
self.name = None
|
|
self.sql = None
|
|
# In order to be able to make good estimates for DML queries in the binary search,
|
|
# we need to bring the table to a good initial state before excuting the sql. Running
|
|
# set_up_sql accomplishes this task.
|
|
self.set_up_sql = None
|
|
self.db_name = None
|
|
self.result_hash = None
|
|
self.required_mem_mb_with_spilling = None
|
|
self.required_mem_mb_without_spilling = None
|
|
self.solo_runtime_profile_with_spilling = None
|
|
self.solo_runtime_profile_without_spilling = None
|
|
self.solo_runtime_secs_with_spilling = None
|
|
self.solo_runtime_secs_without_spilling = None
|
|
# Query options to set before running the query.
|
|
self.options = {}
|
|
# Determines the order in which we will populate query runtime info. Queries with the
|
|
# lowest population_order property will be handled first.
|
|
self.population_order = 0
|
|
# Type of query. Can have the following values: SELECT, COMPUTE_STATS, INSERT, UPDATE,
|
|
# UPSERT, DELETE.
|
|
self.query_type = QueryType.SELECT
|
|
|
|
self._logical_query_id = None
|
|
|
|
def __repr__(self):
|
|
return dedent("""
|
|
<Query
|
|
Mem: %(required_mem_mb_with_spilling)s
|
|
Mem no-spilling: %(required_mem_mb_without_spilling)s
|
|
Solo Runtime: %(solo_runtime_secs_with_spilling)s
|
|
Solo Runtime no-spilling: %(solo_runtime_secs_without_spilling)s
|
|
DB: %(db_name)s
|
|
Options: %(options)s
|
|
Set up SQL: %(set_up_sql)s>
|
|
SQL: %(sql)s>
|
|
Population order: %(population_order)r>
|
|
""".strip() % self.__dict__)
|
|
|
|
@property
|
|
def logical_query_id(self):
|
|
"""
|
|
Return a meanginful unique str identifier for the query.
|
|
|
|
Example: "tpcds_300_decimal_parquet_q21"
|
|
"""
|
|
if self._logical_query_id is None:
|
|
self._logical_query_id = '{0}_{1}'.format(self.db_name, self.name)
|
|
return self._logical_query_id
|
|
|
|
def write_runtime_info_profiles(self, directory):
|
|
"""Write profiles for spilling and non-spilling into directory (str)."""
|
|
profiles_to_write = [
|
|
(self.logical_query_id + "_profile_with_spilling.txt",
|
|
self.solo_runtime_profile_with_spilling),
|
|
(self.logical_query_id + "_profile_without_spilling.txt",
|
|
self.solo_runtime_profile_without_spilling),
|
|
]
|
|
for filename, profile in profiles_to_write:
|
|
if profile is None:
|
|
LOG.debug("No profile recorded for {0}".format(filename))
|
|
continue
|
|
with open(os.path.join(directory, filename), "w") as fh:
|
|
fh.write(profile)
|
|
|
|
|
|
def load_tpc_queries(workload):
|
|
"""Returns a list of TPC queries. 'workload' should either be 'tpch' or 'tpcds'."""
|
|
LOG.info("Loading %s queries", workload)
|
|
queries = []
|
|
for query_name, query_sql in test_file_parser.load_tpc_queries(workload,
|
|
include_stress_queries=True).items():
|
|
query = Query()
|
|
query.name = query_name
|
|
query.sql = query_sql
|
|
queries.append(query)
|
|
return queries
|
|
|
|
|
|
def load_queries_from_test_file(file_path, db_name=None):
|
|
LOG.debug("Loading queries from %s", file_path)
|
|
test_cases = test_file_parser.parse_query_test_file(file_path)
|
|
queries = list()
|
|
for test_case in test_cases:
|
|
query = Query()
|
|
query.sql = test_file_parser.remove_comments(test_case["QUERY"])
|
|
query.db_name = db_name
|
|
queries.append(query)
|
|
return queries
|
|
|
|
|
|
def generate_compute_stats_queries(cursor):
|
|
"""For each table in the database that cursor is connected to, generate several compute
|
|
stats queries. Each query will have a different value for the MT_DOP query option.
|
|
"""
|
|
LOG.info("Generating Compute Stats queries")
|
|
tables = [cursor.describe_table(t) for t in cursor.list_table_names()
|
|
if not t.endswith("_original")]
|
|
result = []
|
|
mt_dop_values = [str(2**k) for k in range(5)]
|
|
for table in tables:
|
|
for mt_dop_value in mt_dop_values:
|
|
compute_query = Query()
|
|
compute_query.population_order = 1
|
|
compute_query.query_type = QueryType.COMPUTE_STATS
|
|
compute_query.sql = "COMPUTE STATS {0}".format(table.name)
|
|
compute_query.options["MT_DOP"] = mt_dop_value
|
|
compute_query.db_name = cursor.db_name
|
|
compute_query.name = "compute_stats_{0}_mt_dop_{1}".format(
|
|
table.name, compute_query.options["MT_DOP"])
|
|
result.append(compute_query)
|
|
LOG.debug("Added compute stats query: {0}".format(compute_query))
|
|
return result
|
|
|
|
|
|
def generate_DML_queries(cursor, dml_mod_values):
|
|
"""Generate insert, upsert, update, delete DML statements.
|
|
|
|
For each table in the database that cursor is connected to, create 4 DML queries
|
|
(insert, upsert, update, delete) for each mod value in 'dml_mod_values'. This value
|
|
controls which rows will be affected. The generated queries assume that for each table
|
|
in the database, there exists a table with a '_original' suffix that is never modified.
|
|
|
|
This function has some limitations:
|
|
1. Only generates DML statements against Kudu tables, and ignores non-Kudu tables.
|
|
2. Requires that the type of the first column of the primary key is an integer type.
|
|
"""
|
|
LOG.info("Generating DML queries")
|
|
tables = [cursor.describe_table(t) for t in cursor.list_table_names()
|
|
if not t.endswith("_original")]
|
|
result = []
|
|
for table in tables:
|
|
if not table.primary_keys:
|
|
# Skip non-Kudu tables. If a table has no primary keys, then it cannot be a Kudu
|
|
# table.
|
|
LOG.debug("Skipping table '{0}' because it has no primary keys.".format(table.name))
|
|
continue
|
|
if len(table.primary_keys) > 1:
|
|
# TODO(IMPALA-4665): Add support for tables with multiple primary keys.
|
|
LOG.debug("Skipping table '{0}' because it has more than "
|
|
"1 primary key column.".format(table.name))
|
|
continue
|
|
primary_key = table.primary_keys[0]
|
|
if primary_key.exact_type not in (Int, TinyInt, SmallInt, BigInt):
|
|
# We want to be able to apply the modulo operation on the primary key. If the
|
|
# the first primary key column happens to not be an integer, we will skip
|
|
# generating queries for this table
|
|
LOG.debug("Skipping table '{0}' because the first column '{1}' in the "
|
|
"primary key is not an integer.".format(table.name, primary_key.name))
|
|
continue
|
|
for mod_value in dml_mod_values:
|
|
# Insert
|
|
insert_query = Query()
|
|
# Populate runtime info for Insert and Upsert queries before Update and Delete
|
|
# queries because tables remain in original state after running the Insert and
|
|
# Upsert queries. During the binary search in runtime info population for the
|
|
# Insert query, we first delete some rows and then reinsert them, so the table
|
|
# remains in the original state. For the delete, the order is reversed, so the table
|
|
# is not in the original state after running the the delete (or update) query. This
|
|
# is why population_order is smaller for Insert and Upsert queries.
|
|
insert_query.population_order = 1
|
|
insert_query.query_type = QueryType.INSERT
|
|
insert_query.name = "insert_{0}".format(table.name)
|
|
insert_query.db_name = cursor.db_name
|
|
insert_query.sql = (
|
|
"INSERT INTO TABLE {0} SELECT * FROM {0}_original "
|
|
"WHERE {1} % {2} = 0").format(table.name, primary_key.name, mod_value)
|
|
# Upsert
|
|
upsert_query = Query()
|
|
upsert_query.population_order = 1
|
|
upsert_query.query_type = QueryType.UPSERT
|
|
upsert_query.name = "upsert_{0}".format(table.name)
|
|
upsert_query.db_name = cursor.db_name
|
|
upsert_query.sql = (
|
|
"UPSERT INTO TABLE {0} SELECT * "
|
|
"FROM {0}_original WHERE {1} % {2} = 0").format(
|
|
table.name, primary_key.name, mod_value)
|
|
# Update
|
|
update_query = Query()
|
|
update_query.population_order = 2
|
|
update_query.query_type = QueryType.UPDATE
|
|
update_query.name = "update_{0}".format(table.name)
|
|
update_query.db_name = cursor.db_name
|
|
update_list = ', '.join(
|
|
'a.{0} = b.{0}'.format(col.name)
|
|
for col in table.cols if not col.is_primary_key)
|
|
update_query.sql = (
|
|
"UPDATE a SET {update_list} FROM {table_name} a JOIN {table_name}_original b "
|
|
"ON a.{pk} = b.{pk} + 1 WHERE a.{pk} % {mod_value} = 0").format(
|
|
table_name=table.name, pk=primary_key.name, mod_value=mod_value,
|
|
update_list=update_list)
|
|
# Delete
|
|
delete_query = Query()
|
|
delete_query.population_order = 2
|
|
delete_query.query_type = QueryType.DELETE
|
|
delete_query.name = "delete_{0}".format(table.name)
|
|
delete_query.db_name = cursor.db_name
|
|
delete_query.sql = ("DELETE FROM {0} WHERE {1} % {2} = 0").format(
|
|
table.name, primary_key.name, mod_value)
|
|
|
|
if table.name + "_original" in set(table.name for table in tables):
|
|
insert_query.set_up_sql = "DELETE FROM {0} WHERE {1} % {2} = 0".format(
|
|
table.name, primary_key.name, mod_value)
|
|
upsert_query.set_up_sql = insert_query.set_up_sql
|
|
update_query.set_up_sql = (
|
|
"UPSERT INTO TABLE {0} SELECT * FROM {0}_original "
|
|
"WHERE {1} % {2} = 0").format(table.name, primary_key.name, mod_value)
|
|
delete_query.set_up_sql = update_query.set_up_sql
|
|
|
|
result.append(insert_query)
|
|
LOG.debug("Added insert query: {0}".format(insert_query))
|
|
result.append(update_query)
|
|
LOG.debug("Added update query: {0}".format(update_query))
|
|
result.append(upsert_query)
|
|
LOG.debug("Added upsert query: {0}".format(upsert_query))
|
|
result.append(delete_query)
|
|
LOG.debug("Added delete query: {0}".format(delete_query))
|
|
assert len(result) > 0, "No DML queries were added."
|
|
return result
|
|
|
|
|
|
def generate_random_queries(impala, random_db):
|
|
"""Generator function to produce random queries. 'impala' is the Impala service
|
|
object. random_db is the name of the database that queries should be
|
|
generated for."""
|
|
with impala.cursor(db_name=random_db) as cursor:
|
|
tables = [cursor.describe_table(t) for t in cursor.list_table_names()]
|
|
query_generator = QueryGenerator(DefaultProfile())
|
|
model_translator = SqlWriter.create()
|
|
while True:
|
|
query_model = query_generator.generate_statement(tables)
|
|
sql = model_translator.write_query(query_model)
|
|
query = Query()
|
|
query.sql = sql
|
|
query.db_name = random_db
|
|
yield query
|
|
|
|
|
|
def estimate_query_mem_mb_usage(query, impalad_conn):
|
|
"""Runs an explain plan then extracts and returns the estimated memory needed to run
|
|
the query.
|
|
"""
|
|
with impalad_conn.cursor() as cursor:
|
|
LOG.debug("Using %s database", query.db_name)
|
|
if query.db_name:
|
|
cursor.execute('USE ' + query.db_name)
|
|
if query.query_type == QueryType.COMPUTE_STATS:
|
|
# Running "explain" on compute stats is not supported by Impala.
|
|
return
|
|
LOG.debug("Explaining query\n%s", query.sql)
|
|
cursor.execute('EXPLAIN ' + query.sql)
|
|
explain_rows = cursor.fetchall()
|
|
explain_lines = [row[0] for row in explain_rows]
|
|
mem_limit, units = match_memory_estimate(explain_lines)
|
|
return parse_mem_to_mb(mem_limit, units)
|