IMPALA-14401: Deflake/Improve OpenTelemetry Tracing Tests

Contains the following improvements to the Impala queries as
OpenTelemetry traces custom cluster tests:

1. Supporting code for asserting traces was moved to
   'tests/util/otel_trace.py'. The moved code was modified to remove
   all references to 'self'. Since this code used
   'self.assert_impalad_log_contains', it had to be modified so the
   caller provides the correct log file path to search. The
   '__find_span_log' function was updated to call a new generic file
   grep function to run the necessary log file search regex. All
   other code was moved unmodified.

2. Classes 'TestOtelTraceSelectsDMLs' and 'TestOtelTraceDDLs'
   contained a total of 11 individual tests that used the
   'unique_database' fixture. When this fixture is used in a test, it
   results in two DDLs being run before the test to drop/create the
   database and one DDL being run after the test to drop the database.
   These classes now create a test database once during 'setup_class'
   and drop it once during 'teardown_class' because creating a new
   database for each test was unnecessary. This change dropped test
   execution time from about 97 seconds to about 77 seconds.

3. Each test now has comments describing what the test is asserting.

4. The unnecessary sleep in 'test_query_exec_fail' was removed saving
   five seconds of test execution time.

5. New test 'test_dml_insert_fail' added. Previously, the situation
   where an insert DML failed was not tested. The test passed without
   any changes to backend code.

6. Test 'test_ddl_createtable_fail' is greatly simplified by using a
   debug action to fail the query instead of multiple parallel
   queries where one dropped the database the other was inserting
   into. The simplified setup eliminated test flakiness caused by
   timing differences and sped up test execution by about 5 seconds.

7. Fixed test flakiness was caused by timing issues. Depending on
   when the close process was initiated, span events are sometimes in
   the QueryExecution span and sometimes in the Close span. Test
   assertions cannot handle these situations. All span event
   assertions for the Close span were removed. IMPALA-14334 will fix
   these assertions.

8. The function 'query_id_from_ui' which retrieves the query profile
   using the Impala debug ui now makes multiple attempts to retrieve
   the query. In slower test situations, such as ASAN, the query may
   not yet be available when the function is called initially which
   used to cause tests to fail. This test flakiness is now eliminated
   through the addition of the retries.

Testing accomplished by running tests in test_otel_trace.py both
locally and in a full Jenkins build.

Generated-by: Github Copilot (Claude Sonnet 3.7)
Change-Id: I0c3e0075df688c7ae601c6f2e5743f56d6db100e
Reviewed-on: http://gerrit.cloudera.org:8080/23385
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
jasonmfehr
2025-09-05 09:56:40 -07:00
committed by Impala Public Jenkins
parent 68ab52f2c7
commit 0fe8de0f3f
4 changed files with 650 additions and 550 deletions

View File

@@ -679,7 +679,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
return cls.cluster.impalads[0].service if cls.cluster.impalads else None
def query_id_from_ui(self, section, match_func=None, match_query=None, coord_idx=0,
not_found_ok=False):
max_attempts=30, sleep_time_s=1):
"""
Calls to the debug UI's queries page and loops over all queries in the specified
section calling the provided func for each query, Returns the string id of the
@@ -696,8 +696,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
is used instead of match_func.
coord_idx: Index of the Impalad to use as the coordinator. This is used to
determine which impalad's UI to query.
not_found_ok: If True, returns None when no matching query is found. If False,
fails an assert when no matching query is found.
max_attempts: Number of times to poll the queries page before giving up.
sleep_time_s: Number of seconds to sleep between polls of the queries page.
Returns:
String of the query id of the first matching query or None if no query matches.
@@ -712,20 +712,23 @@ class CustomClusterTestSuite(ImpalaTestSuite):
if match_query is not None:
match_query = match_query.lower().strip()
service = self.cluster.impalads[coord_idx].service
queries_json = service.get_debug_webpage_json('/queries')
attempts = 0
for query in queries_json[section]:
if (match_query is not None and query["stmt"].lower() == match_query.lower()) \
or (match_func is not None and match_func(query)):
query_id = query['query_id']
return query_id, service.read_query_profile_page(query_id)
while attempts < max_attempts:
service = self.cluster.impalads[coord_idx].service
queries_json = service.get_debug_webpage_json('/queries')
if not_found_ok:
return None, None
else:
assert False, "No matching query found in section '{}'".format(section)
for query in queries_json[section]:
if (match_query is not None and query["stmt"].lower() == match_query.lower()) \
or (match_func is not None and match_func(query)):
query_id = query['query_id']
return query_id, service.read_query_profile_page(query_id)
attempts += 1
sleep(sleep_time_s)
assert False, "No matching query found in section '{}' after {} " \
"attempts".format(section, max_attempts)
def query_profile_from_ui(self, query_id, coord_idx=0):
"""

View File

@@ -171,6 +171,24 @@ def grep_file(file, search):
return matching_lines
def grep_file_first(file, search):
"""
Searches for a pattern in a file and returns the first match. If no match is found,
returns None.
file: An opened file object to search within.
search: A string containing the regex pattern to search for.
return: The first regex search() return object if found, otherwise None.
"""
matcher = re.compile(search)
for line in file:
res = matcher.search(line)
if res is not None:
return res
return None
def assert_file_in_dir_contains(dir, search):
'''Asserts that at least one file in the 'dir' contains the 'search' term.'''
results = grep_dir(dir, search)

View File

