# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from __future__ import absolute_import, division, print_function from builtins import range import logging import pytest import re from random import randint from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.test_dimensions import ( add_exec_option_dimension, create_exec_option_dimension, create_uncompressed_text_dimension) from tests.common.skip import SkipIfFS from tests.util.test_file_parser import QueryTestSectionReader LOG = logging.getLogger('test_exprs') EXPR_REWRITE_OPTIONS = [0, 1] class TestExprs(ImpalaTestSuite): @classmethod def add_test_dimensions(cls): super(TestExprs, cls).add_test_dimensions() # Test with and without expr rewrites to cover regular expr evaluations # as well as constant folding, in particular, timestamp literals. add_exec_option_dimension(cls, 'enable_expr_rewrites', EXPR_REWRITE_OPTIONS) if cls.exploration_strategy() == 'core': # Test with file format that supports codegen cls.ImpalaTestMatrix.add_constraint(lambda v: v.get_value('table_format').file_format == 'parquet' and v.get_value('table_format').compression_codec == 'none') def test_exprs(self, vector): # Remove 'exec_single_node_rows_threshold' option so we can set it at .test file. # Revisit this if 'exec_single_node_rows_threshold' dimension size increase. vector.unset_exec_option('exec_single_node_rows_threshold') # TODO: Enable some of these tests for Avro if possible # Don't attempt to evaluate timestamp expressions with Avro tables (which don't # support a timestamp type)" table_format = vector.get_value('table_format') if table_format.file_format == 'avro': pytest.skip() if table_format.file_format == 'hbase': pytest.xfail("A lot of queries check for NULLs, which hbase does not recognize") if table_format.file_format == 'kudu': # Can't load LikeTbl without KUDU-1570. pytest.xfail("Need support for Kudu tables with nullable PKs (KUDU-1570)") self.run_test_case('QueryTest/exprs', vector) # This will change the current database to matching table format and then execute # select current_database(). An error will be thrown if multiple values are returned. current_db = self.execute_scalar('select current_database()', vector=vector) assert current_db == QueryTestSectionReader.get_db_name(table_format) def test_special_strings(self, vector): """Test handling of expressions with "special" strings.""" self.run_test_case('QueryTest/special-strings', vector) def test_encryption_exprs(self, vector): """Test handling encryption/ decryption functionality. Some AES operation modes are not supported by all versions of the OpenSSL library, therefore the tests are divided into separate .test files based on the mode used in them. For modes that may not be supported, we run a probing query first and only run the test file if it succeeds. """ # Run queries that are expected to fail, e.g. trying invalid operation modes etc. self.run_test_case('QueryTest/encryption_exprs_errors', vector) self.run_test_case('QueryTest/encryption_exprs_aes_128_ecb', vector) self.run_test_case('QueryTest/encryption_exprs_aes_256_ecb', vector) self.run_test_case('QueryTest/encryption_exprs_aes_256_cfb', vector) aes_256_gcm_ok = self._check_aes_mode_supported("aes_256_gcm", vector) if aes_256_gcm_ok: self.run_test_case('QueryTest/encryption_exprs_aes_256_gcm', vector) self._log_whether_aes_tests_run("aes_256_gcm", aes_256_gcm_ok) aes_128_gcm_ok = self._check_aes_mode_supported("aes_128_gcm", vector) if aes_128_gcm_ok: self.run_test_case('QueryTest/encryption_exprs_aes_128_gcm', vector) self._log_whether_aes_tests_run("aes_128_gcm", aes_128_gcm_ok) aes_256_ctr_ok = self._check_aes_mode_supported("aes_256_ctr", vector) if aes_256_ctr_ok: self.run_test_case('QueryTest/encryption_exprs_aes_256_ctr', vector) self._log_whether_aes_tests_run("aes_256_ctr", aes_256_ctr_ok) def _log_whether_aes_tests_run(self, mode, running): msg = "{} {} tests because the OpenSSL version {} this mode.".format( "Running" if running else "Not running", mode, "supports" if running else "does not support") LOG.warning(msg) def _check_aes_mode_supported(self, mode, vector): """Checks whether the given AES mode is supported in the current environment (see "test_encryption_exprs()") by running a probing query.""" assert "ECB" not in mode.upper() expr = "expr" key_len_bytes = 32 if "256" in mode else 16 key = "A" * key_len_bytes # GCM doesn't support an empty IV. iv = "a" query = 'select aes_encrypt("{expr}", "{key}", "{mode}", "{iv}")'.format( expr=expr, key=key, mode=mode, iv=iv) try: res = self.execute_query_using_vector(query, vector) assert res.success return True except Exception as e: assert "not supported by OpenSSL" in str(e) return False def test_ai_generate_text_exprs(self, vector): table_format = vector.get_value('table_format') if table_format.file_format != 'parquet': pytest.skip() self.run_test_case('QueryTest/ai_generate_text_exprs', vector) # Tests very deep expression trees and expressions with many children. Impala defines # a 'safe' upper bound on the expr depth and the number of expr children in the # FE Expr.java and any changes to those limits should be reflected in this test. # The expr limits primarily guard against stack overflows or similar problems # causing crashes. Therefore, this tests succeeds if no Impalads crash. class TestExprLimits(ImpalaTestSuite): # Keep these in sync with Expr.java EXPR_CHILDREN_LIMIT = 10000 EXPR_DEPTH_LIMIT = 1000 @classmethod def add_test_dimensions(cls): super(TestExprLimits, cls).add_test_dimensions() if cls.exploration_strategy() != 'exhaustive': # Ensure the test runs with codegen enabled and disabled, even when the # exploration strategy is not exhaustive. cls.ImpalaTestMatrix.clear_dimension('exec_option') cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension( cluster_sizes=[0], disable_codegen_options=[False, True], batch_sizes=[0])) # There is no reason to run these tests using all dimensions. cls.ImpalaTestMatrix.add_dimension( create_uncompressed_text_dimension(cls.get_workload())) def test_expr_child_limit(self, vector): # IN predicate in_query = "select 1 IN(" for i in range(0, self.EXPR_CHILDREN_LIMIT - 1): in_query += str(i) if (i + 1 != self.EXPR_CHILDREN_LIMIT - 1): in_query += "," in_query += ")" self.__exec_query(in_query, vector) # CASE expr case_query = "select case " for i in range(0, self.EXPR_CHILDREN_LIMIT // 2): case_query += " when true then 1" case_query += " end" self.__exec_query(case_query, vector) def test_expr_depth_limit(self, vector): # Compound predicates and_query = "select " + self.__gen_deep_infix_expr("true", " and false") self.__exec_query(and_query, vector) or_query = "select " + self.__gen_deep_infix_expr("true", " or false") self.__exec_query(or_query, vector) # Arithmetic expr arith_query = "select " + self.__gen_deep_infix_expr("1", " + 1") self.__exec_query(arith_query, vector) func_query = "select " + self.__gen_deep_func_expr("lower(", "'abc'", ")") self.__exec_query(func_query, vector) # Casts. cast_query = "select " + self.__gen_deep_func_expr("cast(", "1", " as int)") self.__exec_query(cast_query, vector) def test_under_statement_expression_limit(self): """Generate a huge case statement that barely fits within the statement expression limit and verify that it runs.""" # IMPALA-13280: Disable codegen because it will take 2GB+ of memory # and over 30 minutes for doing codegen. if self.exploration_strategy() != 'exhaustive': pytest.skip("Only test limit of expression on exhaustive") case = self.__gen_huge_case("int_col", 32, 2, " ") query = "select {0} as huge_case from functional_parquet.alltypes".format(case) options = {'disable_codegen': True} self.execute_query_expect_success(self.client, query, options) def test_max_statement_size(self): """Generate a huge case statement that exceeds the default 16MB limit and verify that it gets rejected.""" expected_err_tmpl = ( r"Statement length of {0} bytes exceeds the maximum " r"statement length \({1} bytes\)") size_16mb = 16 * 1024 * 1024 # Case 1: a valid SQL that would parse correctly case = self.__gen_huge_case("int_col", 75, 2, " ") query = "select {0} as huge_case from functional.alltypes".format(case) err = self.execute_query_expect_failure(self.client, query) assert re.search(expected_err_tmpl.format(len(query), size_16mb), str(err)) # Case 2: a string of 'a' characters that does not parse. This will still fail # with the same message, because the check is before parsing. invalid_sql = 'a' * (size_16mb + 1) err = self.execute_query_expect_failure(self.client, invalid_sql) assert re.search(expected_err_tmpl.format(len(invalid_sql), size_16mb), str(err)) # This test can take ~2GB memory while it takes only ~10 seconds. It caused OOM # in the past, so it is safer to run it serially. @pytest.mark.execute_serially def test_statement_expression_limit(self): """Generate a huge case statement that barely fits within the 16MB limit but exceeds the statement expression limit. Verify that it fails.""" case = self.__gen_huge_case("int_col", 66, 2, " ") query = "select {0} as huge_case from functional.alltypes".format(case) assert len(query) < 16 * 1024 * 1024 expected_err_re = ( r"Exceeded the statement expression limit \({0}\)\n" r"Statement has .* expressions.").format(250000) err = self.execute_query_expect_failure(self.client, query) assert re.search(expected_err_re, str(err)) def __gen_huge_case(self, col_name, fanout, depth, indent): toks = ["case\n"] for i in range(fanout): add = randint(1, 1000000) divisor = randint(1, 10000000) mod = randint(0, divisor) # Generate a mathematical expr that can't be easily optimised out. # IMPALA-13280: The parentheses in when_expr is needed to disable constant folding # that can take over 7 minutes to complete by Planner. when_expr = "({0} + {1}) % {2} = {3}".format(col_name, add, divisor, mod) if depth == 0: then_expr = "{0}".format(i) else: then_expr = "({0})".format( self.__gen_huge_case(col_name, fanout, depth - 1, indent + " ")) toks.append(indent) toks.append("when {0} then {1}\n".format(when_expr, then_expr)) toks.append(indent) toks.append("end") return ''.join(toks) def __gen_deep_infix_expr(self, prefix, repeat_suffix): expr = prefix for i in range(self.EXPR_DEPTH_LIMIT - 1): expr += repeat_suffix return expr def __gen_deep_func_expr(self, open_func, base_arg, close_func): expr = "" for i in range(self.EXPR_DEPTH_LIMIT - 1): expr += open_func expr += base_arg for i in range(self.EXPR_DEPTH_LIMIT - 1): expr += close_func return expr def __exec_query(self, sql_str, vector): try: impala_ret = self.execute_query_using_vector(sql_str, vector) assert impala_ret.success, "Failed to execute query %s" % (sql_str) except Exception as e: # consider any exception a failure assert False, "Failed to execute query %s: %s" % (sql_str, e) class TestTimestampFunctions(ImpalaTestSuite): """Tests for UTC timestamp functions and local timestamp functions.""" @classmethod def add_test_dimensions(cls): super(TestTimestampFunctions, cls).add_test_dimensions() # Test with and without expr rewrites to cover regular expr evaluations # as well as constant folding, in particular, timestamp literals. add_exec_option_dimension(cls, 'enable_expr_rewrites', EXPR_REWRITE_OPTIONS) add_exec_option_dimension(cls, 'use_local_tz_for_unix_timestamp_conversions', [0, 1]) # No need to permute different file format. cls.ImpalaTestMatrix.add_constraint(lambda v: v.get_value('table_format').file_format == 'text' and v.get_value('table_format').compression_codec == 'none') def test_utc_functions(self, vector): """Tests for UTC timestamp functions, i.e. functions that do not depend on the behavior of the use_local_tz_for_unix_timestamp_conversions option.""" self.run_test_case('QueryTest/utc-timestamp-functions', vector) @SkipIfFS.hbase def test_timestamp_functions(self, vector): if vector.get_exec_option('use_local_tz_for_unix_timestamp_conversions') == 1: # Tests for local timestamp functions, i.e. functions that depend on the # behavior of the use_local_tz_for_unix_timestamp_conversions option. self.run_test_case('QueryTest/local-timestamp-functions', vector) # Test that scanning of different file formats is not affected by # use_local_tz_for_unix_timestamp_conversions option. self.run_test_case('QueryTest/file-formats-with-local-tz-conversion', vector) class TestConstantFoldingNoTypeLoss(ImpalaTestSuite): """"Regression tests for IMPALA-11462.""" @classmethod def add_test_dimensions(cls): super(TestConstantFoldingNoTypeLoss, cls).add_test_dimensions() # Test with and without expr rewrites to verify that constant folding does not change # the behaviour. add_exec_option_dimension(cls, 'enable_expr_rewrites', EXPR_REWRITE_OPTIONS) # We don't actually use a table so one file format is enough. cls.ImpalaTestMatrix.add_constraint(lambda v: v.get_value('table_format').file_format in ['parquet']) def test_shiftleft(self, vector): """ Tests that the return values of the 'shiftleft' functions are correct for the input types (the return type should be the same as the first argument).""" types_and_widths = [ ("TINYINT", 8), ("SMALLINT", 16), ("INT", 32), ("BIGINT", 64) ] query_template = ("select shiftleft(cast(1 as {typename}), z) c " "from (select {shift_val} z ) x") for (typename, width) in types_and_widths: shift_val = width - 2 # Valid and positive for signed types. expected_value = 1 << shift_val result = self.execute_query_using_vector( query_template.format(typename=typename, shift_val=shift_val), vector) assert result.data == [str(expected_value)] def test_addition(self, vector): query = "select typeof(cast(1 as bigint) + cast(rand() as tinyint))" result = self.execute_query_using_vector(query, vector) assert result.data == ["BIGINT"] class TestNonConstPatternILike(ImpalaTestSuite): """Tests for ILIKE and IREGEXP with non-constant patterns for IMPALA-12581. These tests verify that ILIKE and IREGEXP work correctly when the pattern is not a constant string.""" @classmethod def add_test_dimensions(cls): super(TestNonConstPatternILike, cls).add_test_dimensions() # This test does not care about the file format of test table. # Fix the table format to text. cls.ImpalaTestMatrix.add_dimension( create_uncompressed_text_dimension(cls.get_workload())) def test_non_const_pattern_ilike(self, vector, unique_database): with self.create_impala_client_from_vector(vector) as client: tbl_name = '`{0}`.`ilike_test`'.format(unique_database) self.__run_non_const_pattern_ilike(client, tbl_name) def __run_non_const_pattern_ilike(self, client, tbl_name): self.execute_query_expect_success(client, "CREATE TABLE {0} (pattern_str string)".format(tbl_name)) self.execute_query_expect_success(client, "INSERT INTO TABLE {0} VALUES('%b%'), ('.*b.*')".format(tbl_name)) ilike_result = self.execute_query_expect_success(client, "SELECT count(*) FROM {0} WHERE 'ABC' ILIKE pattern_str".format(tbl_name)) assert int(ilike_result.get_data()) == 1 iregexp_result = self.execute_query_expect_success(client, "SELECT count(*) FROM {0} WHERE 'ABC' IREGEXP pattern_str".format(tbl_name)) assert int(iregexp_result.get_data()) == 1