# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # Targeted tests for Impala joins # from __future__ import absolute_import, division, print_function import pytest from copy import deepcopy from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.skip import SkipIf, SkipIfFS from tests.common.test_dimensions import ( add_exec_option_dimension, add_mandatory_exec_option, create_exec_option_dimension, create_single_exec_option_dimension, create_table_format_dimension) ENABLE_OUTER_JOIN_TO_INNER_TRANSFORMATION = ['false', 'true'] def batch_size_dim(cls): if cls.exploration_strategy() == 'exhaustive': return [0, 1] else: return [0] def mt_dop_dim(cls): if cls.exploration_strategy() == 'exhaustive': return [0, 1, 4] else: return [0, 4] class TestJoinBase(ImpalaTestSuite): """The base class for test join classes. Intended to provide subclasses with default test dimensions declaration and constraints through add_test_dimensions() both in core and exhaustive exploration.""" @classmethod def add_test_dimensions(cls): super(TestJoinBase, cls).add_test_dimensions() # Set exec options cls.ImpalaTestMatrix.add_dimension( create_exec_option_dimension(batch_sizes=batch_size_dim(cls))) cls.ImpalaTestMatrix.add_constraint(lambda v: v.get_value('table_format').file_format in ['parquet']) class TestJoinQueries(TestJoinBase): @classmethod def add_test_dimensions(cls): super(TestJoinQueries, cls).add_test_dimensions() add_exec_option_dimension(cls, 'mt_dop', mt_dop_dim(cls)) def test_basic_joins(self, vector): self.run_test_case('QueryTest/joins', vector) @SkipIfFS.hbase @SkipIf.skip_hbase def test_joins_against_hbase(self, vector): # TODO: Look into splitting up join tests to accomodate hbase. # Joins with hbase tables produce drastically different results. self.run_test_case('QueryTest/joins-against-hbase', vector) def test_outer_joins(self, vector): self.run_test_case('QueryTest/outer-joins', vector) def test_empty_build_joins(self, vector): self.run_test_case('QueryTest/empty-build-joins', vector) class TestSingleNodeJoins(TestJoinBase): @classmethod def add_test_dimensions(cls): super(TestSingleNodeJoins, cls).add_test_dimensions() # Redeclare exec options with num_nodes=1, batch_size=0. cls.ImpalaTestMatrix.add_dimension( create_exec_option_dimension(cluster_sizes=[1], batch_sizes=[0])) def test_single_node_nested_loop_joins(self, vector): # Test the execution of nested-loops joins for join types that can only be # executed in a single node (right [outer|semi|anti] and full outer joins). self.run_test_case('QueryTest/single-node-nlj', vector) class TestSingleNodeJoinsExhaustive(TestJoinBase): @classmethod def add_test_dimensions(cls): super(TestSingleNodeJoinsExhaustive, cls).add_test_dimensions() if cls.exploration_strategy() != 'exhaustive': # skip this test if not in exhaustive exploration. pytest.skip("Only run in exhaustive exploration.") # Redeclare exec options with num_nodes=1, batch_size=0. cls.ImpalaTestMatrix.add_dimension( create_exec_option_dimension(cluster_sizes=[1], batch_sizes=[0])) add_exec_option_dimension(cls, 'mt_dop', mt_dop_dim(cls)) def test_single_node_joins_with_limits_exhaustive(self, vector): new_vector = deepcopy(vector) del new_vector.get_value('exec_option')['batch_size'] # .test file sets batch_size self.run_test_case('QueryTest/single-node-joins-with-limits-exhaustive', new_vector) def test_single_node_nested_loop_joins_exhaustive(self, vector): # Test the execution of nested-loops joins for join types that can only be # executed in a single node (right [outer|semi|anti] and full outer joins). self.run_test_case('QueryTest/single-node-nlj-exhaustive', vector) class TestOuterJoinToInnerTransformation(TestJoinBase): @classmethod def add_test_dimensions(cls): super(TestOuterJoinToInnerTransformation, cls).add_test_dimensions() add_exec_option_dimension(cls, 'enable_outer_join_to_inner_transformation', ENABLE_OUTER_JOIN_TO_INNER_TRANSFORMATION) def test_outer_to_inner_joins(self, vector): self.run_test_case('QueryTest/outer-to-inner-joins', vector) class TestMissTupleJoins(TestJoinBase): @classmethod def add_test_dimensions(cls): super(TestMissTupleJoins, cls).add_test_dimensions() # Only need to run with single exec option dimension. cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension()) def test_miss_tuple_joins(self, vector, unique_database): self.run_test_case('QueryTest/miss-tuple-joins', vector, unique_database) class TestTPCHJoinQueries(TestJoinBase): # Uses the TPC-H dataset in order to have larger joins. Needed for example to test # the repartitioning codepaths. @classmethod def get_workload(cls): return 'tpch' @classmethod def add_test_dimensions(cls): super(TestTPCHJoinQueries, cls).add_test_dimensions() @classmethod def teardown_class(cls): cls.client.execute('set mem_limit = 0') super(TestTPCHJoinQueries, cls).teardown_class() def test_outer_joins(self, vector, unique_database): self.run_test_case('tpch-outer-joins', vector, test_file_vars={'$UNIQUE_DB': unique_database}) class TestSemiJoinQueries(TestJoinBase): @classmethod def add_test_dimensions(cls): super(TestSemiJoinQueries, cls).add_test_dimensions() def __load_semi_join_tables(self, db_name): # Create and load fresh test tables for semi/anti-join tests fq_tbl_name_a = '%s.SemiJoinTblA' % db_name self.client.execute('create table %s (a int, b int, c int)' % fq_tbl_name_a) self.client.execute('insert into %s values(1,1,1)' % fq_tbl_name_a) self.client.execute('insert into %s values(1,1,10)' % fq_tbl_name_a) self.client.execute('insert into %s values(1,2,10)' % fq_tbl_name_a) self.client.execute('insert into %s values(1,3,10)' % fq_tbl_name_a) self.client.execute('insert into %s values(NULL,NULL,30)' % fq_tbl_name_a) self.client.execute('insert into %s values(2,4,30)' % fq_tbl_name_a) self.client.execute('insert into %s values(2,NULL,20)' % fq_tbl_name_a) fq_tbl_name_b = '%s.SemiJoinTblB' % db_name self.client.execute('create table %s (a int, b int, c int)' % fq_tbl_name_b) self.client.execute('insert into %s values(1,1,1)' % fq_tbl_name_b) self.client.execute('insert into %s values(1,1,10)' % fq_tbl_name_b) self.client.execute('insert into %s values(1,2,5)' % fq_tbl_name_b) self.client.execute('insert into %s values(1,NULL,10)' % fq_tbl_name_b) self.client.execute('insert into %s values(2,10,NULL)' % fq_tbl_name_b) self.client.execute('insert into %s values(3,NULL,NULL)' % fq_tbl_name_b) self.client.execute('insert into %s values(3,NULL,50)' % fq_tbl_name_b) def test_semi_joins(self, vector, unique_database): self.__load_semi_join_tables(unique_database) self.run_test_case('QueryTest/semi-joins', vector, unique_database) class TestSemiJoinQueriesExhaustive(TestJoinBase): @classmethod def add_test_dimensions(cls): super(TestSemiJoinQueriesExhaustive, cls).add_test_dimensions() if cls.exploration_strategy() != 'exhaustive': # skip this test if not in exhaustive exploration. pytest.skip("Only run in exhaustive exploration.") @pytest.mark.execute_serially def test_semi_joins_exhaustive(self, vector): """Expensive and memory-intensive semi-join tests.""" self.run_test_case('QueryTest/semi-joins-exhaustive', vector) class TestSpillingHashJoin(ImpalaTestSuite): @classmethod def add_test_dimensions(cls): super(TestSpillingHashJoin, cls).add_test_dimensions() # To cut down on test execution time, only run in exhaustive. if cls.exploration_strategy() != 'exhaustive': pytest.skip("Only run in exhaustive exploration.") cls.ImpalaTestMatrix.add_constraint( lambda v: v.get_value('table_format').file_format == 'parquet') cls.ImpalaTestMatrix.add_constraint(lambda v: v.get_value('exec_option')['disable_codegen'] is False) @pytest.mark.execute_serially def test_spilling_hash_join(self, vector, unique_database): """Regression test for IMPALA-13138. It loads a few large tables and runs a complex query that spills during JOIN build that crashed Impala before IMPALA-13138.""" self.run_test_case('QueryTest/create-tables-impala-13138', vector, unique_database) for i in range(0, 5): self.run_test_case('QueryTest/query-impala-13138', vector, unique_database) class TestExprValueCache(ImpalaTestSuite): # Test that HashTableCtx::ExprValueCache memory usage stays under 256KB. # Run TPC-DS Q97 with bare minimum memory limit, MT_DOP=1, and max BATCH_SIZE. # Before IMPALA-13075, the test query will pass Planner and Admission Control, # but later failed during backend execution due to memory limit exceeded. @classmethod def get_workload(cls): return 'tpcds_partitioned' @classmethod def add_test_dimensions(cls): super(TestExprValueCache, cls).add_test_dimensions() # create_single_exec_option_dimension + batch_size=65536 cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension( cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[65536], disable_codegen_rows_threshold_options=[5000])) cls.ImpalaTestMatrix.add_dimension( create_table_format_dimension(cls.get_workload(), 'parquet/snap/block')) add_mandatory_exec_option(cls, 'runtime_filter_mode', 'OFF') add_mandatory_exec_option(cls, 'mem_limit', '149mb') add_mandatory_exec_option(cls, 'mt_dop', 1) def test_expr_value_cache_fits(self, vector): self.run_test_case('tpcds-q97', vector)