mirror of
https://github.com/apache/impala.git
synced 2025-12-25 02:03:09 -05:00
IMPALA-8818: Replace deque with spillable queue in BufferedPRS
Replaces DequeRowBatchQueue with SpillableRowBatchQueue in BufferedPlanRootSink. A few changes to BufferedPlanRootSink were necessary for it to work with the spillable queue, however, all the synchronization logic is the same. SpillableRowBatchQueue is a wrapper around a BufferedTupleStream and a ReservationManager. It takes in a TBackendResourceProfile that specifies the max / min memory reservation the BufferedTupleStream can use to buffer rows. The 'max_unpinned_bytes' parameter limits the max number of bytes that can be unpinned in the BufferedTupleStream. The limit is a 'soft' limit because calls to AddBatch may push the amount of unpinned memory over the limit. The queue is non-blocking and not thread safe. It provides AddBatch and GetBatch methods. Calls to AddBatch spill if the BufferedTupleStream does not have enough reservation to fit the entire RowBatch. Adds two new query options: 'MAX_PINNED_RESULT_SPOOLING_MEMORY' and 'MAX_UNPINNED_RESULT_SPOOLING_MEMORY', which bound the amount of pinned and unpinned memory that a query can use for spooling, respectively. MAX_PINNED_RESULT_SPOOLING_MEMORY must be <= MAX_UNPINNED_RESULT_SPOOLING_MEMORY in order to allow all the pinned data in the BufferedTupleStream to be unpinned. This is enforced in a new method in QueryOptions called 'ValidateQueryOptions'. Planner Changes: PlanRootSink.java now computes a full ResourceProfile if result spooling is enabled. The min mem reservation is bounded by the size of the read and write pages used by the BufferedTupleStream. The max mem reservation is bounded by 'MAX_PINNED_RESULT_SPOOLING_MEMORY'. The mem estimate is computed by estimating the size of the result set using stats. BufferedTupleStream Re-Factoring: For the most part, using a BufferedTupleStream outside an ExecNode works properly. However, some changes were necessary: * The message for the MAX_ROW_SIZE error is ExecNode specific. In order to fix this, this patch introduces the concept of an ExecNode 'label' which is a more generic version of an ExecNode 'id'. * The definition of TBackendResourceProfile lived in PlanNodes.thrift, it was moved to its own file so it can be used by DataSinks.thrift. * Modified BufferedTupleStream so it internally tracks how many bytes are unpinned (necessary for 'MAX_UNPINNED_RESULT_SPOOLING_MEMORY'). Metrics: * Added a few of the metrics mentioned in IMPALA-8825 to BufferedPlanRootSink. Specifically, added timers to track how much time is spent waiting in the BufferedPlanRootSink 'Send' and 'GetNext' methods. * The BufferedTupleStream in the SpillableRowBatchQueue exposes several BufferPool metrics such as number of reserved and unpinned bytes. Bug Fixes: * Fixed a bug in BufferedPlanRootSink where the MemPool used by the expression evaluators was not being cleared incrementally. * Fixed a bug where the inactive timer was not being properly updated in BufferedPlanRootSink. * Fixed a bug where RowBatch memory was not freed if BufferedPlanRootSink::GetNext terminated early because it could not handle requests where num_results < BATCH_SIZE. Testing: * Added new tests to test_result_spooling.py. * Updated errors thrown in spilling-large-rows.test. * Ran exhaustive tests. Change-Id: I10f9e72374cdf9501c0e5e2c5b39c13688ae65a9 Reviewed-on: http://gerrit.cloudera.org:8080/14039 Reviewed-by: Sahil Takiar <stakiar@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
committed by
Impala Public Jenkins
parent
af0e04f33b
commit
d037ac8304
@@ -15,16 +15,21 @@
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import re
|
||||
import time
|
||||
import threading
|
||||
|
||||
from time import sleep
|
||||
from tests.common.errors import Timeout
|
||||
from tests.common.impala_test_suite import ImpalaTestSuite
|
||||
from tests.common.test_vector import ImpalaTestDimension
|
||||
from tests.util.cancel_util import cancel_query_and_validate_state
|
||||
|
||||
# Queries to execute, use the TPC-H dataset because tables are large so queries take some
|
||||
# time to execute.
|
||||
CANCELLATION_QUERIES = ['select l_returnflag from tpch_parquet.lineitem',
|
||||
'select * from tpch_parquet.lineitem limit 50',
|
||||
'select * from tpch_parquet.lineitem order by l_orderkey']
|
||||
CANCELLATION_QUERIES = ["select l_returnflag from tpch_parquet.lineitem",
|
||||
"select * from tpch_parquet.lineitem limit 50",
|
||||
"select * from tpch_parquet.lineitem order by l_orderkey"]
|
||||
|
||||
# Time to sleep between issuing query and canceling.
|
||||
CANCEL_DELAY_IN_SECONDS = [0, 0.01, 0.1, 1, 4]
|
||||
@@ -34,6 +39,8 @@ class TestResultSpooling(ImpalaTestSuite):
|
||||
@classmethod
|
||||
def add_test_dimensions(cls):
|
||||
super(TestResultSpooling, cls).add_test_dimensions()
|
||||
# Result spooling should be independent of file format, so only test against
|
||||
# Parquet files.
|
||||
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
||||
v.get_value('table_format').file_format == 'parquet')
|
||||
|
||||
@@ -48,10 +55,126 @@ class TestResultSpooling(ImpalaTestSuite):
|
||||
"""Validates that reading multiple row batches works when result spooling is
|
||||
enabled."""
|
||||
vector.get_value('exec_option')['batch_size'] = 10
|
||||
self.validate_query("select id from functional_parquet.alltypes order by id "
|
||||
"limit 1000", vector.get_value('exec_option'))
|
||||
self.__validate_query("select id from functional_parquet.alltypes order by id "
|
||||
"limit 1000", vector.get_value('exec_option'))
|
||||
|
||||
def validate_query(self, query, exec_options):
|
||||
def test_spilling(self, vector):
|
||||
"""Tests that query results which don't fully fit into memory are spilled to disk.
|
||||
The test runs a query asynchronously and wait for the PeakUnpinnedBytes counter in
|
||||
the PLAN_ROOT_SINK section of the runtime profile to reach a non-zero value. Then
|
||||
it fetches all the results and validates them."""
|
||||
query = "select * from functional.alltypes order by id limit 1500"
|
||||
exec_options = vector.get_value('exec_option')
|
||||
|
||||
# Set lower values for spill-to-disk configs to force the above query to spill
|
||||
# spooled results.
|
||||
exec_options['min_spillable_buffer_size'] = 8 * 1024
|
||||
exec_options['default_spillable_buffer_size'] = 8 * 1024
|
||||
exec_options['max_result_spooling_mem'] = 32 * 1024
|
||||
|
||||
# Execute the query without result spooling and save the results for later validation
|
||||
base_result = self.execute_query(query, exec_options)
|
||||
assert base_result.success, "Failed to run {0} when result spooling is disabled" \
|
||||
.format(query)
|
||||
|
||||
exec_options['spool_query_results'] = 'true'
|
||||
|
||||
# Amount of time to wait for the PeakUnpinnedBytes counter in the PLAN_ROOT_SINK
|
||||
# section of the profile to reach a non-zero value.
|
||||
timeout = 10
|
||||
|
||||
# Regexes to look for in the runtime profiles.
|
||||
# PeakUnpinnedBytes can show up in exec nodes as well, so we only look for the
|
||||
# PeakUnpinnedBytes metrics in the PLAN_ROOT_SINK section of the profile.
|
||||
unpinned_bytes_regex = "PLAN_ROOT_SINK[\s\S]*?PeakUnpinnedBytes.*\([1-9][0-9]*\)"
|
||||
# The PLAN_ROOT_SINK should have 'Spilled' in the 'ExecOption' info string.
|
||||
spilled_exec_option_regex = "ExecOption:.*Spilled"
|
||||
|
||||
# Fetch the runtime profile every 0.5 seconds until either the timeout is hit, or
|
||||
# PeakUnpinnedBytes shows up in the profile.
|
||||
start_time = time.time()
|
||||
handle = self.execute_query_async(query, exec_options)
|
||||
try:
|
||||
while re.search(unpinned_bytes_regex, self.client.get_runtime_profile(handle)) \
|
||||
is None and time.time() - start_time < timeout:
|
||||
time.sleep(0.5)
|
||||
profile = self.client.get_runtime_profile(handle)
|
||||
if re.search(unpinned_bytes_regex, profile) is None:
|
||||
raise Timeout("Query {0} did not spill spooled results within the timeout {1}"
|
||||
.format(query, timeout))
|
||||
# At this point PLAN_ROOT_SINK must have spilled, so spilled_exec_option_regex
|
||||
# should be in the profile as well.
|
||||
assert re.search(spilled_exec_option_regex, profile)
|
||||
result = self.client.fetch(query, handle)
|
||||
assert result.data == base_result.data
|
||||
finally:
|
||||
self.client.close_query(handle)
|
||||
|
||||
def test_full_queue(self, vector):
|
||||
"""Tests result spooling when there is no more space to buffer query results (the
|
||||
queue is full), and the client hasn't fetched any results. Validates that
|
||||
RowBatchSendWaitTime (amount of time Impala blocks waiting for the client to read
|
||||
buffered results and clear up space in the queue) is updated properly."""
|
||||
query = "select * from functional.alltypes order by id limit 1500"
|
||||
exec_options = vector.get_value('exec_option')
|
||||
|
||||
# Set lower values for spill-to-disk and result spooling configs so that the queue
|
||||
# gets full when selecting a small number of rows.
|
||||
exec_options['min_spillable_buffer_size'] = 8 * 1024
|
||||
exec_options['default_spillable_buffer_size'] = 8 * 1024
|
||||
exec_options['max_result_spooling_mem'] = 32 * 1024
|
||||
exec_options['max_spilled_result_spooling_mem'] = 32 * 1024
|
||||
exec_options['spool_query_results'] = 'true'
|
||||
|
||||
# Amount of time to wait for the query to reach a running state before through a
|
||||
# Timeout exception.
|
||||
timeout = 10
|
||||
# Regex to look for in the runtime profile.
|
||||
send_wait_time_regex = "RowBatchSendWaitTime: [1-9]"
|
||||
|
||||
# Execute the query asynchronously, wait a bit for the result spooling queue to fill
|
||||
# up, start fetching results, and then validate that RowBatchSendWaitTime shows a
|
||||
# non-zero value in the profile.
|
||||
handle = self.execute_query_async(query, exec_options)
|
||||
try:
|
||||
self.wait_for_any_state(handle, [self.client.QUERY_STATES['RUNNING'],
|
||||
self.client.QUERY_STATES['FINISHED']], timeout)
|
||||
time.sleep(5)
|
||||
self.client.fetch(query, handle)
|
||||
assert re.search(send_wait_time_regex, self.client.get_runtime_profile(handle)) \
|
||||
is not None
|
||||
finally:
|
||||
self.client.close_query(handle)
|
||||
|
||||
def test_slow_query(self, vector):
|
||||
"""Tests results spooling when the client is blocked waiting for Impala to add more
|
||||
results to the queue. Validates that RowBatchGetWaitTime (amount of time the client
|
||||
spends waiting for Impala to buffer query results) is updated properly."""
|
||||
query = "select id from functional.alltypes order by id limit 10"
|
||||
|
||||
# Add a delay to the EXCHANGE_NODE in the query above to simulate a "slow" query. The
|
||||
# delay should give the client enough time to issue a fetch request and block until
|
||||
# Impala produces results.
|
||||
vector.get_value('exec_option')['debug_action'] = '2:GETNEXT:DELAY'
|
||||
vector.get_value('exec_option')['spool_query_results'] = 'true'
|
||||
|
||||
# Regex to look for in the runtime profile.
|
||||
get_wait_time_regex = "RowBatchGetWaitTime: [1-9]"
|
||||
|
||||
# Execute the query, start a thread to fetch results, wait for the query to finish,
|
||||
# and then validate that RowBatchGetWaitTime shows a non-zero value in the profile.
|
||||
handle = self.execute_query_async(query, vector.get_value('exec_option'))
|
||||
try:
|
||||
thread = threading.Thread(target=lambda:
|
||||
self.create_impala_client().fetch(query, handle))
|
||||
thread.start()
|
||||
self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 10)
|
||||
assert re.search(get_wait_time_regex, self.client.get_runtime_profile(handle)) \
|
||||
is not None
|
||||
finally:
|
||||
self.client.close_query(handle)
|
||||
|
||||
def __validate_query(self, query, exec_options):
|
||||
"""Compares the results of the given query with and without result spooling
|
||||
enabled."""
|
||||
exec_options = exec_options.copy()
|
||||
@@ -114,6 +237,6 @@ class TestResultSpoolingCancellation(ImpalaTestSuite):
|
||||
sleep(vector.get_value('cancel_delay'))
|
||||
cancel_result = self.client.cancel(handle)
|
||||
assert cancel_result.status_code == 0,\
|
||||
'Unexpected status code from cancel request: {0}'.format(cancel_result)
|
||||
"Unexpected status code from cancel request: {0}".format(cancel_result)
|
||||
finally:
|
||||
if handle: self.client.close_query(handle)
|
||||
|
||||
Reference in New Issue
Block a user