Files
impala/tests/query_test/test_fetch.py
Kurt Deschler aa164e3cbc IMPALA-12176: Improve client fetch metrics
This patch makes multiple improvements to query profile and RPC metrics
to improve observability and allow more detailed analysis of where time
is being spent by client RPCs.

- A new CreateResultSetTime metric has been added to PLAN_ROOT_SINK node
  in the query profile. This timer isolates the cost to convert fetched
  rows to the client protocol.
- Read/Write time is now tracked during client RPC execution and added to
  the rpcz JSON output. A checkbox in the /rpcz Web UI page enables
  display of the Read/Write stats.
- Read and Write time are defined from Thrift callbacks defined in
  apache::thrift::TProcessorEventHandler. Read time includes reading and
  deserializing Thrift RPC args from the transport. Write time includes
  serializing, writing, and flushing Thrift RPC args to the transport.
- Client RPC cost is tracked on a per-query basis and displayed in the
  server profile as RPCCount, RPCReadTimer, and RPCWriteTimer
- Accuracy of RPC histograms is changed from milliseconds to microseconds

Testing:
tests added to test_fetch.py and test_web_pages.py

Change-Id: I986f3f2afac1775274895393969b270cf956b262
Reviewed-on: http://gerrit.cloudera.org:8080/19966
Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2023-06-09 16:59:03 +00:00

