Files
impala/tests/query_test/test_join_queries.py
Csaba Ringhofer f98b697c7b IMPALA-13929: Make 'functional-query' the default workload in tests
This change adds get_workload() to ImpalaTestSuite and removes it
from all test suites that already returned 'functional-query'.
get_workload() is also removed from CustomClusterTestSuite which
used to return 'tpch'.

All other changes besides impala_test_suite.py and
custom_cluster_test_suite.py are just mass removals of
get_workload() functions.

The behavior is only changed in custom cluster tests that didn't
override get_workload(). By returning 'functional-query' instead
of 'tpch', exploration_strategy() will no longer return 'core' in
'exhaustive' test runs. See IMPALA-3947 on why workload affected
exploration_strategy. An example for affected test is
TestCatalogHMSFailures which was skipped both in core and exhaustive
runs before this change.

get_workload() functions that return a different workload than
'functional-query' are not changed - it is possible that some of
these also don't handle exploration_strategy() as expected, but
individually checking these tests is out of scope in this patch.

Change-Id: I9ec6c41ffb3a30e1ea2de773626d1485c69fe115
Reviewed-on: http://gerrit.cloudera.org:8080/22726
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
Reviewed-by: Daniel Becker <daniel.becker@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-04-08 07:12:55 +00:00

