fix: fallback phoenix parent trace when parent tracing disabled (#36290)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Blackoutta
2026-05-18 16:56:50 +08:00
committed by GitHub
parent 9dc95eeb20
commit f4c02e4c6b
2 changed files with 258 additions and 7 deletions

View File

@@ -15,6 +15,7 @@ from openinference.semconv.trace import (
SpanAttributes,
ToolCallAttributes,
)
from opentelemetry.context import Context
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter as GrpcOTLPSpanExporter
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as HttpOTLPSpanExporter
from opentelemetry.sdk import trace as trace_sdk
@@ -45,8 +46,8 @@ from dify_trace_arize_phoenix.config import ArizeConfig, PhoenixConfig
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from graphon.enums import WorkflowNodeExecutionStatus
from models.model import EndUser, MessageFile
from models.workflow import WorkflowNodeExecutionTriggeredFrom
from models.model import App, EndUser, MessageFile
from models.workflow import WorkflowNodeExecutionTriggeredFrom, WorkflowRun
logger = logging.getLogger(__name__)
@@ -139,6 +140,48 @@ def _resolve_published_parent_span_context(parent_node_execution_id: str) -> dic
return normalized_carrier
def _app_uses_phoenix_provider(app_tracing_config: Mapping[str, Any] | None) -> bool:
if not app_tracing_config or not app_tracing_config.get("enabled"):
return False
return app_tracing_config.get("tracing_provider") in {"arize", "phoenix"}
def _parent_workflow_can_publish_span_context(parent_workflow_run_id: str) -> bool:
parent_run = db.session.query(WorkflowRun).where(WorkflowRun.id == parent_workflow_run_id).first()
if parent_run is None:
return True
parent_app = db.session.query(App).where(App.id == parent_run.app_id).first()
if parent_app is None or not parent_app.tracing:
return False
try:
app_tracing_config = json.loads(parent_app.tracing)
except (TypeError, json.JSONDecodeError):
return False
if not isinstance(app_tracing_config, Mapping):
return False
return _app_uses_phoenix_provider(app_tracing_config)
def _resolve_workflow_parent_carrier(
parent_node_execution_id: str, parent_workflow_run_id: str | None
) -> dict[str, str] | None:
try:
return _resolve_published_parent_span_context(parent_node_execution_id)
except PendingTraceParentContextError:
if parent_workflow_run_id and not _parent_workflow_can_publish_span_context(parent_workflow_run_id):
logger.info(
"[Arize/Phoenix] Parent workflow cannot publish Phoenix span context; falling back to root span: "
"parent_workflow_run_id=%s parent_node_execution_id=%s",
parent_workflow_run_id,
parent_node_execution_id,
)
return None
raise
def setup_tracer(arize_phoenix_config: ArizeConfig | PhoenixConfig) -> tuple[trace_sdk.Tracer, SimpleSpanProcessor]:
"""Configure OpenTelemetry tracer with OTLP exporter for Arize/Phoenix."""
try:
@@ -581,9 +624,11 @@ class ArizePhoenixDataTrace(BaseTraceInstance):
workflow_session_id,
)
workflow_parent_carrier: dict[str, str] | None = None
if parent_node_execution_id:
workflow_parent_carrier = _resolve_published_parent_span_context(parent_node_execution_id)
else:
workflow_parent_carrier = _resolve_workflow_parent_carrier(parent_node_execution_id, parent_workflow_run_id)
if workflow_parent_carrier is None:
root_trace_id = _resolve_workflow_root_trace_id(trace_info)
workflow_root_span_name: str | None = trace_info.workflow_run_id
if not isinstance(workflow_root_span_name, str) or not workflow_root_span_name.strip():
@@ -1176,7 +1221,7 @@ class ArizePhoenixDataTrace(BaseTraceInstance):
if root_span_attributes:
root_span_attributes_dict.update(root_span_attributes)
root_span = self.tracer.start_span(name=span_name, attributes=root_span_attributes_dict)
root_span = self.tracer.start_span(name=span_name, attributes=root_span_attributes_dict, context=Context())
with use_span(root_span, end_on_exit=False):
self.propagator.inject(carrier=carrier)

View File

