feat(workflow): domain model for workflow node execution (#19430)

Signed-off-by: -LAN- <laipz8200@outlook.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
-LAN-
2025-05-17 00:56:16 +08:00
committed by GitHub
parent aeceb200ec
commit 4977bb21ec
31 changed files with 1108 additions and 483 deletions

View File

@@ -0,0 +1,98 @@
"""
Domain entities for workflow node execution.
This module contains the domain model for workflow node execution, which is used
by the core workflow module. These models are independent of the storage mechanism
and don't contain implementation details like tenant_id, app_id, etc.
"""
from collections.abc import Mapping
from datetime import datetime
from enum import StrEnum
from typing import Any, Optional
from pydantic import BaseModel, Field
from core.workflow.entities.node_entities import NodeRunMetadataKey
from core.workflow.nodes.enums import NodeType
class NodeExecutionStatus(StrEnum):
"""
Node Execution Status Enum.
"""
RUNNING = "running"
SUCCEEDED = "succeeded"
FAILED = "failed"
EXCEPTION = "exception"
RETRY = "retry"
class NodeExecution(BaseModel):
"""
Domain model for workflow node execution.
This model represents the core business entity of a node execution,
without implementation details like tenant_id, app_id, etc.
Note: User/context-specific fields (triggered_from, created_by, created_by_role)
have been moved to the repository implementation to keep the domain model clean.
These fields are still accepted in the constructor for backward compatibility,
but they are not stored in the model.
"""
# Core identification fields
id: str # Unique identifier for this execution record
node_execution_id: Optional[str] = None # Optional secondary ID for cross-referencing
workflow_id: str # ID of the workflow this node belongs to
workflow_run_id: Optional[str] = None # ID of the specific workflow run (null for single-step debugging)
# Execution positioning and flow
index: int # Sequence number for ordering in trace visualization
predecessor_node_id: Optional[str] = None # ID of the node that executed before this one
node_id: str # ID of the node being executed
node_type: NodeType # Type of node (e.g., start, llm, knowledge)
title: str # Display title of the node
# Execution data
inputs: Optional[Mapping[str, Any]] = None # Input variables used by this node
process_data: Optional[Mapping[str, Any]] = None # Intermediate processing data
outputs: Optional[Mapping[str, Any]] = None # Output variables produced by this node
# Execution state
status: NodeExecutionStatus = NodeExecutionStatus.RUNNING # Current execution status
error: Optional[str] = None # Error message if execution failed
elapsed_time: float = Field(default=0.0) # Time taken for execution in seconds
# Additional metadata
metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None # Execution metadata (tokens, cost, etc.)
# Timing information
created_at: datetime # When execution started
finished_at: Optional[datetime] = None # When execution completed
def update_from_mapping(
self,
inputs: Optional[Mapping[str, Any]] = None,
process_data: Optional[Mapping[str, Any]] = None,
outputs: Optional[Mapping[str, Any]] = None,
metadata: Optional[Mapping[NodeRunMetadataKey, Any]] = None,
) -> None:
"""
Update the model from mappings.
Args:
inputs: The inputs to update
process_data: The process data to update
outputs: The outputs to update
metadata: The metadata to update
"""
if inputs is not None:
self.inputs = dict(inputs)
if process_data is not None:
self.process_data = dict(process_data)
if outputs is not None:
self.outputs = dict(outputs)
if metadata is not None:
self.metadata = dict(metadata)

View File

@@ -2,12 +2,12 @@ from collections.abc import Sequence
from dataclasses import dataclass
from typing import Literal, Optional, Protocol
from models.workflow import WorkflowNodeExecution
from core.workflow.entities.node_execution_entities import NodeExecution
@dataclass
class OrderConfig:
"""Configuration for ordering WorkflowNodeExecution instances."""
"""Configuration for ordering NodeExecution instances."""
order_by: list[str]
order_direction: Optional[Literal["asc", "desc"]] = None
@@ -15,10 +15,10 @@ class OrderConfig:
class WorkflowNodeExecutionRepository(Protocol):
"""
Repository interface for WorkflowNodeExecution.
Repository interface for NodeExecution.
This interface defines the contract for accessing and manipulating
WorkflowNodeExecution data, regardless of the underlying storage mechanism.
NodeExecution data, regardless of the underlying storage mechanism.
Note: Domain-specific concepts like multi-tenancy (tenant_id), application context (app_id),
and trigger sources (triggered_from) should be handled at the implementation level, not in
@@ -26,24 +26,28 @@ class WorkflowNodeExecutionRepository(Protocol):
application domains or deployment scenarios.
"""
def save(self, execution: WorkflowNodeExecution) -> None:
def save(self, execution: NodeExecution) -> None:
"""
Save a WorkflowNodeExecution instance.
Save or update a NodeExecution instance.
This method handles both creating new records and updating existing ones.
The implementation should determine whether to create or update based on
the execution's ID or other identifying fields.
Args:
execution: The WorkflowNodeExecution instance to save
execution: The NodeExecution instance to save or update
"""
...
def get_by_node_execution_id(self, node_execution_id: str) -> Optional[WorkflowNodeExecution]:
def get_by_node_execution_id(self, node_execution_id: str) -> Optional[NodeExecution]:
"""
Retrieve a WorkflowNodeExecution by its node_execution_id.
Retrieve a NodeExecution by its node_execution_id.
Args:
node_execution_id: The node execution ID
Returns:
The WorkflowNodeExecution instance if found, None otherwise
The NodeExecution instance if found, None otherwise
"""
...
@@ -51,9 +55,9 @@ class WorkflowNodeExecutionRepository(Protocol):
self,
workflow_run_id: str,
order_config: Optional[OrderConfig] = None,
) -> Sequence[WorkflowNodeExecution]:
) -> Sequence[NodeExecution]:
"""
Retrieve all WorkflowNodeExecution instances for a specific workflow run.
Retrieve all NodeExecution instances for a specific workflow run.
Args:
workflow_run_id: The workflow run ID
@@ -62,34 +66,25 @@ class WorkflowNodeExecutionRepository(Protocol):
order_config.order_direction: Direction to order ("asc" or "desc")
Returns:
A list of WorkflowNodeExecution instances
A list of NodeExecution instances
"""
...
def get_running_executions(self, workflow_run_id: str) -> Sequence[WorkflowNodeExecution]:
def get_running_executions(self, workflow_run_id: str) -> Sequence[NodeExecution]:
"""
Retrieve all running WorkflowNodeExecution instances for a specific workflow run.
Retrieve all running NodeExecution instances for a specific workflow run.
Args:
workflow_run_id: The workflow run ID
Returns:
A list of running WorkflowNodeExecution instances
"""
...
def update(self, execution: WorkflowNodeExecution) -> None:
"""
Update an existing WorkflowNodeExecution instance.
Args:
execution: The WorkflowNodeExecution instance to update
A list of running NodeExecution instances
"""
...
def clear(self) -> None:
"""
Clear all WorkflowNodeExecution records based on implementation-specific criteria.
Clear all NodeExecution records based on implementation-specific criteria.
This method is intended to be used for bulk deletion operations, such as removing
all records associated with a specific app_id and tenant_id in multi-tenant implementations.

View File

@@ -58,7 +58,7 @@ from core.workflow.repository.workflow_node_execution_repository import Workflow
from core.workflow.workflow_cycle_manager import WorkflowCycleManager
from extensions.ext_database import db
from models.account import Account
from models.enums import CreatedByRole
from models.enums import CreatorUserRole
from models.model import EndUser
from models.workflow import (
Workflow,
@@ -94,11 +94,11 @@ class WorkflowAppGenerateTaskPipeline:
if isinstance(user, EndUser):
self._user_id = user.id
user_session_id = user.session_id
self._created_by_role = CreatedByRole.END_USER
self._created_by_role = CreatorUserRole.END_USER
elif isinstance(user, Account):
self._user_id = user.id
user_session_id = user.id
self._created_by_role = CreatedByRole.ACCOUNT
self._created_by_role = CreatorUserRole.ACCOUNT
else:
raise ValueError(f"Invalid user type: {type(user)}")

View File

@@ -46,26 +46,28 @@ from core.app.entities.task_entities import (
)
from core.app.task_pipeline.exc import WorkflowRunNotFoundError
from core.file import FILE_MODEL_IDENTITY, File
from core.model_runtime.utils.encoders import jsonable_encoder
from core.ops.entities.trace_entity import TraceTaskName
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask
from core.tools.tool_manager import ToolManager
from core.workflow.entities.node_entities import NodeRunMetadataKey
from core.workflow.entities.node_execution_entities import (
NodeExecution,
NodeExecutionStatus,
)
from core.workflow.enums import SystemVariableKey
from core.workflow.nodes import NodeType
from core.workflow.nodes.tool.entities import ToolNodeData
from core.workflow.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core.workflow.workflow_entry import WorkflowEntry
from models.account import Account
from models.enums import CreatedByRole, WorkflowRunTriggeredFrom
from models.model import EndUser
from models.workflow import (
from models import (
Account,
CreatorUserRole,
EndUser,
Workflow,
WorkflowNodeExecution,
WorkflowNodeExecutionStatus,
WorkflowNodeExecutionTriggeredFrom,
WorkflowRun,
WorkflowRunStatus,
WorkflowRunTriggeredFrom,
)
@@ -78,7 +80,6 @@ class WorkflowCycleManager:
workflow_node_execution_repository: WorkflowNodeExecutionRepository,
) -> None:
self._workflow_run: WorkflowRun | None = None
self._workflow_node_executions: dict[str, WorkflowNodeExecution] = {}
self._application_generate_entity = application_generate_entity
self._workflow_system_variables = workflow_system_variables
self._workflow_node_execution_repository = workflow_node_execution_repository
@@ -89,7 +90,7 @@ class WorkflowCycleManager:
session: Session,
workflow_id: str,
user_id: str,
created_by_role: CreatedByRole,
created_by_role: CreatorUserRole,
) -> WorkflowRun:
workflow_stmt = select(Workflow).where(Workflow.id == workflow_id)
workflow = session.scalar(workflow_stmt)
@@ -258,21 +259,22 @@ class WorkflowCycleManager:
workflow_run.exceptions_count = exceptions_count
# Use the instance repository to find running executions for a workflow run
running_workflow_node_executions = self._workflow_node_execution_repository.get_running_executions(
running_domain_executions = self._workflow_node_execution_repository.get_running_executions(
workflow_run_id=workflow_run.id
)
# Update the cache with the retrieved executions
for execution in running_workflow_node_executions:
if execution.node_execution_id:
self._workflow_node_executions[execution.node_execution_id] = execution
# Update the domain models
now = datetime.now(UTC).replace(tzinfo=None)
for domain_execution in running_domain_executions:
if domain_execution.node_execution_id:
# Update the domain model
domain_execution.status = NodeExecutionStatus.FAILED
domain_execution.error = error
domain_execution.finished_at = now
domain_execution.elapsed_time = (now - domain_execution.created_at).total_seconds()
for workflow_node_execution in running_workflow_node_executions:
now = datetime.now(UTC).replace(tzinfo=None)
workflow_node_execution.status = WorkflowNodeExecutionStatus.FAILED.value
workflow_node_execution.error = error
workflow_node_execution.finished_at = now
workflow_node_execution.elapsed_time = (now - workflow_node_execution.created_at).total_seconds()
# Update the repository with the domain model
self._workflow_node_execution_repository.save(domain_execution)
if trace_manager:
trace_manager.add_trace_task(
@@ -286,63 +288,67 @@ class WorkflowCycleManager:
return workflow_run
def _handle_node_execution_start(
self, *, workflow_run: WorkflowRun, event: QueueNodeStartedEvent
) -> WorkflowNodeExecution:
workflow_node_execution = WorkflowNodeExecution()
workflow_node_execution.id = str(uuid4())
workflow_node_execution.tenant_id = workflow_run.tenant_id
workflow_node_execution.app_id = workflow_run.app_id
workflow_node_execution.workflow_id = workflow_run.workflow_id
workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value
workflow_node_execution.workflow_run_id = workflow_run.id
workflow_node_execution.predecessor_node_id = event.predecessor_node_id
workflow_node_execution.index = event.node_run_index
workflow_node_execution.node_execution_id = event.node_execution_id
workflow_node_execution.node_id = event.node_id
workflow_node_execution.node_type = event.node_type.value
workflow_node_execution.title = event.node_data.title
workflow_node_execution.status = WorkflowNodeExecutionStatus.RUNNING.value
workflow_node_execution.created_by_role = workflow_run.created_by_role
workflow_node_execution.created_by = workflow_run.created_by
workflow_node_execution.execution_metadata = json.dumps(
{
NodeRunMetadataKey.PARALLEL_MODE_RUN_ID: event.parallel_mode_run_id,
NodeRunMetadataKey.ITERATION_ID: event.in_iteration_id,
NodeRunMetadataKey.LOOP_ID: event.in_loop_id,
}
def _handle_node_execution_start(self, *, workflow_run: WorkflowRun, event: QueueNodeStartedEvent) -> NodeExecution:
# Create a domain model
created_at = datetime.now(UTC).replace(tzinfo=None)
metadata = {
NodeRunMetadataKey.PARALLEL_MODE_RUN_ID: event.parallel_mode_run_id,
NodeRunMetadataKey.ITERATION_ID: event.in_iteration_id,
NodeRunMetadataKey.LOOP_ID: event.in_loop_id,
}
domain_execution = NodeExecution(
id=str(uuid4()),
workflow_id=workflow_run.workflow_id,
workflow_run_id=workflow_run.id,
predecessor_node_id=event.predecessor_node_id,
index=event.node_run_index,
node_execution_id=event.node_execution_id,
node_id=event.node_id,
node_type=event.node_type,
title=event.node_data.title,
status=NodeExecutionStatus.RUNNING,
metadata=metadata,
created_at=created_at,
)
workflow_node_execution.created_at = datetime.now(UTC).replace(tzinfo=None)
# Use the instance repository to save the workflow node execution
self._workflow_node_execution_repository.save(workflow_node_execution)
# Use the instance repository to save the domain model
self._workflow_node_execution_repository.save(domain_execution)
self._workflow_node_executions[event.node_execution_id] = workflow_node_execution
return workflow_node_execution
return domain_execution
def _handle_workflow_node_execution_success(self, *, event: QueueNodeSucceededEvent) -> WorkflowNodeExecution:
workflow_node_execution = self._get_workflow_node_execution(node_execution_id=event.node_execution_id)
def _handle_workflow_node_execution_success(self, *, event: QueueNodeSucceededEvent) -> NodeExecution:
# Get the domain model from repository
domain_execution = self._workflow_node_execution_repository.get_by_node_execution_id(event.node_execution_id)
if not domain_execution:
raise ValueError(f"Domain node execution not found: {event.node_execution_id}")
# Process data
inputs = WorkflowEntry.handle_special_values(event.inputs)
process_data = WorkflowEntry.handle_special_values(event.process_data)
outputs = WorkflowEntry.handle_special_values(event.outputs)
execution_metadata_dict = dict(event.execution_metadata or {})
execution_metadata = json.dumps(jsonable_encoder(execution_metadata_dict)) if execution_metadata_dict else None
# Convert metadata keys to strings
execution_metadata_dict = {}
if event.execution_metadata:
for key, value in event.execution_metadata.items():
execution_metadata_dict[key] = value
finished_at = datetime.now(UTC).replace(tzinfo=None)
elapsed_time = (finished_at - event.start_at).total_seconds()
process_data = WorkflowEntry.handle_special_values(event.process_data)
# Update domain model
domain_execution.status = NodeExecutionStatus.SUCCEEDED
domain_execution.update_from_mapping(
inputs=inputs, process_data=process_data, outputs=outputs, metadata=execution_metadata_dict
)
domain_execution.finished_at = finished_at
domain_execution.elapsed_time = elapsed_time
workflow_node_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED.value
workflow_node_execution.inputs = json.dumps(inputs) if inputs else None
workflow_node_execution.process_data = json.dumps(process_data) if process_data else None
workflow_node_execution.outputs = json.dumps(outputs) if outputs else None
workflow_node_execution.execution_metadata = execution_metadata
workflow_node_execution.finished_at = finished_at
workflow_node_execution.elapsed_time = elapsed_time
# Update the repository with the domain model
self._workflow_node_execution_repository.save(domain_execution)
# Use the instance repository to update the workflow node execution
self._workflow_node_execution_repository.update(workflow_node_execution)
return workflow_node_execution
return domain_execution
def _handle_workflow_node_execution_failed(
self,
@@ -351,43 +357,52 @@ class WorkflowCycleManager:
| QueueNodeInIterationFailedEvent
| QueueNodeInLoopFailedEvent
| QueueNodeExceptionEvent,
) -> WorkflowNodeExecution:
) -> NodeExecution:
"""
Workflow node execution failed
:param event: queue node failed event
:return:
"""
workflow_node_execution = self._get_workflow_node_execution(node_execution_id=event.node_execution_id)
# Get the domain model from repository
domain_execution = self._workflow_node_execution_repository.get_by_node_execution_id(event.node_execution_id)
if not domain_execution:
raise ValueError(f"Domain node execution not found: {event.node_execution_id}")
# Process data
inputs = WorkflowEntry.handle_special_values(event.inputs)
process_data = WorkflowEntry.handle_special_values(event.process_data)
outputs = WorkflowEntry.handle_special_values(event.outputs)
# Convert metadata keys to strings
execution_metadata_dict = {}
if event.execution_metadata:
for key, value in event.execution_metadata.items():
execution_metadata_dict[key] = value
finished_at = datetime.now(UTC).replace(tzinfo=None)
elapsed_time = (finished_at - event.start_at).total_seconds()
execution_metadata = (
json.dumps(jsonable_encoder(event.execution_metadata)) if event.execution_metadata else None
)
process_data = WorkflowEntry.handle_special_values(event.process_data)
workflow_node_execution.status = (
WorkflowNodeExecutionStatus.FAILED.value
# Update domain model
domain_execution.status = (
NodeExecutionStatus.FAILED
if not isinstance(event, QueueNodeExceptionEvent)
else WorkflowNodeExecutionStatus.EXCEPTION.value
else NodeExecutionStatus.EXCEPTION
)
workflow_node_execution.error = event.error
workflow_node_execution.inputs = json.dumps(inputs) if inputs else None
workflow_node_execution.process_data = json.dumps(process_data) if process_data else None
workflow_node_execution.outputs = json.dumps(outputs) if outputs else None
workflow_node_execution.finished_at = finished_at
workflow_node_execution.elapsed_time = elapsed_time
workflow_node_execution.execution_metadata = execution_metadata
domain_execution.error = event.error
domain_execution.update_from_mapping(
inputs=inputs, process_data=process_data, outputs=outputs, metadata=execution_metadata_dict
)
domain_execution.finished_at = finished_at
domain_execution.elapsed_time = elapsed_time
self._workflow_node_execution_repository.update(workflow_node_execution)
# Update the repository with the domain model
self._workflow_node_execution_repository.save(domain_execution)
return workflow_node_execution
return domain_execution
def _handle_workflow_node_execution_retried(
self, *, workflow_run: WorkflowRun, event: QueueNodeRetryEvent
) -> WorkflowNodeExecution:
) -> NodeExecution:
"""
Workflow node execution failed
:param workflow_run: workflow run
@@ -399,47 +414,47 @@ class WorkflowCycleManager:
elapsed_time = (finished_at - created_at).total_seconds()
inputs = WorkflowEntry.handle_special_values(event.inputs)
outputs = WorkflowEntry.handle_special_values(event.outputs)
# Convert metadata keys to strings
origin_metadata = {
NodeRunMetadataKey.ITERATION_ID: event.in_iteration_id,
NodeRunMetadataKey.PARALLEL_MODE_RUN_ID: event.parallel_mode_run_id,
NodeRunMetadataKey.LOOP_ID: event.in_loop_id,
}
merged_metadata = (
{**jsonable_encoder(event.execution_metadata), **origin_metadata}
if event.execution_metadata is not None
else origin_metadata
# Convert execution metadata keys to strings
execution_metadata_dict: dict[NodeRunMetadataKey, str | None] = {}
if event.execution_metadata:
for key, value in event.execution_metadata.items():
execution_metadata_dict[key] = value
merged_metadata = {**execution_metadata_dict, **origin_metadata} if execution_metadata_dict else origin_metadata
# Create a domain model
domain_execution = NodeExecution(
id=str(uuid4()),
workflow_id=workflow_run.workflow_id,
workflow_run_id=workflow_run.id,
predecessor_node_id=event.predecessor_node_id,
node_execution_id=event.node_execution_id,
node_id=event.node_id,
node_type=event.node_type,
title=event.node_data.title,
status=NodeExecutionStatus.RETRY,
created_at=created_at,
finished_at=finished_at,
elapsed_time=elapsed_time,
error=event.error,
index=event.node_run_index,
)
execution_metadata = json.dumps(merged_metadata)
workflow_node_execution = WorkflowNodeExecution()
workflow_node_execution.id = str(uuid4())
workflow_node_execution.tenant_id = workflow_run.tenant_id
workflow_node_execution.app_id = workflow_run.app_id
workflow_node_execution.workflow_id = workflow_run.workflow_id
workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value
workflow_node_execution.workflow_run_id = workflow_run.id
workflow_node_execution.predecessor_node_id = event.predecessor_node_id
workflow_node_execution.node_execution_id = event.node_execution_id
workflow_node_execution.node_id = event.node_id
workflow_node_execution.node_type = event.node_type.value
workflow_node_execution.title = event.node_data.title
workflow_node_execution.status = WorkflowNodeExecutionStatus.RETRY.value
workflow_node_execution.created_by_role = workflow_run.created_by_role
workflow_node_execution.created_by = workflow_run.created_by
workflow_node_execution.created_at = created_at
workflow_node_execution.finished_at = finished_at
workflow_node_execution.elapsed_time = elapsed_time
workflow_node_execution.error = event.error
workflow_node_execution.inputs = json.dumps(inputs) if inputs else None
workflow_node_execution.outputs = json.dumps(outputs) if outputs else None
workflow_node_execution.execution_metadata = execution_metadata
workflow_node_execution.index = event.node_run_index
# Update with mappings
domain_execution.update_from_mapping(inputs=inputs, outputs=outputs, metadata=merged_metadata)
# Use the instance repository to save the workflow node execution
self._workflow_node_execution_repository.save(workflow_node_execution)
# Use the instance repository to save the domain model
self._workflow_node_execution_repository.save(domain_execution)
self._workflow_node_executions[event.node_execution_id] = workflow_node_execution
return workflow_node_execution
return domain_execution
def _workflow_start_to_stream_response(
self,
@@ -469,7 +484,7 @@ class WorkflowCycleManager:
workflow_run: WorkflowRun,
) -> WorkflowFinishStreamResponse:
created_by = None
if workflow_run.created_by_role == CreatedByRole.ACCOUNT:
if workflow_run.created_by_role == CreatorUserRole.ACCOUNT:
stmt = select(Account).where(Account.id == workflow_run.created_by)
account = session.scalar(stmt)
if account:
@@ -478,7 +493,7 @@ class WorkflowCycleManager:
"name": account.name,
"email": account.email,
}
elif workflow_run.created_by_role == CreatedByRole.END_USER:
elif workflow_run.created_by_role == CreatorUserRole.END_USER:
stmt = select(EndUser).where(EndUser.id == workflow_run.created_by)
end_user = session.scalar(stmt)
if end_user:
@@ -515,9 +530,9 @@ class WorkflowCycleManager:
*,
event: QueueNodeStartedEvent,
task_id: str,
workflow_node_execution: WorkflowNodeExecution,
workflow_node_execution: NodeExecution,
) -> Optional[NodeStartStreamResponse]:
if workflow_node_execution.node_type in {NodeType.ITERATION.value, NodeType.LOOP.value}:
if workflow_node_execution.node_type in {NodeType.ITERATION, NodeType.LOOP}:
return None
if not workflow_node_execution.workflow_run_id:
return None
@@ -532,7 +547,7 @@ class WorkflowCycleManager:
title=workflow_node_execution.title,
index=workflow_node_execution.index,
predecessor_node_id=workflow_node_execution.predecessor_node_id,
inputs=workflow_node_execution.inputs_dict,
inputs=workflow_node_execution.inputs,
created_at=int(workflow_node_execution.created_at.timestamp()),
parallel_id=event.parallel_id,
parallel_start_node_id=event.parallel_start_node_id,
@@ -565,9 +580,9 @@ class WorkflowCycleManager:
| QueueNodeInLoopFailedEvent
| QueueNodeExceptionEvent,
task_id: str,
workflow_node_execution: WorkflowNodeExecution,
workflow_node_execution: NodeExecution,
) -> Optional[NodeFinishStreamResponse]:
if workflow_node_execution.node_type in {NodeType.ITERATION.value, NodeType.LOOP.value}:
if workflow_node_execution.node_type in {NodeType.ITERATION, NodeType.LOOP}:
return None
if not workflow_node_execution.workflow_run_id:
return None
@@ -584,16 +599,16 @@ class WorkflowCycleManager:
index=workflow_node_execution.index,
title=workflow_node_execution.title,
predecessor_node_id=workflow_node_execution.predecessor_node_id,
inputs=workflow_node_execution.inputs_dict,
process_data=workflow_node_execution.process_data_dict,
outputs=workflow_node_execution.outputs_dict,
inputs=workflow_node_execution.inputs,
process_data=workflow_node_execution.process_data,
outputs=workflow_node_execution.outputs,
status=workflow_node_execution.status,
error=workflow_node_execution.error,
elapsed_time=workflow_node_execution.elapsed_time,
execution_metadata=workflow_node_execution.execution_metadata_dict,
execution_metadata=workflow_node_execution.metadata,
created_at=int(workflow_node_execution.created_at.timestamp()),
finished_at=int(workflow_node_execution.finished_at.timestamp()),
files=self._fetch_files_from_node_outputs(workflow_node_execution.outputs_dict or {}),
files=self._fetch_files_from_node_outputs(workflow_node_execution.outputs or {}),
parallel_id=event.parallel_id,
parallel_start_node_id=event.parallel_start_node_id,
parent_parallel_id=event.parent_parallel_id,
@@ -608,9 +623,9 @@ class WorkflowCycleManager:
*,
event: QueueNodeRetryEvent,
task_id: str,
workflow_node_execution: WorkflowNodeExecution,
workflow_node_execution: NodeExecution,
) -> Optional[Union[NodeRetryStreamResponse, NodeFinishStreamResponse]]:
if workflow_node_execution.node_type in {NodeType.ITERATION.value, NodeType.LOOP.value}:
if workflow_node_execution.node_type in {NodeType.ITERATION, NodeType.LOOP}:
return None
if not workflow_node_execution.workflow_run_id:
return None
@@ -627,16 +642,16 @@ class WorkflowCycleManager:
index=workflow_node_execution.index,
title=workflow_node_execution.title,
predecessor_node_id=workflow_node_execution.predecessor_node_id,
inputs=workflow_node_execution.inputs_dict,
process_data=workflow_node_execution.process_data_dict,
outputs=workflow_node_execution.outputs_dict,
inputs=workflow_node_execution.inputs,
process_data=workflow_node_execution.process_data,
outputs=workflow_node_execution.outputs,
status=workflow_node_execution.status,
error=workflow_node_execution.error,
elapsed_time=workflow_node_execution.elapsed_time,
execution_metadata=workflow_node_execution.execution_metadata_dict,
execution_metadata=workflow_node_execution.metadata,
created_at=int(workflow_node_execution.created_at.timestamp()),
finished_at=int(workflow_node_execution.finished_at.timestamp()),
files=self._fetch_files_from_node_outputs(workflow_node_execution.outputs_dict or {}),
files=self._fetch_files_from_node_outputs(workflow_node_execution.outputs or {}),
parallel_id=event.parallel_id,
parallel_start_node_id=event.parallel_start_node_id,
parent_parallel_id=event.parent_parallel_id,
@@ -908,23 +923,6 @@ class WorkflowCycleManager:
return workflow_run
def _get_workflow_node_execution(self, node_execution_id: str) -> WorkflowNodeExecution:
# First check the cache for performance
if node_execution_id in self._workflow_node_executions:
cached_execution = self._workflow_node_executions[node_execution_id]
# No need to merge with session since expire_on_commit=False
return cached_execution
# If not in cache, use the instance repository to get by node_execution_id
execution = self._workflow_node_execution_repository.get_by_node_execution_id(node_execution_id)
if not execution:
raise ValueError(f"Workflow node execution not found: {node_execution_id}")
# Update cache
self._workflow_node_executions[node_execution_id] = execution
return execution
def _handle_agent_log(self, task_id: str, event: QueueAgentLogEvent) -> AgentLogStreamResponse:
"""
Handle agent log