@@ -16,22 +16,18 @@
# under the License.
from __future__ import absolute_import, division, print_function
from threading import Thread
from datetime import datetime
from random import choice
from string import ascii_lowercase
from time import sleep
from impala.error import HiveServer2Error
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.file_utils import wait_for_file_line_count, count_lines
from tests.common.impala_connection import ERROR, RUNNING, FINISHED, INITIALIZED, PENDING
from tests.common.test_vector import BEESWAX, HS2, ImpalaTestDimension
from tests.util.otel_trace import parse_trace_file, ATTR_VAL_TYPE_STRING, \
ATTR_VAL_TYPE_INT, ATTR_VAL_TYPE_BOOL
from tests.util.query_profile_util import parse_db_user, parse_session_id, parse_sql, \
parse_query_type, parse_query_status, parse_impala_query_state, parse_query_id, \
parse_retry_status, parse_original_query_id, parse_retried_query_id, \
parse_num_rows_fetched, parse_admission_result, parse_default_db, \
parse_num_modified_rows, parse_num_deleted_rows, parse_coordinator
from tests.common.file_utils import count_lines, wait_for_file_line_count
from tests.common.impala_connection import ERROR, FINISHED, PENDING, RUNNING
from tests.common.test_vector import BEESWAX, ImpalaTestDimension
from tests.util.otel_trace import assert_trace
from tests.util.query_profile_util import parse_query_id, parse_retry_status
from tests.util.retry import retry
OUT_DIR = "out_dir_traces"
@@ -40,431 +36,46 @@ TRACE_FLAGS = "--otel_trace_enabled=true --otel_trace_exporter=file " \
"--otel_file_flush_interval_ms=500 " \
"--otel_file_pattern={out_dir_traces}/" + TRACE_FILE
class TestOtelTrace(CustomClusterTestSuite):
class TestOtelTraceBase(CustomClusterTestSuite):
def setup_method(self, method):
super(TestOtelTrace, self).setup_method(method)
super(TestOtelTraceBase, self).setup_method(method)
self.assert_impalad_log_contains("INFO", "join Impala Service pool")
self.trace_file_path = "{}/{}".format(self.get_tmp_dir(OUT_DIR), TRACE_FILE)
self.trace_file_count = count_lines(self.trace_file_path, True)
def assert_trace(self, query_id, query_profile, cluster_id, trace_cnt=1, err_span="",
missing_spans=[], async_close=False, exact_trace_cnt=False):
# Parse common values needed in multiple asserts.
session_id = parse_session_id(query_profile)
db_user = parse_db_user(query_profile)
# Wait until all spans are written to the trace file.
wait_for_file_line_count(
file_path=self.trace_file_path,
expected_line_count=trace_cnt + self.trace_file_count,
max_attempts=60,
sleep_time_s=1,
backoff=1,
exact_match=exact_trace_cnt)
# Remove missing spans from the expected span count.
expected_span_count = 6 - len(missing_spans)
# Parse the trace json files to get the trace for the query.
trace = parse_trace_file(self.trace_file_path, query_id)
self.__assert_trace_common(trace, expected_span_count)
# Retrieve the query status which contains error messages if the query failed.
query_status = parse_query_status(query_profile)
query_status = "" if query_status == "OK" else query_status
impala_query_state = parse_retry_status(query_profile)
if impala_query_state is None:
impala_query_state = parse_impala_query_state(query_profile)
# Determine if the query was retried and if so, get the original query id.
original_query_id = parse_original_query_id(query_profile)
original_query_id = "" if original_query_id is None else original_query_id
# Determine if the query initially failed but has a successful retry under a different
# query id. If so, get the retried query id.
retried_query_id = parse_retried_query_id(query_profile)
retried_query_id = "" if retried_query_id is None else retried_query_id
# Error message should follow on all spans after the errored span
in_error = False
# Retrieve the coordinator from the query profile.
coordinator = parse_coordinator(query_profile)
# Parse the query type from the query profile.
query_type = parse_query_type(query_profile)
if query_type == "N/A":
query_type = "UNKNOWN"
# Assert root span.
root_span_id = self.__assert_rootspan_attrs(trace.root_span, query_id, session_id,
cluster_id, db_user, "default-pool", impala_query_state, query_status,
original_query_id, retried_query_id, coordinator)
# Assert Init span.
if "Init" not in missing_spans:
span_err_msg = ""
if err_span == "Init":
span_err_msg = query_status
in_error = True
self.__assert_initspan_attrs(trace.child_spans, root_span_id, query_id, session_id,
cluster_id, db_user, "default-pool", parse_default_db(query_profile),
parse_sql(query_profile).replace('\n', ' '), original_query_id, coordinator)
# Assert Submitted span.
if "Submitted" not in missing_spans:
span_err_msg = ""
if err_span == "Submitted" or in_error:
span_err_msg = query_status
in_error = True
self.__assert_submittedspan_attrs(trace.child_spans, root_span_id, query_id)
# Assert Planning span.
if "Planning" not in missing_spans:
status = INITIALIZED
span_err_msg = ""
if err_span == "Planning" or in_error:
span_err_msg = query_status
status = ERROR
in_error = True
self.__assert_planningspan_attrs(trace.child_spans, root_span_id, query_id,
query_type, span_err_msg, status)
# Assert AdmissionControl span.
if "AdmissionControl" not in missing_spans:
status = PENDING
span_err_msg = ""
if err_span == "AdmissionControl" or in_error:
span_err_msg = query_status
status = ERROR
in_error = True
self.__assert_admissioncontrol_attrs(trace.child_spans, root_span_id, query_id,
"default-pool", parse_admission_result(query_profile), span_err_msg, status)
# Assert QueryExecution span.
if "QueryExecution" not in missing_spans:
span_err_msg = ""
if err_span == "QueryExecution" or in_error:
span_err_msg = query_status
in_error = True
self.__assert_query_exec_attrs(trace.child_spans, query_profile, root_span_id,
query_id, span_err_msg, parse_impala_query_state(query_profile))
# Assert Close span.
if "Close" not in missing_spans:
span_err_msg = ""
if err_span == "Close" or in_error:
span_err_msg = query_status
in_error = True
self.__assert_close_attrs(trace.child_spans, root_span_id, query_id, span_err_msg,
parse_impala_query_state(query_profile), async_close)
def __assert_trace_common(self, trace, expected_child_spans_count):
"""
Asserts common structure/fields in resource spans and scope spans of the
OpenTelemetry trace JSON object.
"""
# Assert the number of child spans in the trace.
assert len(trace.child_spans) == expected_child_spans_count, \
"Trace '{}' expected child spans count: {}, actual: {}".format(trace.trace_id,
expected_child_spans_count, len(trace.child_spans))
# Each scope span has a scope object which contains the name and version of the
# OpenTelemetry scope. Assert the scope object sttructure and contents contained
# within the single span at the path resourceSpan[0].scopeSpans[0].scope.
assert trace.root_span.scope_name == "org.apache.impala.impalad.query", \
"Span: '{}' expected: 'org.apache.impala.impalad.query', actual: {}" \
.format(trace.root_span.span_id, trace.root_span.scope_name)
assert trace.root_span.scope_version == "1.0.0", "Span: '{}' expected scope " \
"version '1.0.0', actual: '{}'".format("Root", trace.root_span.scope_version)
# Assert the scope of each child span.
for span in trace.child_spans:
assert span.scope_name == "org.apache.impala.impalad.query", \
"Span: '{}' expected scope name: 'org.apache.impala.impalad.query', " \
"actual: {}".format(span.name, span.scope_name)
assert span.scope_version == "1.0.0", "Span: '{}' expected scope " \
"version '1.0.0', actual: '{}'".format(span.name, span.scope_version)
def __assert_scopespan_common(self, span, query_id, is_root, name, attributes_count,
status, root_span_id=None, err_msg=""):
"""
Helper function to assert common data points of a single scope span. These spans
contain the actual root and child spans. Assertions include the span object's
structure, span properties, and common span attributes.
- span: The OtelSpan object to assert.
- query_id: The query id of the span.
- is_root: Whether the span is a root span.
- name: The name of the span to assert without the query_id prefix.
- attributes_count: The expected number of attributes unique to the span. If
asserting a child span, adds 7 to this value to account for
attributes common across all child spans.
- status: The expected status of the span. Only used for child spans.
- root_span_id: The root span id of the span.
"""
# Read the span trace id and span id from the Impalad logs.
expected_span_id, expected_trace_id = self.__find_span_log(name, query_id)
# Assert span properties.
expected_name = query_id
actual_kind = span.kind
if (is_root):
assert span.parent_span_id is None, "Found parentSpanId on root span"
assert actual_kind == 2, "Span '{}' expected kind: '{}', actual: '{}'" \
.format(expected_name, 2, actual_kind)
else:
expected_name += " - {}".format(name)
assert root_span_id is not None
actual = span.parent_span_id
assert actual == root_span_id, "Span '{}' expected parentSpanId: '{}', actual: " \
"'{}'".format(expected_name, root_span_id, actual)
assert actual_kind == 1, "Span '{}' expected kind: '{}', actual: '{}'" \
.format(expected_name, 1, actual)
actual = span.name
assert actual == expected_name, "Expected span name: '{}', actual: '{}'" \
.format(expected_name, actual)
actual = span.trace_id
assert actual == expected_trace_id, "Span '{}' expected traceId: '{}', " \
"actual: '{}'".format(expected_name, expected_trace_id, actual)
actual = span.span_id
assert actual == expected_span_id, "Span '{}' expected spanId: '{}', " \
"actual: '{}'".format(expected_name, expected_span_id, actual)
# Flags must always be 1 which indicates the trace is to be sampled.
expected_flags = 1
actual = span.flags
assert actual == expected_flags, "Span '{}' expected flags: '{}', " \
"actual: '{}'".format(expected_name, expected_flags, actual)
# Assert span attributes.
expected_span_attrs_count = attributes_count if is_root else 7 + attributes_count
assert len(span.attributes) == expected_span_attrs_count, "Span '{}' attributes " \
"must contain exactly {} elements, actual: {}".format(expected_name,
expected_span_attrs_count, len(span.attributes))
if (is_root):
self.__assert_attr(expected_name, span.attributes, "ErrorMessage", err_msg)
else:
self.__assert_attr(expected_name, span.attributes, "ErrorMsg", err_msg)
self.__assert_attr(expected_name, span.attributes, "Name", expected_name)
self.__assert_attr(expected_name, span.attributes, "Running",
name == "QueryExecution", "boolValue")
self.__assert_attr(expected_name, span.attributes, "Status", status)
def __find_span_log(self, span_name, query_id):
"""
Finds the start span log entry for the given span name and query id in the Impalad
logs. This log line contains the trace id and span id for the span which are used
as the expected values when asserting the span properties in the trace file.
"""
span_regex = r'Started \'{}\' span trace_id="(.*?)" span_id="(.*?)" query_id="{}"' \
.format(span_name, query_id)
span_log = self.assert_impalad_log_contains("INFO", span_regex)
trace_id = span_log.group(1)
span_id = span_log.group(2)
return span_id, trace_id
def __assert_attr(self, span_name, attributes, expected_key, expected_value,
expected_type="stringValue"):
"""
Helper function to assert that a specific OpenTelemetry attribute exists in a span.
"""
assert expected_type in ("stringValue", "boolValue", "intValue"), "Invalid " \
"expected_type '{}', must be one of 'stringValue', 'boolValue', or 'intValue'" \
.format(expected_type)
val = attributes[expected_key]
assert val is not None, "Span '{}' attribute not found: '{}', actual attributes: {}" \
.format(span_name, expected_key, attributes)
assert val.value == expected_value, "Span '{}' attribute '{}' expected: '{}', " \
"actual: '{}'".format(span_name, expected_key, expected_value, val.value)
if expected_type == "boolValue":
expected_type = ATTR_VAL_TYPE_BOOL
elif expected_type == "intValue":
expected_type = ATTR_VAL_TYPE_INT
else:
expected_type = ATTR_VAL_TYPE_STRING
assert val.get_type() == expected_type, "Span '{}' attribute '{}' expected to be " \
"of type '{}', actual: '{}'".format(span_name, expected_key, expected_type,
val.get_type())
def __assert_span_events(self, span, expected_events=[]):
"""
Helper function to assert that a span contains the expected span events.
"""
assert len(expected_events) == len(span.events), "Span '{}' expected to have " \
"exactly {} events, actual: {}".format(span.name, len(expected_events),
len(span.events))
for event in expected_events:
assert event in span.events, "Expected '{}' event on span '{}' but " \
"no such events was found.".format(event, span.name)
def __assert_rootspan_attrs(self, span, query_id, session_id, cluster_id, user_name,
request_pool, state, err_msg, original_query_id, retried_query_id, coordinator):
"""
Helper function that asserts the common attributes in the root span.
"""
root_span_id, _ = self.__find_span_log("Root", query_id)
self.__assert_scopespan_common(span, query_id, True, "Root", 14, "", None, err_msg)
self.__assert_attr(span.name, span.attributes, "QueryId", query_id)
self.__assert_attr(span.name, span.attributes, "SessionId", session_id)
self.__assert_attr(span.name, span.attributes, "ClusterId", cluster_id)
self.__assert_attr(span.name, span.attributes, "UserName", user_name)
self.__assert_attr(span.name, span.attributes, "RequestPool", request_pool)
self.__assert_attr(span.name, span.attributes, "State", state)
self.__assert_attr(span.name, span.attributes, "OriginalQueryId", original_query_id)
self.__assert_attr(span.name, span.attributes, "RetriedQueryId", retried_query_id)
self.__assert_attr(span.name, span.attributes, "Coordinator", coordinator)
return root_span_id
def __assert_initspan_attrs(self, spans, root_span_id, query_id, session_id, cluster_id,
user_name, request_pool, default_db, query_string, original_query_id, coordinator):
"""
Helper function that asserts the common and span-specific attributes in the
init span.
"""
# Locate the init span and assert.
init_span = self.__find_span(spans, "Init", query_id)
self.__assert_scopespan_common(init_span, query_id, False, "Init", 9, INITIALIZED,
root_span_id)
self.__assert_attr(init_span.name, init_span.attributes, "QueryId", query_id)
self.__assert_attr(init_span.name, init_span.attributes, "SessionId", session_id)
self.__assert_attr(init_span.name, init_span.attributes, "ClusterId", cluster_id)
self.__assert_attr(init_span.name, init_span.attributes, "UserName", user_name)
self.__assert_attr(init_span.name, init_span.attributes, "RequestPool", request_pool)
self.__assert_attr(init_span.name, init_span.attributes, "DefaultDb", default_db)
self.__assert_attr(init_span.name, init_span.attributes, "QueryString", query_string)
self.__assert_attr(init_span.name, init_span.attributes, "OriginalQueryId",
original_query_id)
self.__assert_attr(init_span.name, init_span.attributes, "Coordinator", coordinator)
self.__assert_span_events(init_span)
def __assert_submittedspan_attrs(self, spans, root_span_id, query_id):
"""
Helper function that asserts the common attributes in the submitted span.
"""
submitted_span = self.__find_span(spans, "Submitted", query_id)
self.__assert_scopespan_common(submitted_span, query_id, False, "Submitted", 0,
INITIALIZED, root_span_id)
self.__assert_span_events(submitted_span)
def __assert_planningspan_attrs(self, spans, root_span_id, query_id, query_type,
err_msg="", status=INITIALIZED):
"""
Helper function that asserts the common and span-specific attributes in the
planning execution span.
"""
planning_span = self.__find_span(spans, "Planning", query_id)
self.__assert_scopespan_common(planning_span, query_id, False, "Planning", 1,
status, root_span_id, err_msg)
self.__assert_attr(planning_span.name, planning_span.attributes, "QueryType",
query_type)
self.__assert_span_events(planning_span)
def __assert_admissioncontrol_attrs(self, spans, root_span_id, query_id, request_pool,
adm_result, err_msg, status):
"""
Helper function that asserts the common and span-specific attributes in the
admission control span.
"""
queued = False if adm_result == "Admitted immediately" \
or adm_result == "Admitted as a trivial query" else True
adm_ctrl_span = self.__find_span(spans, "AdmissionControl", query_id)
self.__assert_scopespan_common(adm_ctrl_span, query_id, False, "AdmissionControl", 3,
status, root_span_id, err_msg)
self.__assert_attr(adm_ctrl_span.name, adm_ctrl_span.attributes, "Queued",
queued, "boolValue")
self.__assert_attr(adm_ctrl_span.name, adm_ctrl_span.attributes, "AdmissionResult",
adm_result)
self.__assert_attr(adm_ctrl_span.name, adm_ctrl_span.attributes, "RequestPool",
request_pool)
if queued:
self.__assert_span_events(adm_ctrl_span, ["Queued"])
else:
self.__assert_span_events(adm_ctrl_span)
def __assert_query_exec_attrs(self, spans, query_profile, root_span_id, query_id,
err_msg, status):
"""
Helper function that asserts the common and span-specific attributes in the
query execution span.
"""
query_exec_span = self.__find_span(spans, "QueryExecution", query_id)
self.__assert_scopespan_common(query_exec_span, query_id, False, "QueryExecution", 3,
status, root_span_id, err_msg)
self.__assert_attr(query_exec_span.name, query_exec_span.attributes,
"NumModifiedRows", parse_num_modified_rows(query_profile), "intValue")
self.__assert_attr(query_exec_span.name, query_exec_span.attributes, "NumDeletedRows",
parse_num_deleted_rows(query_profile), "intValue")
self.__assert_attr(query_exec_span.name, query_exec_span.attributes, "NumRowsFetched",
parse_num_rows_fetched(query_profile), "intValue")
# TODO: IMPALA-14334 - Assert QueryExecution span events
def __assert_close_attrs(self, spans, root_span_id, query_id, err_msg, status,
async_close):
"""
Helper function that asserts the common and span-specific attributes in the
close span.
"""
close_span = self.__find_span(spans, "Close", query_id)
self.__assert_scopespan_common(close_span, query_id, False, "Close", 0, status,
root_span_id, err_msg)
expected_events = ["QueryUnregistered"]
if async_close and "ReleasedAdmissionControlResources" in close_span.events:
expected_events.append("ReleasedAdmissionControlResources")
self.__assert_span_events(close_span, expected_events)
def __find_span(self, spans, name, query_id):
"""
Helper function to find a span by name in a list of OtelSpan objects.
"""
for s in spans:
if s.name.endswith(name):
return s
assert False, "Span '{}' not found for query '{}'".format(name, query_id)
"""Helper method to assert a trace exists in the trace file with the required inputs
for log file path, trace file path, and trace file line count (that was determined
before the test ran)."""
assert_trace(self.build_log_path("impalad", "INFO"), self.trace_file_path,
self.trace_file_count, query_id, query_profile, cluster_id, trace_cnt, err_span,
missing_spans, async_close, exact_trace_cnt)
@CustomClusterTestSuite.with_args(
impalad_args="-v=2 --cluster_id=select_dml {}".format(TRACE_FLAGS),
cluster_size=1, tmp_dir_placeholders=[OUT_DIR], disable_log_buffering=True)
class TestOtelTraceSelectsDMLs(TestOtelTrace):
class TestOtelTraceSelectsDMLs(TestOtelTraceBase):
"""Tests that exercise OpenTelemetry tracing behavior for select and dml queries."""
@classmethod
def setup_class(cls):
super(TestOtelTraceSelectsDMLs, cls).setup_class()
cls.test_db = "test_otel_trace_selects_dmls{}_{}".format(
datetime.now().strftime("%Y%m%d%H%M%S"),
"".join(choice(ascii_lowercase) for _ in range(7)))
cls.execute_query_expect_success(cls.client, "CREATE DATABASE {}".format(cls.test_db))
@classmethod
def teardown_class(cls):
cls.execute_query_expect_success(cls.client, "DROP DATABASE {} CASCADE"
.format(cls.test_db))
super(TestOtelTraceSelectsDMLs, cls).teardown_class()
def setup_method(self, method):
super(TestOtelTraceSelectsDMLs, self).setup_method(method)
self.client.clear_configuration()
@@ -492,6 +103,7 @@ class TestOtelTraceSelectsDMLs(TestOtelTrace):
cluster_id="select_dml")
def test_invalid_sql(self):
"""Asserts that queries with invalid SQL still generate the expected traces."""
query = "SELECT * FROM functional.alltypes WHERE field_does_not_exist=1"
self.execute_query_expect_failure(self.client, query)
@@ -532,7 +144,7 @@ class TestOtelTraceSelectsDMLs(TestOtelTrace):
{
"abort_on_error": 1,
"batch_size": 0,
"debug_action": "3:PREPARE:FAIL|COORD_BEFORE_EXEC_RPC:SLEEP@5000",
"debug_action": "3:PREPARE:FAIL",
"disable_codegen": 0,
"disable_codegen_rows_threshold": 0,
"exec_single_node_rows_threshold": 0,
@@ -542,7 +154,7 @@ class TestOtelTraceSelectsDMLs(TestOtelTrace):
})
query_id, profile = self.query_id_from_ui(section="completed_queries",
match_query=query)
match_query=query)
self.assert_trace(
query_id=query_id,
@@ -552,6 +164,7 @@ class TestOtelTraceSelectsDMLs(TestOtelTrace):
async_close=True)
def test_select_timeout(self):
"""Asserts queries that timeout generate the expected traces."""
query = "SELECT * FROM functional.alltypes WHERE id=sleep(5000)"
self.execute_query_expect_failure(self.client, query, {"exec_time_limit_s": "1"})
@@ -569,6 +182,7 @@ class TestOtelTraceSelectsDMLs(TestOtelTrace):
async_close=True)
def test_select_empty(self):
"""Asserts empty queries do not generate traces."""
self.execute_query_expect_failure(self.client, "")
# Run a query that will succeed to ensure all traces have been flushed.
@@ -589,6 +203,7 @@ class TestOtelTraceSelectsDMLs(TestOtelTrace):
exact_trace_cnt=True)
def test_select_union(self):
"""Asserts queries with a union generate the expected traces."""
result = self.execute_query_expect_success(self.client, "SELECT * FROM "
"functional.alltypes LIMIT 5 UNION ALL SELECT * FROM functional.alltypessmall")
@@ -599,6 +214,7 @@ class TestOtelTraceSelectsDMLs(TestOtelTrace):
exact_trace_cnt=True)
def test_dml_timeout(self):
"""Asserts insert DMLs that timeout generate the expected traces."""
query = "INSERT INTO functional.alltypes (id, string_col, year, month) VALUES " \
"(99999, 'foo', 2025, 1)"
self.execute_query_expect_failure(self.client, query,
@@ -617,9 +233,9 @@ class TestOtelTraceSelectsDMLs(TestOtelTrace):
err_span="QueryExecution",
async_close=True)
def test_dml_update(self, unique_database, unique_name):
# IMPALA-14340: Cannot update a table that has the same name as the database.
tbl = "{}.{}_tbl".format(unique_database, unique_name)
def test_dml_update(self, unique_name):
"""Asserts update DMLs generate the expected traces."""
tbl = "{}.{}".format(self.test_db, unique_name)
self.execute_query_expect_success(self.client, "CREATE TABLE {} (id int, "
"string_col string) STORED AS ICEBERG TBLPROPERTIES('format-version'='2')"
@@ -635,11 +251,11 @@ class TestOtelTraceSelectsDMLs(TestOtelTrace):
query_id=result.query_id,
query_profile=result.runtime_profile,
cluster_id="select_dml",
trace_cnt=4)
trace_cnt=3)
def test_dml_delete(self, unique_database, unique_name):
# IMPALA-14340: Cannot delete from a table that has the same name as the database.
tbl = "{}.{}_tbl".format(unique_database, unique_name)
def test_dml_delete(self, unique_name):
"""Asserts delete DMLs generate the expected traces."""
tbl = "{}.{}".format(self.test_db, unique_name)
self.execute_query_expect_success(self.client, "CREATE TABLE {} (id int, "
"string_col string) STORED AS ICEBERG TBLPROPERTIES('format-version'='2')"
@@ -655,15 +271,16 @@ class TestOtelTraceSelectsDMLs(TestOtelTrace):
query_id=result.query_id,
query_profile=result.runtime_profile,
cluster_id="select_dml",
trace_cnt=4)
trace_cnt=3)
def test_dml_delete_join(self, unique_database, unique_name):
tbl1 = "{}.{}_1".format(unique_database, unique_name)
def test_dml_delete_join(self, unique_name):
"""Asserts delete join DMLs generate the expected traces."""
tbl1 = "{}.{}_1".format(self.test_db, unique_name)
self.execute_query_expect_success(self.client, "CREATE TABLE {} STORED AS ICEBERG "
"TBLPROPERTIES('format-version'='2') AS SELECT id, bool_col, int_col, year, month "
"FROM functional.alltypes ORDER BY id limit 100".format(tbl1))
tbl2 = "{}.{}_2".format(unique_database, unique_name)
tbl2 = "{}.{}_2".format(self.test_db, unique_name)
self.execute_query_expect_success(self.client, "CREATE TABLE {} STORED AS ICEBERG "
"TBLPROPERTIES('format-version'='2') AS SELECT id, bool_col, int_col, year, month "
"FROM functional.alltypessmall ORDER BY id LIMIT 100".format(tbl2))
@@ -675,23 +292,23 @@ class TestOtelTraceSelectsDMLs(TestOtelTrace):
query_id=result.query_id,
query_profile=result.runtime_profile,
cluster_id="select_dml",
trace_cnt=5)
trace_cnt=3)
def test_ignored_queries(self, unique_database, unique_name):
def test_ignored_queries(self, unique_name):
"""Asserts queries that should not generate traces do not generate traces."""
tbl = "{}.{}".format(unique_database, unique_name)
tbl = "{}.{}".format(self.test_db, unique_name)
res_create = self.execute_query_expect_success(self.client,
"CREATE TABLE {} (a int)".format(tbl))
# These queries are not expected to have traces created for them.
ignore_queries = [
"COMMENT ON DATABASE {} IS 'test'".format(unique_database),
"COMMENT ON DATABASE {} IS 'test'".format(self.test_db),
"DESCRIBE {}".format(tbl),
"EXPLAIN SELECT * FROM {}".format(tbl),
"REFRESH FUNCTIONS functional",
"REFRESH functional.alltypes",
"SET ALL",
"SHOW TABLES IN {}".format(unique_database),
"SHOW TABLES IN {}".format(self.test_db),
"SHOW DATABASES",
"TRUNCATE TABLE {}".format(tbl),
"USE functional",
@@ -716,13 +333,11 @@ class TestOtelTraceSelectsDMLs(TestOtelTrace):
.format(tbl))
# Ensure the expected number of traces are present in the trace file.
# The expected line count is 4 because:
# 1. unique_database fixture runs drop database
# 2. unique_database fixture runs create database
# 3. test runs create table
# 4. test runs drop table
# The expected line count is 2 because:
# 1. test runs create table
# 2. test runs drop table
wait_for_file_line_count(file_path=self.trace_file_path,
expected_line_count=4 + self.trace_file_count, max_attempts=10, sleep_time_s=1,
expected_line_count=2 + self.trace_file_count, max_attempts=10, sleep_time_s=1,
backoff=1, exact_match=True)
# Assert the traces for the create/drop table query to ensure both were created.
@@ -730,65 +345,87 @@ class TestOtelTraceSelectsDMLs(TestOtelTrace):
query_id=res_create.query_id,
query_profile=res_create.runtime_profile,
cluster_id="select_dml",
trace_cnt=4,
trace_cnt=2,
missing_spans=["AdmissionControl"])
self.assert_trace(
query_id=res_drop.query_id,
query_profile=res_drop.runtime_profile,
cluster_id="select_dml",
trace_cnt=4,
trace_cnt=2,
missing_spans=["AdmissionControl"])
def test_dml_insert_success(self, unique_database, unique_name):
def test_dml_insert_success(self, unique_name):
"""Asserts successful insert DMLs generate the expected traces."""
self.execute_query_expect_success(self.client,
"CREATE TABLE {}.{} (id int, string_col string, int_col int)"
.format(unique_database, unique_name))
.format(self.test_db, unique_name))
result = self.execute_query_expect_success(self.client,
"INSERT INTO {}.{} (id, string_col, int_col) VALUES (1, 'a', 10), (2, 'b', 20), "
"(3, 'c', 30)".format(unique_database, unique_name))
"(3, 'c', 30)".format(self.test_db, unique_name))
self.assert_trace(
query_id=result.query_id,
query_profile=result.runtime_profile,
cluster_id="select_dml",
trace_cnt=4)
trace_cnt=2)
def test_dml_insert_cte_success(self, unique_database, unique_name):
def test_dml_insert_fail(self, unique_name):
"""Asserts failed insert DMLs generate the expected traces."""
self.execute_query_expect_success(self.client,
"CREATE TABLE {}.{} (id int)".format(unique_database, unique_name))
"CREATE TABLE {}.{} (id int, string_col string, int_col int)"
.format(self.test_db, unique_name))
fail_query = "INSERT INTO {}.{} (id, string_col, int_col) VALUES (1, 'a', 10), " \
"(2, 'b', 20), (3, 'c', 30)".format(self.test_db, unique_name)
self.execute_query_expect_failure(self.client, fail_query,
{"debug_action": "0:OPEN:FAIL"})
query_id, profile = self.query_id_from_ui(section="completed_queries",
match_query=fail_query)
self.assert_trace(
query_id=query_id,
query_profile=profile,
cluster_id="select_dml",
trace_cnt=2,
err_span="QueryExecution")
def test_dml_insert_cte_success(self, unique_name):
"""Asserts insert DMLs that use a CTE generate the expected traces."""
self.execute_query_expect_success(self.client,
"CREATE TABLE {}.{} (id int)".format(self.test_db, unique_name))
result = self.execute_query_expect_success(self.client,
"WITH a1 AS (SELECT * FROM functional.alltypes WHERE tinyint_col=1 limit 10) "
"INSERT INTO {}.{} SELECT id FROM a1".format(unique_database, unique_name))
"INSERT INTO {}.{} SELECT id FROM a1".format(self.test_db, unique_name))
self.assert_trace(
query_id=result.query_id,
query_profile=result.runtime_profile,
cluster_id="select_dml",
trace_cnt=4)
trace_cnt=2)
def test_dml_insert_overwrite(self, unique_database, unique_name):
def test_dml_insert_overwrite(self, unique_name):
"""Test that OpenTelemetry tracing is working by running an insert overwrite query and
checking that the trace file is created and contains expected spans with the
expected attributes."""
self.execute_query_expect_success(self.client,
"CREATE TABLE {}.{} AS SELECT * FROM functional.alltypes WHERE id < 500 ".format(
unique_database, unique_name))
self.test_db, unique_name))
result = self.execute_query_expect_success(self.client,
"INSERT OVERWRITE TABLE {}.{} SELECT * FROM functional.alltypes WHERE id > 500 "
"AND id < 1000".format(unique_database, unique_name))
"AND id < 1000".format(self.test_db, unique_name))
self.assert_trace(
query_id=result.query_id,
query_profile=result.runtime_profile,
cluster_id="select_dml",
trace_cnt=4)
trace_cnt=2)
class TestOtelTraceSelectQueued(TestOtelTrace):
class TestOtelTraceSelectQueued(TestOtelTraceBase):
"""Tests that require setting additional startup flags to assert admission control
queueing behavior. The cluster must be restarted after each test to apply the
new flags."""
@@ -798,6 +435,8 @@ class TestOtelTraceSelectQueued(TestOtelTrace):
.format(TRACE_FLAGS),
cluster_size=1, tmp_dir_placeholders=[OUT_DIR], disable_log_buffering=True)
def test_select_queued(self):
"""Asserts a query that is queued in admission control then completes successfully
generates the expected trace."""
# Launch two queries, the second will be queued until the first completes.
query = "SELECT * FROM functional.alltypes WHERE id = 1"
handle1 = self.client.execute_async("{} AND int_col = SLEEP(5000)".format(query))
@@ -861,7 +500,7 @@ class TestOtelTraceSelectQueued(TestOtelTrace):
async_close=True)
class TestOtelTraceSelectRetry(TestOtelTrace):
class TestOtelTraceSelectRetry(TestOtelTraceBase):
"""Tests the require ending an Impala daemon and thus the cluster must restart after
each test."""
@@ -871,6 +510,8 @@ class TestOtelTraceSelectRetry(TestOtelTrace):
disable_log_buffering=True,
statestored_args="-statestore_heartbeat_frequency_ms=60000")
def test_retry_select_success(self):
"""Asserts select queries that are successfully retried generate the expected
traces."""
self.cluster.impalads[1].kill()
result = self.execute_query_expect_success(self.client,
@@ -903,7 +544,8 @@ class TestOtelTraceSelectRetry(TestOtelTrace):
disable_log_buffering=True,
statestored_args="-statestore_heartbeat_frequency_ms=1000")
def test_retry_select_failed(self):
"""Asserts select queries that are retried but ultimately fail generate the expected
traces."""
with self.create_impala_client() as client:
client.set_configuration({"retry_failed_queries": "true"})
@@ -955,9 +597,24 @@ class TestOtelTraceSelectRetry(TestOtelTrace):
@CustomClusterTestSuite.with_args(
impalad_args="-v=2 --cluster_id=trace_ddl {}".format(TRACE_FLAGS),
cluster_size=2, tmp_dir_placeholders=[OUT_DIR], disable_log_buffering=True)
class TestOtelTraceDDLs(TestOtelTrace):
class TestOtelTraceDDLs(TestOtelTraceBase):
"""Tests that exercise OpenTelemetry tracing behavior on DDLs. These tests are in their
own class because they require an additional test dimension for async DDLs"""
own class because they require an additional test dimension for async DDLs,"""
@classmethod
def setup_class(cls):
super(TestOtelTraceDDLs, cls).setup_class()
cls.test_db = "test_otel_trace_ddls_{}_{}".format(
datetime.now().strftime("%Y%m%d%H%M%S"),
"".join(choice(ascii_lowercase) for _ in range(7)))
cls.execute_query_expect_success(cls.client, "CREATE DATABASE {}".format(cls.test_db))
@classmethod
def teardown_class(cls):
cls.execute_query_expect_success(cls.client, "DROP DATABASE {} CASCADE"
.format(cls.test_db))
super(TestOtelTraceDDLs, cls).teardown_class()
@classmethod
def add_test_dimensions(cls):
@@ -965,6 +622,8 @@ class TestOtelTraceDDLs(TestOtelTrace):
cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('async_ddl', True, False))
def test_ddl_createdb(self, vector, unique_name):
"""Asserts a successful create database and drop database generate the expected
traces."""
try:
result = self.execute_query_expect_success(self.client,
"CREATE DATABASE {}".format(unique_name),
@@ -986,20 +645,20 @@ class TestOtelTraceDDLs(TestOtelTrace):
trace_cnt=2,
missing_spans=["AdmissionControl"])
def test_ddl_create_alter_table(self, vector, unique_database, unique_name):
def test_ddl_create_alter_table(self, vector, unique_name):
"""Tests that traces are created for a successful create table, a successful alter
table, and a failed alter table (adding a column that already exists)."""
create_result = self.execute_query_expect_success(self.client,
"CREATE TABLE {}.{} (id int, string_col string, int_col int)"
.format(unique_database, unique_name),
.format(self.test_db, unique_name),
{"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')})
alter_success_result = self.execute_query_expect_success(self.client, "ALTER TABLE "
"{}.{} ADD COLUMNS (new_col string)".format(unique_database, unique_name),
"{}.{} ADD COLUMNS (new_col string)".format(self.test_db, unique_name),
{"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')})
fail_query = "ALTER TABLE {}.{} ADD COLUMNS (new_col string)" \
.format(unique_database, unique_name)
.format(self.test_db, unique_name)
self.execute_query_expect_failure(self.client, fail_query,
{"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')})
@@ -1010,96 +669,61 @@ class TestOtelTraceDDLs(TestOtelTrace):
query_id=create_result.query_id,
query_profile=create_result.runtime_profile,
cluster_id="trace_ddl",
trace_cnt=5,
trace_cnt=3,
missing_spans=["AdmissionControl"])
self.assert_trace(
query_id=alter_success_result.query_id,
query_profile=alter_success_result.runtime_profile,
cluster_id="trace_ddl",
trace_cnt=5,
trace_cnt=3,
missing_spans=["AdmissionControl"])
self.assert_trace(
query_id=fail_query_id,
query_profile=fail_profile,
cluster_id="trace_ddl",
trace_cnt=5,
trace_cnt=3,
missing_spans=["AdmissionControl", "QueryExecution"],
err_span="Planning")
def test_ddl_createtable_fail(self, vector, unique_name):
with self.create_client_for_nth_impalad(1, HS2) as second_coord_client:
try:
# Create a database to use for this test. Cannot use the unique_database fixture
# because we want to drop the database after planning but before execution and
# that fixture drops the database without the "if exists" clause.
self.execute_query_expect_success(self.client, "CREATE DATABASE {}"
.format(unique_name))
"""Asserts a failed create table generates the expected trace."""
query = "CREATE TABLE {}.{} AS (SELECT * FROM functional.alltypes LIMIT 1)" \
.format(self.test_db, unique_name)
self.execute_query_expect_failure(self.client, query,
{"debug_action": "CLIENT_REQUEST_UPDATE_CATALOG:FAIL",
"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')})
query_id, profile = self.query_id_from_ui(section="completed_queries",
match_query=query)
with self.create_client_for_nth_impalad(0, HS2) as first_coord_client:
# In a separate thread, run the create table DDL that will fail.
fail_query = "CREATE TABLE {}.{} (id int, string_col string, int_col int)" \
.format(unique_name, unique_name)
def execute_query_fail():
self.execute_query_expect_failure(first_coord_client, fail_query,
{"debug_action": "CRS_DELAY_BEFORE_CATALOG_OP_EXEC:SLEEP@5000",
"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')})
thread = Thread(target=execute_query_fail)
thread.daemon = True
thread.start()
# Wait until the create table query is in flight.
fail_query_id = None
while fail_query_id is None:
fail_query_id, profile = self.query_id_from_ui(section="in_flight_queries",
match_query=fail_query, not_found_ok=True)
if fail_query_id is not None and len(profile.strip()) > 0 \
and parse_impala_query_state(profile) == "RUNNING":
break
sleep(0.1)
# Drop the database after planning to cause the create table to fail.
self.execute_query_expect_success(second_coord_client,
"DROP DATABASE {} CASCADE".format(unique_name),
{"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')})
# Wait until the create table query fails.
thread.join()
fail_profile_str = self.query_profile_from_ui(fail_query_id)
finally:
self.execute_query_expect_success(second_coord_client,
"DROP DATABASE IF EXISTS {} CASCADE".format(unique_name))
# Assert the errored query.
self.assert_trace(
query_id=fail_query_id,
query_profile=fail_profile_str,
query_id=query_id,
query_profile=profile,
cluster_id="trace_ddl",
trace_cnt=4,
trace_cnt=1,
missing_spans=["AdmissionControl"],
err_span="QueryExecution")
def test_ddl_createtable_cte_success(self, vector, unique_database, unique_name):
def test_ddl_createtable_cte_success(self, vector, unique_name):
"""Asserts create table queries that use a CTE generate the expected traces."""
result = self.execute_query_expect_success(self.client,
"CREATE TABLE {}.{} AS WITH a1 AS (SELECT * FROM functional.alltypes WHERE "
"tinyint_col=1 LIMIT 10) SELECT id FROM a1".format(unique_database, unique_name),
"tinyint_col=1 LIMIT 10) SELECT id FROM a1".format(self.test_db, unique_name),
{"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')})
self.assert_trace(
query_id=result.query_id,
query_profile=result.runtime_profile,
cluster_id="trace_ddl",
trace_cnt=3,
trace_cnt=1,
missing_spans=["AdmissionControl"])
def test_compute_stats(self, vector, unique_database, unique_name):
def test_compute_stats(self, vector, unique_name):
"""The compute stats queries are a special case. These statements run two separate
select queries. Locate both select queries on the UI and assert their traces."""
tbl_name = "{}.{}_alltypes".format(unique_database, unique_name)
tbl_name = "{}.{}_alltypes".format(self.test_db, unique_name)
# Setup a test table to ensure calculating stats on an existing table does not impact
# other tests.
@@ -1136,8 +760,9 @@ class TestOtelTraceDDLs(TestOtelTrace):
cluster_id="trace_ddl",
trace_cnt=4)
def test_compute_incremental_stats(self, vector, unique_database, unique_name):
tbl_name = "{}.{}_alltypes".format(unique_database, unique_name)
def test_compute_incremental_stats(self, vector, unique_name):
"""Asserts compute incremental stats queries generate the expected traces."""
tbl_name = "{}.{}_alltypes".format(self.test_db, unique_name)
# Setup a test table to ensure calculating stats on an existing table does not impact
# other tests.
@@ -1155,15 +780,8 @@ class TestOtelTraceDDLs(TestOtelTrace):
trace_cnt=2,
missing_spans=["AdmissionControl"])
# Assert the one trace matches the refresh table query.
self.assert_trace(
query_id=result.query_id,
query_profile=result.runtime_profile,
cluster_id="trace_ddl",
trace_cnt=2,
missing_spans=["AdmissionControl"])
def test_invalidate_metadata(self, vector):
"""Asserts invalidate metadata queries generate the expected traces."""
result = self.execute_query_expect_success(self.client,
"INVALIDATE METADATA functional.alltypes",
{"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')})

View File

@@ -16,14 +16,34 @@
# under the License.
from __future__ import absolute_import, division, print_function
import json
import logging
import os
import sys
from time import sleep
from tests.common.environ import IMPALA_LOCAL_BUILD_VERSION
from tests.common.file_utils import grep_file_first, wait_for_file_line_count
from tests.common.impala_connection import ERROR, INITIALIZED, PENDING
from tests.util.query_profile_util import (
parse_admission_result,
parse_coordinator,
parse_db_user,
parse_default_db,
parse_impala_query_state,
parse_num_deleted_rows,
parse_num_modified_rows,
parse_num_rows_fetched,
parse_original_query_id,
parse_query_status,
parse_query_type,
parse_retried_query_id,
parse_retry_status,
parse_session_id,
parse_sql,
)
LOG = logging.getLogger(__name__)
# Valid types of OpenTelemetry attribute values.
ATTR_VAL_TYPE_STRING = "string"
@@ -336,3 +356,444 @@ def parse_trace_file(file_path, query_id):
.format(query_id)
return query_trace
def assert_trace(log_file_path, trace_file_path, trace_file_count, query_id,
query_profile, cluster_id, trace_cnt=1, err_span="", missing_spans=[],
async_close=False, exact_trace_cnt=False):
# Parse common values needed in multiple asserts.
session_id = parse_session_id(query_profile)
db_user = parse_db_user(query_profile)
# Wait until all spans are written to the trace file.
wait_for_file_line_count(
file_path=trace_file_path,
expected_line_count=trace_cnt + trace_file_count,
max_attempts=60,
sleep_time_s=1,
backoff=1,
exact_match=exact_trace_cnt)
# Remove missing spans from the expected span count.
expected_span_count = 6 - len(missing_spans)
# Parse the trace json files to get the trace for the query.
trace = parse_trace_file(trace_file_path, query_id)
__assert_trace_common(trace, expected_span_count)
# Retrieve the query status which contains error messages if the query failed.
query_status = parse_query_status(query_profile)
query_status = "" if query_status == "OK" else query_status
impala_query_state = parse_retry_status(query_profile)
if impala_query_state is None:
impala_query_state = parse_impala_query_state(query_profile)
# Determine if the query was retried and if so, get the original query id.
original_query_id = parse_original_query_id(query_profile)
original_query_id = "" if original_query_id is None else original_query_id
# Determine if the query initially failed but has a successful retry under a different
# query id. If so, get the retried query id.
retried_query_id = parse_retried_query_id(query_profile)
retried_query_id = "" if retried_query_id is None else retried_query_id
# Error message should follow on all spans after the errored span
in_error = False
# Retrieve the coordinator from the query profile.
coordinator = parse_coordinator(query_profile)
# Parse the query type from the query profile.
query_type = parse_query_type(query_profile)
if query_type == "N/A":
query_type = "UNKNOWN"
# Assert root span.
root_span_id = __assert_rootspan_attrs(trace.root_span, query_id, session_id,
cluster_id, db_user, "default-pool", impala_query_state, query_status,
original_query_id, retried_query_id, coordinator, log_file_path)
# Assert Init span.
if "Init" not in missing_spans:
span_err_msg = ""
if err_span == "Init":
span_err_msg = query_status
in_error = True
__assert_initspan_attrs(trace.child_spans, root_span_id, query_id, session_id,
cluster_id, db_user, "default-pool", parse_default_db(query_profile),
parse_sql(query_profile).replace('\n', ' '), original_query_id, coordinator,
log_file_path)
# Assert Submitted span.
if "Submitted" not in missing_spans:
span_err_msg = ""
if err_span == "Submitted" or in_error:
span_err_msg = query_status
in_error = True
__assert_submittedspan_attrs(trace.child_spans, root_span_id, query_id, log_file_path)
# Assert Planning span.
if "Planning" not in missing_spans:
status = INITIALIZED
span_err_msg = ""
if err_span == "Planning" or in_error:
span_err_msg = query_status
status = ERROR
in_error = True
__assert_planningspan_attrs(trace.child_spans, root_span_id, query_id,
query_type, span_err_msg, status, log_file_path)
# Assert AdmissionControl span.
if "AdmissionControl" not in missing_spans:
status = PENDING
span_err_msg = ""
if err_span == "AdmissionControl" or in_error:
span_err_msg = query_status
status = ERROR
in_error = True
__assert_admissioncontrol_attrs(trace.child_spans, root_span_id, query_id,
"default-pool", parse_admission_result(query_profile), span_err_msg, status,
log_file_path)
# Assert QueryExecution span.
if "QueryExecution" not in missing_spans:
span_err_msg = ""
if err_span == "QueryExecution" or in_error:
span_err_msg = query_status
in_error = True
__assert_query_exec_attrs(trace.child_spans, query_profile, root_span_id,
query_id, span_err_msg, parse_impala_query_state(query_profile), log_file_path)
# Assert Close span.
if "Close" not in missing_spans:
span_err_msg = ""
if err_span == "Close" or in_error:
span_err_msg = query_status
in_error = True
__assert_close_attrs(trace.child_spans, root_span_id, query_id, span_err_msg,
parse_impala_query_state(query_profile), async_close, log_file_path)
def __assert_trace_common(trace, expected_child_spans_count):
"""
Asserts common structure/fields in resource spans and scope spans of the
OpenTelemetry trace JSON object.
"""
# Assert the number of child spans in the trace.
assert len(trace.child_spans) == expected_child_spans_count, \
"Trace '{}' expected child spans count: {}, actual: {}".format(trace.trace_id,
expected_child_spans_count, len(trace.child_spans))
# Each scope span has a scope object which contains the name and version of the
# OpenTelemetry scope. Assert the scope object sttructure and contents contained
# within the single span at the path resourceSpan[0].scopeSpans[0].scope.
assert trace.root_span.scope_name == "org.apache.impala.impalad.query", \
"Span: '{}' expected: 'org.apache.impala.impalad.query', actual: {}" \
.format(trace.root_span.span_id, trace.root_span.scope_name)
assert trace.root_span.scope_version == "1.0.0", "Span: '{}' expected scope " \
"version '1.0.0', actual: '{}'".format("Root", trace.root_span.scope_version)
# Assert the scope of each child span.
for span in trace.child_spans:
assert span.scope_name == "org.apache.impala.impalad.query", \
"Span: '{}' expected scope name: 'org.apache.impala.impalad.query', " \
"actual: {}".format(span.name, span.scope_name)
assert span.scope_version == "1.0.0", "Span: '{}' expected scope " \
"version '1.0.0', actual: '{}'".format(span.name, span.scope_version)
def __assert_scopespan_common(span, query_id, is_root, name, attributes_count,
status, log_file_path, root_span_id=None, err_msg=""):
"""
Helper function to assert common data points of a single scope span. These spans
contain the actual root and child spans. Assertions include the span object's
structure, span properties, and common span attributes.
- span: The OtelSpan object to assert.
- query_id: The query id of the span.
- is_root: Whether the span is a root span.
- name: The name of the span to assert without the query_id prefix.
- attributes_count: The expected number of attributes unique to the span. If
asserting a child span, adds 7 to this value to account for
attributes common across all child spans.
- status: The expected status of the span. Only used for child spans.
- root_span_id: The root span id of the span.
"""
# Read the span trace id and span id from the Impalad logs.
expected_span_id, expected_trace_id = __find_span_log(log_file_path, name, query_id)
# Assert span properties.
expected_name = query_id
actual_kind = span.kind
if (is_root):
assert span.parent_span_id is None, "Found parentSpanId on root span"
assert actual_kind == 2, "Span '{}' expected kind: '{}', actual: '{}'" \
.format(expected_name, 2, actual_kind)
else:
expected_name += " - {}".format(name)
assert root_span_id is not None
actual = span.parent_span_id
assert actual == root_span_id, "Span '{}' expected parentSpanId: '{}', actual: " \
"'{}'".format(expected_name, root_span_id, actual)
assert actual_kind == 1, "Span '{}' expected kind: '{}', actual: '{}'" \
.format(expected_name, 1, actual)
actual = span.name
assert actual == expected_name, "Expected span name: '{}', actual: '{}'" \
.format(expected_name, actual)
actual = span.trace_id
assert actual == expected_trace_id, "Span '{}' expected traceId: '{}', " \
"actual: '{}'".format(expected_name, expected_trace_id, actual)
actual = span.span_id
assert actual == expected_span_id, "Span '{}' expected spanId: '{}', " \
"actual: '{}'".format(expected_name, expected_span_id, actual)
# Flags must always be 1 which indicates the trace is to be sampled.
expected_flags = 1
actual = span.flags
assert actual == expected_flags, "Span '{}' expected flags: '{}', " \
"actual: '{}'".format(expected_name, expected_flags, actual)
# Assert span attributes.
expected_span_attrs_count = attributes_count if is_root else 7 + attributes_count
assert len(span.attributes) == expected_span_attrs_count, "Span '{}' attributes " \
"must contain exactly {} elements, actual: {}".format(expected_name,
expected_span_attrs_count, len(span.attributes))
if (is_root):
__assert_attr(expected_name, span.attributes, "ErrorMessage", err_msg)
else:
__assert_attr(expected_name, span.attributes, "ErrorMsg", err_msg)
__assert_attr(expected_name, span.attributes, "Name", expected_name)
__assert_attr(expected_name, span.attributes, "Running",
name == "QueryExecution", "boolValue")
__assert_attr(expected_name, span.attributes, "Status", status)
def __find_span_log(log_file_path, span_name, query_id):
"""
Finds the start span log entry for the given span name and query id in the Impalad
logs. This log line contains the trace id and span id for the span which are used
as the expected values when asserting the span properties in the trace file.
"""
span_regex = r'Started \'{}\' span trace_id="(.*?)" span_id="(.*?)" query_id="{}"' \
.format(span_name, query_id)
max_retries = 10
retry_count = 0
LOG.info("Searching for span log entry for span '{}' for query '{}' in log file '{}'"
.format(span_name, query_id, log_file_path))
while retry_count < max_retries:
with open(log_file_path, "r") as f:
span_log = grep_file_first(f, span_regex)
if span_log is not None:
return span_log.group(2), span_log.group(1)
retry_count += 1
sleep(1)
raise Exception("Exceeded maximum retries to find span log entry for span '{}' "
"and query '{}'".format(span_name, query_id))
def __assert_attr(span_name, attributes, expected_key, expected_value,
expected_type="stringValue"):
"""
Helper function to assert that a specific OpenTelemetry attribute exists in a span.
"""
assert expected_type in ("stringValue", "boolValue", "intValue"), "Invalid " \
"expected_type '{}', must be one of 'stringValue', 'boolValue', or 'intValue'" \
.format(expected_type)
val = attributes[expected_key]
assert val is not None, "Span '{}' attribute not found: '{}', actual attributes: {}" \
.format(span_name, expected_key, attributes)
assert val.value == expected_value, "Span '{}' attribute '{}' expected: '{}', " \
"actual: '{}'".format(span_name, expected_key, expected_value, val.value)
if expected_type == "boolValue":
expected_type = ATTR_VAL_TYPE_BOOL
elif expected_type == "intValue":
expected_type = ATTR_VAL_TYPE_INT
else:
expected_type = ATTR_VAL_TYPE_STRING
assert val.get_type() == expected_type, "Span '{}' attribute '{}' expected to be " \
"of type '{}', actual: '{}'".format(span_name, expected_key, expected_type,
val.get_type())
def __assert_span_events(span, expected_events=[]):
"""
Helper function to assert that a span contains the expected span events.
"""
assert len(expected_events) == len(span.events), "Span '{}' expected to have " \
"exactly {} events, actual: {}".format(span.name, len(expected_events),
len(span.events))
for event in expected_events:
assert event in span.events, "Expected '{}' event on span '{}' but " \
"no such events was found.".format(event, span.name)
def __assert_rootspan_attrs(span, query_id, session_id, cluster_id, user_name,
request_pool, state, err_msg, original_query_id, retried_query_id, coordinator,
log_file_path):
"""
Helper function that asserts the common attributes in the root span.
"""
root_span_id, _ = __find_span_log(log_file_path, "Root", query_id)
__assert_scopespan_common(span, query_id, True, "Root", 14, "", log_file_path, None,
err_msg)
__assert_attr(span.name, span.attributes, "QueryId", query_id)
__assert_attr(span.name, span.attributes, "SessionId", session_id)
__assert_attr(span.name, span.attributes, "ClusterId", cluster_id)
__assert_attr(span.name, span.attributes, "UserName", user_name)
__assert_attr(span.name, span.attributes, "RequestPool", request_pool)
__assert_attr(span.name, span.attributes, "State", state)
__assert_attr(span.name, span.attributes, "OriginalQueryId", original_query_id)
__assert_attr(span.name, span.attributes, "RetriedQueryId", retried_query_id)
__assert_attr(span.name, span.attributes, "Coordinator", coordinator)
return root_span_id
def __assert_initspan_attrs(spans, root_span_id, query_id, session_id, cluster_id,
user_name, request_pool, default_db, query_string, original_query_id, coordinator,
log_file_path):
"""
Helper function that asserts the common and span-specific attributes in the
init span.
"""
# Locate the init span and assert.
init_span = __find_span(spans, "Init", query_id)
__assert_scopespan_common(init_span, query_id, False, "Init", 9, INITIALIZED,
log_file_path, root_span_id)
__assert_attr(init_span.name, init_span.attributes, "QueryId", query_id)
__assert_attr(init_span.name, init_span.attributes, "SessionId", session_id)
__assert_attr(init_span.name, init_span.attributes, "ClusterId", cluster_id)
__assert_attr(init_span.name, init_span.attributes, "UserName", user_name)
__assert_attr(init_span.name, init_span.attributes, "RequestPool", request_pool)
__assert_attr(init_span.name, init_span.attributes, "DefaultDb", default_db)
__assert_attr(init_span.name, init_span.attributes, "QueryString", query_string)
__assert_attr(init_span.name, init_span.attributes, "OriginalQueryId",
original_query_id)
__assert_attr(init_span.name, init_span.attributes, "Coordinator", coordinator)
__assert_span_events(init_span)
def __assert_submittedspan_attrs(spans, root_span_id, query_id, log_file_path):
"""
Helper function that asserts the common attributes in the submitted span.
"""
submitted_span = __find_span(spans, "Submitted", query_id)
__assert_scopespan_common(submitted_span, query_id, False, "Submitted", 0, INITIALIZED,
log_file_path, root_span_id)
__assert_span_events(submitted_span)
def __assert_planningspan_attrs(spans, root_span_id, query_id, query_type, err_msg,
status, log_file_path):
"""
Helper function that asserts the common and span-specific attributes in the
planning execution span.
"""
planning_span = __find_span(spans, "Planning", query_id)
__assert_scopespan_common(planning_span, query_id, False, "Planning", 1, status,
log_file_path, root_span_id, err_msg)
__assert_attr(planning_span.name, planning_span.attributes, "QueryType", query_type)
__assert_span_events(planning_span)
def __assert_admissioncontrol_attrs(spans, root_span_id, query_id, request_pool,
adm_result, err_msg, status, log_file_path):
"""
Helper function that asserts the common and span-specific attributes in the
admission control span.
"""
queued = False if adm_result == "Admitted immediately" \
or adm_result == "Admitted as a trivial query" else True
adm_ctrl_span = __find_span(spans, "AdmissionControl", query_id)
__assert_scopespan_common(adm_ctrl_span, query_id, False, "AdmissionControl", 3, status,
log_file_path, root_span_id, err_msg)
__assert_attr(adm_ctrl_span.name, adm_ctrl_span.attributes, "Queued", queued,
"boolValue")
__assert_attr(adm_ctrl_span.name, adm_ctrl_span.attributes, "AdmissionResult",
adm_result)
__assert_attr(adm_ctrl_span.name, adm_ctrl_span.attributes, "RequestPool", request_pool)
if queued:
__assert_span_events(adm_ctrl_span, ["Queued"])
else:
__assert_span_events(adm_ctrl_span)
def __assert_query_exec_attrs(spans, query_profile, root_span_id, query_id,
err_msg, status, log_file_path):
"""
Helper function that asserts the common and span-specific attributes in the
query execution span.
"""
query_exec_span = __find_span(spans, "QueryExecution", query_id)
__assert_scopespan_common(query_exec_span, query_id, False, "QueryExecution", 3, status,
log_file_path, root_span_id, err_msg)
__assert_attr(query_exec_span.name, query_exec_span.attributes, "NumModifiedRows",
parse_num_modified_rows(query_profile), "intValue")
__assert_attr(query_exec_span.name, query_exec_span.attributes, "NumDeletedRows",
parse_num_deleted_rows(query_profile), "intValue")
__assert_attr(query_exec_span.name, query_exec_span.attributes, "NumRowsFetched",
parse_num_rows_fetched(query_profile), "intValue")
# TODO: IMPALA-14334 - Assert QueryExecution span events
def __assert_close_attrs(spans, root_span_id, query_id, err_msg, status, async_close,
log_file_path):
"""
Helper function that asserts the common and span-specific attributes in the
close span.
"""
close_span = __find_span(spans, "Close", query_id)
__assert_scopespan_common(close_span, query_id, False, "Close", 0, status,
log_file_path, root_span_id, err_msg)
expected_events = ["QueryUnregistered"]
if async_close and "ReleasedAdmissionControlResources" in close_span.events:
expected_events.append("ReleasedAdmissionControlResources")
# TODO: IMPALA-14334 - Assert Close span events
def __find_span(spans, name, query_id):
"""
Helper function to find a span by name in a list of OtelSpan objects.
"""
for s in spans:
if s.name.endswith(name):
return s
assert False, "Span '{}' not found for query '{}'".format(name, query_id)