mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
This is the final patch to move all Impala e2e and custom cluster tests to use HS2 protocol by default. Only beeswax-specific test remains testing against beeswax protocol by default. We can remove them once Impala officially remove beeswax support. HS2 error message formatting in impala-hs2-server.cc is adjusted a bit to match with formatting in impala-beeswax-server.cc. Move TestWebPageAndCloseSession from webserver/test_web_pages.py to custom_cluster/test_web_pages.py to disable glog log buffering. Testing: - Pass exhaustive tests, except for some known and unrelated flaky tests. Change-Id: I42e9ceccbba1e6853f37e68f106265d163ccae28 Reviewed-on: http://gerrit.cloudera.org:8080/22845 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Jason Fehr <jfehr@cloudera.com>
519 lines
25 KiB
Python
519 lines
25 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
import pytest
|
|
import re
|
|
import time
|
|
|
|
from time import sleep
|
|
from tests.common.errors import Timeout
|
|
from tests.common.impala_connection import FINISHED
|
|
from tests.common.impala_test_suite import ImpalaTestSuite
|
|
from tests.common.test_dimensions import create_exec_option_dimension
|
|
from tests.common.test_vector import HS2, ImpalaTestDimension
|
|
from tests.util.cancel_util import cancel_query_and_validate_state
|
|
from tests.util.failpoints_util import execute_query_expect_debug_action_failure
|
|
|
|
|
|
class TestResultSpooling(ImpalaTestSuite):
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestResultSpooling, cls).add_test_dimensions()
|
|
# Result spooling should be independent of file format, so only test against
|
|
# Parquet files.
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
v.get_value('table_format').file_format == 'parquet')
|
|
|
|
def test_result_spooling(self, vector):
|
|
self.run_test_case('QueryTest/result-spooling', vector)
|
|
|
|
def test_multi_batches(self, vector):
|
|
"""Validates that reading multiple row batches works when result spooling is
|
|
enabled."""
|
|
vector.get_value('exec_option')['batch_size'] = 10
|
|
self.__validate_query("select id from functional_parquet.alltypes order by id "
|
|
"limit 1000", vector.get_value('exec_option'))
|
|
|
|
def test_spilling(self, vector):
|
|
"""Tests that query results which don't fully fit into memory are spilled to disk.
|
|
The test runs a query asynchronously and wait for the PeakUnpinnedBytes counter in
|
|
the PLAN_ROOT_SINK section of the runtime profile to reach a non-zero value. Then
|
|
it fetches all the results and validates them."""
|
|
query = "select * from functional.alltypes order by id limit 1500"
|
|
exec_options = vector.get_value('exec_option')
|
|
|
|
# Set lower values for spill-to-disk configs to force the above query to spill
|
|
# spooled results.
|
|
exec_options['min_spillable_buffer_size'] = 8 * 1024
|
|
exec_options['default_spillable_buffer_size'] = 8 * 1024
|
|
exec_options['max_result_spooling_mem'] = 32 * 1024
|
|
exec_options['max_row_size'] = 16 * 1024
|
|
|
|
# Execute the query without result spooling and save the results for later validation
|
|
base_result = self.execute_query(query, exec_options)
|
|
assert base_result.success, "Failed to run {0} when result spooling is disabled" \
|
|
.format(query)
|
|
|
|
exec_options['spool_query_results'] = 'true'
|
|
|
|
# Amount of time to wait for the PeakUnpinnedBytes counter in the PLAN_ROOT_SINK
|
|
# section of the profile to reach a non-zero value.
|
|
timeout = 30
|
|
|
|
# Regexes to look for in the runtime profiles.
|
|
# PeakUnpinnedBytes can show up in exec nodes as well, so we only look for the
|
|
# PeakUnpinnedBytes metrics in the PLAN_ROOT_SINK section of the profile.
|
|
unpinned_bytes_regex = r"PLAN_ROOT_SINK[\s\S]*?PeakUnpinnedBytes.*\([1-9][0-9]*\)"
|
|
# The PLAN_ROOT_SINK should have 'Spilled' in the 'ExecOption' info string.
|
|
spilled_exec_option_regex = "ExecOption:.*Spilled"
|
|
# PLAN_ROOT_SINK's reservation limit should be set at MAX_RESULT_SPOOLING_MEM = 32 KB.
|
|
plan_root_sink_reservation_limit = (r"PLAN_ROOT_SINK[\s\S]*?ReservationLimit: "
|
|
r"32.00 KB")
|
|
|
|
# Fetch the runtime profile every 0.5 seconds until either the timeout is hit, or
|
|
# PeakUnpinnedBytes shows up in the profile.
|
|
start_time = time.time()
|
|
handle = self.execute_query_async(query, exec_options)
|
|
try:
|
|
while not re.search(unpinned_bytes_regex, self.client.get_runtime_profile(handle)) \
|
|
and time.time() - start_time < timeout:
|
|
time.sleep(0.5)
|
|
profile = self.client.get_runtime_profile(handle)
|
|
if not re.search(unpinned_bytes_regex, profile):
|
|
raise Timeout("Query {0} did not spill spooled results within the timeout {1}"
|
|
.format(query, timeout))
|
|
# At this point PLAN_ROOT_SINK must have spilled, so spilled_exec_option_regex
|
|
# should be in the profile as well.
|
|
assert re.search(spilled_exec_option_regex, profile)
|
|
# Check that PLAN_ROOT_SINK reservation limit is set accordingly.
|
|
assert re.search(plan_root_sink_reservation_limit, profile)
|
|
result = self.client.fetch(query, handle)
|
|
assert result.data == base_result.data
|
|
finally:
|
|
self.client.close_query(handle)
|
|
|
|
@pytest.mark.xfail(run=False, reason="IMPALA-8926")
|
|
def test_full_queue(self, vector):
|
|
"""Delegates to _test_full_queue."""
|
|
query = "select * from functional.alltypes order by id limit 1500"
|
|
self._test_full_queue(vector, query)
|
|
|
|
@pytest.mark.xfail(run=False, reason="IMPALA-8926")
|
|
def test_full_queue_large_fetch(self, vector):
|
|
"""Delegates to _test_full_queue, but specifies a fetch size equal to the number of
|
|
rows returned by the query. This tests that clients can fetch all rows from a full
|
|
queue."""
|
|
num_rows = 1500
|
|
query = "select * from functional.alltypes order by id limit {0}".format(num_rows)
|
|
self._test_full_queue(vector, query, fetch_size=num_rows)
|
|
|
|
def _test_full_queue(self, vector, query, fetch_size=-1):
|
|
"""Tests result spooling when there is no more space to buffer query results (the
|
|
queue is full), and the client hasn't fetched any results. Validates that
|
|
RowBatchSendWaitTime (amount of time Impala blocks waiting for the client to read
|
|
buffered results and clear up space in the queue) is updated properly."""
|
|
exec_options = vector.get_value('exec_option')
|
|
|
|
# Set lower values for spill-to-disk and result spooling configs so that the queue
|
|
# gets full when selecting a small number of rows.
|
|
exec_options['min_spillable_buffer_size'] = 8 * 1024
|
|
exec_options['default_spillable_buffer_size'] = 8 * 1024
|
|
exec_options['max_result_spooling_mem'] = 32 * 1024
|
|
exec_options['max_spilled_result_spooling_mem'] = 32 * 1024
|
|
exec_options['spool_query_results'] = 'true'
|
|
|
|
# Amount of time to wait for the query to reach a running state before through a
|
|
# Timeout exception.
|
|
timeout = 10
|
|
# Regexes to look for in the runtime profile.
|
|
send_wait_time_regex = "RowBatchSendWaitTime: [1-9]"
|
|
queue_spilled_regex = r"PLAN_ROOT_SINK[\s\S]*?ExecOption: Spilled"
|
|
|
|
# Execute the query asynchronously, wait for the queue to fill up, start fetching
|
|
# results, and then validate that RowBatchSendWaitTime shows a non-zero value in the
|
|
# profile. Result spooling is configured so that max_result_spooling_mem ==
|
|
# max_spilled_result_spooling_mem. This means that once the queue spills, the queue
|
|
# is guaranteed to be full.
|
|
handle = self.execute_query_async(query, exec_options)
|
|
try:
|
|
self.client.wait_for_impala_state(handle, FINISHED, timeout)
|
|
self.assert_eventually(30, 1, lambda: re.search(queue_spilled_regex,
|
|
self.client.get_runtime_profile(handle)))
|
|
# A fetch request is necessary to unblock the producer thread and trigger an update
|
|
# of the RowBatchSendWaitTime counter.
|
|
assert self.client.fetch(query, handle, max_rows=fetch_size).success
|
|
assert re.search(send_wait_time_regex, self.client.get_runtime_profile(handle))
|
|
finally:
|
|
self.client.close_query(handle)
|
|
|
|
def test_slow_query(self, vector):
|
|
"""Tests results spooling when the client is blocked waiting for Impala to add more
|
|
results to the queue. Validates that RowBatchGetWaitTime (amount of time the client
|
|
spends waiting for Impala to buffer query results) is updated properly."""
|
|
query = "select id from functional.alltypes"
|
|
|
|
# Add a delay to the SCAN_NODE in the query above to simulate a "slow" query. The
|
|
# delay should give the client enough time to issue a fetch request and block until
|
|
# Impala produces results.
|
|
vector.get_value('exec_option')['debug_action'] = '0:GETNEXT:DELAY'
|
|
vector.get_value('exec_option')['spool_query_results'] = 'true'
|
|
|
|
# Regex to look for in the runtime profile.
|
|
get_wait_time_regex = "RowBatchGetWaitTime: [1-9]"
|
|
|
|
# Execute the query asynchronously, fetch results, wait for the query to finish,
|
|
# and then validate that RowBatchGetWaitTime shows a non-zero value in the profile.
|
|
handle = self.execute_query_async(query, vector.get_value('exec_option'))
|
|
try:
|
|
self.client.wait_for_admission_control(handle, 60)
|
|
self.client.fetch(query, handle)
|
|
self.client.wait_for_impala_state(handle, FINISHED, 10)
|
|
assert re.search(get_wait_time_regex, self.client.get_runtime_profile(handle))
|
|
finally:
|
|
self.client.close_query(handle)
|
|
|
|
def test_exec_tree_failpoint(self, vector):
|
|
"""Inject a failure during exec tree execution. The GETNEXT:DELAY is necessary to
|
|
ensure the client issues a fetch request before the MEM_LIMIT_EXCEEDED exception is
|
|
thrown. Unlike the tests in TestResultSpoolingFailpoints this test injects a fail
|
|
during the execution of the exec tree, rather than in the result spooling code."""
|
|
vector.get_value('exec_option')['batch_size'] = 10
|
|
vector.get_value('exec_option')['debug_action'] = \
|
|
'4:GETNEXT:MEM_LIMIT_EXCEEDED|0:GETNEXT:DELAY'
|
|
vector.get_value('exec_option')['spool_query_results'] = 'true'
|
|
query = "select 1 from functional.alltypessmall a join functional.alltypessmall b " \
|
|
"on a.id = b.id"
|
|
execute_query_expect_debug_action_failure(self, query, vector)
|
|
|
|
def __validate_query(self, query, exec_options):
|
|
"""Compares the results of the given query with and without result spooling
|
|
enabled."""
|
|
exec_options = exec_options.copy()
|
|
result = self.execute_query(query, exec_options)
|
|
assert result.success, "Failed to run {0} when result spooling is " \
|
|
"disabled".format(query)
|
|
base_data = result.data
|
|
exec_options['spool_query_results'] = 'true'
|
|
result = self.execute_query(query, exec_options)
|
|
assert result.success, "Failed to run {0} when result spooling is " \
|
|
"enabled".format(query)
|
|
assert len(result.data) == len(base_data), "{0} returned a different number of " \
|
|
"results when result spooling was " \
|
|
"enabled".format(query)
|
|
assert result.data == base_data, "{0} returned different results when result " \
|
|
"spooling was enabled".format(query)
|
|
|
|
|
|
class TestResultSpoolingFetchSize(ImpalaTestSuite):
|
|
"""Tests fetching logic when result spooling is enabled. When result spooling is
|
|
disabled, Impala only supports fetching up to BATCH_SIZE rows at a time (since only
|
|
one RowBatch is ever buffered). When result spooling is enabled, clients can specify
|
|
any fetch size (up to a limit) and Impala will return exactly that number of rows
|
|
(assuming there are that many rows left to fetch). This class validates the
|
|
aformentioned result spooling fetch logic using different fetch and batch sizes."""
|
|
|
|
# The different values of BATCH_SIZE 'test_fetch' will be parameterized by.
|
|
_batch_sizes = [100, 1024, 2048]
|
|
|
|
# The number of rows to fetch from the query handle.
|
|
_fetch_sizes = [7, 23, 321, 512, 2048, 4321, 5000, 10000]
|
|
|
|
# The number of rows in functional_parquet.alltypes.
|
|
_num_rows = 7300
|
|
|
|
# The query that 'test_fetch' will run.
|
|
_query = "select id from functional_parquet.alltypes order by id"
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestResultSpoolingFetchSize, cls).add_test_dimensions()
|
|
# Create a test matrix with three different dimensions: BATCH_SIZE, the number of
|
|
# rows to fetch at a time, and whether the tests should wait for all results to be
|
|
# spooled before fetching any rows.
|
|
cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
|
|
batch_sizes=cls._batch_sizes))
|
|
cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('fetch_size',
|
|
*cls._fetch_sizes))
|
|
cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('wait_for_finished',
|
|
*[True, False]))
|
|
|
|
# Result spooling should be independent of file format, so only testing for
|
|
# table_format=parquet/none in order to avoid a test dimension explosion.
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
v.get_value('table_format').file_format == 'parquet'
|
|
and v.get_value('table_format').compression_codec == 'none')
|
|
|
|
@classmethod
|
|
def setup_class(cls):
|
|
super(TestResultSpoolingFetchSize, cls).setup_class()
|
|
# All tests only ever run a single query, so rather than re-run this query for every
|
|
# test, run it once and store the results.
|
|
base_result = cls.client.execute(cls._query)
|
|
assert base_result.success, "Failed to run {0} when result spooling is " \
|
|
"enabled".format(cls._query)
|
|
cls._base_data = base_result.data
|
|
|
|
def test_fetch(self, vector):
|
|
"""Run '_query' with result spooling enabled and with the specified BATCH_SIZE. Use
|
|
the 'fetch_size' parameter to determine how many rows to fetch from the query handle
|
|
at a time. Fetch all results and then validate they match '_base_data'."""
|
|
exec_options = vector.get_value('exec_option')
|
|
exec_options['spool_query_results'] = 'true'
|
|
fetch_size = vector.get_value('fetch_size')
|
|
|
|
# Amount of time to wait for the query to reach a running state before through a
|
|
# Timeout exception.
|
|
timeout = 30
|
|
|
|
results = []
|
|
handle = self.execute_query_async(self._query, exec_options)
|
|
try:
|
|
# If 'wait_for_finished' is True, wait for the query to reach the FINISHED state.
|
|
# When it reaches this state all results should be successfully spooled.
|
|
if vector.get_value('wait_for_finished'):
|
|
self.client.wait_for_impala_state(handle, FINISHED, timeout)
|
|
rows_fetched = 0
|
|
|
|
# Call 'fetch' on the query handle enough times to read all rows.
|
|
while rows_fetched < self._num_rows:
|
|
result_data = self.client.fetch(self._query, handle, fetch_size).data
|
|
# Assert that each fetch request returns exactly the number of rows requested,
|
|
# unless less than that many rows were left in the result set.
|
|
assert len(result_data) == min(fetch_size, self._num_rows - rows_fetched)
|
|
rows_fetched += len(result_data)
|
|
results.extend(result_data)
|
|
finally:
|
|
self.client.close_query(handle)
|
|
|
|
# Assert that the fetched results match the '_base_data'.
|
|
assert self._num_rows == rows_fetched
|
|
assert self._base_data == results
|
|
|
|
|
|
class TestResultSpoolingCancellation(ImpalaTestSuite):
|
|
"""Test cancellation of queries when result spooling is enabled. This class heavily
|
|
borrows from the cancellation tests in test_cancellation.py. It uses the following test
|
|
dimensions: 'query' and 'cancel_delay'. 'query' is a list of queries to run
|
|
asynchronously and then cancel. 'cancel_delay' controls how long a query should run
|
|
before being cancelled.
|
|
"""
|
|
|
|
# Queries to execute, use the TPC-H dataset because tables are large so queries take
|
|
# some time to execute.
|
|
_cancellation_queries = ["select l_returnflag from tpch_parquet.lineitem",
|
|
"select * from tpch_parquet.lineitem limit 50",
|
|
"select * from tpch_parquet.lineitem order by l_orderkey"]
|
|
|
|
# Time to sleep between issuing query and canceling.
|
|
_cancel_delay_in_seconds = [0, 0.01, 0.1, 1, 4]
|
|
|
|
@classmethod
|
|
def default_test_protocol(cls):
|
|
return HS2
|
|
|
|
@classmethod
|
|
def get_workload(cls):
|
|
return 'tpch'
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestResultSpoolingCancellation, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('query',
|
|
*cls._cancellation_queries))
|
|
cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('cancel_delay',
|
|
*cls._cancel_delay_in_seconds))
|
|
|
|
# Result spooling should be independent of file format, so only testing for
|
|
# table_format=parquet/none in order to avoid a test dimension explosion.
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
v.get_value('table_format').file_format == 'parquet'
|
|
and v.get_value('table_format').compression_codec == 'none')
|
|
|
|
def test_cancellation(self, vector):
|
|
vector.get_value('exec_option')['spool_query_results'] = 'true'
|
|
cancel_query_and_validate_state(
|
|
vector.get_value('query'),
|
|
vector.get_value('exec_option'), vector.get_value('table_format'),
|
|
vector.get_value('cancel_delay'))
|
|
|
|
def test_cancel_no_fetch(self, vector):
|
|
"""Test cancelling a query before any results are fetched. Unlike the
|
|
test_cancellation test, the query is cancelled before results are
|
|
fetched (there is no fetch thread)."""
|
|
vector.get_value('exec_option')['spool_query_results'] = 'true'
|
|
handle = None
|
|
try:
|
|
handle = self.execute_query_async(vector.get_value('query'),
|
|
vector.get_value('exec_option'))
|
|
sleep(vector.get_value('cancel_delay'))
|
|
self.client.cancel(handle)
|
|
finally:
|
|
if handle: self.client.close_query(handle)
|
|
|
|
|
|
class TestResultSpoolingFailpoints(ImpalaTestSuite):
|
|
"""Test result spooling failure handling. Uses debug actions to inject failures at
|
|
various points of result spooling execution (e.g. the when results are actually getting
|
|
spooled)."""
|
|
|
|
_debug_actions = [
|
|
# Inject a failure in BufferedPlanRootSink::Open.
|
|
'BPRS_BEFORE_OPEN:FAIL',
|
|
# Inject a failure immediately before BufferedPlanRootSink::Send adds a batch to
|
|
# the queue. The probability ensures that the error is thrown on a random
|
|
# RowBatch.
|
|
'BPRS_BEFORE_ADD_BATCH:FAIL@1.0',
|
|
# Inject a failure in BufferedPlanRootSink::FlushFinal.
|
|
'BPRS_BEFORE_FLUSH_FINAL:FAIL',
|
|
# Inject a failure immediately before the BufferedPlanRootSink::GetNext reads a
|
|
# batch from the queue. The probability ensures that the error is thrown on a
|
|
# random RowBatch.
|
|
'BPRS_BEFORE_GET_BATCH:FAIL@1.0']
|
|
|
|
_query = "select * from functional.alltypes"
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestResultSpoolingFailpoints, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('debug_action',
|
|
*cls._debug_actions))
|
|
|
|
# Result spooling should be independent of file format, so only testing for
|
|
# table_format=parquet/none in order to avoid a test dimension explosion.
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
v.get_value('table_format').file_format == 'parquet'
|
|
and v.get_value('table_format').compression_codec == 'none')
|
|
|
|
def test_failpoints(self, vector):
|
|
vector.get_value('exec_option')['batch_size'] = 10
|
|
vector.get_value('exec_option')['debug_action'] = vector.get_value('debug_action')
|
|
vector.get_value('exec_option')['spool_query_results'] = 'true'
|
|
execute_query_expect_debug_action_failure(self, self._query, vector)
|
|
|
|
|
|
class TestResultSpoolingMaxReservation(ImpalaTestSuite):
|
|
"""These tests verify that while calculating max_reservation for spooling these query
|
|
options are taken into account: MAX_ROW_SIZE, MAX_RESULT_SPOOLING_MEM and
|
|
DEFAULT_SPILLABLE_BUFFER_SIZE."""
|
|
|
|
# Test with denial of reservations at varying frequency.
|
|
# Always test with the minimal amount of spilling and running with the absolute minimum
|
|
# memory requirement.
|
|
DEBUG_ACTION_VALUES = [None, '-1:OPEN:SET_DENY_RESERVATION_PROBABILITY@1.0']
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestResultSpoolingMaxReservation, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('debug_action',
|
|
*cls.DEBUG_ACTION_VALUES))
|
|
|
|
# Result spooling should be independent of file format, so only testing for
|
|
# table_format=parquet/none in order to avoid a test dimension explosion.
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
v.get_value('table_format').file_format == 'parquet'
|
|
and v.get_value('table_format').compression_codec == 'none')
|
|
|
|
def test_high_max_row_size(self, vector):
|
|
"""Test that when MAX_ROW_SIZE is set, PLAN_ROOT_SINK can adjust its max_reservation
|
|
even though MAX_RESULT_SPOOLING_MEM is set lower."""
|
|
exec_options = vector.get_value('exec_option')
|
|
exec_options['debug_action'] = vector.get_value('debug_action')
|
|
exec_options['spool_query_results'] = 'true'
|
|
exec_options['max_row_size'] = 10 * 1024 * 1024
|
|
exec_options['max_result_spooling_mem'] = 2 * 1024 * 1024
|
|
exec_options['default_spillable_buffer_size'] = 2 * 1024 * 1024
|
|
|
|
# Select 3 wide rows, each with size of 10MB.
|
|
query = "select string_col from functional.widerow " \
|
|
"join functional.tinyinttable where int_col < 3"
|
|
result = self.execute_query(query, exec_options)
|
|
assert result.success, "Failed to run {0} when result spooling is enabled" \
|
|
.format(query)
|
|
|
|
# The PLAN_ROOT_SINK should have 'Spilled' in the 'ExecOption' info string.
|
|
spilled_exec_option_regex = "ExecOption:.*Spilled"
|
|
assert re.search(spilled_exec_option_regex, result.runtime_profile)
|
|
|
|
# PLAN_ROOT_SINK's reservation limit should be set at 2 * MAX_ROW_SIZE.
|
|
plan_root_sink_reservation_limit = (r"PLAN_ROOT_SINK[\s\S]*?ReservationLimit: "
|
|
r"32.00 MB")
|
|
assert re.search(plan_root_sink_reservation_limit, result.runtime_profile)
|
|
|
|
def test_high_default_spillable_buffer(self, vector):
|
|
"""Test that high DEFAULT_SPILLABLE_BUFFER_SIZE wins the calculation for
|
|
PLAN_ROOT_SINK's max_reservation"""
|
|
exec_options = vector.get_value('exec_option')
|
|
exec_options['debug_action'] = vector.get_value('debug_action')
|
|
exec_options['spool_query_results'] = 'true'
|
|
exec_options['max_row_size'] = 8 * 1024
|
|
exec_options['max_result_spooling_mem'] = 8 * 1024
|
|
exec_options['default_spillable_buffer_size'] = 32 * 1024
|
|
self.__run_small_spilling_query(exec_options, "64.00 KB")
|
|
|
|
def test_high_max_result_spooling_mem(self, vector):
|
|
"""Test that high MAX_RESULT_SPOOLING_MEM wins the calculation for
|
|
PLAN_ROOT_SINK's max_reservation"""
|
|
exec_options = vector.get_value('exec_option')
|
|
exec_options['debug_action'] = vector.get_value('debug_action')
|
|
exec_options['spool_query_results'] = 'true'
|
|
exec_options['max_row_size'] = 8 * 1024
|
|
exec_options['max_result_spooling_mem'] = 70 * 1024
|
|
exec_options['default_spillable_buffer_size'] = 8 * 1024
|
|
self.__run_small_spilling_query(exec_options, "70.00 KB")
|
|
|
|
def test_unbounded_result_spooling_mem(self, vector):
|
|
"""Test result spooling against unbounded MAX_RESULT_SPOOLING_MEM and
|
|
MAX_SPILLED_RESULT_SPOOLING_MEM. In this situation, planner should override
|
|
MAX_RESULT_SPOOLING_MEM to its default (100MB) and BufferedPlanRootSink should
|
|
assume MAX_SPILLED_RESULT_SPOOLING_MEM = INT64_MAX."""
|
|
exec_options = vector.get_value('exec_option')
|
|
exec_options['debug_action'] = vector.get_value('debug_action')
|
|
exec_options['spool_query_results'] = 'true'
|
|
exec_options['max_row_size'] = 8 * 1024
|
|
exec_options['max_result_spooling_mem'] = 0
|
|
exec_options['max_spilled_result_spooling_mem'] = 0
|
|
exec_options['default_spillable_buffer_size'] = 8 * 1024
|
|
|
|
query = "select * from functional.alltypes order by id limit 1500"
|
|
result = self.execute_query(query, exec_options)
|
|
assert result.success, "Failed to run {0} when result spooling is enabled" \
|
|
.format(query)
|
|
|
|
# Check that PLAN_ROOT_SINK's reservation limit match the default
|
|
# MAX_RESULT_SPOOLING_MEM.
|
|
plan_root_sink_reservation_limit = r"PLAN_ROOT_SINK[\s\S]*?ReservationLimit: {0}" \
|
|
.format('100.00 MB')
|
|
assert re.search(plan_root_sink_reservation_limit, result.runtime_profile)
|
|
|
|
def __run_small_spilling_query(self, exec_options, expected_limit):
|
|
"""Given an exec_options, test that simple query below spills and PLAN_ROOT_SINK's
|
|
ReservationLimit match with the expected_limit"""
|
|
query = "select * from functional.alltypes order by id limit 1500"
|
|
result = self.execute_query(query, exec_options)
|
|
assert result.success, "Failed to run {0} when result spooling is enabled" \
|
|
.format(query)
|
|
|
|
# The PLAN_ROOT_SINK should have 'Spilled' in the 'ExecOption' info string.
|
|
spilled_exec_option_regex = "ExecOption:.*Spilled"
|
|
assert re.search(spilled_exec_option_regex, result.runtime_profile)
|
|
|
|
# Check that PLAN_ROOT_SINK's reservation limit match.
|
|
plan_root_sink_reservation_limit = r"PLAN_ROOT_SINK[\s\S]*?ReservationLimit: {0}" \
|
|
.format(expected_limit)
|
|
assert re.search(plan_root_sink_reservation_limit, result.runtime_profile)
|