diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py index 04b34cfe0..76fa48ce6 100644 --- a/tests/common/custom_cluster_test_suite.py +++ b/tests/common/custom_cluster_test_suite.py @@ -679,7 +679,7 @@ class CustomClusterTestSuite(ImpalaTestSuite): return cls.cluster.impalads[0].service if cls.cluster.impalads else None def query_id_from_ui(self, section, match_func=None, match_query=None, coord_idx=0, - not_found_ok=False): + max_attempts=30, sleep_time_s=1): """ Calls to the debug UI's queries page and loops over all queries in the specified section calling the provided func for each query, Returns the string id of the @@ -696,8 +696,8 @@ class CustomClusterTestSuite(ImpalaTestSuite): is used instead of match_func. coord_idx: Index of the Impalad to use as the coordinator. This is used to determine which impalad's UI to query. - not_found_ok: If True, returns None when no matching query is found. If False, - fails an assert when no matching query is found. + max_attempts: Number of times to poll the queries page before giving up. + sleep_time_s: Number of seconds to sleep between polls of the queries page. Returns: String of the query id of the first matching query or None if no query matches. @@ -712,20 +712,23 @@ class CustomClusterTestSuite(ImpalaTestSuite): if match_query is not None: match_query = match_query.lower().strip() - service = self.cluster.impalads[coord_idx].service - queries_json = service.get_debug_webpage_json('/queries') + attempts = 0 - for query in queries_json[section]: - if (match_query is not None and query["stmt"].lower() == match_query.lower()) \ - or (match_func is not None and match_func(query)): - query_id = query['query_id'] - return query_id, service.read_query_profile_page(query_id) + while attempts < max_attempts: + service = self.cluster.impalads[coord_idx].service + queries_json = service.get_debug_webpage_json('/queries') - if not_found_ok: - return None, None - else: - assert False, "No matching query found in section '{}'".format(section) + for query in queries_json[section]: + if (match_query is not None and query["stmt"].lower() == match_query.lower()) \ + or (match_func is not None and match_func(query)): + query_id = query['query_id'] + return query_id, service.read_query_profile_page(query_id) + attempts += 1 + sleep(sleep_time_s) + + assert False, "No matching query found in section '{}' after {} " \ + "attempts".format(section, max_attempts) def query_profile_from_ui(self, query_id, coord_idx=0): """ diff --git a/tests/common/file_utils.py b/tests/common/file_utils.py index f0744024d..10a0cc5fc 100644 --- a/tests/common/file_utils.py +++ b/tests/common/file_utils.py @@ -171,6 +171,24 @@ def grep_file(file, search): return matching_lines +def grep_file_first(file, search): + """ + Searches for a pattern in a file and returns the first match. If no match is found, + returns None. + + file: An opened file object to search within. + search: A string containing the regex pattern to search for. + + return: The first regex search() return object if found, otherwise None. + """ + matcher = re.compile(search) + for line in file: + res = matcher.search(line) + if res is not None: + return res + return None + + def assert_file_in_dir_contains(dir, search): '''Asserts that at least one file in the 'dir' contains the 'search' term.''' results = grep_dir(dir, search) diff --git a/tests/custom_cluster/test_otel_trace.py b/tests/custom_cluster/test_otel_trace.py index e01108b60..967b516cc 100644 --- a/tests/custom_cluster/test_otel_trace.py +++ b/tests/custom_cluster/test_otel_trace.py @@ -16,22 +16,18 @@ # under the License. from __future__ import absolute_import, division, print_function - -from threading import Thread +from datetime import datetime +from random import choice +from string import ascii_lowercase from time import sleep from impala.error import HiveServer2Error from tests.common.custom_cluster_test_suite import CustomClusterTestSuite -from tests.common.file_utils import wait_for_file_line_count, count_lines -from tests.common.impala_connection import ERROR, RUNNING, FINISHED, INITIALIZED, PENDING -from tests.common.test_vector import BEESWAX, HS2, ImpalaTestDimension -from tests.util.otel_trace import parse_trace_file, ATTR_VAL_TYPE_STRING, \ - ATTR_VAL_TYPE_INT, ATTR_VAL_TYPE_BOOL -from tests.util.query_profile_util import parse_db_user, parse_session_id, parse_sql, \ - parse_query_type, parse_query_status, parse_impala_query_state, parse_query_id, \ - parse_retry_status, parse_original_query_id, parse_retried_query_id, \ - parse_num_rows_fetched, parse_admission_result, parse_default_db, \ - parse_num_modified_rows, parse_num_deleted_rows, parse_coordinator +from tests.common.file_utils import count_lines, wait_for_file_line_count +from tests.common.impala_connection import ERROR, FINISHED, PENDING, RUNNING +from tests.common.test_vector import BEESWAX, ImpalaTestDimension +from tests.util.otel_trace import assert_trace +from tests.util.query_profile_util import parse_query_id, parse_retry_status from tests.util.retry import retry OUT_DIR = "out_dir_traces" @@ -40,431 +36,46 @@ TRACE_FLAGS = "--otel_trace_enabled=true --otel_trace_exporter=file " \ "--otel_file_flush_interval_ms=500 " \ "--otel_file_pattern={out_dir_traces}/" + TRACE_FILE -class TestOtelTrace(CustomClusterTestSuite): + +class TestOtelTraceBase(CustomClusterTestSuite): def setup_method(self, method): - super(TestOtelTrace, self).setup_method(method) + super(TestOtelTraceBase, self).setup_method(method) self.assert_impalad_log_contains("INFO", "join Impala Service pool") self.trace_file_path = "{}/{}".format(self.get_tmp_dir(OUT_DIR), TRACE_FILE) self.trace_file_count = count_lines(self.trace_file_path, True) def assert_trace(self, query_id, query_profile, cluster_id, trace_cnt=1, err_span="", missing_spans=[], async_close=False, exact_trace_cnt=False): - # Parse common values needed in multiple asserts. - session_id = parse_session_id(query_profile) - db_user = parse_db_user(query_profile) - - # Wait until all spans are written to the trace file. - wait_for_file_line_count( - file_path=self.trace_file_path, - expected_line_count=trace_cnt + self.trace_file_count, - max_attempts=60, - sleep_time_s=1, - backoff=1, - exact_match=exact_trace_cnt) - - # Remove missing spans from the expected span count. - expected_span_count = 6 - len(missing_spans) - - # Parse the trace json files to get the trace for the query. - trace = parse_trace_file(self.trace_file_path, query_id) - self.__assert_trace_common(trace, expected_span_count) - - # Retrieve the query status which contains error messages if the query failed. - query_status = parse_query_status(query_profile) - query_status = "" if query_status == "OK" else query_status - - impala_query_state = parse_retry_status(query_profile) - if impala_query_state is None: - impala_query_state = parse_impala_query_state(query_profile) - - # Determine if the query was retried and if so, get the original query id. - original_query_id = parse_original_query_id(query_profile) - original_query_id = "" if original_query_id is None else original_query_id - - # Determine if the query initially failed but has a successful retry under a different - # query id. If so, get the retried query id. - retried_query_id = parse_retried_query_id(query_profile) - retried_query_id = "" if retried_query_id is None else retried_query_id - - # Error message should follow on all spans after the errored span - in_error = False - - # Retrieve the coordinator from the query profile. - coordinator = parse_coordinator(query_profile) - - # Parse the query type from the query profile. - query_type = parse_query_type(query_profile) - if query_type == "N/A": - query_type = "UNKNOWN" - - # Assert root span. - root_span_id = self.__assert_rootspan_attrs(trace.root_span, query_id, session_id, - cluster_id, db_user, "default-pool", impala_query_state, query_status, - original_query_id, retried_query_id, coordinator) - - # Assert Init span. - if "Init" not in missing_spans: - span_err_msg = "" - if err_span == "Init": - span_err_msg = query_status - in_error = True - self.__assert_initspan_attrs(trace.child_spans, root_span_id, query_id, session_id, - cluster_id, db_user, "default-pool", parse_default_db(query_profile), - parse_sql(query_profile).replace('\n', ' '), original_query_id, coordinator) - - # Assert Submitted span. - if "Submitted" not in missing_spans: - span_err_msg = "" - if err_span == "Submitted" or in_error: - span_err_msg = query_status - in_error = True - self.__assert_submittedspan_attrs(trace.child_spans, root_span_id, query_id) - - # Assert Planning span. - if "Planning" not in missing_spans: - status = INITIALIZED - span_err_msg = "" - if err_span == "Planning" or in_error: - span_err_msg = query_status - status = ERROR - in_error = True - self.__assert_planningspan_attrs(trace.child_spans, root_span_id, query_id, - query_type, span_err_msg, status) - - # Assert AdmissionControl span. - if "AdmissionControl" not in missing_spans: - status = PENDING - span_err_msg = "" - if err_span == "AdmissionControl" or in_error: - span_err_msg = query_status - status = ERROR - in_error = True - self.__assert_admissioncontrol_attrs(trace.child_spans, root_span_id, query_id, - "default-pool", parse_admission_result(query_profile), span_err_msg, status) - - # Assert QueryExecution span. - if "QueryExecution" not in missing_spans: - span_err_msg = "" - if err_span == "QueryExecution" or in_error: - span_err_msg = query_status - in_error = True - self.__assert_query_exec_attrs(trace.child_spans, query_profile, root_span_id, - query_id, span_err_msg, parse_impala_query_state(query_profile)) - - # Assert Close span. - if "Close" not in missing_spans: - span_err_msg = "" - if err_span == "Close" or in_error: - span_err_msg = query_status - in_error = True - self.__assert_close_attrs(trace.child_spans, root_span_id, query_id, span_err_msg, - parse_impala_query_state(query_profile), async_close) - - def __assert_trace_common(self, trace, expected_child_spans_count): - """ - Asserts common structure/fields in resource spans and scope spans of the - OpenTelemetry trace JSON object. - """ - - # Assert the number of child spans in the trace. - assert len(trace.child_spans) == expected_child_spans_count, \ - "Trace '{}' expected child spans count: {}, actual: {}".format(trace.trace_id, - expected_child_spans_count, len(trace.child_spans)) - - # Each scope span has a scope object which contains the name and version of the - # OpenTelemetry scope. Assert the scope object sttructure and contents contained - # within the single span at the path resourceSpan[0].scopeSpans[0].scope. - assert trace.root_span.scope_name == "org.apache.impala.impalad.query", \ - "Span: '{}' expected: 'org.apache.impala.impalad.query', actual: {}" \ - .format(trace.root_span.span_id, trace.root_span.scope_name) - assert trace.root_span.scope_version == "1.0.0", "Span: '{}' expected scope " \ - "version '1.0.0', actual: '{}'".format("Root", trace.root_span.scope_version) - - # Assert the scope of each child span. - for span in trace.child_spans: - assert span.scope_name == "org.apache.impala.impalad.query", \ - "Span: '{}' expected scope name: 'org.apache.impala.impalad.query', " \ - "actual: {}".format(span.name, span.scope_name) - assert span.scope_version == "1.0.0", "Span: '{}' expected scope " \ - "version '1.0.0', actual: '{}'".format(span.name, span.scope_version) - - def __assert_scopespan_common(self, span, query_id, is_root, name, attributes_count, - status, root_span_id=None, err_msg=""): - """ - Helper function to assert common data points of a single scope span. These spans - contain the actual root and child spans. Assertions include the span object's - structure, span properties, and common span attributes. - - span: The OtelSpan object to assert. - - query_id: The query id of the span. - - is_root: Whether the span is a root span. - - name: The name of the span to assert without the query_id prefix. - - attributes_count: The expected number of attributes unique to the span. If - asserting a child span, adds 7 to this value to account for - attributes common across all child spans. - - status: The expected status of the span. Only used for child spans. - - root_span_id: The root span id of the span. - """ - - # Read the span trace id and span id from the Impalad logs. - expected_span_id, expected_trace_id = self.__find_span_log(name, query_id) - - # Assert span properties. - expected_name = query_id - actual_kind = span.kind - - if (is_root): - assert span.parent_span_id is None, "Found parentSpanId on root span" - assert actual_kind == 2, "Span '{}' expected kind: '{}', actual: '{}'" \ - .format(expected_name, 2, actual_kind) - else: - expected_name += " - {}".format(name) - - assert root_span_id is not None - actual = span.parent_span_id - assert actual == root_span_id, "Span '{}' expected parentSpanId: '{}', actual: " \ - "'{}'".format(expected_name, root_span_id, actual) - - assert actual_kind == 1, "Span '{}' expected kind: '{}', actual: '{}'" \ - .format(expected_name, 1, actual) - - actual = span.name - assert actual == expected_name, "Expected span name: '{}', actual: '{}'" \ - .format(expected_name, actual) - - actual = span.trace_id - assert actual == expected_trace_id, "Span '{}' expected traceId: '{}', " \ - "actual: '{}'".format(expected_name, expected_trace_id, actual) - - actual = span.span_id - assert actual == expected_span_id, "Span '{}' expected spanId: '{}', " \ - "actual: '{}'".format(expected_name, expected_span_id, actual) - - # Flags must always be 1 which indicates the trace is to be sampled. - expected_flags = 1 - actual = span.flags - assert actual == expected_flags, "Span '{}' expected flags: '{}', " \ - "actual: '{}'".format(expected_name, expected_flags, actual) - - # Assert span attributes. - expected_span_attrs_count = attributes_count if is_root else 7 + attributes_count - assert len(span.attributes) == expected_span_attrs_count, "Span '{}' attributes " \ - "must contain exactly {} elements, actual: {}".format(expected_name, - expected_span_attrs_count, len(span.attributes)) - - if (is_root): - self.__assert_attr(expected_name, span.attributes, "ErrorMessage", err_msg) - else: - self.__assert_attr(expected_name, span.attributes, "ErrorMsg", err_msg) - self.__assert_attr(expected_name, span.attributes, "Name", expected_name) - self.__assert_attr(expected_name, span.attributes, "Running", - name == "QueryExecution", "boolValue") - self.__assert_attr(expected_name, span.attributes, "Status", status) - - def __find_span_log(self, span_name, query_id): - """ - Finds the start span log entry for the given span name and query id in the Impalad - logs. This log line contains the trace id and span id for the span which are used - as the expected values when asserting the span properties in the trace file. - """ - span_regex = r'Started \'{}\' span trace_id="(.*?)" span_id="(.*?)" query_id="{}"' \ - .format(span_name, query_id) - span_log = self.assert_impalad_log_contains("INFO", span_regex) - trace_id = span_log.group(1) - span_id = span_log.group(2) - - return span_id, trace_id - - def __assert_attr(self, span_name, attributes, expected_key, expected_value, - expected_type="stringValue"): - """ - Helper function to assert that a specific OpenTelemetry attribute exists in a span. - """ - - assert expected_type in ("stringValue", "boolValue", "intValue"), "Invalid " \ - "expected_type '{}', must be one of 'stringValue', 'boolValue', or 'intValue'" \ - .format(expected_type) - - val = attributes[expected_key] - assert val is not None, "Span '{}' attribute not found: '{}', actual attributes: {}" \ - .format(span_name, expected_key, attributes) - assert val.value == expected_value, "Span '{}' attribute '{}' expected: '{}', " \ - "actual: '{}'".format(span_name, expected_key, expected_value, val.value) - - if expected_type == "boolValue": - expected_type = ATTR_VAL_TYPE_BOOL - elif expected_type == "intValue": - expected_type = ATTR_VAL_TYPE_INT - else: - expected_type = ATTR_VAL_TYPE_STRING - - assert val.get_type() == expected_type, "Span '{}' attribute '{}' expected to be " \ - "of type '{}', actual: '{}'".format(span_name, expected_key, expected_type, - val.get_type()) - - def __assert_span_events(self, span, expected_events=[]): - """ - Helper function to assert that a span contains the expected span events. - """ - assert len(expected_events) == len(span.events), "Span '{}' expected to have " \ - "exactly {} events, actual: {}".format(span.name, len(expected_events), - len(span.events)) - - for event in expected_events: - assert event in span.events, "Expected '{}' event on span '{}' but " \ - "no such events was found.".format(event, span.name) - - def __assert_rootspan_attrs(self, span, query_id, session_id, cluster_id, user_name, - request_pool, state, err_msg, original_query_id, retried_query_id, coordinator): - """ - Helper function that asserts the common attributes in the root span. - """ - - root_span_id, _ = self.__find_span_log("Root", query_id) - self.__assert_scopespan_common(span, query_id, True, "Root", 14, "", None, err_msg) - - self.__assert_attr(span.name, span.attributes, "QueryId", query_id) - self.__assert_attr(span.name, span.attributes, "SessionId", session_id) - self.__assert_attr(span.name, span.attributes, "ClusterId", cluster_id) - self.__assert_attr(span.name, span.attributes, "UserName", user_name) - self.__assert_attr(span.name, span.attributes, "RequestPool", request_pool) - self.__assert_attr(span.name, span.attributes, "State", state) - self.__assert_attr(span.name, span.attributes, "OriginalQueryId", original_query_id) - self.__assert_attr(span.name, span.attributes, "RetriedQueryId", retried_query_id) - self.__assert_attr(span.name, span.attributes, "Coordinator", coordinator) - - return root_span_id - - def __assert_initspan_attrs(self, spans, root_span_id, query_id, session_id, cluster_id, - user_name, request_pool, default_db, query_string, original_query_id, coordinator): - """ - Helper function that asserts the common and span-specific attributes in the - init span. - """ - - # Locate the init span and assert. - init_span = self.__find_span(spans, "Init", query_id) - - self.__assert_scopespan_common(init_span, query_id, False, "Init", 9, INITIALIZED, - root_span_id) - - self.__assert_attr(init_span.name, init_span.attributes, "QueryId", query_id) - self.__assert_attr(init_span.name, init_span.attributes, "SessionId", session_id) - self.__assert_attr(init_span.name, init_span.attributes, "ClusterId", cluster_id) - self.__assert_attr(init_span.name, init_span.attributes, "UserName", user_name) - self.__assert_attr(init_span.name, init_span.attributes, "RequestPool", request_pool) - self.__assert_attr(init_span.name, init_span.attributes, "DefaultDb", default_db) - self.__assert_attr(init_span.name, init_span.attributes, "QueryString", query_string) - self.__assert_attr(init_span.name, init_span.attributes, "OriginalQueryId", - original_query_id) - self.__assert_attr(init_span.name, init_span.attributes, "Coordinator", coordinator) - - self.__assert_span_events(init_span) - - def __assert_submittedspan_attrs(self, spans, root_span_id, query_id): - """ - Helper function that asserts the common attributes in the submitted span. - """ - - submitted_span = self.__find_span(spans, "Submitted", query_id) - self.__assert_scopespan_common(submitted_span, query_id, False, "Submitted", 0, - INITIALIZED, root_span_id) - - self.__assert_span_events(submitted_span) - - def __assert_planningspan_attrs(self, spans, root_span_id, query_id, query_type, - err_msg="", status=INITIALIZED): - """ - Helper function that asserts the common and span-specific attributes in the - planning execution span. - """ - - planning_span = self.__find_span(spans, "Planning", query_id) - self.__assert_scopespan_common(planning_span, query_id, False, "Planning", 1, - status, root_span_id, err_msg) - self.__assert_attr(planning_span.name, planning_span.attributes, "QueryType", - query_type) - - self.__assert_span_events(planning_span) - - def __assert_admissioncontrol_attrs(self, spans, root_span_id, query_id, request_pool, - adm_result, err_msg, status): - """ - Helper function that asserts the common and span-specific attributes in the - admission control span. - """ - - queued = False if adm_result == "Admitted immediately" \ - or adm_result == "Admitted as a trivial query" else True - - adm_ctrl_span = self.__find_span(spans, "AdmissionControl", query_id) - self.__assert_scopespan_common(adm_ctrl_span, query_id, False, "AdmissionControl", 3, - status, root_span_id, err_msg) - self.__assert_attr(adm_ctrl_span.name, adm_ctrl_span.attributes, "Queued", - queued, "boolValue") - self.__assert_attr(adm_ctrl_span.name, adm_ctrl_span.attributes, "AdmissionResult", - adm_result) - self.__assert_attr(adm_ctrl_span.name, adm_ctrl_span.attributes, "RequestPool", - request_pool) - - if queued: - self.__assert_span_events(adm_ctrl_span, ["Queued"]) - else: - self.__assert_span_events(adm_ctrl_span) - - def __assert_query_exec_attrs(self, spans, query_profile, root_span_id, query_id, - err_msg, status): - """ - Helper function that asserts the common and span-specific attributes in the - query execution span. - """ - - query_exec_span = self.__find_span(spans, "QueryExecution", query_id) - self.__assert_scopespan_common(query_exec_span, query_id, False, "QueryExecution", 3, - status, root_span_id, err_msg) - self.__assert_attr(query_exec_span.name, query_exec_span.attributes, - "NumModifiedRows", parse_num_modified_rows(query_profile), "intValue") - self.__assert_attr(query_exec_span.name, query_exec_span.attributes, "NumDeletedRows", - parse_num_deleted_rows(query_profile), "intValue") - self.__assert_attr(query_exec_span.name, query_exec_span.attributes, "NumRowsFetched", - parse_num_rows_fetched(query_profile), "intValue") - - # TODO: IMPALA-14334 - Assert QueryExecution span events - - def __assert_close_attrs(self, spans, root_span_id, query_id, err_msg, status, - async_close): - """ - Helper function that asserts the common and span-specific attributes in the - close span. - """ - - close_span = self.__find_span(spans, "Close", query_id) - self.__assert_scopespan_common(close_span, query_id, False, "Close", 0, status, - root_span_id, err_msg) - - expected_events = ["QueryUnregistered"] - if async_close and "ReleasedAdmissionControlResources" in close_span.events: - expected_events.append("ReleasedAdmissionControlResources") - self.__assert_span_events(close_span, expected_events) - - def __find_span(self, spans, name, query_id): - """ - Helper function to find a span by name in a list of OtelSpan objects. - """ - - for s in spans: - if s.name.endswith(name): - return s - - assert False, "Span '{}' not found for query '{}'".format(name, query_id) + """Helper method to assert a trace exists in the trace file with the required inputs + for log file path, trace file path, and trace file line count (that was determined + before the test ran).""" + assert_trace(self.build_log_path("impalad", "INFO"), self.trace_file_path, + self.trace_file_count, query_id, query_profile, cluster_id, trace_cnt, err_span, + missing_spans, async_close, exact_trace_cnt) @CustomClusterTestSuite.with_args( impalad_args="-v=2 --cluster_id=select_dml {}".format(TRACE_FLAGS), cluster_size=1, tmp_dir_placeholders=[OUT_DIR], disable_log_buffering=True) -class TestOtelTraceSelectsDMLs(TestOtelTrace): +class TestOtelTraceSelectsDMLs(TestOtelTraceBase): """Tests that exercise OpenTelemetry tracing behavior for select and dml queries.""" + @classmethod + def setup_class(cls): + super(TestOtelTraceSelectsDMLs, cls).setup_class() + + cls.test_db = "test_otel_trace_selects_dmls{}_{}".format( + datetime.now().strftime("%Y%m%d%H%M%S"), + "".join(choice(ascii_lowercase) for _ in range(7))) + cls.execute_query_expect_success(cls.client, "CREATE DATABASE {}".format(cls.test_db)) + + @classmethod + def teardown_class(cls): + cls.execute_query_expect_success(cls.client, "DROP DATABASE {} CASCADE" + .format(cls.test_db)) + super(TestOtelTraceSelectsDMLs, cls).teardown_class() + def setup_method(self, method): super(TestOtelTraceSelectsDMLs, self).setup_method(method) self.client.clear_configuration() @@ -492,6 +103,7 @@ class TestOtelTraceSelectsDMLs(TestOtelTrace): cluster_id="select_dml") def test_invalid_sql(self): + """Asserts that queries with invalid SQL still generate the expected traces.""" query = "SELECT * FROM functional.alltypes WHERE field_does_not_exist=1" self.execute_query_expect_failure(self.client, query) @@ -532,7 +144,7 @@ class TestOtelTraceSelectsDMLs(TestOtelTrace): { "abort_on_error": 1, "batch_size": 0, - "debug_action": "3:PREPARE:FAIL|COORD_BEFORE_EXEC_RPC:SLEEP@5000", + "debug_action": "3:PREPARE:FAIL", "disable_codegen": 0, "disable_codegen_rows_threshold": 0, "exec_single_node_rows_threshold": 0, @@ -542,7 +154,7 @@ class TestOtelTraceSelectsDMLs(TestOtelTrace): }) query_id, profile = self.query_id_from_ui(section="completed_queries", - match_query=query) + match_query=query) self.assert_trace( query_id=query_id, @@ -552,6 +164,7 @@ class TestOtelTraceSelectsDMLs(TestOtelTrace): async_close=True) def test_select_timeout(self): + """Asserts queries that timeout generate the expected traces.""" query = "SELECT * FROM functional.alltypes WHERE id=sleep(5000)" self.execute_query_expect_failure(self.client, query, {"exec_time_limit_s": "1"}) @@ -569,6 +182,7 @@ class TestOtelTraceSelectsDMLs(TestOtelTrace): async_close=True) def test_select_empty(self): + """Asserts empty queries do not generate traces.""" self.execute_query_expect_failure(self.client, "") # Run a query that will succeed to ensure all traces have been flushed. @@ -589,6 +203,7 @@ class TestOtelTraceSelectsDMLs(TestOtelTrace): exact_trace_cnt=True) def test_select_union(self): + """Asserts queries with a union generate the expected traces.""" result = self.execute_query_expect_success(self.client, "SELECT * FROM " "functional.alltypes LIMIT 5 UNION ALL SELECT * FROM functional.alltypessmall") @@ -599,6 +214,7 @@ class TestOtelTraceSelectsDMLs(TestOtelTrace): exact_trace_cnt=True) def test_dml_timeout(self): + """Asserts insert DMLs that timeout generate the expected traces.""" query = "INSERT INTO functional.alltypes (id, string_col, year, month) VALUES " \ "(99999, 'foo', 2025, 1)" self.execute_query_expect_failure(self.client, query, @@ -617,9 +233,9 @@ class TestOtelTraceSelectsDMLs(TestOtelTrace): err_span="QueryExecution", async_close=True) - def test_dml_update(self, unique_database, unique_name): - # IMPALA-14340: Cannot update a table that has the same name as the database. - tbl = "{}.{}_tbl".format(unique_database, unique_name) + def test_dml_update(self, unique_name): + """Asserts update DMLs generate the expected traces.""" + tbl = "{}.{}".format(self.test_db, unique_name) self.execute_query_expect_success(self.client, "CREATE TABLE {} (id int, " "string_col string) STORED AS ICEBERG TBLPROPERTIES('format-version'='2')" @@ -635,11 +251,11 @@ class TestOtelTraceSelectsDMLs(TestOtelTrace): query_id=result.query_id, query_profile=result.runtime_profile, cluster_id="select_dml", - trace_cnt=4) + trace_cnt=3) - def test_dml_delete(self, unique_database, unique_name): - # IMPALA-14340: Cannot delete from a table that has the same name as the database. - tbl = "{}.{}_tbl".format(unique_database, unique_name) + def test_dml_delete(self, unique_name): + """Asserts delete DMLs generate the expected traces.""" + tbl = "{}.{}".format(self.test_db, unique_name) self.execute_query_expect_success(self.client, "CREATE TABLE {} (id int, " "string_col string) STORED AS ICEBERG TBLPROPERTIES('format-version'='2')" @@ -655,15 +271,16 @@ class TestOtelTraceSelectsDMLs(TestOtelTrace): query_id=result.query_id, query_profile=result.runtime_profile, cluster_id="select_dml", - trace_cnt=4) + trace_cnt=3) - def test_dml_delete_join(self, unique_database, unique_name): - tbl1 = "{}.{}_1".format(unique_database, unique_name) + def test_dml_delete_join(self, unique_name): + """Asserts delete join DMLs generate the expected traces.""" + tbl1 = "{}.{}_1".format(self.test_db, unique_name) self.execute_query_expect_success(self.client, "CREATE TABLE {} STORED AS ICEBERG " "TBLPROPERTIES('format-version'='2') AS SELECT id, bool_col, int_col, year, month " "FROM functional.alltypes ORDER BY id limit 100".format(tbl1)) - tbl2 = "{}.{}_2".format(unique_database, unique_name) + tbl2 = "{}.{}_2".format(self.test_db, unique_name) self.execute_query_expect_success(self.client, "CREATE TABLE {} STORED AS ICEBERG " "TBLPROPERTIES('format-version'='2') AS SELECT id, bool_col, int_col, year, month " "FROM functional.alltypessmall ORDER BY id LIMIT 100".format(tbl2)) @@ -675,23 +292,23 @@ class TestOtelTraceSelectsDMLs(TestOtelTrace): query_id=result.query_id, query_profile=result.runtime_profile, cluster_id="select_dml", - trace_cnt=5) + trace_cnt=3) - def test_ignored_queries(self, unique_database, unique_name): + def test_ignored_queries(self, unique_name): """Asserts queries that should not generate traces do not generate traces.""" - tbl = "{}.{}".format(unique_database, unique_name) + tbl = "{}.{}".format(self.test_db, unique_name) res_create = self.execute_query_expect_success(self.client, "CREATE TABLE {} (a int)".format(tbl)) # These queries are not expected to have traces created for them. ignore_queries = [ - "COMMENT ON DATABASE {} IS 'test'".format(unique_database), + "COMMENT ON DATABASE {} IS 'test'".format(self.test_db), "DESCRIBE {}".format(tbl), "EXPLAIN SELECT * FROM {}".format(tbl), "REFRESH FUNCTIONS functional", "REFRESH functional.alltypes", "SET ALL", - "SHOW TABLES IN {}".format(unique_database), + "SHOW TABLES IN {}".format(self.test_db), "SHOW DATABASES", "TRUNCATE TABLE {}".format(tbl), "USE functional", @@ -716,13 +333,11 @@ class TestOtelTraceSelectsDMLs(TestOtelTrace): .format(tbl)) # Ensure the expected number of traces are present in the trace file. - # The expected line count is 4 because: - # 1. unique_database fixture runs drop database - # 2. unique_database fixture runs create database - # 3. test runs create table - # 4. test runs drop table + # The expected line count is 2 because: + # 1. test runs create table + # 2. test runs drop table wait_for_file_line_count(file_path=self.trace_file_path, - expected_line_count=4 + self.trace_file_count, max_attempts=10, sleep_time_s=1, + expected_line_count=2 + self.trace_file_count, max_attempts=10, sleep_time_s=1, backoff=1, exact_match=True) # Assert the traces for the create/drop table query to ensure both were created. @@ -730,65 +345,87 @@ class TestOtelTraceSelectsDMLs(TestOtelTrace): query_id=res_create.query_id, query_profile=res_create.runtime_profile, cluster_id="select_dml", - trace_cnt=4, + trace_cnt=2, missing_spans=["AdmissionControl"]) self.assert_trace( query_id=res_drop.query_id, query_profile=res_drop.runtime_profile, cluster_id="select_dml", - trace_cnt=4, + trace_cnt=2, missing_spans=["AdmissionControl"]) - def test_dml_insert_success(self, unique_database, unique_name): + def test_dml_insert_success(self, unique_name): + """Asserts successful insert DMLs generate the expected traces.""" self.execute_query_expect_success(self.client, "CREATE TABLE {}.{} (id int, string_col string, int_col int)" - .format(unique_database, unique_name)) + .format(self.test_db, unique_name)) result = self.execute_query_expect_success(self.client, "INSERT INTO {}.{} (id, string_col, int_col) VALUES (1, 'a', 10), (2, 'b', 20), " - "(3, 'c', 30)".format(unique_database, unique_name)) + "(3, 'c', 30)".format(self.test_db, unique_name)) self.assert_trace( query_id=result.query_id, query_profile=result.runtime_profile, cluster_id="select_dml", - trace_cnt=4) + trace_cnt=2) - def test_dml_insert_cte_success(self, unique_database, unique_name): + def test_dml_insert_fail(self, unique_name): + """Asserts failed insert DMLs generate the expected traces.""" self.execute_query_expect_success(self.client, - "CREATE TABLE {}.{} (id int)".format(unique_database, unique_name)) + "CREATE TABLE {}.{} (id int, string_col string, int_col int)" + .format(self.test_db, unique_name)) + + fail_query = "INSERT INTO {}.{} (id, string_col, int_col) VALUES (1, 'a', 10), " \ + "(2, 'b', 20), (3, 'c', 30)".format(self.test_db, unique_name) + self.execute_query_expect_failure(self.client, fail_query, + {"debug_action": "0:OPEN:FAIL"}) + query_id, profile = self.query_id_from_ui(section="completed_queries", + match_query=fail_query) + + self.assert_trace( + query_id=query_id, + query_profile=profile, + cluster_id="select_dml", + trace_cnt=2, + err_span="QueryExecution") + + def test_dml_insert_cte_success(self, unique_name): + """Asserts insert DMLs that use a CTE generate the expected traces.""" + self.execute_query_expect_success(self.client, + "CREATE TABLE {}.{} (id int)".format(self.test_db, unique_name)) result = self.execute_query_expect_success(self.client, "WITH a1 AS (SELECT * FROM functional.alltypes WHERE tinyint_col=1 limit 10) " - "INSERT INTO {}.{} SELECT id FROM a1".format(unique_database, unique_name)) + "INSERT INTO {}.{} SELECT id FROM a1".format(self.test_db, unique_name)) self.assert_trace( query_id=result.query_id, query_profile=result.runtime_profile, cluster_id="select_dml", - trace_cnt=4) + trace_cnt=2) - def test_dml_insert_overwrite(self, unique_database, unique_name): + def test_dml_insert_overwrite(self, unique_name): """Test that OpenTelemetry tracing is working by running an insert overwrite query and checking that the trace file is created and contains expected spans with the expected attributes.""" self.execute_query_expect_success(self.client, "CREATE TABLE {}.{} AS SELECT * FROM functional.alltypes WHERE id < 500 ".format( - unique_database, unique_name)) + self.test_db, unique_name)) result = self.execute_query_expect_success(self.client, "INSERT OVERWRITE TABLE {}.{} SELECT * FROM functional.alltypes WHERE id > 500 " - "AND id < 1000".format(unique_database, unique_name)) + "AND id < 1000".format(self.test_db, unique_name)) self.assert_trace( query_id=result.query_id, query_profile=result.runtime_profile, cluster_id="select_dml", - trace_cnt=4) + trace_cnt=2) -class TestOtelTraceSelectQueued(TestOtelTrace): +class TestOtelTraceSelectQueued(TestOtelTraceBase): """Tests that require setting additional startup flags to assert admission control queueing behavior. The cluster must be restarted after each test to apply the new flags.""" @@ -798,6 +435,8 @@ class TestOtelTraceSelectQueued(TestOtelTrace): .format(TRACE_FLAGS), cluster_size=1, tmp_dir_placeholders=[OUT_DIR], disable_log_buffering=True) def test_select_queued(self): + """Asserts a query that is queued in admission control then completes successfully + generates the expected trace.""" # Launch two queries, the second will be queued until the first completes. query = "SELECT * FROM functional.alltypes WHERE id = 1" handle1 = self.client.execute_async("{} AND int_col = SLEEP(5000)".format(query)) @@ -861,7 +500,7 @@ class TestOtelTraceSelectQueued(TestOtelTrace): async_close=True) -class TestOtelTraceSelectRetry(TestOtelTrace): +class TestOtelTraceSelectRetry(TestOtelTraceBase): """Tests the require ending an Impala daemon and thus the cluster must restart after each test.""" @@ -871,6 +510,8 @@ class TestOtelTraceSelectRetry(TestOtelTrace): disable_log_buffering=True, statestored_args="-statestore_heartbeat_frequency_ms=60000") def test_retry_select_success(self): + """Asserts select queries that are successfully retried generate the expected + traces.""" self.cluster.impalads[1].kill() result = self.execute_query_expect_success(self.client, @@ -903,7 +544,8 @@ class TestOtelTraceSelectRetry(TestOtelTrace): disable_log_buffering=True, statestored_args="-statestore_heartbeat_frequency_ms=1000") def test_retry_select_failed(self): - + """Asserts select queries that are retried but ultimately fail generate the expected + traces.""" with self.create_impala_client() as client: client.set_configuration({"retry_failed_queries": "true"}) @@ -955,9 +597,24 @@ class TestOtelTraceSelectRetry(TestOtelTrace): @CustomClusterTestSuite.with_args( impalad_args="-v=2 --cluster_id=trace_ddl {}".format(TRACE_FLAGS), cluster_size=2, tmp_dir_placeholders=[OUT_DIR], disable_log_buffering=True) -class TestOtelTraceDDLs(TestOtelTrace): +class TestOtelTraceDDLs(TestOtelTraceBase): """Tests that exercise OpenTelemetry tracing behavior on DDLs. These tests are in their - own class because they require an additional test dimension for async DDLs""" + own class because they require an additional test dimension for async DDLs,""" + + @classmethod + def setup_class(cls): + super(TestOtelTraceDDLs, cls).setup_class() + + cls.test_db = "test_otel_trace_ddls_{}_{}".format( + datetime.now().strftime("%Y%m%d%H%M%S"), + "".join(choice(ascii_lowercase) for _ in range(7))) + cls.execute_query_expect_success(cls.client, "CREATE DATABASE {}".format(cls.test_db)) + + @classmethod + def teardown_class(cls): + cls.execute_query_expect_success(cls.client, "DROP DATABASE {} CASCADE" + .format(cls.test_db)) + super(TestOtelTraceDDLs, cls).teardown_class() @classmethod def add_test_dimensions(cls): @@ -965,6 +622,8 @@ class TestOtelTraceDDLs(TestOtelTrace): cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('async_ddl', True, False)) def test_ddl_createdb(self, vector, unique_name): + """Asserts a successful create database and drop database generate the expected + traces.""" try: result = self.execute_query_expect_success(self.client, "CREATE DATABASE {}".format(unique_name), @@ -986,20 +645,20 @@ class TestOtelTraceDDLs(TestOtelTrace): trace_cnt=2, missing_spans=["AdmissionControl"]) - def test_ddl_create_alter_table(self, vector, unique_database, unique_name): + def test_ddl_create_alter_table(self, vector, unique_name): """Tests that traces are created for a successful create table, a successful alter table, and a failed alter table (adding a column that already exists).""" create_result = self.execute_query_expect_success(self.client, "CREATE TABLE {}.{} (id int, string_col string, int_col int)" - .format(unique_database, unique_name), + .format(self.test_db, unique_name), {"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')}) alter_success_result = self.execute_query_expect_success(self.client, "ALTER TABLE " - "{}.{} ADD COLUMNS (new_col string)".format(unique_database, unique_name), + "{}.{} ADD COLUMNS (new_col string)".format(self.test_db, unique_name), {"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')}) fail_query = "ALTER TABLE {}.{} ADD COLUMNS (new_col string)" \ - .format(unique_database, unique_name) + .format(self.test_db, unique_name) self.execute_query_expect_failure(self.client, fail_query, {"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')}) @@ -1010,96 +669,61 @@ class TestOtelTraceDDLs(TestOtelTrace): query_id=create_result.query_id, query_profile=create_result.runtime_profile, cluster_id="trace_ddl", - trace_cnt=5, + trace_cnt=3, missing_spans=["AdmissionControl"]) self.assert_trace( query_id=alter_success_result.query_id, query_profile=alter_success_result.runtime_profile, cluster_id="trace_ddl", - trace_cnt=5, + trace_cnt=3, missing_spans=["AdmissionControl"]) self.assert_trace( query_id=fail_query_id, query_profile=fail_profile, cluster_id="trace_ddl", - trace_cnt=5, + trace_cnt=3, missing_spans=["AdmissionControl", "QueryExecution"], err_span="Planning") def test_ddl_createtable_fail(self, vector, unique_name): - with self.create_client_for_nth_impalad(1, HS2) as second_coord_client: - try: - # Create a database to use for this test. Cannot use the unique_database fixture - # because we want to drop the database after planning but before execution and - # that fixture drops the database without the "if exists" clause. - self.execute_query_expect_success(self.client, "CREATE DATABASE {}" - .format(unique_name)) + """Asserts a failed create table generates the expected trace.""" + query = "CREATE TABLE {}.{} AS (SELECT * FROM functional.alltypes LIMIT 1)" \ + .format(self.test_db, unique_name) + self.execute_query_expect_failure(self.client, query, + {"debug_action": "CLIENT_REQUEST_UPDATE_CATALOG:FAIL", + "ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')}) + query_id, profile = self.query_id_from_ui(section="completed_queries", + match_query=query) - with self.create_client_for_nth_impalad(0, HS2) as first_coord_client: - # In a separate thread, run the create table DDL that will fail. - fail_query = "CREATE TABLE {}.{} (id int, string_col string, int_col int)" \ - .format(unique_name, unique_name) - - def execute_query_fail(): - self.execute_query_expect_failure(first_coord_client, fail_query, - {"debug_action": "CRS_DELAY_BEFORE_CATALOG_OP_EXEC:SLEEP@5000", - "ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')}) - - thread = Thread(target=execute_query_fail) - thread.daemon = True - thread.start() - - # Wait until the create table query is in flight. - fail_query_id = None - while fail_query_id is None: - fail_query_id, profile = self.query_id_from_ui(section="in_flight_queries", - match_query=fail_query, not_found_ok=True) - if fail_query_id is not None and len(profile.strip()) > 0 \ - and parse_impala_query_state(profile) == "RUNNING": - break - sleep(0.1) - - # Drop the database after planning to cause the create table to fail. - self.execute_query_expect_success(second_coord_client, - "DROP DATABASE {} CASCADE".format(unique_name), - {"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')}) - - # Wait until the create table query fails. - thread.join() - fail_profile_str = self.query_profile_from_ui(fail_query_id) - finally: - self.execute_query_expect_success(second_coord_client, - "DROP DATABASE IF EXISTS {} CASCADE".format(unique_name)) - - # Assert the errored query. self.assert_trace( - query_id=fail_query_id, - query_profile=fail_profile_str, + query_id=query_id, + query_profile=profile, cluster_id="trace_ddl", - trace_cnt=4, + trace_cnt=1, missing_spans=["AdmissionControl"], err_span="QueryExecution") - def test_ddl_createtable_cte_success(self, vector, unique_database, unique_name): + def test_ddl_createtable_cte_success(self, vector, unique_name): + """Asserts create table queries that use a CTE generate the expected traces.""" result = self.execute_query_expect_success(self.client, "CREATE TABLE {}.{} AS WITH a1 AS (SELECT * FROM functional.alltypes WHERE " - "tinyint_col=1 LIMIT 10) SELECT id FROM a1".format(unique_database, unique_name), + "tinyint_col=1 LIMIT 10) SELECT id FROM a1".format(self.test_db, unique_name), {"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')}) self.assert_trace( query_id=result.query_id, query_profile=result.runtime_profile, cluster_id="trace_ddl", - trace_cnt=3, + trace_cnt=1, missing_spans=["AdmissionControl"]) - def test_compute_stats(self, vector, unique_database, unique_name): + def test_compute_stats(self, vector, unique_name): """The compute stats queries are a special case. These statements run two separate select queries. Locate both select queries on the UI and assert their traces.""" - tbl_name = "{}.{}_alltypes".format(unique_database, unique_name) + tbl_name = "{}.{}_alltypes".format(self.test_db, unique_name) # Setup a test table to ensure calculating stats on an existing table does not impact # other tests. @@ -1136,8 +760,9 @@ class TestOtelTraceDDLs(TestOtelTrace): cluster_id="trace_ddl", trace_cnt=4) - def test_compute_incremental_stats(self, vector, unique_database, unique_name): - tbl_name = "{}.{}_alltypes".format(unique_database, unique_name) + def test_compute_incremental_stats(self, vector, unique_name): + """Asserts compute incremental stats queries generate the expected traces.""" + tbl_name = "{}.{}_alltypes".format(self.test_db, unique_name) # Setup a test table to ensure calculating stats on an existing table does not impact # other tests. @@ -1155,15 +780,8 @@ class TestOtelTraceDDLs(TestOtelTrace): trace_cnt=2, missing_spans=["AdmissionControl"]) - # Assert the one trace matches the refresh table query. - self.assert_trace( - query_id=result.query_id, - query_profile=result.runtime_profile, - cluster_id="trace_ddl", - trace_cnt=2, - missing_spans=["AdmissionControl"]) - def test_invalidate_metadata(self, vector): + """Asserts invalidate metadata queries generate the expected traces.""" result = self.execute_query_expect_success(self.client, "INVALIDATE METADATA functional.alltypes", {"ENABLE_ASYNC_DDL_EXECUTION": vector.get_value('async_ddl')}) diff --git a/tests/util/otel_trace.py b/tests/util/otel_trace.py index b763c9a47..833d667c3 100644 --- a/tests/util/otel_trace.py +++ b/tests/util/otel_trace.py @@ -16,14 +16,34 @@ # under the License. from __future__ import absolute_import, division, print_function - import json +import logging import os import sys - from time import sleep from tests.common.environ import IMPALA_LOCAL_BUILD_VERSION +from tests.common.file_utils import grep_file_first, wait_for_file_line_count +from tests.common.impala_connection import ERROR, INITIALIZED, PENDING +from tests.util.query_profile_util import ( + parse_admission_result, + parse_coordinator, + parse_db_user, + parse_default_db, + parse_impala_query_state, + parse_num_deleted_rows, + parse_num_modified_rows, + parse_num_rows_fetched, + parse_original_query_id, + parse_query_status, + parse_query_type, + parse_retried_query_id, + parse_retry_status, + parse_session_id, + parse_sql, +) + +LOG = logging.getLogger(__name__) # Valid types of OpenTelemetry attribute values. ATTR_VAL_TYPE_STRING = "string" @@ -336,3 +356,444 @@ def parse_trace_file(file_path, query_id): .format(query_id) return query_trace + + +def assert_trace(log_file_path, trace_file_path, trace_file_count, query_id, + query_profile, cluster_id, trace_cnt=1, err_span="", missing_spans=[], + async_close=False, exact_trace_cnt=False): + # Parse common values needed in multiple asserts. + session_id = parse_session_id(query_profile) + db_user = parse_db_user(query_profile) + + # Wait until all spans are written to the trace file. + wait_for_file_line_count( + file_path=trace_file_path, + expected_line_count=trace_cnt + trace_file_count, + max_attempts=60, + sleep_time_s=1, + backoff=1, + exact_match=exact_trace_cnt) + + # Remove missing spans from the expected span count. + expected_span_count = 6 - len(missing_spans) + + # Parse the trace json files to get the trace for the query. + trace = parse_trace_file(trace_file_path, query_id) + __assert_trace_common(trace, expected_span_count) + + # Retrieve the query status which contains error messages if the query failed. + query_status = parse_query_status(query_profile) + query_status = "" if query_status == "OK" else query_status + + impala_query_state = parse_retry_status(query_profile) + if impala_query_state is None: + impala_query_state = parse_impala_query_state(query_profile) + + # Determine if the query was retried and if so, get the original query id. + original_query_id = parse_original_query_id(query_profile) + original_query_id = "" if original_query_id is None else original_query_id + + # Determine if the query initially failed but has a successful retry under a different + # query id. If so, get the retried query id. + retried_query_id = parse_retried_query_id(query_profile) + retried_query_id = "" if retried_query_id is None else retried_query_id + + # Error message should follow on all spans after the errored span + in_error = False + + # Retrieve the coordinator from the query profile. + coordinator = parse_coordinator(query_profile) + + # Parse the query type from the query profile. + query_type = parse_query_type(query_profile) + if query_type == "N/A": + query_type = "UNKNOWN" + + # Assert root span. + root_span_id = __assert_rootspan_attrs(trace.root_span, query_id, session_id, + cluster_id, db_user, "default-pool", impala_query_state, query_status, + original_query_id, retried_query_id, coordinator, log_file_path) + + # Assert Init span. + if "Init" not in missing_spans: + span_err_msg = "" + if err_span == "Init": + span_err_msg = query_status + in_error = True + __assert_initspan_attrs(trace.child_spans, root_span_id, query_id, session_id, + cluster_id, db_user, "default-pool", parse_default_db(query_profile), + parse_sql(query_profile).replace('\n', ' '), original_query_id, coordinator, + log_file_path) + + # Assert Submitted span. + if "Submitted" not in missing_spans: + span_err_msg = "" + if err_span == "Submitted" or in_error: + span_err_msg = query_status + in_error = True + __assert_submittedspan_attrs(trace.child_spans, root_span_id, query_id, log_file_path) + + # Assert Planning span. + if "Planning" not in missing_spans: + status = INITIALIZED + span_err_msg = "" + if err_span == "Planning" or in_error: + span_err_msg = query_status + status = ERROR + in_error = True + __assert_planningspan_attrs(trace.child_spans, root_span_id, query_id, + query_type, span_err_msg, status, log_file_path) + + # Assert AdmissionControl span. + if "AdmissionControl" not in missing_spans: + status = PENDING + span_err_msg = "" + if err_span == "AdmissionControl" or in_error: + span_err_msg = query_status + status = ERROR + in_error = True + __assert_admissioncontrol_attrs(trace.child_spans, root_span_id, query_id, + "default-pool", parse_admission_result(query_profile), span_err_msg, status, + log_file_path) + + # Assert QueryExecution span. + if "QueryExecution" not in missing_spans: + span_err_msg = "" + if err_span == "QueryExecution" or in_error: + span_err_msg = query_status + in_error = True + __assert_query_exec_attrs(trace.child_spans, query_profile, root_span_id, + query_id, span_err_msg, parse_impala_query_state(query_profile), log_file_path) + + # Assert Close span. + if "Close" not in missing_spans: + span_err_msg = "" + if err_span == "Close" or in_error: + span_err_msg = query_status + in_error = True + __assert_close_attrs(trace.child_spans, root_span_id, query_id, span_err_msg, + parse_impala_query_state(query_profile), async_close, log_file_path) + + +def __assert_trace_common(trace, expected_child_spans_count): + """ + Asserts common structure/fields in resource spans and scope spans of the + OpenTelemetry trace JSON object. + """ + + # Assert the number of child spans in the trace. + assert len(trace.child_spans) == expected_child_spans_count, \ + "Trace '{}' expected child spans count: {}, actual: {}".format(trace.trace_id, + expected_child_spans_count, len(trace.child_spans)) + + # Each scope span has a scope object which contains the name and version of the + # OpenTelemetry scope. Assert the scope object sttructure and contents contained + # within the single span at the path resourceSpan[0].scopeSpans[0].scope. + assert trace.root_span.scope_name == "org.apache.impala.impalad.query", \ + "Span: '{}' expected: 'org.apache.impala.impalad.query', actual: {}" \ + .format(trace.root_span.span_id, trace.root_span.scope_name) + assert trace.root_span.scope_version == "1.0.0", "Span: '{}' expected scope " \ + "version '1.0.0', actual: '{}'".format("Root", trace.root_span.scope_version) + + # Assert the scope of each child span. + for span in trace.child_spans: + assert span.scope_name == "org.apache.impala.impalad.query", \ + "Span: '{}' expected scope name: 'org.apache.impala.impalad.query', " \ + "actual: {}".format(span.name, span.scope_name) + assert span.scope_version == "1.0.0", "Span: '{}' expected scope " \ + "version '1.0.0', actual: '{}'".format(span.name, span.scope_version) + + +def __assert_scopespan_common(span, query_id, is_root, name, attributes_count, + status, log_file_path, root_span_id=None, err_msg=""): + """ + Helper function to assert common data points of a single scope span. These spans + contain the actual root and child spans. Assertions include the span object's + structure, span properties, and common span attributes. + - span: The OtelSpan object to assert. + - query_id: The query id of the span. + - is_root: Whether the span is a root span. + - name: The name of the span to assert without the query_id prefix. + - attributes_count: The expected number of attributes unique to the span. If + asserting a child span, adds 7 to this value to account for + attributes common across all child spans. + - status: The expected status of the span. Only used for child spans. + - root_span_id: The root span id of the span. + """ + + # Read the span trace id and span id from the Impalad logs. + expected_span_id, expected_trace_id = __find_span_log(log_file_path, name, query_id) + + # Assert span properties. + expected_name = query_id + actual_kind = span.kind + + if (is_root): + assert span.parent_span_id is None, "Found parentSpanId on root span" + assert actual_kind == 2, "Span '{}' expected kind: '{}', actual: '{}'" \ + .format(expected_name, 2, actual_kind) + else: + expected_name += " - {}".format(name) + + assert root_span_id is not None + actual = span.parent_span_id + assert actual == root_span_id, "Span '{}' expected parentSpanId: '{}', actual: " \ + "'{}'".format(expected_name, root_span_id, actual) + + assert actual_kind == 1, "Span '{}' expected kind: '{}', actual: '{}'" \ + .format(expected_name, 1, actual) + + actual = span.name + assert actual == expected_name, "Expected span name: '{}', actual: '{}'" \ + .format(expected_name, actual) + + actual = span.trace_id + assert actual == expected_trace_id, "Span '{}' expected traceId: '{}', " \ + "actual: '{}'".format(expected_name, expected_trace_id, actual) + + actual = span.span_id + assert actual == expected_span_id, "Span '{}' expected spanId: '{}', " \ + "actual: '{}'".format(expected_name, expected_span_id, actual) + + # Flags must always be 1 which indicates the trace is to be sampled. + expected_flags = 1 + actual = span.flags + assert actual == expected_flags, "Span '{}' expected flags: '{}', " \ + "actual: '{}'".format(expected_name, expected_flags, actual) + + # Assert span attributes. + expected_span_attrs_count = attributes_count if is_root else 7 + attributes_count + assert len(span.attributes) == expected_span_attrs_count, "Span '{}' attributes " \ + "must contain exactly {} elements, actual: {}".format(expected_name, + expected_span_attrs_count, len(span.attributes)) + + if (is_root): + __assert_attr(expected_name, span.attributes, "ErrorMessage", err_msg) + else: + __assert_attr(expected_name, span.attributes, "ErrorMsg", err_msg) + __assert_attr(expected_name, span.attributes, "Name", expected_name) + __assert_attr(expected_name, span.attributes, "Running", + name == "QueryExecution", "boolValue") + __assert_attr(expected_name, span.attributes, "Status", status) + + +def __find_span_log(log_file_path, span_name, query_id): + """ + Finds the start span log entry for the given span name and query id in the Impalad + logs. This log line contains the trace id and span id for the span which are used + as the expected values when asserting the span properties in the trace file. + """ + span_regex = r'Started \'{}\' span trace_id="(.*?)" span_id="(.*?)" query_id="{}"' \ + .format(span_name, query_id) + + max_retries = 10 + retry_count = 0 + + LOG.info("Searching for span log entry for span '{}' for query '{}' in log file '{}'" + .format(span_name, query_id, log_file_path)) + while retry_count < max_retries: + with open(log_file_path, "r") as f: + span_log = grep_file_first(f, span_regex) + if span_log is not None: + return span_log.group(2), span_log.group(1) + + retry_count += 1 + sleep(1) + + raise Exception("Exceeded maximum retries to find span log entry for span '{}' " + "and query '{}'".format(span_name, query_id)) + + +def __assert_attr(span_name, attributes, expected_key, expected_value, + expected_type="stringValue"): + """ + Helper function to assert that a specific OpenTelemetry attribute exists in a span. + """ + + assert expected_type in ("stringValue", "boolValue", "intValue"), "Invalid " \ + "expected_type '{}', must be one of 'stringValue', 'boolValue', or 'intValue'" \ + .format(expected_type) + + val = attributes[expected_key] + assert val is not None, "Span '{}' attribute not found: '{}', actual attributes: {}" \ + .format(span_name, expected_key, attributes) + assert val.value == expected_value, "Span '{}' attribute '{}' expected: '{}', " \ + "actual: '{}'".format(span_name, expected_key, expected_value, val.value) + + if expected_type == "boolValue": + expected_type = ATTR_VAL_TYPE_BOOL + elif expected_type == "intValue": + expected_type = ATTR_VAL_TYPE_INT + else: + expected_type = ATTR_VAL_TYPE_STRING + + assert val.get_type() == expected_type, "Span '{}' attribute '{}' expected to be " \ + "of type '{}', actual: '{}'".format(span_name, expected_key, expected_type, + val.get_type()) + + +def __assert_span_events(span, expected_events=[]): + """ + Helper function to assert that a span contains the expected span events. + """ + assert len(expected_events) == len(span.events), "Span '{}' expected to have " \ + "exactly {} events, actual: {}".format(span.name, len(expected_events), + len(span.events)) + + for event in expected_events: + assert event in span.events, "Expected '{}' event on span '{}' but " \ + "no such events was found.".format(event, span.name) + + +def __assert_rootspan_attrs(span, query_id, session_id, cluster_id, user_name, + request_pool, state, err_msg, original_query_id, retried_query_id, coordinator, + log_file_path): + """ + Helper function that asserts the common attributes in the root span. + """ + + root_span_id, _ = __find_span_log(log_file_path, "Root", query_id) + __assert_scopespan_common(span, query_id, True, "Root", 14, "", log_file_path, None, + err_msg) + + __assert_attr(span.name, span.attributes, "QueryId", query_id) + __assert_attr(span.name, span.attributes, "SessionId", session_id) + __assert_attr(span.name, span.attributes, "ClusterId", cluster_id) + __assert_attr(span.name, span.attributes, "UserName", user_name) + __assert_attr(span.name, span.attributes, "RequestPool", request_pool) + __assert_attr(span.name, span.attributes, "State", state) + __assert_attr(span.name, span.attributes, "OriginalQueryId", original_query_id) + __assert_attr(span.name, span.attributes, "RetriedQueryId", retried_query_id) + __assert_attr(span.name, span.attributes, "Coordinator", coordinator) + + return root_span_id + + +def __assert_initspan_attrs(spans, root_span_id, query_id, session_id, cluster_id, + user_name, request_pool, default_db, query_string, original_query_id, coordinator, + log_file_path): + """ + Helper function that asserts the common and span-specific attributes in the + init span. + """ + + # Locate the init span and assert. + init_span = __find_span(spans, "Init", query_id) + + __assert_scopespan_common(init_span, query_id, False, "Init", 9, INITIALIZED, + log_file_path, root_span_id) + + __assert_attr(init_span.name, init_span.attributes, "QueryId", query_id) + __assert_attr(init_span.name, init_span.attributes, "SessionId", session_id) + __assert_attr(init_span.name, init_span.attributes, "ClusterId", cluster_id) + __assert_attr(init_span.name, init_span.attributes, "UserName", user_name) + __assert_attr(init_span.name, init_span.attributes, "RequestPool", request_pool) + __assert_attr(init_span.name, init_span.attributes, "DefaultDb", default_db) + __assert_attr(init_span.name, init_span.attributes, "QueryString", query_string) + __assert_attr(init_span.name, init_span.attributes, "OriginalQueryId", + original_query_id) + __assert_attr(init_span.name, init_span.attributes, "Coordinator", coordinator) + + __assert_span_events(init_span) + + +def __assert_submittedspan_attrs(spans, root_span_id, query_id, log_file_path): + """ + Helper function that asserts the common attributes in the submitted span. + """ + + submitted_span = __find_span(spans, "Submitted", query_id) + __assert_scopespan_common(submitted_span, query_id, False, "Submitted", 0, INITIALIZED, + log_file_path, root_span_id) + + __assert_span_events(submitted_span) + + +def __assert_planningspan_attrs(spans, root_span_id, query_id, query_type, err_msg, + status, log_file_path): + """ + Helper function that asserts the common and span-specific attributes in the + planning execution span. + """ + + planning_span = __find_span(spans, "Planning", query_id) + __assert_scopespan_common(planning_span, query_id, False, "Planning", 1, status, + log_file_path, root_span_id, err_msg) + __assert_attr(planning_span.name, planning_span.attributes, "QueryType", query_type) + + __assert_span_events(planning_span) + + +def __assert_admissioncontrol_attrs(spans, root_span_id, query_id, request_pool, + adm_result, err_msg, status, log_file_path): + """ + Helper function that asserts the common and span-specific attributes in the + admission control span. + """ + + queued = False if adm_result == "Admitted immediately" \ + or adm_result == "Admitted as a trivial query" else True + + adm_ctrl_span = __find_span(spans, "AdmissionControl", query_id) + __assert_scopespan_common(adm_ctrl_span, query_id, False, "AdmissionControl", 3, status, + log_file_path, root_span_id, err_msg) + __assert_attr(adm_ctrl_span.name, adm_ctrl_span.attributes, "Queued", queued, + "boolValue") + __assert_attr(adm_ctrl_span.name, adm_ctrl_span.attributes, "AdmissionResult", + adm_result) + __assert_attr(adm_ctrl_span.name, adm_ctrl_span.attributes, "RequestPool", request_pool) + + if queued: + __assert_span_events(adm_ctrl_span, ["Queued"]) + else: + __assert_span_events(adm_ctrl_span) + + +def __assert_query_exec_attrs(spans, query_profile, root_span_id, query_id, + err_msg, status, log_file_path): + """ + Helper function that asserts the common and span-specific attributes in the + query execution span. + """ + + query_exec_span = __find_span(spans, "QueryExecution", query_id) + __assert_scopespan_common(query_exec_span, query_id, False, "QueryExecution", 3, status, + log_file_path, root_span_id, err_msg) + __assert_attr(query_exec_span.name, query_exec_span.attributes, "NumModifiedRows", + parse_num_modified_rows(query_profile), "intValue") + __assert_attr(query_exec_span.name, query_exec_span.attributes, "NumDeletedRows", + parse_num_deleted_rows(query_profile), "intValue") + __assert_attr(query_exec_span.name, query_exec_span.attributes, "NumRowsFetched", + parse_num_rows_fetched(query_profile), "intValue") + + # TODO: IMPALA-14334 - Assert QueryExecution span events + + +def __assert_close_attrs(spans, root_span_id, query_id, err_msg, status, async_close, + log_file_path): + """ + Helper function that asserts the common and span-specific attributes in the + close span. + """ + + close_span = __find_span(spans, "Close", query_id) + __assert_scopespan_common(close_span, query_id, False, "Close", 0, status, + log_file_path, root_span_id, err_msg) + + expected_events = ["QueryUnregistered"] + if async_close and "ReleasedAdmissionControlResources" in close_span.events: + expected_events.append("ReleasedAdmissionControlResources") + + # TODO: IMPALA-14334 - Assert Close span events + + +def __find_span(spans, name, query_id): + """ + Helper function to find a span by name in a list of OtelSpan objects. + """ + + for s in spans: + if s.name.endswith(name): + return s + + assert False, "Span '{}' not found for query '{}'".format(name, query_id)