@@ -1,3 +1,5 @@
import json
from collections.abc import Sequence
from datetime import UTC, datetime, timedelta
from types import SimpleNamespace
from typing import Any, cast
@@ -8,8 +10,10 @@ import pytest
from dify_trace_arize_phoenix.arize_phoenix_trace import (
_NODE_TYPE_TO_SPAN_KIND,
ArizePhoenixDataTrace,
_app_uses_phoenix_provider,
_build_graph_parent_index,
_get_node_span_kind,
_parent_workflow_can_publish_span_context,
_phoenix_parent_span_redis_key,
_resolve_node_parent,
_resolve_published_parent_span_context,
@@ -25,9 +29,13 @@ from dify_trace_arize_phoenix.arize_phoenix_trace import (
)
from dify_trace_arize_phoenix.config import ArizeConfig, PhoenixConfig
from openinference.semconv.trace import OpenInferenceSpanKindValues, SpanAttributes
from opentelemetry.sdk.trace import Tracer
from opentelemetry.context import Context
from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.sdk.trace import ReadableSpan, Tracer
from opentelemetry.sdk.trace.export import SimpleSpanProcessor, SpanExporter, SpanExportResult
from opentelemetry.semconv.trace import SpanAttributes as OTELSpanAttributes
from opentelemetry.trace import StatusCode
from opentelemetry.trace import NonRecordingSpan, SpanContext, StatusCode, TraceFlags, TraceState, use_span
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from core.ops.entities.trace_entity import (
DatasetRetrievalTraceInfo,
@@ -96,6 +104,32 @@ def _get_start_span_call(start_span_mock, *, span_name: str):
raise AssertionError(f"Could not find start_span call with name={span_name!r}")
class _FakeQuery:
def __init__(self, result):
self._result = result
def filter(self, *args, **kwargs):
return self
def where(self, *args, **kwargs):
return self
def first(self):
return self._result
class _CollectingSpanExporter(SpanExporter):
def __init__(self):
self.spans: list[ReadableSpan] = []
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
self.spans.extend(spans)
return SpanExportResult.SUCCESS
def shutdown(self) -> None:
return None
def _make_node_execution(**kwargs):
defaults = {
"node_type": "tool",
@@ -233,6 +267,45 @@ def test_wrap_span_metadata():
assert res == {"a": 1, "b": 2, "created_from": "Dify"}
def test_app_uses_phoenix_provider_only_for_enabled_arize_or_phoenix():
assert _app_uses_phoenix_provider({"enabled": True, "tracing_provider": "phoenix"}) is True
assert _app_uses_phoenix_provider({"enabled": True, "tracing_provider": "arize"}) is True
assert _app_uses_phoenix_provider({"enabled": False, "tracing_provider": "phoenix"}) is False
assert _app_uses_phoenix_provider({"enabled": True, "tracing_provider": "langfuse"}) is False
assert _app_uses_phoenix_provider(None) is False
def test_parent_workflow_can_publish_span_context_keeps_unknown_parent_retryable(monkeypatch):
monkeypatch.setattr(
"dify_trace_arize_phoenix.arize_phoenix_trace.db.session.query",
lambda model: _FakeQuery(None),
)
assert _parent_workflow_can_publish_span_context("missing-run") is True
def test_parent_workflow_can_publish_span_context_checks_parent_app_tracing(monkeypatch):
parent_run = SimpleNamespace(app_id="parent-app")
parent_app = SimpleNamespace(tracing=json.dumps({"enabled": True, "tracing_provider": "phoenix"}))
def fake_query(model):
if getattr(model, "__tablename__", None) == "workflow_runs":
return _FakeQuery(parent_run)
if getattr(model, "__tablename__", None) == "apps":
return _FakeQuery(parent_app)
raise AssertionError(f"Unexpected model query: {model}")
monkeypatch.setattr("dify_trace_arize_phoenix.arize_phoenix_trace.db.session.query", fake_query)
assert _parent_workflow_can_publish_span_context("parent-run") is True
parent_app.tracing = json.dumps({"enabled": False, "tracing_provider": "phoenix"})
assert _parent_workflow_can_publish_span_context("parent-run") is False
parent_app.tracing = json.dumps({"enabled": True, "tracing_provider": "langfuse"})
assert _parent_workflow_can_publish_span_context("parent-run") is False
class TestGetNodeSpanKind:
def test_all_node_types_are_mapped_correctly(self):
special_mappings = {
@@ -839,6 +912,10 @@ def test_workflow_trace_raises_pending_parent_error_when_parent_node_context_is_
with (
patch.object(trace_instance, "get_service_account_with_tenant", return_value=MagicMock()),
patch(
"dify_trace_arize_phoenix.arize_phoenix_trace._parent_workflow_can_publish_span_context",
return_value=True,
),
patch.object(trace_instance, "ensure_root_span") as mock_ensure_root_span,
pytest.raises(PendingTraceParentContextError) as exc_info,
):
@@ -851,6 +928,102 @@ def test_workflow_trace_raises_pending_parent_error_when_parent_node_context_is_
mock_ensure_root_span.assert_not_called()
@patch("dify_trace_arize_phoenix.arize_phoenix_trace.db")
@patch("dify_trace_arize_phoenix.arize_phoenix_trace.DifyCoreRepositoryFactory")
@patch("dify_trace_arize_phoenix.arize_phoenix_trace.sessionmaker")
def test_workflow_trace_falls_back_when_parent_app_tracing_cannot_publish_parent_context(
mock_sessionmaker,
mock_repo_factory,
mock_db,
trace_instance,
):
mock_db.engine = MagicMock()
info = _make_workflow_info(
message_id="message-1",
workflow_run_id="workflow-run-1",
metadata={
"app_id": "app1",
"parent_trace_context": {
"parent_workflow_run_id": "outer-workflow-run-1",
"parent_node_execution_id": "outer-node-execution-1",
},
},
)
repo = MagicMock()
repo.get_by_workflow_execution.return_value = []
mock_repo_factory.create_workflow_node_execution_repository.return_value = repo
trace_instance._mock_redis_client.get.return_value = None
parent_carrier = {}
parent_context = object()
with (
patch.object(trace_instance, "get_service_account_with_tenant", return_value=MagicMock()),
patch(
"dify_trace_arize_phoenix.arize_phoenix_trace._parent_workflow_can_publish_span_context",
return_value=False,
),
patch.object(trace_instance, "ensure_root_span", return_value=parent_carrier) as mock_ensure_root_span,
patch.object(trace_instance.propagator, "extract", return_value=parent_context) as mock_extract,
):
trace_instance.workflow_trace(info)
mock_ensure_root_span.assert_called_once_with(
"outer-workflow-run-1",
root_span_name="workflow-run-1",
root_span_attributes={
SpanAttributes.INPUT_VALUE: safe_json_dumps(info.workflow_run_inputs),
SpanAttributes.INPUT_MIME_TYPE: "application/json",
SpanAttributes.OUTPUT_VALUE: safe_json_dumps(info.workflow_run_outputs),
SpanAttributes.OUTPUT_MIME_TYPE: "application/json",
},
)
mock_extract.assert_called_once_with(carrier=parent_carrier)
workflow_span_call = _get_start_span_call(trace_instance.tracer.start_span, span_name="workflow_workflow-run-1")
assert workflow_span_call.kwargs["context"] is parent_context
@patch("dify_trace_arize_phoenix.arize_phoenix_trace.db")
@patch("dify_trace_arize_phoenix.arize_phoenix_trace.DifyCoreRepositoryFactory")
@patch("dify_trace_arize_phoenix.arize_phoenix_trace.sessionmaker")
def test_workflow_trace_still_retries_when_parent_app_can_publish_parent_context(
mock_sessionmaker,
mock_repo_factory,
mock_db,
trace_instance,
):
mock_db.engine = MagicMock()
info = _make_workflow_info(
message_id="message-1",
workflow_run_id="workflow-run-1",
metadata={
"app_id": "app1",
"parent_trace_context": {
"parent_workflow_run_id": "outer-workflow-run-1",
"parent_node_execution_id": "outer-node-execution-1",
},
},
)
repo = MagicMock()
repo.get_by_workflow_execution.return_value = []
mock_repo_factory.create_workflow_node_execution_repository.return_value = repo
trace_instance._mock_redis_client.get.return_value = None
with (
patch.object(trace_instance, "get_service_account_with_tenant", return_value=MagicMock()),
patch(
"dify_trace_arize_phoenix.arize_phoenix_trace._parent_workflow_can_publish_span_context",
return_value=True,
),
patch.object(trace_instance, "ensure_root_span") as mock_ensure_root_span,
pytest.raises(PendingTraceParentContextError) as exc_info,
):
trace_instance.workflow_trace(info)
assert exc_info.value.parent_node_execution_id == "outer-node-execution-1"
mock_ensure_root_span.assert_not_called()
@patch("dify_trace_arize_phoenix.arize_phoenix_trace.db")
@patch("dify_trace_arize_phoenix.arize_phoenix_trace.DifyCoreRepositoryFactory")
@patch("dify_trace_arize_phoenix.arize_phoenix_trace.sessionmaker")
@@ -1544,6 +1717,38 @@ def test_ensure_root_span_basic(trace_instance):
assert "tid" in trace_instance.dify_trace_ids
def test_ensure_root_span_ignores_unsampled_ambient_otel_parent():
exporter = _CollectingSpanExporter()
provider = trace_sdk.TracerProvider()
provider.add_span_processor(SimpleSpanProcessor(exporter))
trace_instance = ArizePhoenixDataTrace.__new__(ArizePhoenixDataTrace)
trace_instance.tracer = cast(Tracer, provider.get_tracer("test-phoenix-root-span"))
trace_instance.propagator = TraceContextTextMapPropagator()
trace_instance.project = "p"
trace_instance.dify_trace_ids = set()
trace_instance.root_span_carriers = {}
trace_instance.carrier = {}
ambient_span_context = SpanContext(
trace_id=0x11111111111111111111111111111111,
span_id=0x2222222222222222,
is_remote=True,
trace_flags=TraceFlags(0),
trace_state=TraceState(),
)
with use_span(NonRecordingSpan(ambient_span_context), end_on_exit=False):
carrier = trace_instance.ensure_root_span("tid")
assert len(exporter.spans) == 1
root_span = exporter.spans[0]
root_span_context = root_span.get_span_context()
assert root_span_context is not None
assert root_span.parent is None
assert root_span_context.trace_id != ambient_span_context.trace_id
assert carrier["traceparent"].split("-")[1] == f"{root_span_context.trace_id:032x}"
def test_ensure_root_span_uses_custom_name_and_attributes(trace_instance):
root_attributes = {
SpanAttributes.INPUT_VALUE: '{"input":"value"}',
@@ -1561,6 +1766,7 @@ def test_ensure_root_span_uses_custom_name_and_attributes(trace_instance):
SpanAttributes.INPUT_VALUE: '{"input":"value"}',
SpanAttributes.OUTPUT_VALUE: '{"output":"value"}',
},
context=Context(),
)