diff --git a/api/core/app/apps/pipeline/pipeline_runner.py b/api/core/app/apps/pipeline/pipeline_runner.py index 1249303285..78df0639bd 100644 --- a/api/core/app/apps/pipeline/pipeline_runner.py +++ b/api/core/app/apps/pipeline/pipeline_runner.py @@ -16,7 +16,6 @@ from core.repositories.factory import WorkflowExecutionRepository, WorkflowNodeE from core.workflow.node_factory import DifyNodeFactory, get_default_root_node_id from core.workflow.system_variables import build_bootstrap_variables, build_system_variables from core.workflow.variable_pool_initializer import add_node_inputs_to_pool, add_variables_to_pool -from core.workflow.variable_prefixes import CHILD_SYNC_VARIABLE_NODE_IDS from core.workflow.workflow_entry import WorkflowEntry from dify_graph.entities.graph_init_params import GraphInitParams from dify_graph.enums import WorkflowType @@ -275,7 +274,6 @@ class PipelineRunner(WorkflowBasedAppRunner): invoke_from=invoke_from, ), call_depth=0, - child_sync_variable_node_ids=CHILD_SYNC_VARIABLE_NODE_IDS, ) node_factory = DifyNodeFactory( diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index ded8fb0035..634f2e1e58 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -40,7 +40,6 @@ from core.workflow.system_variables import ( inject_default_system_variable_mappings, ) from core.workflow.variable_pool_initializer import add_variables_to_pool -from core.workflow.variable_prefixes import CHILD_SYNC_VARIABLE_NODE_IDS from core.workflow.workflow_entry import WorkflowEntry from dify_graph.entities import GraphInitParams from dify_graph.entities.graph_config import NodeConfigDictAdapter @@ -137,7 +136,6 @@ class WorkflowBasedAppRunner: invoke_from=invoke_from, ), call_depth=0, - child_sync_variable_node_ids=CHILD_SYNC_VARIABLE_NODE_IDS, ) # Use the provided graph_runtime_state for consistent state management @@ -292,7 +290,6 @@ class WorkflowBasedAppRunner: invoke_from=InvokeFrom.DEBUGGER, ), call_depth=0, - child_sync_variable_node_ids=CHILD_SYNC_VARIABLE_NODE_IDS, ) node_factory = DifyNodeFactory( diff --git a/api/core/workflow/variable_prefixes.py b/api/core/workflow/variable_prefixes.py index 9574bc8a42..7664be0983 100644 --- a/api/core/workflow/variable_prefixes.py +++ b/api/core/workflow/variable_prefixes.py @@ -2,5 +2,3 @@ SYSTEM_VARIABLE_NODE_ID = "sys" ENVIRONMENT_VARIABLE_NODE_ID = "env" CONVERSATION_VARIABLE_NODE_ID = "conversation" RAG_PIPELINE_VARIABLE_NODE_ID = "rag" - -CHILD_SYNC_VARIABLE_NODE_IDS = frozenset((CONVERSATION_VARIABLE_NODE_ID,)) diff --git a/api/core/workflow/workflow_entry.py b/api/core/workflow/workflow_entry.py index 22c855e7bf..c59fb3422b 100644 --- a/api/core/workflow/workflow_entry.py +++ b/api/core/workflow/workflow_entry.py @@ -15,7 +15,7 @@ from core.workflow.system_variables import ( inject_default_system_variable_mappings, ) from core.workflow.variable_pool_initializer import add_node_inputs_to_pool, add_variables_to_pool -from core.workflow.variable_prefixes import CHILD_SYNC_VARIABLE_NODE_IDS, ENVIRONMENT_VARIABLE_NODE_ID +from core.workflow.variable_prefixes import ENVIRONMENT_VARIABLE_NODE_ID from dify_graph.entities import GraphInitParams from dify_graph.entities.graph_config import NodeConfigDictAdapter from dify_graph.errors import WorkflowNodeRunFailedError @@ -234,7 +234,6 @@ class WorkflowEntry: invoke_from=InvokeFrom.DEBUGGER, ), call_depth=0, - child_sync_variable_node_ids=CHILD_SYNC_VARIABLE_NODE_IDS, ) graph_runtime_state = GraphRuntimeState( variable_pool=variable_pool, @@ -385,7 +384,6 @@ class WorkflowEntry: invoke_from=InvokeFrom.DEBUGGER, ), call_depth=0, - child_sync_variable_node_ids=CHILD_SYNC_VARIABLE_NODE_IDS, ) graph_runtime_state = GraphRuntimeState( variable_pool=variable_pool, diff --git a/api/dify_graph/entities/graph_init_params.py b/api/dify_graph/entities/graph_init_params.py index 66218517c0..f785d58a52 100644 --- a/api/dify_graph/entities/graph_init_params.py +++ b/api/dify_graph/entities/graph_init_params.py @@ -3,6 +3,8 @@ from typing import Any from pydantic import BaseModel, Field +DIFY_RUN_CONTEXT_KEY = "_dify" + class GraphInitParams(BaseModel): """GraphInitParams encapsulates the configurations and contextual information @@ -20,7 +22,3 @@ class GraphInitParams(BaseModel): graph_config: Mapping[str, Any] = Field(..., description="graph config") run_context: Mapping[str, Any] = Field(..., description="runtime context") call_depth: int = Field(..., description="call depth") - child_sync_variable_node_ids: frozenset[str] = Field( - default_factory=frozenset, - description="Variable node IDs whose values must be synced back from child graph executions.", - ) diff --git a/api/dify_graph/graph_engine/event_management/event_handlers.py b/api/dify_graph/graph_engine/event_management/event_handlers.py index f0c8eada17..e7a8291373 100644 --- a/api/dify_graph/graph_engine/event_management/event_handlers.py +++ b/api/dify_graph/graph_engine/event_management/event_handlers.py @@ -94,6 +94,10 @@ class EventHandler: Args: event: The event to handle """ + if isinstance(event, NodeRunVariableUpdatedEvent): + self._dispatch(event) + return + # Events in loops or iterations are always collected if event.in_loop_id or event.in_iteration_id: self._event_collector.collect(event) diff --git a/api/dify_graph/nodes/iteration/iteration_node.py b/api/dify_graph/nodes/iteration/iteration_node.py index c1c25aa0f9..e8c16ceb99 100644 --- a/api/dify_graph/nodes/iteration/iteration_node.py +++ b/api/dify_graph/nodes/iteration/iteration_node.py @@ -1,7 +1,7 @@ import logging from collections.abc import Generator, Mapping, Sequence from concurrent.futures import Future, ThreadPoolExecutor, as_completed -from contextlib import AbstractContextManager +from contextlib import AbstractContextManager, nullcontext from datetime import UTC, datetime from typing import TYPE_CHECKING, Any, NewType, cast @@ -36,7 +36,6 @@ from dify_graph.nodes.iteration.entities import ErrorHandleMode, IterationNodeDa from dify_graph.runtime import VariablePool from dify_graph.variables import IntegerVariable, NoneSegment from dify_graph.variables.segments import ArrayAnySegment, ArraySegment -from dify_graph.variables.variables import Variable from .exc import ( InvalidIteratorValueError, @@ -203,11 +202,6 @@ class IterationNode(LLMUsageTrackingMixin, Node[IterationNodeData]): graph_engine=graph_engine, ) - # Sync conversation variables after each iteration completes - self._sync_child_variable_snapshot( - self._extract_child_variable_snapshot(variable_pool=graph_engine.graph_runtime_state.variable_pool) - ) - # Accumulate usage from this iteration usage_accumulator[0] = self._merge_usage( usage_accumulator[0], graph_engine.graph_runtime_state.llm_usage @@ -235,7 +229,6 @@ class IterationNode(LLMUsageTrackingMixin, Node[IterationNodeData]): float, list[GraphNodeEventBase], object | None, - dict[tuple[str, str], Variable], LLMUsage, ] ], @@ -260,7 +253,6 @@ class IterationNode(LLMUsageTrackingMixin, Node[IterationNodeData]): iteration_duration, events, output_value, - child_variable_snapshot, iteration_usage, ) = result @@ -276,9 +268,6 @@ class IterationNode(LLMUsageTrackingMixin, Node[IterationNodeData]): usage_accumulator[0] = self._merge_usage(usage_accumulator[0], iteration_usage) - # Sync workflow-owned variable scopes after iteration completion - self._sync_child_variable_snapshot(child_variable_snapshot) - except Exception as e: # Handle errors based on error_handle_mode match self.node_data.error_handle_mode: @@ -302,7 +291,7 @@ class IterationNode(LLMUsageTrackingMixin, Node[IterationNodeData]): index: int, item: object, execution_context: AbstractContextManager[object], - ) -> tuple[float, list[GraphNodeEventBase], object | None, dict[tuple[str, str], Variable], LLMUsage]: + ) -> tuple[float, list[GraphNodeEventBase], object | None, LLMUsage]: """Execute a single iteration in parallel mode and return results.""" with execution_context: iter_start_at = datetime.now(UTC).replace(tzinfo=None) @@ -321,22 +310,21 @@ class IterationNode(LLMUsageTrackingMixin, Node[IterationNodeData]): # Get the output value from the temporary outputs list output_value = outputs_temp[0] if outputs_temp else None - child_variable_snapshot = self._extract_child_variable_snapshot( - variable_pool=graph_engine.graph_runtime_state.variable_pool - ) iteration_duration = (datetime.now(UTC).replace(tzinfo=None) - iter_start_at).total_seconds() return ( iteration_duration, events, output_value, - child_variable_snapshot, graph_engine.graph_runtime_state.llm_usage, ) def _capture_execution_context(self) -> AbstractContextManager[object]: """Return the application-supplied execution context for parallel iterations.""" - return self.graph_runtime_state.execution_context + execution_context = self.graph_runtime_state.execution_context + if execution_context is not None: + return execution_context + return nullcontext() def _handle_iteration_success( self, @@ -510,26 +498,6 @@ class IterationNode(LLMUsageTrackingMixin, Node[IterationNodeData]): return variable_mapping - def _extract_child_variable_snapshot(self, *, variable_pool: VariablePool) -> dict[tuple[str, str], Variable]: - snapshot: dict[tuple[str, str], Variable] = {} - for node_id in self.graph_init_params.child_sync_variable_node_ids: - variables = variable_pool.variable_dictionary.get(node_id, {}) - for name, variable in variables.items(): - snapshot[(node_id, name)] = variable.model_copy(deep=True) - return snapshot - - def _sync_child_variable_snapshot(self, snapshot: dict[tuple[str, str], Variable]) -> None: - parent_pool = self.graph_runtime_state.variable_pool - for node_id in self.graph_init_params.child_sync_variable_node_ids: - current_keys = set(parent_pool.variable_dictionary.get(node_id, {})) - snapshot_keys = {name for scope_node_id, name in snapshot if scope_node_id == node_id} - - for removed_key in current_keys - snapshot_keys: - parent_pool.remove((node_id, removed_key)) - - for selector, variable in snapshot.items(): - parent_pool.add(selector, variable) - def _append_iteration_info_to_event( self, event: GraphNodeEventBase, @@ -592,7 +560,6 @@ class IterationNode(LLMUsageTrackingMixin, Node[IterationNodeData]): graph_config=self.graph_config, run_context=self.run_context, call_depth=self.workflow_call_depth, - child_sync_variable_node_ids=self.graph_init_params.child_sync_variable_node_ids, ) # Create a deep copy of the variable pool for each iteration variable_pool_copy = self.graph_runtime_state.variable_pool.model_copy(deep=True) diff --git a/api/dify_graph/nodes/loop/loop_node.py b/api/dify_graph/nodes/loop/loop_node.py index 34bfacec14..64a0535ef4 100644 --- a/api/dify_graph/nodes/loop/loop_node.py +++ b/api/dify_graph/nodes/loop/loop_node.py @@ -416,7 +416,6 @@ class LoopNode(LLMUsageTrackingMixin, Node[LoopNodeData]): graph_config=self.graph_config, run_context=self.run_context, call_depth=self.workflow_call_depth, - child_sync_variable_node_ids=self.graph_init_params.child_sync_variable_node_ids, ) # Create a new GraphRuntimeState for this iteration diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_table_runner.py b/api/tests/unit_tests/core/workflow/graph_engine/test_table_runner.py index 2dec3580c0..aef32208d7 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_table_runner.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_table_runner.py @@ -24,7 +24,6 @@ from core.tools.utils.yaml_utils import _load_yaml_file from core.workflow.node_factory import DifyNodeFactory, get_default_root_node_id from core.workflow.system_variables import build_bootstrap_variables, build_system_variables from core.workflow.variable_pool_initializer import add_node_inputs_to_pool, add_variables_to_pool -from core.workflow.variable_prefixes import CHILD_SYNC_VARIABLE_NODE_IDS from dify_graph.entities.graph_init_params import GraphInitParams from dify_graph.graph import Graph from dify_graph.graph_engine import GraphEngine, GraphEngineConfig @@ -206,7 +205,6 @@ class WorkflowRunner: } }, call_depth=0, - child_sync_variable_node_ids=CHILD_SYNC_VARIABLE_NODE_IDS, ) system_variables = build_system_variables( diff --git a/api/tests/unit_tests/core/workflow/test_workflow_entry_helpers.py b/api/tests/unit_tests/core/workflow/test_workflow_entry_helpers.py index 5960230dd3..9945fb992f 100644 --- a/api/tests/unit_tests/core/workflow/test_workflow_entry_helpers.py +++ b/api/tests/unit_tests/core/workflow/test_workflow_entry_helpers.py @@ -7,7 +7,6 @@ import pytest from core.app.apps.exc import GenerateTaskStoppedError from core.app.entities.app_invoke_entities import InvokeFrom, UserFrom from core.workflow import workflow_entry -from core.workflow.variable_prefixes import CHILD_SYNC_VARIABLE_NODE_IDS from dify_graph.entities.graph_config import NodeConfigDictAdapter from dify_graph.enums import NodeType from dify_graph.errors import WorkflowNodeRunFailedError @@ -503,7 +502,6 @@ class TestWorkflowEntryHelpers: ), run_context={"_dify": "context"}, call_depth=0, - child_sync_variable_node_ids=CHILD_SYNC_VARIABLE_NODE_IDS, ) dify_node_factory_cls.assert_called_once_with( graph_init_params=sentinel.graph_init_params, diff --git a/api/tests/workflow_test_utils.py b/api/tests/workflow_test_utils.py index 60513d17fb..fad486f7e0 100644 --- a/api/tests/workflow_test_utils.py +++ b/api/tests/workflow_test_utils.py @@ -3,7 +3,6 @@ from typing import Any from core.app.entities.app_invoke_entities import InvokeFrom, UserFrom, build_dify_run_context from core.workflow.variable_pool_initializer import add_node_inputs_to_pool, add_variables_to_pool -from core.workflow.variable_prefixes import CHILD_SYNC_VARIABLE_NODE_IDS from dify_graph.entities.graph_init_params import GraphInitParams from dify_graph.runtime import VariablePool from dify_graph.variables.variables import Variable @@ -54,7 +53,6 @@ def build_test_graph_init_params( extra_context=extra_context, ), call_depth=call_depth, - child_sync_variable_node_ids=CHILD_SYNC_VARIABLE_NODE_IDS, )