235 lines
11 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
import re
from time import sleep
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.test_dimensions import extend_exec_option_dimension
from tests.util.parse_util import parse_duration_string_ms, \
parse_duration_string_ns, get_time_summary_stats_counter
class TestFetch(ImpalaTestSuite):
"""Tests that are independent of whether result spooling is enabled or not."""
@classmethod
def add_test_dimensions(cls):
super(TestFetch, cls).add_test_dimensions()
# Result fetching should be independent of file format, so only test against
# Parquet files.
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format == 'parquet')
@classmethod
def get_workload(cls):
return 'functional-query'
def test_rows_sent_counters(self, vector):
"""Validate that ClientFetchWaitTimer, NumRowsFetched, RowMaterializationRate,
and RowMaterializationTimer are set to valid values in the ImpalaServer section
of the runtime profile."""
num_rows = 25
query = "select sleep(100) from functional.alltypes limit {0}".format(num_rows)
handle = self.execute_query_async(query, vector.get_value('exec_option'))
try:
# Wait until the query is 'FINISHED' and results are available for fetching.
self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 30)
# Sleep for 2.5 seconds so that the ClientFetchWaitTimer is >= 1s.
sleep(2.5)
# Fetch the results so that the fetch related counters are updated.
assert self.client.fetch(query, handle).success
runtime_profile = self.client.get_runtime_profile(handle)
fetch_timer = re.search("ClientFetchWaitTimer: (.*)", runtime_profile)
assert fetch_timer and len(fetch_timer.groups()) == 1 and \
parse_duration_string_ms(fetch_timer.group(1)) > 1000
assert "NumRowsFetched: {0} ({0})".format(num_rows) in runtime_profile
assert re.search("RowMaterializationRate: [1-9]", runtime_profile)
# The query should take at least 1s to materialize all rows since it should sleep
# for at least 1s during materialization.
materialization_timer = re.search("RowMaterializationTimer: (.*)", runtime_profile)
assert materialization_timer and len(materialization_timer.groups()) == 1 and \
parse_duration_string_ms(materialization_timer.group(1)) > 1000
assert re.search("RPCCount: [5-9]", runtime_profile)
rpc_read_timer = re.search("RPCReadTimer: (.*)", runtime_profile)
assert rpc_read_timer and len(rpc_read_timer.groups()) == 1
rpc_read_ns = parse_duration_string_ns(rpc_read_timer.group(1))
assert 0 < rpc_read_ns and rpc_read_ns < 1000000
rpc_write_timer = re.search("RPCWriteTimer: (.*)", runtime_profile)
assert rpc_write_timer and len(rpc_write_timer.groups()) == 1
rpc_write_ns = parse_duration_string_ns(rpc_write_timer.group(1))
assert 0 < rpc_write_ns and rpc_write_ns < 10000000
create_result_time = re.search("CreateResultSetTime: (.*)", runtime_profile)
assert create_result_time and len(create_result_time.groups()) == 1
create_result_ms = parse_duration_string_ms(create_result_time.group(1))
assert 2400 < create_result_ms and create_result_ms < 2600
finally:
self.client.close_query(handle)
def test_client_fetch_time_stats(self, vector):
num_rows = 27
query = "select sleep(10) from functional.alltypes limit {0}".format(num_rows)
handle = self.execute_query_async(query, vector.get_value('exec_option'))
try:
# Wait until the query is 'FINISHED' and results are available for fetching.
self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 30)
# This loop will do 6 fetches that contain data and a final fetch with
# no data. The last fetch is after eos has been set, so it does not count.
rows_fetched = 0
while True:
result = self.client.fetch(query, handle, max_rows=5)
assert result.success
rows_fetched += len(result.data)
# If no rows are returned, we are done.
if len(result.data) == 0:
break
sleep(0.1)
# After fetching all rows, sleep before closing the query. This should not
# count as client wait time, because the query is already done.
sleep(2.5)
finally:
self.client.close_query(handle)
runtime_profile = self.client.get_runtime_profile(handle)
summary_stats = get_time_summary_stats_counter("ClientFetchWaitTimeStats",
runtime_profile)
assert len(summary_stats) == 1
assert summary_stats[0].total_num_values == 6
# The 2.5 second sleep should not count, so the max must be less than 2.5 seconds.
assert summary_stats[0].max_value < 2500000000
assert summary_stats[0].min_value > 0
def test_client_fetch_time_stats_incomplete(self, vector):
num_rows = 27
query = "select sleep(10) from functional.alltypes limit {0}".format(num_rows)
handle = self.execute_query_async(query, vector.get_value('exec_option'))
try:
# Wait until the query is 'FINISHED' and results are available for fetching.
self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 30)
# This loop will do 5 fetches for a total of 25 rows. This is incomplete.
for i in range(5):
result = self.client.fetch(query, handle, max_rows=5)
assert result.success
sleep(0.1)
# Sleep before closing the query. For an incomplete fetch, this still counts
# towards the query time, so this does show up in the counters.
sleep(2.5)
finally:
self.client.close_query(handle)
runtime_profile = self.client.get_runtime_profile(handle)
summary_stats = get_time_summary_stats_counter("ClientFetchWaitTimeStats",
runtime_profile)
assert len(summary_stats) == 1
# There are 5 fetches and the finalization sample for a total of 6.
assert summary_stats[0].total_num_values == 6
# The 2.5 second sleep does count for an incomplete fetch, verify the max is higher.
assert summary_stats[0].max_value >= 2500000000
assert summary_stats[0].min_value > 0
class TestFetchAndSpooling(ImpalaTestSuite):
"""Tests that apply when result spooling is enabled or disabled."""
@classmethod
def add_test_dimensions(cls):
super(TestFetchAndSpooling, cls).add_test_dimensions()
# Result fetching should be independent of file format, so only test against
# Parquet files.
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format == 'parquet')
# spool_query_results is set as true by default.
extend_exec_option_dimension(cls, 'spool_query_results', 'false')
@classmethod
def get_workload(cls):
return 'functional-query'
def test_rows_sent_counters(self, vector):
"""Validate that RowsSent and RowsSentRate are set to valid values in
the PLAN_ROOT_SINK section of the runtime profile."""
num_rows = 10
if ('spool_query_results' in vector.get_value('exec_option') and
vector.get_value('exec_option')['spool_query_results'] == 'false'):
vector.get_value('exec_option')['debug_action'] = "BPRS_BEFORE_ADD_ROWS:SLEEP@1000"
else:
vector.get_value('exec_option')['debug_action'] = "BPRS_BEFORE_ADD_BATCH:SLEEP@1000"
result = self.execute_query("select id from functional.alltypes limit {0}"
.format(num_rows), vector.get_value('exec_option'))
assert "RowsSent: {0} ({0})".format(num_rows) in result.runtime_profile
rows_sent_rate = re.search("RowsSentRate: (\d*\.?\d*)", result.runtime_profile)
assert rows_sent_rate
assert float(rows_sent_rate.group(1)) > 0
class TestFetchTimeout(ImpalaTestSuite):
"""A few basic tests for FETCH_ROWS_TIMEOUT_MS that are not specific to the HS2 protocol
(e.g. in contrast to the tests in tests/hs2/test_fetch_timeout.py). These tests are
necessary because part of the FETCH_ROWS_TIMEOUT_MS code is HS2/Beeswax specific.
Unlike the tests in hs2/test_fetch_timeout.py, these tests do not validate that
individual RPC calls timeout, instead they set a low value for the timeout and assert
that the query works end-to-end."""
@classmethod
def add_test_dimensions(cls):
super(TestFetchTimeout, cls).add_test_dimensions()
# Result fetching should be independent of file format, so only test against
# Parquet files.
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format == 'parquet')
extend_exec_option_dimension(cls, 'spool_query_results', 'true')
@classmethod
def get_workload(cls):
return 'functional-query'
def test_fetch_timeout(self, vector):
"""A simple test that runs a query with a low timeout and introduces delays in
RowBatch production. Asserts that the query succeeds and returns the expected number
of rows."""
num_rows = 100
query = "select * from functional.alltypes limit {0}".format(num_rows)
vector.get_value('exec_option')['batch_size'] = 1
vector.get_value('exec_option')['fetch_rows_timeout_ms'] = 1
vector.get_value('exec_option')['debug_action'] = '0:GETNEXT:DELAY'
results = self.execute_query(query, vector.get_value('exec_option'))
assert results.success
assert len(results.data) == num_rows
def test_fetch_before_finished_timeout(self, vector):
"""Tests that the FETCH_ROWS_TIMEOUT_MS timeout applies to queries that are not in
the 'finished' state. Similar to the test tests/hs2/test_fetch_timeout.py::
TestFetchTimeout::test_fetch_before_finished_timeout(_with_result_spooling)."""
num_rows = 10
query = "select * from functional.alltypes limit {0}".format(num_rows)
vector.get_value('exec_option')['debug_action'] = 'CRS_BEFORE_COORD_STARTS:SLEEP@5000'
vector.get_value('exec_option')['fetch_rows_timeout_ms'] = '1000'
results = self.execute_query(query, vector.get_value('exec_option'))
assert results.success
assert len(results.data) == num_rows