274 lines
10 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Targeted tests for Impala joins
#
from __future__ import absolute_import, division, print_function
import pytest
from copy import deepcopy
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import SkipIf, SkipIfFS
from tests.common.test_dimensions import (
add_exec_option_dimension,
add_mandatory_exec_option,
create_exec_option_dimension,
create_single_exec_option_dimension,
create_table_format_dimension)
ENABLE_OUTER_JOIN_TO_INNER_TRANSFORMATION = ['false', 'true']
def batch_size_dim(cls):
if cls.exploration_strategy() == 'exhaustive':
return [0, 1]
else:
return [0]
def mt_dop_dim(cls):
if cls.exploration_strategy() == 'exhaustive':
return [0, 1, 4]
else:
return [0, 4]
class TestJoinBase(ImpalaTestSuite):
"""The base class for test join classes.
Intended to provide subclasses with default test dimensions declaration
and constraints through add_test_dimensions() both in core and
exhaustive exploration."""
@classmethod
def add_test_dimensions(cls):
super(TestJoinBase, cls).add_test_dimensions()
# Set exec options
cls.ImpalaTestMatrix.add_dimension(
create_exec_option_dimension(batch_sizes=batch_size_dim(cls)))
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format in ['parquet'])
class TestJoinQueries(TestJoinBase):
@classmethod
def add_test_dimensions(cls):
super(TestJoinQueries, cls).add_test_dimensions()
add_exec_option_dimension(cls, 'mt_dop', mt_dop_dim(cls))
def test_basic_joins(self, vector):
self.run_test_case('QueryTest/joins', vector)
@SkipIfFS.hbase
@SkipIf.skip_hbase
def test_joins_against_hbase(self, vector):
# TODO: Look into splitting up join tests to accomodate hbase.
# Joins with hbase tables produce drastically different results.
self.run_test_case('QueryTest/joins-against-hbase', vector)
def test_outer_joins(self, vector):
self.run_test_case('QueryTest/outer-joins', vector)
def test_empty_build_joins(self, vector):
self.run_test_case('QueryTest/empty-build-joins', vector)
class TestSingleNodeJoins(TestJoinBase):
@classmethod
def add_test_dimensions(cls):
super(TestSingleNodeJoins, cls).add_test_dimensions()
# Redeclare exec options with num_nodes=1, batch_size=0.
cls.ImpalaTestMatrix.add_dimension(
create_exec_option_dimension(cluster_sizes=[1], batch_sizes=[0]))
def test_single_node_nested_loop_joins(self, vector):
# Test the execution of nested-loops joins for join types that can only be
# executed in a single node (right [outer|semi|anti] and full outer joins).
self.run_test_case('QueryTest/single-node-nlj', vector)
class TestSingleNodeJoinsExhaustive(TestJoinBase):
@classmethod
def add_test_dimensions(cls):
super(TestSingleNodeJoinsExhaustive, cls).add_test_dimensions()
if cls.exploration_strategy() != 'exhaustive':
# skip this test if not in exhaustive exploration.
pytest.skip("Only run in exhaustive exploration.")
# Redeclare exec options with num_nodes=1, batch_size=0.
cls.ImpalaTestMatrix.add_dimension(
create_exec_option_dimension(cluster_sizes=[1], batch_sizes=[0]))
add_exec_option_dimension(cls, 'mt_dop', mt_dop_dim(cls))
def test_single_node_joins_with_limits_exhaustive(self, vector):
new_vector = deepcopy(vector)
del new_vector.get_value('exec_option')['batch_size'] # .test file sets batch_size
self.run_test_case('QueryTest/single-node-joins-with-limits-exhaustive', new_vector)
def test_single_node_nested_loop_joins_exhaustive(self, vector):
# Test the execution of nested-loops joins for join types that can only be
# executed in a single node (right [outer|semi|anti] and full outer joins).
self.run_test_case('QueryTest/single-node-nlj-exhaustive', vector)
class TestOuterJoinToInnerTransformation(TestJoinBase):
@classmethod
def add_test_dimensions(cls):
super(TestOuterJoinToInnerTransformation, cls).add_test_dimensions()
add_exec_option_dimension(cls, 'enable_outer_join_to_inner_transformation',
ENABLE_OUTER_JOIN_TO_INNER_TRANSFORMATION)
def test_outer_to_inner_joins(self, vector):
self.run_test_case('QueryTest/outer-to-inner-joins', vector)
class TestMissTupleJoins(TestJoinBase):
@classmethod
def add_test_dimensions(cls):
super(TestMissTupleJoins, cls).add_test_dimensions()
# Only need to run with single exec option dimension.
cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
def test_miss_tuple_joins(self, vector, unique_database):
self.run_test_case('QueryTest/miss-tuple-joins', vector, unique_database)
class TestTPCHJoinQueries(TestJoinBase):
# Uses the TPC-H dataset in order to have larger joins. Needed for example to test
# the repartitioning codepaths.
@classmethod
def get_workload(cls):
return 'tpch'
@classmethod
def add_test_dimensions(cls):
super(TestTPCHJoinQueries, cls).add_test_dimensions()
@classmethod
def teardown_class(cls):
cls.client.execute('set mem_limit = 0')
super(TestTPCHJoinQueries, cls).teardown_class()
def test_outer_joins(self, vector, unique_database):
self.run_test_case('tpch-outer-joins', vector,
test_file_vars={'$UNIQUE_DB': unique_database})
class TestSemiJoinQueries(TestJoinBase):
@classmethod
def add_test_dimensions(cls):
super(TestSemiJoinQueries, cls).add_test_dimensions()
def __load_semi_join_tables(self, db_name):
# Create and load fresh test tables for semi/anti-join tests
fq_tbl_name_a = '%s.SemiJoinTblA' % db_name
self.client.execute('create table %s (a int, b int, c int)' % fq_tbl_name_a)
self.client.execute('insert into %s values(1,1,1)' % fq_tbl_name_a)
self.client.execute('insert into %s values(1,1,10)' % fq_tbl_name_a)
self.client.execute('insert into %s values(1,2,10)' % fq_tbl_name_a)
self.client.execute('insert into %s values(1,3,10)' % fq_tbl_name_a)
self.client.execute('insert into %s values(NULL,NULL,30)' % fq_tbl_name_a)
self.client.execute('insert into %s values(2,4,30)' % fq_tbl_name_a)
self.client.execute('insert into %s values(2,NULL,20)' % fq_tbl_name_a)
fq_tbl_name_b = '%s.SemiJoinTblB' % db_name
self.client.execute('create table %s (a int, b int, c int)' % fq_tbl_name_b)
self.client.execute('insert into %s values(1,1,1)' % fq_tbl_name_b)
self.client.execute('insert into %s values(1,1,10)' % fq_tbl_name_b)
self.client.execute('insert into %s values(1,2,5)' % fq_tbl_name_b)
self.client.execute('insert into %s values(1,NULL,10)' % fq_tbl_name_b)
self.client.execute('insert into %s values(2,10,NULL)' % fq_tbl_name_b)
self.client.execute('insert into %s values(3,NULL,NULL)' % fq_tbl_name_b)
self.client.execute('insert into %s values(3,NULL,50)' % fq_tbl_name_b)
def test_semi_joins(self, vector, unique_database):
self.__load_semi_join_tables(unique_database)
self.run_test_case('QueryTest/semi-joins', vector, unique_database)
class TestSemiJoinQueriesExhaustive(TestJoinBase):
@classmethod
def add_test_dimensions(cls):
super(TestSemiJoinQueriesExhaustive, cls).add_test_dimensions()
if cls.exploration_strategy() != 'exhaustive':
# skip this test if not in exhaustive exploration.
pytest.skip("Only run in exhaustive exploration.")
@pytest.mark.execute_serially
def test_semi_joins_exhaustive(self, vector):
"""Expensive and memory-intensive semi-join tests."""
self.run_test_case('QueryTest/semi-joins-exhaustive', vector)
class TestSpillingHashJoin(ImpalaTestSuite):
@classmethod
def add_test_dimensions(cls):
super(TestSpillingHashJoin, cls).add_test_dimensions()
# To cut down on test execution time, only run in exhaustive.
if cls.exploration_strategy() != 'exhaustive':
pytest.skip("Only run in exhaustive exploration.")
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('table_format').file_format == 'parquet')
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('exec_option')['disable_codegen'] is False)
@pytest.mark.execute_serially
def test_spilling_hash_join(self, vector, unique_database):
"""Regression test for IMPALA-13138. It loads a few large tables and runs a complex
query that spills during JOIN build that crashed Impala before IMPALA-13138."""
self.run_test_case('QueryTest/create-tables-impala-13138', vector, unique_database)
for i in range(0, 5):
self.run_test_case('QueryTest/query-impala-13138', vector, unique_database)
class TestExprValueCache(ImpalaTestSuite):
# Test that HashTableCtx::ExprValueCache memory usage stays under 256KB.
# Run TPC-DS Q97 with bare minimum memory limit, MT_DOP=1, and max BATCH_SIZE.
# Before IMPALA-13075, the test query will pass Planner and Admission Control,
# but later failed during backend execution due to memory limit exceeded.
@classmethod
def get_workload(cls):
return 'tpcds_partitioned'
@classmethod
def add_test_dimensions(cls):
super(TestExprValueCache, cls).add_test_dimensions()
# create_single_exec_option_dimension + batch_size=65536
cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[65536],
disable_codegen_rows_threshold_options=[5000]))
cls.ImpalaTestMatrix.add_dimension(
create_table_format_dimension(cls.get_workload(), 'parquet/snap/block'))
add_mandatory_exec_option(cls, 'runtime_filter_mode', 'OFF')
add_mandatory_exec_option(cls, 'mem_limit', '149mb')
add_mandatory_exec_option(cls, 'mt_dop', 1)
def test_expr_value_cache_fits(self, vector):
self.run_test_case('tpcds-q97', vector)