From f4c02e4c6becbbd4b04e00c485c9da0201ee7a7c Mon Sep 17 00:00:00 2001 From: Blackoutta <37723456+Blackoutta@users.noreply.github.com> Date: Mon, 18 May 2026 16:56:50 +0800 Subject: [PATCH] fix: fallback phoenix parent trace when parent tracing disabled (#36290) Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- .../arize_phoenix_trace.py | 55 ++++- .../test_arize_phoenix_trace.py | 210 +++++++++++++++++- 2 files changed, 258 insertions(+), 7 deletions(-) diff --git a/api/providers/trace/trace-arize-phoenix/src/dify_trace_arize_phoenix/arize_phoenix_trace.py b/api/providers/trace/trace-arize-phoenix/src/dify_trace_arize_phoenix/arize_phoenix_trace.py index a0d150e1b6..9b35612b52 100644 --- a/api/providers/trace/trace-arize-phoenix/src/dify_trace_arize_phoenix/arize_phoenix_trace.py +++ b/api/providers/trace/trace-arize-phoenix/src/dify_trace_arize_phoenix/arize_phoenix_trace.py @@ -15,6 +15,7 @@ from openinference.semconv.trace import ( SpanAttributes, ToolCallAttributes, ) +from opentelemetry.context import Context from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter as GrpcOTLPSpanExporter from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as HttpOTLPSpanExporter from opentelemetry.sdk import trace as trace_sdk @@ -45,8 +46,8 @@ from dify_trace_arize_phoenix.config import ArizeConfig, PhoenixConfig from extensions.ext_database import db from extensions.ext_redis import redis_client from graphon.enums import WorkflowNodeExecutionStatus -from models.model import EndUser, MessageFile -from models.workflow import WorkflowNodeExecutionTriggeredFrom +from models.model import App, EndUser, MessageFile +from models.workflow import WorkflowNodeExecutionTriggeredFrom, WorkflowRun logger = logging.getLogger(__name__) @@ -139,6 +140,48 @@ def _resolve_published_parent_span_context(parent_node_execution_id: str) -> dic return normalized_carrier +def _app_uses_phoenix_provider(app_tracing_config: Mapping[str, Any] | None) -> bool: + if not app_tracing_config or not app_tracing_config.get("enabled"): + return False + return app_tracing_config.get("tracing_provider") in {"arize", "phoenix"} + + +def _parent_workflow_can_publish_span_context(parent_workflow_run_id: str) -> bool: + parent_run = db.session.query(WorkflowRun).where(WorkflowRun.id == parent_workflow_run_id).first() + if parent_run is None: + return True + + parent_app = db.session.query(App).where(App.id == parent_run.app_id).first() + if parent_app is None or not parent_app.tracing: + return False + + try: + app_tracing_config = json.loads(parent_app.tracing) + except (TypeError, json.JSONDecodeError): + return False + if not isinstance(app_tracing_config, Mapping): + return False + + return _app_uses_phoenix_provider(app_tracing_config) + + +def _resolve_workflow_parent_carrier( + parent_node_execution_id: str, parent_workflow_run_id: str | None +) -> dict[str, str] | None: + try: + return _resolve_published_parent_span_context(parent_node_execution_id) + except PendingTraceParentContextError: + if parent_workflow_run_id and not _parent_workflow_can_publish_span_context(parent_workflow_run_id): + logger.info( + "[Arize/Phoenix] Parent workflow cannot publish Phoenix span context; falling back to root span: " + "parent_workflow_run_id=%s parent_node_execution_id=%s", + parent_workflow_run_id, + parent_node_execution_id, + ) + return None + raise + + def setup_tracer(arize_phoenix_config: ArizeConfig | PhoenixConfig) -> tuple[trace_sdk.Tracer, SimpleSpanProcessor]: """Configure OpenTelemetry tracer with OTLP exporter for Arize/Phoenix.""" try: @@ -581,9 +624,11 @@ class ArizePhoenixDataTrace(BaseTraceInstance): workflow_session_id, ) + workflow_parent_carrier: dict[str, str] | None = None if parent_node_execution_id: - workflow_parent_carrier = _resolve_published_parent_span_context(parent_node_execution_id) - else: + workflow_parent_carrier = _resolve_workflow_parent_carrier(parent_node_execution_id, parent_workflow_run_id) + + if workflow_parent_carrier is None: root_trace_id = _resolve_workflow_root_trace_id(trace_info) workflow_root_span_name: str | None = trace_info.workflow_run_id if not isinstance(workflow_root_span_name, str) or not workflow_root_span_name.strip(): @@ -1176,7 +1221,7 @@ class ArizePhoenixDataTrace(BaseTraceInstance): if root_span_attributes: root_span_attributes_dict.update(root_span_attributes) - root_span = self.tracer.start_span(name=span_name, attributes=root_span_attributes_dict) + root_span = self.tracer.start_span(name=span_name, attributes=root_span_attributes_dict, context=Context()) with use_span(root_span, end_on_exit=False): self.propagator.inject(carrier=carrier) diff --git a/api/providers/trace/trace-arize-phoenix/tests/unit_tests/arize_phoenix_trace/test_arize_phoenix_trace.py b/api/providers/trace/trace-arize-phoenix/tests/unit_tests/arize_phoenix_trace/test_arize_phoenix_trace.py index dd260aeee5..9b244e3008 100644 --- a/api/providers/trace/trace-arize-phoenix/tests/unit_tests/arize_phoenix_trace/test_arize_phoenix_trace.py +++ b/api/providers/trace/trace-arize-phoenix/tests/unit_tests/arize_phoenix_trace/test_arize_phoenix_trace.py @@ -1,3 +1,5 @@ +import json +from collections.abc import Sequence from datetime import UTC, datetime, timedelta from types import SimpleNamespace from typing import Any, cast @@ -8,8 +10,10 @@ import pytest from dify_trace_arize_phoenix.arize_phoenix_trace import ( _NODE_TYPE_TO_SPAN_KIND, ArizePhoenixDataTrace, + _app_uses_phoenix_provider, _build_graph_parent_index, _get_node_span_kind, + _parent_workflow_can_publish_span_context, _phoenix_parent_span_redis_key, _resolve_node_parent, _resolve_published_parent_span_context, @@ -25,9 +29,13 @@ from dify_trace_arize_phoenix.arize_phoenix_trace import ( ) from dify_trace_arize_phoenix.config import ArizeConfig, PhoenixConfig from openinference.semconv.trace import OpenInferenceSpanKindValues, SpanAttributes -from opentelemetry.sdk.trace import Tracer +from opentelemetry.context import Context +from opentelemetry.sdk import trace as trace_sdk +from opentelemetry.sdk.trace import ReadableSpan, Tracer +from opentelemetry.sdk.trace.export import SimpleSpanProcessor, SpanExporter, SpanExportResult from opentelemetry.semconv.trace import SpanAttributes as OTELSpanAttributes -from opentelemetry.trace import StatusCode +from opentelemetry.trace import NonRecordingSpan, SpanContext, StatusCode, TraceFlags, TraceState, use_span +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator from core.ops.entities.trace_entity import ( DatasetRetrievalTraceInfo, @@ -96,6 +104,32 @@ def _get_start_span_call(start_span_mock, *, span_name: str): raise AssertionError(f"Could not find start_span call with name={span_name!r}") +class _FakeQuery: + def __init__(self, result): + self._result = result + + def filter(self, *args, **kwargs): + return self + + def where(self, *args, **kwargs): + return self + + def first(self): + return self._result + + +class _CollectingSpanExporter(SpanExporter): + def __init__(self): + self.spans: list[ReadableSpan] = [] + + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + self.spans.extend(spans) + return SpanExportResult.SUCCESS + + def shutdown(self) -> None: + return None + + def _make_node_execution(**kwargs): defaults = { "node_type": "tool", @@ -233,6 +267,45 @@ def test_wrap_span_metadata(): assert res == {"a": 1, "b": 2, "created_from": "Dify"} +def test_app_uses_phoenix_provider_only_for_enabled_arize_or_phoenix(): + assert _app_uses_phoenix_provider({"enabled": True, "tracing_provider": "phoenix"}) is True + assert _app_uses_phoenix_provider({"enabled": True, "tracing_provider": "arize"}) is True + assert _app_uses_phoenix_provider({"enabled": False, "tracing_provider": "phoenix"}) is False + assert _app_uses_phoenix_provider({"enabled": True, "tracing_provider": "langfuse"}) is False + assert _app_uses_phoenix_provider(None) is False + + +def test_parent_workflow_can_publish_span_context_keeps_unknown_parent_retryable(monkeypatch): + monkeypatch.setattr( + "dify_trace_arize_phoenix.arize_phoenix_trace.db.session.query", + lambda model: _FakeQuery(None), + ) + + assert _parent_workflow_can_publish_span_context("missing-run") is True + + +def test_parent_workflow_can_publish_span_context_checks_parent_app_tracing(monkeypatch): + parent_run = SimpleNamespace(app_id="parent-app") + parent_app = SimpleNamespace(tracing=json.dumps({"enabled": True, "tracing_provider": "phoenix"})) + + def fake_query(model): + if getattr(model, "__tablename__", None) == "workflow_runs": + return _FakeQuery(parent_run) + if getattr(model, "__tablename__", None) == "apps": + return _FakeQuery(parent_app) + raise AssertionError(f"Unexpected model query: {model}") + + monkeypatch.setattr("dify_trace_arize_phoenix.arize_phoenix_trace.db.session.query", fake_query) + + assert _parent_workflow_can_publish_span_context("parent-run") is True + + parent_app.tracing = json.dumps({"enabled": False, "tracing_provider": "phoenix"}) + assert _parent_workflow_can_publish_span_context("parent-run") is False + + parent_app.tracing = json.dumps({"enabled": True, "tracing_provider": "langfuse"}) + assert _parent_workflow_can_publish_span_context("parent-run") is False + + class TestGetNodeSpanKind: def test_all_node_types_are_mapped_correctly(self): special_mappings = { @@ -839,6 +912,10 @@ def test_workflow_trace_raises_pending_parent_error_when_parent_node_context_is_ with ( patch.object(trace_instance, "get_service_account_with_tenant", return_value=MagicMock()), + patch( + "dify_trace_arize_phoenix.arize_phoenix_trace._parent_workflow_can_publish_span_context", + return_value=True, + ), patch.object(trace_instance, "ensure_root_span") as mock_ensure_root_span, pytest.raises(PendingTraceParentContextError) as exc_info, ): @@ -851,6 +928,102 @@ def test_workflow_trace_raises_pending_parent_error_when_parent_node_context_is_ mock_ensure_root_span.assert_not_called() +@patch("dify_trace_arize_phoenix.arize_phoenix_trace.db") +@patch("dify_trace_arize_phoenix.arize_phoenix_trace.DifyCoreRepositoryFactory") +@patch("dify_trace_arize_phoenix.arize_phoenix_trace.sessionmaker") +def test_workflow_trace_falls_back_when_parent_app_tracing_cannot_publish_parent_context( + mock_sessionmaker, + mock_repo_factory, + mock_db, + trace_instance, +): + mock_db.engine = MagicMock() + info = _make_workflow_info( + message_id="message-1", + workflow_run_id="workflow-run-1", + metadata={ + "app_id": "app1", + "parent_trace_context": { + "parent_workflow_run_id": "outer-workflow-run-1", + "parent_node_execution_id": "outer-node-execution-1", + }, + }, + ) + repo = MagicMock() + repo.get_by_workflow_execution.return_value = [] + mock_repo_factory.create_workflow_node_execution_repository.return_value = repo + trace_instance._mock_redis_client.get.return_value = None + + parent_carrier = {} + parent_context = object() + + with ( + patch.object(trace_instance, "get_service_account_with_tenant", return_value=MagicMock()), + patch( + "dify_trace_arize_phoenix.arize_phoenix_trace._parent_workflow_can_publish_span_context", + return_value=False, + ), + patch.object(trace_instance, "ensure_root_span", return_value=parent_carrier) as mock_ensure_root_span, + patch.object(trace_instance.propagator, "extract", return_value=parent_context) as mock_extract, + ): + trace_instance.workflow_trace(info) + + mock_ensure_root_span.assert_called_once_with( + "outer-workflow-run-1", + root_span_name="workflow-run-1", + root_span_attributes={ + SpanAttributes.INPUT_VALUE: safe_json_dumps(info.workflow_run_inputs), + SpanAttributes.INPUT_MIME_TYPE: "application/json", + SpanAttributes.OUTPUT_VALUE: safe_json_dumps(info.workflow_run_outputs), + SpanAttributes.OUTPUT_MIME_TYPE: "application/json", + }, + ) + mock_extract.assert_called_once_with(carrier=parent_carrier) + workflow_span_call = _get_start_span_call(trace_instance.tracer.start_span, span_name="workflow_workflow-run-1") + assert workflow_span_call.kwargs["context"] is parent_context + + +@patch("dify_trace_arize_phoenix.arize_phoenix_trace.db") +@patch("dify_trace_arize_phoenix.arize_phoenix_trace.DifyCoreRepositoryFactory") +@patch("dify_trace_arize_phoenix.arize_phoenix_trace.sessionmaker") +def test_workflow_trace_still_retries_when_parent_app_can_publish_parent_context( + mock_sessionmaker, + mock_repo_factory, + mock_db, + trace_instance, +): + mock_db.engine = MagicMock() + info = _make_workflow_info( + message_id="message-1", + workflow_run_id="workflow-run-1", + metadata={ + "app_id": "app1", + "parent_trace_context": { + "parent_workflow_run_id": "outer-workflow-run-1", + "parent_node_execution_id": "outer-node-execution-1", + }, + }, + ) + repo = MagicMock() + repo.get_by_workflow_execution.return_value = [] + mock_repo_factory.create_workflow_node_execution_repository.return_value = repo + trace_instance._mock_redis_client.get.return_value = None + + with ( + patch.object(trace_instance, "get_service_account_with_tenant", return_value=MagicMock()), + patch( + "dify_trace_arize_phoenix.arize_phoenix_trace._parent_workflow_can_publish_span_context", + return_value=True, + ), + patch.object(trace_instance, "ensure_root_span") as mock_ensure_root_span, + pytest.raises(PendingTraceParentContextError) as exc_info, + ): + trace_instance.workflow_trace(info) + + assert exc_info.value.parent_node_execution_id == "outer-node-execution-1" + mock_ensure_root_span.assert_not_called() + + @patch("dify_trace_arize_phoenix.arize_phoenix_trace.db") @patch("dify_trace_arize_phoenix.arize_phoenix_trace.DifyCoreRepositoryFactory") @patch("dify_trace_arize_phoenix.arize_phoenix_trace.sessionmaker") @@ -1544,6 +1717,38 @@ def test_ensure_root_span_basic(trace_instance): assert "tid" in trace_instance.dify_trace_ids +def test_ensure_root_span_ignores_unsampled_ambient_otel_parent(): + exporter = _CollectingSpanExporter() + provider = trace_sdk.TracerProvider() + provider.add_span_processor(SimpleSpanProcessor(exporter)) + trace_instance = ArizePhoenixDataTrace.__new__(ArizePhoenixDataTrace) + trace_instance.tracer = cast(Tracer, provider.get_tracer("test-phoenix-root-span")) + trace_instance.propagator = TraceContextTextMapPropagator() + trace_instance.project = "p" + trace_instance.dify_trace_ids = set() + trace_instance.root_span_carriers = {} + trace_instance.carrier = {} + + ambient_span_context = SpanContext( + trace_id=0x11111111111111111111111111111111, + span_id=0x2222222222222222, + is_remote=True, + trace_flags=TraceFlags(0), + trace_state=TraceState(), + ) + + with use_span(NonRecordingSpan(ambient_span_context), end_on_exit=False): + carrier = trace_instance.ensure_root_span("tid") + + assert len(exporter.spans) == 1 + root_span = exporter.spans[0] + root_span_context = root_span.get_span_context() + assert root_span_context is not None + assert root_span.parent is None + assert root_span_context.trace_id != ambient_span_context.trace_id + assert carrier["traceparent"].split("-")[1] == f"{root_span_context.trace_id:032x}" + + def test_ensure_root_span_uses_custom_name_and_attributes(trace_instance): root_attributes = { SpanAttributes.INPUT_VALUE: '{"input":"value"}', @@ -1561,6 +1766,7 @@ def test_ensure_root_span_uses_custom_name_and_attributes(trace_instance): SpanAttributes.INPUT_VALUE: '{"input":"value"}', SpanAttributes.OUTPUT_VALUE: '{"output":"value"}', }, + context=Context(), )