mirror of
https://github.com/apache/impala.git
synced 2026-02-03 09:00:39 -05:00
Result spooling has been relatively stable since it was introduced, and it has several benefits described in IMPALA-8656. This patch enable result spooling (SPOOL_QUERY_RESULTS) query options by default. Furthermore, some tests need to be adjusted to account for result spooling by default. The following are the adjustment categories and list of tests that fall under such category. Change in assertions: PlannerTest#testAcidTableScans PlannerTest#testBloomFilterAssignment PlannerTest#testConstantFolding PlannerTest#testFkPkJoinDetection PlannerTest#testFkPkJoinDetectionWithHDFSNumRowsEstDisabled PlannerTest#testKuduSelectivity PlannerTest#testMaxRowSize PlannerTest#testMinMaxRuntimeFilters PlannerTest#testMinMaxRuntimeFiltersWithHDFSNumRowsEstDisabled PlannerTest#testMtDopValidation PlannerTest#testParquetFiltering PlannerTest#testParquetFilteringDisabled PlannerTest#testPartitionPruning PlannerTest#testPreaggBytesLimit PlannerTest#testResourceRequirements PlannerTest#testRuntimeFilterQueryOptions PlannerTest#testSortExprMaterialization PlannerTest#testSpillableBufferSizing PlannerTest#testTableSample PlannerTest#testTpch PlannerTest#testKuduTpch PlannerTest#testTpchNested PlannerTest#testUnion TpcdsPlannerTest custom_cluster/test_admission_controller.py::TestAdmissionController::test_dedicated_coordinator_planner_estimates custom_cluster/test_admission_controller.py::TestAdmissionController::test_memory_rejection custom_cluster/test_admission_controller.py::TestAdmissionController::test_pool_mem_limit_configs metadata/test_explain.py::TestExplain::test_explain_level2 metadata/test_explain.py::TestExplain::test_explain_level3 metadata/test_stats_extrapolation.py::TestStatsExtrapolation::test_stats_extrapolation Increase BUFFER_POOL_LIMIT: query_test/test_queries.py::TestQueries::test_analytic_fns query_test/test_runtime_filters.py::TestRuntimeRowFilters::test_row_filter_reservation query_test/test_sort.py::TestQueryFullSort::test_multiple_mem_limits_full_output query_test/test_spilling.py::TestSpillingBroadcastJoins::test_spilling_broadcast_joins query_test/test_spilling.py::TestSpillingDebugActionDimensions::test_spilling_aggs query_test/test_spilling.py::TestSpillingDebugActionDimensions::test_spilling_regression_exhaustive query_test/test_udfs.py::TestUdfExecution::test_mem_limits Increase MEM_LIMIT: query_test/test_mem_usage_scaling.py::TestExchangeMemUsage::test_exchange_mem_usage_scaling query_test/test_mem_usage_scaling.py::TestScanMemLimit::test_hdfs_scanner_thread_mem_scaling Increase MAX_ROW_SIZE: custom_cluster/test_parquet_max_page_header.py::TestParquetMaxPageHeader::test_large_page_header_config query_test/test_insert.py::TestInsertQueries::test_insert_large_string query_test/test_query_mem_limit.py::TestQueryMemLimit::test_mem_limit query_test/test_scanners.py::TestTextSplitDelimiters::test_text_split_across_buffers_delimiter query_test/test_scanners.py::TestWideRow::test_wide_row Disable result spooling to maintain assertion: custom_cluster/test_admission_controller.py::TestAdmissionController::test_set_request_pool custom_cluster/test_admission_controller.py::TestAdmissionController::test_timeout_reason_host_memory custom_cluster/test_admission_controller.py::TestAdmissionController::test_timeout_reason_pool_memory custom_cluster/test_admission_controller.py::TestAdmissionController::test_queue_reasons_memory custom_cluster/test_admission_controller.py::TestAdmissionController::test_pool_config_change_while_queued custom_cluster/test_query_retries.py::TestQueryRetries::test_retry_fetched_rows custom_cluster/test_query_retries.py::TestQueryRetries::test_retry_finished_query custom_cluster/test_scratch_disk.py::TestScratchDir::test_no_dirs custom_cluster/test_scratch_disk.py::TestScratchDir::test_non_existing_dirs custom_cluster/test_scratch_disk.py::TestScratchDir::test_non_writable_dirs query_test/test_insert.py::TestInsertQueries::test_insert_large_string (the last query only) query_test/test_kudu.py::TestKuduMemLimits::test_low_mem_limit_low_selectivity_scan query_test/test_mem_usage_scaling.py::TestScanMemLimit::test_kudu_scan_mem_usage query_test/test_queries.py::TestQueriesParquetTables::test_very_large_strings query_test/test_query_mem_limit.py::TestCodegenMemLimit::test_codegen_mem_limit shell/test_shell_client.py::TestShellClient::test_fetch_size Testing: - Pass exhaustive tests. Change-Id: I9e360c1428676d8f3fab5d95efee18aca085eba4 Reviewed-on: http://gerrit.cloudera.org:8080/16755 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
266 lines
12 KiB
Python
266 lines
12 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import re
|
|
from copy import copy, deepcopy
|
|
|
|
from tests.common.impala_test_suite import ImpalaTestSuite
|
|
from tests.common.skip import SkipIfNotHdfsMinicluster
|
|
|
|
def transpose_results(result, map_fn=lambda x: x):
|
|
"""Given a query result (list of strings, each string represents a row), return a list
|
|
of columns, where each column is a list of strings. Optionally, map_fn can be provided
|
|
to be applied to every value, eg. to convert the strings to their underlying types."""
|
|
split_result = [row.split('\t') for row in result]
|
|
return [map(map_fn, list(l)) for l in zip(*split_result)]
|
|
|
|
class TestQueryFullSort(ImpalaTestSuite):
|
|
"""Test class to do functional validation of sorting when data is spilled to disk."""
|
|
|
|
@classmethod
|
|
def get_workload(self):
|
|
return 'tpch'
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestQueryFullSort, cls).add_test_dimensions()
|
|
|
|
if cls.exploration_strategy() == 'core':
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:\
|
|
v.get_value('table_format').file_format == 'parquet')
|
|
|
|
def test_multiple_buffer_pool_limits(self, vector):
|
|
"""Using lineitem table forces the multi-phase sort with low buffer_pool_limit.
|
|
This test takes about a minute."""
|
|
query = """select l_comment, l_partkey, l_orderkey, l_suppkey, l_commitdate
|
|
from lineitem order by l_comment limit 100000"""
|
|
exec_option = copy(vector.get_value('exec_option'))
|
|
exec_option['disable_outermost_topn'] = 1
|
|
exec_option['num_nodes'] = 1
|
|
table_format = vector.get_value('table_format')
|
|
|
|
"""The first run should fit in memory, the second run is a 2-phase disk sort,
|
|
and the third run is a multi-phase sort (i.e. with an intermediate merge)."""
|
|
for buffer_pool_limit in ['-1', '300m', '130m']:
|
|
exec_option['buffer_pool_limit'] = buffer_pool_limit
|
|
query_result = self.execute_query(
|
|
query, exec_option, table_format=table_format)
|
|
result = transpose_results(query_result.data)
|
|
assert(result[0] == sorted(result[0]))
|
|
|
|
def test_multiple_sort_run_bytes_limits(self, vector):
|
|
"""Using lineitem table forces the multi-phase sort with low sort_run_bytes_limit.
|
|
This test takes about a minute."""
|
|
query = """select l_comment, l_partkey, l_orderkey, l_suppkey, l_commitdate
|
|
from lineitem order by l_comment limit 100000"""
|
|
exec_option = copy(vector.get_value('exec_option'))
|
|
exec_option['disable_outermost_topn'] = 1
|
|
exec_option['num_nodes'] = 1
|
|
table_format = vector.get_value('table_format')
|
|
|
|
"""The first sort run is given a privilege to ignore sort_run_bytes_limit, except
|
|
when estimate hints that spill is inevitable. The lower sort_run_bytes_limit of
|
|
a query is, the more sort runs are likely to be produced and spilled.
|
|
Case 1 : 0 SpilledRuns, because all rows fit within the maximum reservation.
|
|
sort_run_bytes_limit is not enforced.
|
|
Case 2 : 4 SpilledRuns, because sort node estimate that spill is inevitable.
|
|
So all runs are capped to 130m, including the first one."""
|
|
options = [('2g', '100m', '0'), ('400m', '130m', '4')]
|
|
for (mem_limit, sort_run_bytes_limit, spilled_runs) in options:
|
|
exec_option['mem_limit'] = mem_limit
|
|
exec_option['sort_run_bytes_limit'] = sort_run_bytes_limit
|
|
query_result = self.execute_query(
|
|
query, exec_option, table_format=table_format)
|
|
m = re.search(r'\s+\- SpilledRuns: .*', query_result.runtime_profile)
|
|
assert "SpilledRuns: " + spilled_runs in m.group()
|
|
result = transpose_results(query_result.data)
|
|
assert(result[0] == sorted(result[0]))
|
|
|
|
def test_multiple_mem_limits_full_output(self, vector):
|
|
""" Exercise a range of memory limits, returning the full sorted input. """
|
|
query = """select o_orderdate, o_custkey, o_comment
|
|
from orders
|
|
order by o_orderdate"""
|
|
exec_option = copy(vector.get_value('exec_option'))
|
|
table_format = vector.get_value('table_format')
|
|
exec_option['default_spillable_buffer_size'] = '8M'
|
|
|
|
# Minimum memory for different parts of the plan.
|
|
buffered_plan_root_sink_reservation_mb = 16
|
|
sort_reservation_mb = 48
|
|
if table_format.file_format == 'parquet':
|
|
scan_reservation_mb = 24
|
|
else:
|
|
scan_reservation_mb = 8
|
|
total_reservation_mb = sort_reservation_mb + scan_reservation_mb \
|
|
+ buffered_plan_root_sink_reservation_mb
|
|
|
|
# The below memory value assume 8M pages.
|
|
# Test with unlimited and minimum memory for all file formats.
|
|
buffer_pool_limit_values = ['-1', '{0}M'.format(total_reservation_mb)]
|
|
if self.exploration_strategy() == 'exhaustive' and \
|
|
table_format.file_format == 'parquet':
|
|
# Test some intermediate values for parquet on exhaustive.
|
|
buffer_pool_limit_values += ['128M', '256M']
|
|
for buffer_pool_limit in buffer_pool_limit_values:
|
|
exec_option['buffer_pool_limit'] = buffer_pool_limit
|
|
result = transpose_results(self.execute_query(
|
|
query, exec_option, table_format=table_format).data)
|
|
assert(result[0] == sorted(result[0]))
|
|
|
|
def test_sort_join(self, vector):
|
|
"""With minimum memory limit this should be a 1-phase sort"""
|
|
query = """select o1.o_orderdate, o2.o_custkey, o1.o_comment from orders o1 join
|
|
orders o2 on (o1.o_orderkey = o2.o_orderkey) order by o1.o_orderdate limit 100000"""
|
|
|
|
exec_option = copy(vector.get_value('exec_option'))
|
|
exec_option['disable_outermost_topn'] = 1
|
|
exec_option['mem_limit'] = "134m"
|
|
exec_option['num_nodes'] = 1
|
|
table_format = vector.get_value('table_format')
|
|
|
|
query_result = self.execute_query(query, exec_option, table_format=table_format)
|
|
assert "TotalMergesPerformed: 1" in query_result.runtime_profile
|
|
result = transpose_results(query_result.data)
|
|
assert(result[0] == sorted(result[0]))
|
|
|
|
def test_sort_union(self, vector):
|
|
query = """select o_orderdate, o_custkey, o_comment from (select * from orders union
|
|
select * from orders union all select * from orders) as i
|
|
order by o_orderdate limit 100000"""
|
|
|
|
exec_option = copy(vector.get_value('exec_option'))
|
|
exec_option['disable_outermost_topn'] = 1
|
|
exec_option['mem_limit'] = "3000m"
|
|
table_format = vector.get_value('table_format')
|
|
|
|
result = transpose_results(self.execute_query(
|
|
query, exec_option, table_format=table_format).data)
|
|
assert(result[0] == sorted(result[0]))
|
|
|
|
def test_pathological_input(self, vector):
|
|
""" Regression test for stack overflow and poor performance on certain inputs where
|
|
always selecting the middle element as a quicksort pivot caused poor performance. The
|
|
trick is to concatenate two equal-size sorted inputs. If the middle element is always
|
|
selected as the pivot (the old method), the sorter tends to get stuck selecting the
|
|
minimum element as the pivot, which results in almost all of the tuples ending up
|
|
in the right partition.
|
|
"""
|
|
query = """select l_orderkey from (
|
|
select * from lineitem limit 300000
|
|
union all
|
|
select * from lineitem limit 300000) t
|
|
order by l_orderkey"""
|
|
|
|
exec_option = copy(vector.get_value('exec_option'))
|
|
exec_option['disable_outermost_topn'] = 1
|
|
# Run with a single scanner thread so that the input doesn't get reordered.
|
|
exec_option['num_nodes'] = "1"
|
|
exec_option['num_scanner_threads'] = "1"
|
|
table_format = vector.get_value('table_format')
|
|
|
|
result = transpose_results(self.execute_query(
|
|
query, exec_option, table_format=table_format).data)
|
|
numeric_results = [int(val) for val in result[0]]
|
|
assert(numeric_results == sorted(numeric_results))
|
|
|
|
def test_spill_empty_strings(self, vector):
|
|
"""Test corner case of spilling sort with only empty strings. Spilling with var len
|
|
slots typically means the sort must reorder blocks and convert pointers, but this case
|
|
has to be handled differently because there are no var len blocks to point into."""
|
|
|
|
query = """
|
|
select empty_str, l_orderkey, l_partkey, l_suppkey,
|
|
l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax
|
|
from (select substr(l_comment, 1000, 0) empty_str, * from lineitem) t
|
|
order by empty_str, l_orderkey, l_partkey, l_suppkey, l_linenumber
|
|
limit 100000
|
|
"""
|
|
|
|
exec_option = copy(vector.get_value('exec_option'))
|
|
exec_option['disable_outermost_topn'] = 1
|
|
exec_option['buffer_pool_limit'] = "256m"
|
|
exec_option['num_nodes'] = "1"
|
|
table_format = vector.get_value('table_format')
|
|
|
|
result = transpose_results(self.execute_query(
|
|
query, exec_option, table_format=table_format).data)
|
|
assert(result[0] == sorted(result[0]))
|
|
|
|
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
|
|
def test_sort_reservation_usage(self, vector):
|
|
"""Tests for sorter reservation usage."""
|
|
new_vector = deepcopy(vector)
|
|
# Run with num_nodes=1 to make execution more deterministic.
|
|
new_vector.get_value('exec_option')['num_nodes'] = 1
|
|
self.run_test_case('sort-reservation-usage-single-node', new_vector)
|
|
|
|
class TestRandomSort(ImpalaTestSuite):
|
|
@classmethod
|
|
def get_workload(self):
|
|
return 'functional'
|
|
|
|
def test_order_by_random(self):
|
|
"""Tests that 'order by random()' works as expected."""
|
|
# "order by random()" with different seeds should produce different orderings.
|
|
seed_query = "select * from functional.alltypestiny order by random(%s)"
|
|
results_seed0 = self.execute_query(seed_query % "0")
|
|
results_seed1 = self.execute_query(seed_query % "1")
|
|
assert results_seed0.data != results_seed1.data
|
|
assert sorted(results_seed0.data) == sorted(results_seed1.data)
|
|
|
|
# Include "random()" in the select list to check that it's sorted correctly.
|
|
results = transpose_results(self.execute_query(
|
|
"select random() as r from functional.alltypessmall order by r").data,
|
|
lambda x: float(x))
|
|
assert(results[0] == sorted(results[0]))
|
|
|
|
# Like above, but with a limit.
|
|
results = transpose_results(self.execute_query(
|
|
"select random() as r from functional.alltypes order by r limit 100").data,
|
|
lambda x: float(x))
|
|
assert(results == sorted(results))
|
|
|
|
# "order by random()" inside an inline view.
|
|
query = "select r from (select random() r from functional.alltypessmall) v order by r"
|
|
results = transpose_results(self.execute_query(query).data, lambda x: float(x))
|
|
assert (results == sorted(results))
|
|
|
|
def test_analytic_order_by_random(self):
|
|
"""Tests that a window function over 'order by random()' works as expected."""
|
|
# Since we use the same random seed, the results should be returned in order.
|
|
query = """select last_value(rand(2)) over (order by rand(2)) from
|
|
functional.alltypestiny"""
|
|
results = transpose_results(self.execute_query(query).data, lambda x: float(x))
|
|
assert (results == sorted(results))
|
|
|
|
|
|
class TestPartialSort(ImpalaTestSuite):
|
|
"""Test class to do functional validation of partial sorts."""
|
|
|
|
def test_partial_sort_min_reservation(self, unique_database):
|
|
"""Test that the partial sort node can operate if it only gets its minimum
|
|
memory reservation."""
|
|
table_name = "%s.kudu_test" % unique_database
|
|
self.client.set_configuration_option(
|
|
"debug_action", "-1:OPEN:SET_DENY_RESERVATION_PROBABILITY@1.0")
|
|
self.execute_query("""create table %s (col0 string primary key)
|
|
partition by hash(col0) partitions 8 stored as kudu""" % table_name)
|
|
result = self.execute_query(
|
|
"insert into %s select string_col from functional.alltypessmall" % table_name)
|
|
assert "PARTIAL SORT" in result.runtime_profile, result.runtime_profile
|