Files
impala/tests/query_test/test_exprs.py
Riza Suminto 0ed4e869de IMPALA-13930: ImpylaHS2Connection should only open cursor as needed
Before this patch, ImpylaHS2Connection unconditionally opened a
cursor (and HS2 session) as it connected, followed by running a "SET
ALL" query to populate the default query options.

This patch changes the behavior of ImpylaHS2Connection to open the
default cursor only when querying is needed for the first time. This
helps preserve assertions for a test that is sensitive about client
connection, like IMPALA-13925. Default query options are now parsed from
newly instantiated TQueryOptions object rather than issuing a "SET ALL"
query or making BeeswaxService.get_default_configuration() RPC.

Fix test_query_profile_contains_query_compilation_metadata_cached_event
slightly by setting the 'sync_ddl' option because the test is flaky
without it.

Tweak test_max_hs2_sessions_per_user to run queries so that sessions
will open.

Deduplicate test cases between utc-timestamp-functions.test and
local-timestamp-functions.test. Rename TestUtcTimestampFunctions to
TestTimestampFunctions, and expand it to also tests
local-timestamp-functions.test and
file-formats-with-local-tz-conversion.test. The table_format is now
contrained to 'test/none' because it is unnecessary to permute other
table_format.

Deprecate 'use_local_tz_for_unix_timestamp_conversions' in favor of
query option with the same name. Filed IMPALA-13953 to update the
documentation of 'use_local_tz_for_unix_timestamp_conversions'
flag/option.

Testing:
Run and pass a few pytests such as:
test_admission_controller.py
test_observability.py
test_runtime_filters.py
test_session_expiration.py.
test_set.py

Change-Id: I9d5e3e5c11ad386b7202431201d1a4cff46cbff5
Reviewed-on: http://gerrit.cloudera.org:8080/22731
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-04-11 04:37:14 +00:00

