mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
This change adds get_workload() to ImpalaTestSuite and removes it from all test suites that already returned 'functional-query'. get_workload() is also removed from CustomClusterTestSuite which used to return 'tpch'. All other changes besides impala_test_suite.py and custom_cluster_test_suite.py are just mass removals of get_workload() functions. The behavior is only changed in custom cluster tests that didn't override get_workload(). By returning 'functional-query' instead of 'tpch', exploration_strategy() will no longer return 'core' in 'exhaustive' test runs. See IMPALA-3947 on why workload affected exploration_strategy. An example for affected test is TestCatalogHMSFailures which was skipped both in core and exhaustive runs before this change. get_workload() functions that return a different workload than 'functional-query' are not changed - it is possible that some of these also don't handle exploration_strategy() as expected, but individually checking these tests is out of scope in this patch. Change-Id: I9ec6c41ffb3a30e1ea2de773626d1485c69fe115 Reviewed-on: http://gerrit.cloudera.org:8080/22726 Reviewed-by: Riza Suminto <riza.suminto@cloudera.com> Reviewed-by: Daniel Becker <daniel.becker@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
404 lines
18 KiB
Python
404 lines
18 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
import re
|
|
from copy import copy
|
|
|
|
from tests.common.impala_test_suite import ImpalaTestSuite
|
|
from tests.common.skip import SkipIfNotHdfsMinicluster
|
|
from tests.common.test_dimensions import (
|
|
add_exec_option_dimension,
|
|
create_exec_option_dimension)
|
|
|
|
"""Run sizes (number of pages per run) in sorter.
|
|
Values:
|
|
0: there is no limit on the size of an in-memory run. The sorter will allocate memory
|
|
to fit the data until it encounters some memory limit.
|
|
2: an in-memory run can be at most 2 pages. This is the smallest possible size of a run:
|
|
at least 1 page for fix-len data and 1 page for var-len data.
|
|
20: an in-memory run can be at most 20 pages.
|
|
Too small in-memory runs with var-len data can cause memory fragmentation, therefore
|
|
different memory or spilling limits are needed to trigger the same scenarios in some test
|
|
cases."""
|
|
MAX_SORT_RUN_SIZE = [0, 2, 20]
|
|
|
|
|
|
def split_result_rows(result):
|
|
"""Split result rows by tab to produce a list of lists. i.e.
|
|
[[a1,a2], [b1, b2], [c1, c2]]"""
|
|
return [row.split('\t') for row in result]
|
|
|
|
|
|
def transpose_results(result, map_fn=lambda x: x):
|
|
"""Given a query result (list of strings, each string represents a row), return a list
|
|
of columns, where each column is a list of strings. Optionally, map_fn can be
|
|
provided to be applied to every value, eg. to convert the strings to their
|
|
underlying types."""
|
|
|
|
split_result = split_result_rows(result)
|
|
column_result = []
|
|
for col in zip(*split_result):
|
|
# col is the transposed result, i.e. a1, b1, c1
|
|
# Apply map_fn to all elements
|
|
column_result.append([map_fn(x) for x in col])
|
|
|
|
return column_result
|
|
|
|
|
|
class TestQueryFullSort(ImpalaTestSuite):
|
|
"""Test class to do functional validation of sorting when data is spilled to disk.
|
|
All tests under this class run with num_nodes=1."""
|
|
|
|
@classmethod
|
|
def get_workload(self):
|
|
return 'tpch'
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestQueryFullSort, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(cluster_sizes=[1]))
|
|
add_exec_option_dimension(cls, 'max_sort_run_size', MAX_SORT_RUN_SIZE)
|
|
|
|
if cls.exploration_strategy() == 'core':
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
v.get_value('table_format').file_format == 'parquet')
|
|
|
|
def test_multiple_buffer_pool_limits(self, vector):
|
|
"""Using lineitem table forces the multi-phase sort with low buffer_pool_limit.
|
|
This test takes about a minute."""
|
|
query = """select l_comment, l_partkey, l_orderkey, l_suppkey, l_commitdate
|
|
from lineitem order by l_comment limit 100000"""
|
|
exec_option = copy(vector.get_value('exec_option'))
|
|
exec_option['disable_outermost_topn'] = 1
|
|
table_format = vector.get_value('table_format')
|
|
|
|
"""The first run should fit in memory, the second run is a 2-phase disk sort,
|
|
and the third run is a multi-phase sort (i.e. with an intermediate merge)."""
|
|
for buffer_pool_limit in ['-1', '300m', '130m']:
|
|
exec_option['buffer_pool_limit'] = buffer_pool_limit
|
|
query_result = self.execute_query(
|
|
query, exec_option, table_format=table_format)
|
|
result = transpose_results(query_result.data)
|
|
assert(result[0] == sorted(result[0]))
|
|
|
|
def test_multiple_sort_run_bytes_limits(self, vector):
|
|
"""Using lineitem table forces the multi-phase sort with low sort_run_bytes_limit.
|
|
This test takes about a minute."""
|
|
query = """select l_comment, l_partkey, l_orderkey, l_suppkey, l_commitdate
|
|
from lineitem order by l_comment limit 100000"""
|
|
exec_option = copy(vector.get_value('exec_option'))
|
|
exec_option['disable_outermost_topn'] = 1
|
|
table_format = vector.get_value('table_format')
|
|
|
|
"""The first sort run is given a privilege to ignore sort_run_bytes_limit, except
|
|
when estimate hints that spill is inevitable. The lower sort_run_bytes_limit of
|
|
a query is, the more sort runs are likely to be produced and spilled.
|
|
Case 1 : 0 SpilledRuns, because all rows fit within the maximum reservation.
|
|
sort_run_bytes_limit is not enforced.
|
|
Case 2 : 4 or 5 SpilledRuns, because sort node estimates that spill is inevitable.
|
|
So all runs are capped to 130m, including the first one."""
|
|
# max_sort_run_size > 0 will spill more in Case 2.
|
|
options = [('2g', '100m', '0'),
|
|
('400m', '130m', ('5' if exec_option['max_sort_run_size'] > 0 else '4'))]
|
|
for (mem_limit, sort_run_bytes_limit, spilled_runs) in options:
|
|
exec_option['mem_limit'] = mem_limit
|
|
exec_option['sort_run_bytes_limit'] = sort_run_bytes_limit
|
|
query_result = self.execute_query(
|
|
query, exec_option, table_format=table_format)
|
|
m = re.search(r'\s+\- SpilledRuns: .*', query_result.runtime_profile)
|
|
assert "SpilledRuns: " + spilled_runs in m.group()
|
|
result = transpose_results(query_result.data)
|
|
assert(result[0] == sorted(result[0]))
|
|
|
|
def test_multiple_mem_limits_full_output(self, vector):
|
|
""" Exercise a range of memory limits, returning the full sorted input. """
|
|
query = """select o_orderdate, o_custkey, o_comment
|
|
from orders
|
|
order by o_orderdate"""
|
|
exec_option = copy(vector.get_value('exec_option'))
|
|
table_format = vector.get_value('table_format')
|
|
exec_option['default_spillable_buffer_size'] = '8M'
|
|
|
|
# Minimum memory for different parts of the plan.
|
|
buffered_plan_root_sink_reservation_mb = 16
|
|
sort_reservation_mb = 48
|
|
if table_format.file_format == 'parquet':
|
|
scan_reservation_mb = 24
|
|
else:
|
|
scan_reservation_mb = 8
|
|
total_reservation_mb = sort_reservation_mb + scan_reservation_mb \
|
|
+ buffered_plan_root_sink_reservation_mb
|
|
|
|
# The below memory value assume 8M pages.
|
|
# Test with unlimited and minimum memory for all file formats.
|
|
buffer_pool_limit_values = ['-1', '{0}M'.format(total_reservation_mb)]
|
|
if self.exploration_strategy() == 'exhaustive' and \
|
|
table_format.file_format == 'parquet':
|
|
# Test some intermediate values for parquet on exhaustive.
|
|
buffer_pool_limit_values += ['128M', '256M']
|
|
for buffer_pool_limit in buffer_pool_limit_values:
|
|
exec_option['buffer_pool_limit'] = buffer_pool_limit
|
|
result = transpose_results(self.execute_query(
|
|
query, exec_option, table_format=table_format).data)
|
|
assert(result[0] == sorted(result[0]))
|
|
|
|
def test_sort_join(self, vector):
|
|
"""With minimum memory limit this should be a 1-phase sort"""
|
|
query = """select o1.o_orderdate, o2.o_custkey, o1.o_comment from orders o1 join
|
|
orders o2 on (o1.o_orderkey = o2.o_orderkey) order by o1.o_orderdate limit 100000"""
|
|
|
|
exec_option = copy(vector.get_value('exec_option'))
|
|
exec_option['disable_outermost_topn'] = 1
|
|
# With max_sort_run_size=2 (2 pages per run) the varlen data is more fragmented and
|
|
# requires a higher limit to maintain "TotalMergesPerformed: 1" assertion.
|
|
if exec_option['max_sort_run_size'] == 2:
|
|
exec_option['mem_limit'] = "144m"
|
|
else:
|
|
exec_option['mem_limit'] = "134m"
|
|
table_format = vector.get_value('table_format')
|
|
|
|
query_result = self.execute_query(query, exec_option, table_format=table_format)
|
|
assert "TotalMergesPerformed: 1" in query_result.runtime_profile
|
|
result = transpose_results(query_result.data)
|
|
assert(result[0] == sorted(result[0]))
|
|
|
|
def test_sort_union(self, vector):
|
|
query = """select o_orderdate, o_custkey, o_comment from (select * from orders union
|
|
select * from orders union all select * from orders) as i
|
|
order by o_orderdate limit 100000"""
|
|
|
|
exec_option = copy(vector.get_value('exec_option'))
|
|
exec_option['disable_outermost_topn'] = 1
|
|
exec_option['mem_limit'] = "3000m"
|
|
table_format = vector.get_value('table_format')
|
|
|
|
result = transpose_results(self.execute_query(
|
|
query, exec_option, table_format=table_format).data)
|
|
assert(result[0] == sorted(result[0]))
|
|
|
|
def test_pathological_input(self, vector):
|
|
""" Regression test for stack overflow and poor performance on certain inputs where
|
|
always selecting the middle element as a quicksort pivot caused poor performance. The
|
|
trick is to concatenate two equal-size sorted inputs. If the middle element is always
|
|
selected as the pivot (the old method), the sorter tends to get stuck selecting the
|
|
minimum element as the pivot, which results in almost all of the tuples ending up
|
|
in the right partition.
|
|
"""
|
|
query = """select l_orderkey from (
|
|
select * from lineitem limit 300000
|
|
union all
|
|
select * from lineitem limit 300000) t
|
|
order by l_orderkey"""
|
|
|
|
exec_option = copy(vector.get_value('exec_option'))
|
|
exec_option['disable_outermost_topn'] = 1
|
|
# Run with a single scanner thread so that the input doesn't get reordered.
|
|
exec_option['num_scanner_threads'] = "1"
|
|
table_format = vector.get_value('table_format')
|
|
|
|
result = transpose_results(self.execute_query(
|
|
query, exec_option, table_format=table_format).data)
|
|
numeric_results = [int(val) for val in result[0]]
|
|
assert(numeric_results == sorted(numeric_results))
|
|
|
|
def test_spill_empty_strings(self, vector):
|
|
"""Test corner case of spilling sort with only empty strings. Spilling with var len
|
|
slots typically means the sort must reorder blocks and convert pointers, but this case
|
|
has to be handled differently because there are no var len blocks to point into."""
|
|
|
|
query = """
|
|
select empty_str, l_orderkey, l_partkey, l_suppkey,
|
|
l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax
|
|
from (select substr(l_comment, 1000, 0) empty_str, * from lineitem) t
|
|
order by empty_str, l_orderkey, l_partkey, l_suppkey, l_linenumber
|
|
limit 100000
|
|
"""
|
|
|
|
exec_option = copy(vector.get_value('exec_option'))
|
|
exec_option['disable_outermost_topn'] = 1
|
|
exec_option['buffer_pool_limit'] = "256m"
|
|
table_format = vector.get_value('table_format')
|
|
|
|
result = transpose_results(self.execute_query(
|
|
query, exec_option, table_format=table_format).data)
|
|
assert(result[0] == sorted(result[0]))
|
|
|
|
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
|
|
def test_sort_reservation_usage(self, vector):
|
|
"""Tests for sorter reservation usage.
|
|
If max_sort_run_size > 0, the larger the run size, the sooner the sorter can give up
|
|
memory to the next node."""
|
|
if vector.get_value('exec_option')['max_sort_run_size'] == 2:
|
|
# Increase buffer_limit to maintain such that query never spill.
|
|
buffer_pool_limit = '27m'
|
|
else:
|
|
buffer_pool_limit = '14m'
|
|
self.run_test_case('sort-reservation-usage-single-node', vector,
|
|
test_file_vars={'$BUFFER_POOL_LIMIT': buffer_pool_limit})
|
|
|
|
|
|
class TestRandomSort(ImpalaTestSuite):
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestRandomSort, cls).add_test_dimensions()
|
|
add_exec_option_dimension(cls, 'max_sort_run_size', MAX_SORT_RUN_SIZE)
|
|
|
|
if cls.exploration_strategy() == 'core':
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
v.get_value('table_format').file_format == 'parquet')
|
|
|
|
def test_order_by_random(self, vector):
|
|
"""Tests that 'order by random()' works as expected."""
|
|
exec_option = copy(vector.get_value('exec_option'))
|
|
# "order by random()" with different seeds should produce different orderings.
|
|
seed_query = "select * from functional.alltypestiny order by random(%s)"
|
|
results_seed0 = self.execute_query(seed_query % "0")
|
|
results_seed1 = self.execute_query(seed_query % "1")
|
|
assert results_seed0.data != results_seed1.data
|
|
assert sorted(results_seed0.data) == sorted(results_seed1.data)
|
|
|
|
# Include "random()" in the select list to check that it's sorted correctly.
|
|
results = transpose_results(self.execute_query(
|
|
"select random() as r from functional.alltypessmall order by r",
|
|
exec_option).data, lambda x: float(x))
|
|
assert(results[0] == sorted(results[0]))
|
|
|
|
# Like above, but with a limit.
|
|
results = transpose_results(self.execute_query(
|
|
"select random() as r from functional.alltypes order by r limit 100").data,
|
|
lambda x: float(x))
|
|
assert(results == sorted(results))
|
|
|
|
# "order by random()" inside an inline view.
|
|
query = "select r from (select random() r from functional.alltypessmall) v order by r"
|
|
results = transpose_results(self.execute_query(query, exec_option).data,
|
|
lambda x: float(x))
|
|
assert (results == sorted(results))
|
|
|
|
def test_analytic_order_by_random(self, vector):
|
|
"""Tests that a window function over 'order by random()' works as expected."""
|
|
exec_option = copy(vector.get_value('exec_option'))
|
|
# Since we use the same random seed, the results should be returned in order.
|
|
query = """select last_value(rand(2)) over (order by rand(2)) from
|
|
functional.alltypestiny"""
|
|
results = transpose_results(self.execute_query(query, exec_option).data,
|
|
lambda x: float(x))
|
|
assert (results == sorted(results))
|
|
|
|
|
|
class TestPartialSort(ImpalaTestSuite):
|
|
"""Test class to do functional validation of partial sorts."""
|
|
|
|
@classmethod
|
|
def get_workload(self):
|
|
return 'tpch'
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestPartialSort, cls).add_test_dimensions()
|
|
add_exec_option_dimension(cls, 'max_sort_run_size', MAX_SORT_RUN_SIZE)
|
|
|
|
if cls.exploration_strategy() == 'core':
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
v.get_value('table_format').file_format == 'parquet')
|
|
|
|
def test_partial_sort_min_reservation(self, vector, unique_database):
|
|
"""Test that the partial sort node can operate if it only gets its minimum
|
|
memory reservation."""
|
|
table_name = "%s.kudu_test" % unique_database
|
|
self.client.set_configuration_option(
|
|
"debug_action", "-1:OPEN:SET_DENY_RESERVATION_PROBABILITY@1.0")
|
|
self.execute_query("""create table %s (col0 string primary key)
|
|
partition by hash(col0) partitions 8 stored as kudu""" % table_name)
|
|
exec_option = copy(vector.get_value('exec_option'))
|
|
result = self.execute_query(
|
|
"insert into %s select string_col from functional.alltypessmall" % table_name,
|
|
exec_option)
|
|
assert "PARTIAL SORT" in result.runtime_profile, result.runtime_profile
|
|
|
|
def test_partial_sort_kudu_insert(self, vector, unique_database):
|
|
table_name = "%s.kudu_partial_sort_test" % unique_database
|
|
self.execute_query("""create table %s (l_linenumber INT, l_orderkey BIGINT,
|
|
l_partkey BIGINT, l_shipdate STRING, l_quantity DECIMAL(12,2),
|
|
l_comment STRING, PRIMARY KEY(l_linenumber, l_orderkey) )
|
|
PARTITION BY RANGE (l_linenumber)
|
|
(
|
|
PARTITION VALUE = 1,
|
|
PARTITION VALUE = 2,
|
|
PARTITION VALUE = 3,
|
|
PARTITION VALUE = 4,
|
|
PARTITION VALUE = 5,
|
|
PARTITION VALUE = 6,
|
|
PARTITION VALUE = 7
|
|
)
|
|
STORED AS KUDU""" % table_name)
|
|
exec_option = copy(vector.get_value('exec_option'))
|
|
result = self.execute_query(
|
|
"""insert into %s SELECT l_linenumber, l_orderkey, l_partkey, l_shipdate,
|
|
l_quantity, l_comment FROM tpch.lineitem limit 300000""" % table_name,
|
|
exec_option)
|
|
assert "NumModifiedRows: 300000" in result.runtime_profile, result.runtime_profile
|
|
assert "NumRowErrors: 0" in result.runtime_profile, result.runtime_profile
|
|
|
|
|
|
class TestArraySort(ImpalaTestSuite):
|
|
"""Tests where there are arrays in the sorting tuple.
|
|
These tests run with num_nodes=1."""
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestArraySort, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(cluster_sizes=[1]))
|
|
add_exec_option_dimension(cls, 'max_sort_run_size', MAX_SORT_RUN_SIZE)
|
|
|
|
# The table we use is a parquet table.
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
v.get_value('table_format').file_format == 'parquet')
|
|
|
|
def test_array_sort(self, vector):
|
|
self._run_test_sort_query(vector, None)
|
|
|
|
def test_array_sort_with_limit(self, vector):
|
|
self._run_test_sort_query(vector, 3000)
|
|
|
|
def _run_test_sort_query(self, vector, limit):
|
|
"""Test sorting with spilling where an array that contains var-len data is in the
|
|
sorting tuple. If 'limit' is set to an integer, applies that limit."""
|
|
query = """select string_col, int_array, double_map, string_array, mixed
|
|
from functional_parquet.arrays_big order by string_col"""
|
|
|
|
if limit:
|
|
assert isinstance(limit, int)
|
|
limit_clause = " limit {}".format(limit)
|
|
query = query + limit_clause
|
|
|
|
exec_option = copy(vector.get_value('exec_option'))
|
|
exec_option['disable_outermost_topn'] = 1
|
|
exec_option['buffer_pool_limit'] = '44m'
|
|
table_format = vector.get_value('table_format')
|
|
|
|
query_result = self.execute_query(query, exec_option, table_format=table_format)
|
|
# Check that spilling was successful.
|
|
assert re.search(r'\s+\- SpilledRuns: [1-9]', query_result.runtime_profile)
|
|
|
|
# Split result rows (strings) into columns.
|
|
result = split_result_rows(query_result.data)
|
|
# Sort the result rows according to the first column.
|
|
sorted_result = sorted(result, key=lambda row: row[0])
|
|
assert(result == sorted_result)
|