IMPALA-10180: Add summary stats for client fetch wait time

This adds ClientFetchWaitTimeStats to the runtime profile
to track the min/max/# of samples for ClientFetchWaitTimer.
Here is some sample output:
- ClientFetchWaitTimeStats: (Avg: 161.554ms ; Min: 101.411ms ; Max: 461.728ms ; Number of samples: 6)
- ClientFetchWaitTimer: 969.326ms

This also fixes the definition of ClientFetchWaitTimer to avoid
including time after end of fetch. When the client is closing
the query, Finalize() gets called. The Finalize() call should
only add extra client wait time if fetch has not completed.

Testing:
 - Added test cases in query_test/test_fetch.py with specific
   numbers of fetches and verification of the statistics.
 - The test cases make use of a new function for parsing
   summary stats for timers, and this also gets its own test
   case.

Change-Id: I9ca525285e03c7b51b04ac292f7b3531e6178218
Reviewed-on: http://gerrit.cloudera.org:8080/19897
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
This commit is contained in:
Joe McDonnell
2023-05-13 10:48:30 -07:00
parent ff3d0c7984
commit 21f5a7a6e5
5 changed files with 180 additions and 3 deletions

View File

@@ -102,6 +102,23 @@ def parse_duration_string_ms(duration):
return (times['h'] * 60 * 60 + times['m'] * 60 + times['s']) * 1000 + times['ms']
def parse_duration_string_ns(duration):
"""Parses a duration string of the form 1h2h3m4s5.6ms4.5us7.8ns into nanoseconds."""
pattern = r'(?P<value>[0-9]+\.?[0-9]*?)(?P<units>\D+)'
matches = list(re.finditer(pattern, duration))
assert matches, 'Failed to parse duration string %s' % duration
times = {'h': 0, 'm': 0, 's': 0, 'ms': 0, 'us': 0, 'ns': 0}
for match in matches:
parsed = match.groupdict()
times[parsed['units']] = float(parsed['value'])
value_ns = (times['h'] * 60 * 60 + times['m'] * 60 + times['s']) * 1000000000
value_ns += times['ms'] * 1000000 + times['us'] * 1000 + times['ns']
return value_ns
def get_duration_us_from_str(duration_str):
"""Parses the duration string got in profile and returns the duration in us"""
match_res = re.search(r"\((\d+) us\)", duration_str)
@@ -189,3 +206,54 @@ def get_bytes_summary_stats_counter(counter_name, runtime_profile):
min_value=int(summary_stat['min']), max_value=int(summary_stat['max'])))
return summary_stats
def get_time_summary_stats_counter(counter_name, runtime_profile):
"""Extracts a list of TSummaryStatsCounters from a given runtime profile where the
units are time. Each entry in the returned list corresponds to a single occurrence
of the counter in the profile. If the counter is present, but it has not been
updated, an empty TSummaryStatsCounter is returned for that entry. If the counter
is not in the given profile, an empty list is returned. All time values are returned
as nanoseconds. Here is an example of how this method should be used:
# A single line in a runtime profile used for example purposes.
runtime_profile = "- ExampleTimer: (Avg: 100.000ms ; " \
"Min: 100.000ms ; " \
"Max: 100.000ms ; " \
"Number of samples: 6)"
summary_stats = get_bytes_summary_stats_counter("ExampleTimer",
runtime_profile)
assert len(summary_stats) == 1
assert summary_stats[0].sum == summary_stats[0].min_value == \
summary_stats[0].max_value == 100000000 and \
summary_stats[0].total_num_values == 6
"""
# This requires the Thrift definitions to be generated. We limit the scope of the import
# to allow tools like the stress test to import this file without building Impala.
from RuntimeProfile.ttypes import TSummaryStatsCounter
regex_summary_stat = re.compile(r"""\(
Avg:\s(?P<avg>.*)\s;\s # Matches Avg: ? ;
Min:\s(?P<min>.*)\s;\s # Matches Min: ? ;
Max:\s(?P<max>.*)\s;\s # Matches Max: ? ;
Number\sof\ssamples:\s(?P<samples>[0-9]+)\) # Matches Number of samples: ?)""",
re.VERBOSE)
summary_stats = []
for counter in re.findall(counter_name + ".*", runtime_profile):
summary_stat = re.search(regex_summary_stat, counter)
# We need to special-case when the counter has not been updated at all because empty
# summary counters have a different format than updated ones.
if not summary_stat:
assert "0ns (Number of samples: 0)" in counter
summary_stats.append(TSummaryStatsCounter(sum=0, total_num_values=0, min_value=0,
max_value=0))
else:
summary_stat = summary_stat.groupdict()
num_samples = int(summary_stat['samples'])
summary_stats.append(TSummaryStatsCounter(total_num_values=num_samples,
sum=num_samples * parse_duration_string_ns(summary_stat['avg']),
min_value=parse_duration_string_ns(summary_stat['min']),
max_value=parse_duration_string_ns(summary_stat['max'])))
return summary_stats