391 lines
17 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
from builtins import range
import logging
import pytest
import re
from random import randint
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.test_dimensions import (
add_exec_option_dimension,
create_exec_option_dimension,
create_uncompressed_text_dimension)
from tests.common.skip import SkipIfFS
from tests.util.test_file_parser import QueryTestSectionReader
LOG = logging.getLogger('test_exprs')
EXPR_REWRITE_OPTIONS = [0, 1]
class TestExprs(ImpalaTestSuite):
@classmethod
def add_test_dimensions(cls):
super(TestExprs, cls).add_test_dimensions()
# Test with and without expr rewrites to cover regular expr evaluations
# as well as constant folding, in particular, timestamp literals.
add_exec_option_dimension(cls, 'enable_expr_rewrites', EXPR_REWRITE_OPTIONS)
if cls.exploration_strategy() == 'core':
# Test with file format that supports codegen
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format == 'parquet'
and v.get_value('table_format').compression_codec == 'none')
def test_exprs(self, vector):
# Remove 'exec_single_node_rows_threshold' option so we can set it at .test file.
# Revisit this if 'exec_single_node_rows_threshold' dimension size increase.
vector.unset_exec_option('exec_single_node_rows_threshold')
# TODO: Enable some of these tests for Avro if possible
# Don't attempt to evaluate timestamp expressions with Avro tables (which don't
# support a timestamp type)"
table_format = vector.get_value('table_format')
if table_format.file_format == 'avro':
pytest.skip()
if table_format.file_format == 'hbase':
pytest.xfail("A lot of queries check for NULLs, which hbase does not recognize")
if table_format.file_format == 'kudu':
# Can't load LikeTbl without KUDU-1570.
pytest.xfail("Need support for Kudu tables with nullable PKs (KUDU-1570)")
self.run_test_case('QueryTest/exprs', vector)
# This will change the current database to matching table format and then execute
# select current_database(). An error will be thrown if multiple values are returned.
current_db = self.execute_scalar('select current_database()', vector=vector)
assert current_db == QueryTestSectionReader.get_db_name(table_format)
def test_special_strings(self, vector):
"""Test handling of expressions with "special" strings."""
self.run_test_case('QueryTest/special-strings', vector)
def test_encryption_exprs(self, vector):
"""Test handling encryption/ decryption functionality.
Some AES operation modes are not supported by all versions of the OpenSSL
library, therefore the tests are divided into separate .test files based on
the mode used in them. For modes that may not be supported, we run a
probing query first and only run the test file if it succeeds.
"""
# Run queries that are expected to fail, e.g. trying invalid operation modes etc.
self.run_test_case('QueryTest/encryption_exprs_errors', vector)
self.run_test_case('QueryTest/encryption_exprs_aes_128_ecb', vector)
self.run_test_case('QueryTest/encryption_exprs_aes_256_ecb', vector)
self.run_test_case('QueryTest/encryption_exprs_aes_256_cfb', vector)
aes_256_gcm_ok = self._check_aes_mode_supported("aes_256_gcm", vector)
if aes_256_gcm_ok:
self.run_test_case('QueryTest/encryption_exprs_aes_256_gcm', vector)
self._log_whether_aes_tests_run("aes_256_gcm", aes_256_gcm_ok)
aes_128_gcm_ok = self._check_aes_mode_supported("aes_128_gcm", vector)
if aes_128_gcm_ok:
self.run_test_case('QueryTest/encryption_exprs_aes_128_gcm', vector)
self._log_whether_aes_tests_run("aes_128_gcm", aes_128_gcm_ok)
aes_256_ctr_ok = self._check_aes_mode_supported("aes_256_ctr", vector)
if aes_256_ctr_ok:
self.run_test_case('QueryTest/encryption_exprs_aes_256_ctr', vector)
self._log_whether_aes_tests_run("aes_256_ctr", aes_256_ctr_ok)
def _log_whether_aes_tests_run(self, mode, running):
msg = "{} {} tests because the OpenSSL version {} this mode.".format(
"Running" if running else "Not running",
mode,
"supports" if running else "does not support")
LOG.warning(msg)
def _check_aes_mode_supported(self, mode, vector):
"""Checks whether the given AES mode is supported in the current
environment (see "test_encryption_exprs()") by running a probing query."""
assert "ECB" not in mode.upper()
expr = "expr"
key_len_bytes = 32 if "256" in mode else 16
key = "A" * key_len_bytes
# GCM doesn't support an empty IV.
iv = "a"
query = 'select aes_encrypt("{expr}", "{key}", "{mode}", "{iv}")'.format(
expr=expr, key=key, mode=mode, iv=iv)
try:
res = self.execute_query_using_vector(query, vector)
assert res.success
return True
except Exception as e:
assert "not supported by OpenSSL" in str(e)
return False
def test_ai_generate_text_exprs(self, vector):
table_format = vector.get_value('table_format')
if table_format.file_format != 'parquet':
pytest.skip()
self.run_test_case('QueryTest/ai_generate_text_exprs', vector)
# Tests very deep expression trees and expressions with many children. Impala defines
# a 'safe' upper bound on the expr depth and the number of expr children in the
# FE Expr.java and any changes to those limits should be reflected in this test.
# The expr limits primarily guard against stack overflows or similar problems
# causing crashes. Therefore, this tests succeeds if no Impalads crash.
class TestExprLimits(ImpalaTestSuite):
# Keep these in sync with Expr.java
EXPR_CHILDREN_LIMIT = 10000
EXPR_DEPTH_LIMIT = 1000
@classmethod
def add_test_dimensions(cls):
super(TestExprLimits, cls).add_test_dimensions()
if cls.exploration_strategy() != 'exhaustive':
# Ensure the test runs with codegen enabled and disabled, even when the
# exploration strategy is not exhaustive.
cls.ImpalaTestMatrix.clear_dimension('exec_option')
cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
cluster_sizes=[0], disable_codegen_options=[False, True], batch_sizes=[0]))
# There is no reason to run these tests using all dimensions.
cls.ImpalaTestMatrix.add_dimension(
create_uncompressed_text_dimension(cls.get_workload()))
def test_expr_child_limit(self, vector):
# IN predicate
in_query = "select 1 IN("
for i in range(0, self.EXPR_CHILDREN_LIMIT - 1):
in_query += str(i)
if (i + 1 != self.EXPR_CHILDREN_LIMIT - 1):
in_query += ","
in_query += ")"
self.__exec_query(in_query, vector)
# CASE expr
case_query = "select case "
for i in range(0, self.EXPR_CHILDREN_LIMIT // 2):
case_query += " when true then 1"
case_query += " end"
self.__exec_query(case_query, vector)
def test_expr_depth_limit(self, vector):
# Compound predicates
and_query = "select " + self.__gen_deep_infix_expr("true", " and false")
self.__exec_query(and_query, vector)
or_query = "select " + self.__gen_deep_infix_expr("true", " or false")
self.__exec_query(or_query, vector)
# Arithmetic expr
arith_query = "select " + self.__gen_deep_infix_expr("1", " + 1")
self.__exec_query(arith_query, vector)
func_query = "select " + self.__gen_deep_func_expr("lower(", "'abc'", ")")
self.__exec_query(func_query, vector)
# Casts.
cast_query = "select " + self.__gen_deep_func_expr("cast(", "1", " as int)")
self.__exec_query(cast_query, vector)
def test_under_statement_expression_limit(self):
"""Generate a huge case statement that barely fits within the statement expression
limit and verify that it runs."""
# IMPALA-13280: Disable codegen because it will take 2GB+ of memory
# and over 30 minutes for doing codegen.
if self.exploration_strategy() != 'exhaustive':
pytest.skip("Only test limit of expression on exhaustive")
case = self.__gen_huge_case("int_col", 32, 2, " ")
query = "select {0} as huge_case from functional_parquet.alltypes".format(case)
options = {'disable_codegen': True}
self.execute_query_expect_success(self.client, query, options)
def test_max_statement_size(self):
"""Generate a huge case statement that exceeds the default 16MB limit and verify
that it gets rejected."""
expected_err_tmpl = (
r"Statement length of {0} bytes exceeds the maximum "
r"statement length \({1} bytes\)")
size_16mb = 16 * 1024 * 1024
# Case 1: a valid SQL that would parse correctly
case = self.__gen_huge_case("int_col", 75, 2, " ")
query = "select {0} as huge_case from functional.alltypes".format(case)
err = self.execute_query_expect_failure(self.client, query)
assert re.search(expected_err_tmpl.format(len(query), size_16mb), str(err))
# Case 2: a string of 'a' characters that does not parse. This will still fail
# with the same message, because the check is before parsing.
invalid_sql = 'a' * (size_16mb + 1)
err = self.execute_query_expect_failure(self.client, invalid_sql)
assert re.search(expected_err_tmpl.format(len(invalid_sql), size_16mb), str(err))
# This test can take ~2GB memory while it takes only ~10 seconds. It caused OOM
# in the past, so it is safer to run it serially.
@pytest.mark.execute_serially
def test_statement_expression_limit(self):
"""Generate a huge case statement that barely fits within the 16MB limit but exceeds
the statement expression limit. Verify that it fails."""
case = self.__gen_huge_case("int_col", 66, 2, " ")
query = "select {0} as huge_case from functional.alltypes".format(case)
assert len(query) < 16 * 1024 * 1024
expected_err_re = (
r"Exceeded the statement expression limit \({0}\)\n"
r"Statement has .* expressions.").format(250000)
err = self.execute_query_expect_failure(self.client, query)
assert re.search(expected_err_re, str(err))
def __gen_huge_case(self, col_name, fanout, depth, indent):
toks = ["case\n"]
for i in range(fanout):
add = randint(1, 1000000)
divisor = randint(1, 10000000)
mod = randint(0, divisor)
# Generate a mathematical expr that can't be easily optimised out.
# IMPALA-13280: The parentheses in when_expr is needed to disable constant folding
# that can take over 7 minutes to complete by Planner.
when_expr = "({0} + {1}) % {2} = {3}".format(col_name, add, divisor, mod)
if depth == 0:
then_expr = "{0}".format(i)
else:
then_expr = "({0})".format(
self.__gen_huge_case(col_name, fanout, depth - 1, indent + " "))
toks.append(indent)
toks.append("when {0} then {1}\n".format(when_expr, then_expr))
toks.append(indent)
toks.append("end")
return ''.join(toks)
def __gen_deep_infix_expr(self, prefix, repeat_suffix):
expr = prefix
for i in range(self.EXPR_DEPTH_LIMIT - 1):
expr += repeat_suffix
return expr
def __gen_deep_func_expr(self, open_func, base_arg, close_func):
expr = ""
for i in range(self.EXPR_DEPTH_LIMIT - 1):
expr += open_func
expr += base_arg
for i in range(self.EXPR_DEPTH_LIMIT - 1):
expr += close_func
return expr
def __exec_query(self, sql_str, vector):
try:
impala_ret = self.execute_query_using_vector(sql_str, vector)
assert impala_ret.success, "Failed to execute query %s" % (sql_str)
except Exception as e: # consider any exception a failure
assert False, "Failed to execute query %s: %s" % (sql_str, e)
class TestTimestampFunctions(ImpalaTestSuite):
"""Tests for UTC timestamp functions and local timestamp functions."""
@classmethod
def add_test_dimensions(cls):
super(TestTimestampFunctions, cls).add_test_dimensions()
# Test with and without expr rewrites to cover regular expr evaluations
# as well as constant folding, in particular, timestamp literals.
add_exec_option_dimension(cls, 'enable_expr_rewrites', EXPR_REWRITE_OPTIONS)
add_exec_option_dimension(cls, 'use_local_tz_for_unix_timestamp_conversions',
[0, 1])
# No need to permute different file format.
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format == 'text'
and v.get_value('table_format').compression_codec == 'none')
def test_utc_functions(self, vector):
"""Tests for UTC timestamp functions, i.e. functions that do not depend on the
behavior of the use_local_tz_for_unix_timestamp_conversions option."""
self.run_test_case('QueryTest/utc-timestamp-functions', vector)
@SkipIfFS.hbase
def test_timestamp_functions(self, vector):
if vector.get_exec_option('use_local_tz_for_unix_timestamp_conversions') == 1:
# Tests for local timestamp functions, i.e. functions that depend on the
# behavior of the use_local_tz_for_unix_timestamp_conversions option.
self.run_test_case('QueryTest/local-timestamp-functions', vector)
# Test that scanning of different file formats is not affected by
# use_local_tz_for_unix_timestamp_conversions option.
self.run_test_case('QueryTest/file-formats-with-local-tz-conversion', vector)
class TestConstantFoldingNoTypeLoss(ImpalaTestSuite):
""""Regression tests for IMPALA-11462."""
@classmethod
def add_test_dimensions(cls):
super(TestConstantFoldingNoTypeLoss, cls).add_test_dimensions()
# Test with and without expr rewrites to verify that constant folding does not change
# the behaviour.
add_exec_option_dimension(cls, 'enable_expr_rewrites', EXPR_REWRITE_OPTIONS)
# We don't actually use a table so one file format is enough.
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format in ['parquet'])
def test_shiftleft(self, vector):
""" Tests that the return values of the 'shiftleft' functions are correct for the
input types (the return type should be the same as the first argument)."""
types_and_widths = [
("TINYINT", 8),
("SMALLINT", 16),
("INT", 32),
("BIGINT", 64)
]
query_template = ("select shiftleft(cast(1 as {typename}), z) c "
"from (select {shift_val} z ) x")
for (typename, width) in types_and_widths:
shift_val = width - 2 # Valid and positive for signed types.
expected_value = 1 << shift_val
result = self.execute_query_using_vector(
query_template.format(typename=typename, shift_val=shift_val), vector)
assert result.data == [str(expected_value)]
def test_addition(self, vector):
query = "select typeof(cast(1 as bigint) + cast(rand() as tinyint))"
result = self.execute_query_using_vector(query, vector)
assert result.data == ["BIGINT"]
class TestNonConstPatternILike(ImpalaTestSuite):
"""Tests for ILIKE and IREGEXP with non-constant patterns for IMPALA-12581.
These tests verify that ILIKE and IREGEXP work correctly when the pattern
is not a constant string."""
@classmethod
def add_test_dimensions(cls):
super(TestNonConstPatternILike, cls).add_test_dimensions()
# This test does not care about the file format of test table.
# Fix the table format to text.
cls.ImpalaTestMatrix.add_dimension(
create_uncompressed_text_dimension(cls.get_workload()))
def test_non_const_pattern_ilike(self, vector, unique_database):
with self.create_impala_client_from_vector(vector) as client:
tbl_name = '`{0}`.`ilike_test`'.format(unique_database)
self.__run_non_const_pattern_ilike(client, tbl_name)
def __run_non_const_pattern_ilike(self, client, tbl_name):
self.execute_query_expect_success(client,
"CREATE TABLE {0} (pattern_str string)".format(tbl_name))
self.execute_query_expect_success(client,
"INSERT INTO TABLE {0} VALUES('%b%'), ('.*b.*')".format(tbl_name))
ilike_result = self.execute_query_expect_success(client,
"SELECT count(*) FROM {0} WHERE 'ABC' ILIKE pattern_str".format(tbl_name))
assert int(ilike_result.get_data()) == 1
iregexp_result = self.execute_query_expect_success(client,
"SELECT count(*) FROM {0} WHERE 'ABC' IREGEXP pattern_str".format(tbl_name))
assert int(iregexp_result.get_data()) == 1