mirror of
https://github.com/langgenius/dify.git
synced 2026-05-12 03:00:44 -04:00
Merge remote-tracking branch 'refs/remotes/origin/main' into feat/evaluation
This commit is contained in:
@@ -1,9 +1,10 @@
|
||||
import copy
|
||||
import json
|
||||
import logging
|
||||
from collections.abc import Generator, Mapping, Sequence
|
||||
from datetime import datetime
|
||||
from enum import StrEnum
|
||||
from typing import TYPE_CHECKING, Any, Optional, Union, cast
|
||||
from typing import TYPE_CHECKING, Any, Optional, TypedDict, Union, cast
|
||||
from uuid import uuid4
|
||||
|
||||
import sqlalchemy as sa
|
||||
@@ -19,20 +20,21 @@ from sqlalchemy import (
|
||||
orm,
|
||||
select,
|
||||
)
|
||||
from sqlalchemy.orm import Mapped, declared_attr, mapped_column
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
from typing_extensions import deprecated
|
||||
|
||||
from core.workflow.constants import (
|
||||
from core.trigger.constants import TRIGGER_PLUGIN_NODE_TYPE
|
||||
from dify_graph.constants import (
|
||||
CONVERSATION_VARIABLE_NODE_ID,
|
||||
SYSTEM_VARIABLE_NODE_ID,
|
||||
)
|
||||
from core.workflow.entities.graph_config import NodeConfigDict, NodeConfigDictAdapter
|
||||
from core.workflow.entities.pause_reason import HumanInputRequired, PauseReason, PauseReasonType, SchedulingPause
|
||||
from core.workflow.enums import NodeType, WorkflowExecutionStatus
|
||||
from core.workflow.file.constants import maybe_file_object
|
||||
from core.workflow.file.models import File
|
||||
from core.workflow.variables import utils as variable_utils
|
||||
from core.workflow.variables.variables import FloatVariable, IntegerVariable, StringVariable
|
||||
from dify_graph.entities.graph_config import NodeConfigDict, NodeConfigDictAdapter
|
||||
from dify_graph.entities.pause_reason import HumanInputRequired, PauseReason, PauseReasonType, SchedulingPause
|
||||
from dify_graph.enums import BuiltinNodeTypes, NodeType, WorkflowExecutionStatus, WorkflowNodeExecutionMetadataKey
|
||||
from dify_graph.file.constants import maybe_file_object
|
||||
from dify_graph.file.models import File
|
||||
from dify_graph.variables import utils as variable_utils
|
||||
from dify_graph.variables.variables import FloatVariable, IntegerVariable, RAGPipelineVariable, StringVariable
|
||||
from extensions.ext_storage import Storage
|
||||
from factories.variable_factory import TypeMismatchError, build_segment_with_type
|
||||
from libs.datetime_utils import naive_utc_now
|
||||
@@ -46,18 +48,37 @@ if TYPE_CHECKING:
|
||||
|
||||
from constants import DEFAULT_FILE_NUMBER_LIMITS, HIDDEN_VALUE
|
||||
from core.helper import encrypter
|
||||
from core.workflow.variables import SecretVariable, Segment, SegmentType, VariableBase
|
||||
from dify_graph.variables import SecretVariable, Segment, SegmentType, VariableBase
|
||||
from factories import variable_factory
|
||||
from libs import helper
|
||||
|
||||
from .account import Account
|
||||
from .base import Base, DefaultFieldsMixin, TypeBase
|
||||
from .engine import db
|
||||
from .enums import CreatorUserRole, DraftVariableType, ExecutionOffLoadType
|
||||
from .enums import CreatorUserRole, DraftVariableType, ExecutionOffLoadType, WorkflowRunTriggeredFrom
|
||||
from .types import EnumText, LongText, StringUUID
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
SerializedWorkflowValue = dict[str, Any]
|
||||
SerializedWorkflowVariables = dict[str, SerializedWorkflowValue]
|
||||
|
||||
|
||||
class WorkflowContentDict(TypedDict):
|
||||
graph: Mapping[str, Any]
|
||||
features: dict[str, Any]
|
||||
environment_variables: list[dict[str, Any]]
|
||||
conversation_variables: list[dict[str, Any]]
|
||||
rag_pipeline_variables: list[dict[str, Any]]
|
||||
|
||||
|
||||
class WorkflowRunSummaryDict(TypedDict):
|
||||
id: str
|
||||
status: str
|
||||
triggered_from: str
|
||||
elapsed_time: float
|
||||
total_tokens: int
|
||||
|
||||
|
||||
class WorkflowType(StrEnum):
|
||||
"""
|
||||
@@ -142,7 +163,7 @@ class Workflow(Base): # bug
|
||||
id: Mapped[str] = mapped_column(StringUUID, default=lambda: str(uuid4()))
|
||||
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
type: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
type: Mapped[WorkflowType] = mapped_column(EnumText(WorkflowType, length=255), nullable=False)
|
||||
version: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
marked_name: Mapped[str] = mapped_column(String(255), default="", server_default="")
|
||||
marked_comment: Mapped[str] = mapped_column(String(255), default="", server_default="")
|
||||
@@ -189,7 +210,7 @@ class Workflow(Base): # bug
|
||||
workflow.id = str(uuid4())
|
||||
workflow.tenant_id = tenant_id
|
||||
workflow.app_id = app_id
|
||||
workflow.type = type
|
||||
workflow.type = WorkflowType(type)
|
||||
workflow.version = version
|
||||
workflow.graph = graph
|
||||
workflow.features = features
|
||||
@@ -234,8 +255,11 @@ class Workflow(Base): # bug
|
||||
|
||||
def get_node_config_by_id(self, node_id: str) -> NodeConfigDict:
|
||||
"""Extract a node configuration from the workflow graph by node ID.
|
||||
A node configuration is a dictionary containing the node's properties, including
|
||||
the node's id, title, and its data as a dict.
|
||||
|
||||
A node configuration includes the node id and a typed `BaseNodeData` for `data`.
|
||||
`BaseNodeData` keeps a dict-like `get`/`__getitem__` compatibility layer backed by
|
||||
model fields plus Pydantic extra storage for legacy consumers, but callers should
|
||||
prefer attribute access.
|
||||
"""
|
||||
workflow_graph = self.graph_dict
|
||||
|
||||
@@ -253,12 +277,9 @@ class Workflow(Base): # bug
|
||||
return NodeConfigDictAdapter.validate_python(node_config)
|
||||
|
||||
@staticmethod
|
||||
def get_node_type_from_node_config(node_config: Mapping[str, Any]) -> NodeType:
|
||||
def get_node_type_from_node_config(node_config: NodeConfigDict) -> NodeType:
|
||||
"""Extract type of a node from the node configuration returned by `get_node_config_by_id`."""
|
||||
node_config_data = node_config.get("data", {})
|
||||
# Get node class
|
||||
node_type = NodeType(node_config_data.get("type"))
|
||||
return node_type
|
||||
return node_config["data"].type
|
||||
|
||||
@staticmethod
|
||||
def get_enclosing_node_type_and_id(
|
||||
@@ -270,12 +291,12 @@ class Workflow(Base): # bug
|
||||
loop_id = node_config.get("loop_id")
|
||||
if loop_id is None:
|
||||
raise _InvalidGraphDefinitionError("invalid graph")
|
||||
return NodeType.LOOP, loop_id
|
||||
return BuiltinNodeTypes.LOOP, loop_id
|
||||
elif in_iteration:
|
||||
iteration_id = node_config.get("iteration_id")
|
||||
if iteration_id is None:
|
||||
raise _InvalidGraphDefinitionError("invalid graph")
|
||||
return NodeType.ITERATION, iteration_id
|
||||
return BuiltinNodeTypes.ITERATION, iteration_id
|
||||
else:
|
||||
return None
|
||||
|
||||
@@ -283,26 +304,40 @@ class Workflow(Base): # bug
|
||||
def features(self) -> str:
|
||||
"""
|
||||
Convert old features structure to new features structure.
|
||||
|
||||
This property avoids rewriting the underlying JSON when normalization
|
||||
produces no effective change, to prevent marking the row dirty on read.
|
||||
"""
|
||||
if not self._features:
|
||||
return self._features
|
||||
|
||||
features = json.loads(self._features)
|
||||
if features.get("file_upload", {}).get("image", {}).get("enabled", False):
|
||||
image_enabled = True
|
||||
image_number_limits = int(features["file_upload"]["image"].get("number_limits", DEFAULT_FILE_NUMBER_LIMITS))
|
||||
image_transfer_methods = features["file_upload"]["image"].get(
|
||||
"transfer_methods", ["remote_url", "local_file"]
|
||||
)
|
||||
features["file_upload"]["enabled"] = image_enabled
|
||||
features["file_upload"]["number_limits"] = image_number_limits
|
||||
features["file_upload"]["allowed_file_upload_methods"] = image_transfer_methods
|
||||
features["file_upload"]["allowed_file_types"] = features["file_upload"].get("allowed_file_types", ["image"])
|
||||
features["file_upload"]["allowed_file_extensions"] = features["file_upload"].get(
|
||||
"allowed_file_extensions", []
|
||||
)
|
||||
del features["file_upload"]["image"]
|
||||
self._features = json.dumps(features)
|
||||
# Parse once and deep-copy before normalization to detect in-place changes.
|
||||
original_dict = self._decode_features_payload(self._features)
|
||||
if original_dict is None:
|
||||
return self._features
|
||||
|
||||
# Fast-path: if the legacy file_upload.image.enabled shape is absent, skip
|
||||
# deep-copy and normalization entirely and return the stored JSON.
|
||||
file_upload_payload = original_dict.get("file_upload")
|
||||
if not isinstance(file_upload_payload, dict):
|
||||
return self._features
|
||||
file_upload = cast(dict[str, Any], file_upload_payload)
|
||||
|
||||
image_payload = file_upload.get("image")
|
||||
if not isinstance(image_payload, dict):
|
||||
return self._features
|
||||
image = cast(dict[str, Any], image_payload)
|
||||
if "enabled" not in image:
|
||||
return self._features
|
||||
|
||||
normalized_dict = self._normalize_features_payload(copy.deepcopy(original_dict))
|
||||
|
||||
if normalized_dict == original_dict:
|
||||
# No effective change; return stored JSON unchanged.
|
||||
return self._features
|
||||
|
||||
# Normalization changed the payload: persist the normalized JSON.
|
||||
self._features = json.dumps(normalized_dict)
|
||||
return self._features
|
||||
|
||||
@features.setter
|
||||
@@ -313,6 +348,44 @@ class Workflow(Base): # bug
|
||||
def features_dict(self) -> dict[str, Any]:
|
||||
return json.loads(self.features) if self.features else {}
|
||||
|
||||
@property
|
||||
def serialized_features(self) -> str:
|
||||
"""Return the stored features JSON without triggering compatibility rewrites."""
|
||||
return self._features
|
||||
|
||||
@property
|
||||
def normalized_features_dict(self) -> dict[str, Any]:
|
||||
"""Decode features with legacy normalization without mutating the model state."""
|
||||
if not self._features:
|
||||
return {}
|
||||
|
||||
features = self._decode_features_payload(self._features)
|
||||
return self._normalize_features_payload(features) if features is not None else {}
|
||||
|
||||
@staticmethod
|
||||
def _decode_features_payload(features: str) -> dict[str, Any] | None:
|
||||
"""Decode workflow features JSON when it contains an object payload."""
|
||||
payload = json.loads(features)
|
||||
return cast(dict[str, Any], payload) if isinstance(payload, dict) else None
|
||||
|
||||
@staticmethod
|
||||
def _normalize_features_payload(features: dict[str, Any]) -> dict[str, Any]:
|
||||
if features.get("file_upload", {}).get("image", {}).get("enabled", False):
|
||||
image_number_limits = int(features["file_upload"]["image"].get("number_limits", DEFAULT_FILE_NUMBER_LIMITS))
|
||||
image_transfer_methods = features["file_upload"]["image"].get(
|
||||
"transfer_methods", ["remote_url", "local_file"]
|
||||
)
|
||||
features["file_upload"]["enabled"] = True
|
||||
features["file_upload"]["number_limits"] = image_number_limits
|
||||
features["file_upload"]["allowed_file_upload_methods"] = image_transfer_methods
|
||||
features["file_upload"]["allowed_file_types"] = features["file_upload"].get("allowed_file_types", ["image"])
|
||||
features["file_upload"]["allowed_file_extensions"] = features["file_upload"].get(
|
||||
"allowed_file_extensions", []
|
||||
)
|
||||
del features["file_upload"]["image"]
|
||||
|
||||
return features
|
||||
|
||||
def walk_nodes(
|
||||
self, specific_node_type: NodeType | None = None
|
||||
) -> Generator[tuple[str, Mapping[str, Any]], None, None]:
|
||||
@@ -346,7 +419,7 @@ class Workflow(Base): # bug
|
||||
"selected": false,
|
||||
}
|
||||
|
||||
For specific node type, refer to `core.workflow.nodes`
|
||||
For specific node type, refer to `dify_graph.nodes`
|
||||
"""
|
||||
graph_dict = self.graph_dict
|
||||
if "nodes" not in graph_dict:
|
||||
@@ -354,9 +427,7 @@ class Workflow(Base): # bug
|
||||
|
||||
if specific_node_type:
|
||||
yield from (
|
||||
(node["id"], node["data"])
|
||||
for node in graph_dict["nodes"]
|
||||
if node["data"]["type"] == specific_node_type.value
|
||||
(node["id"], node["data"]) for node in graph_dict["nodes"] if node["data"]["type"] == specific_node_type
|
||||
)
|
||||
else:
|
||||
yield from ((node["id"], node["data"]) for node in graph_dict["nodes"])
|
||||
@@ -391,7 +462,7 @@ class Workflow(Base): # bug
|
||||
|
||||
def rag_pipeline_user_input_form(self) -> list:
|
||||
# get user_input_form from start node
|
||||
variables: list[Any] = self.rag_pipeline_variables
|
||||
variables: list[SerializedWorkflowValue] = self.rag_pipeline_variables
|
||||
|
||||
return variables
|
||||
|
||||
@@ -434,17 +505,13 @@ class Workflow(Base): # bug
|
||||
def environment_variables(
|
||||
self,
|
||||
) -> Sequence[StringVariable | IntegerVariable | FloatVariable | SecretVariable]:
|
||||
# TODO: find some way to init `self._environment_variables` when instance created.
|
||||
if self._environment_variables is None:
|
||||
self._environment_variables = "{}"
|
||||
|
||||
# Use workflow.tenant_id to avoid relying on request user in background threads
|
||||
tenant_id = self.tenant_id
|
||||
|
||||
if not tenant_id:
|
||||
return []
|
||||
|
||||
environment_variables_dict: dict[str, Any] = json.loads(self._environment_variables or "{}")
|
||||
environment_variables_dict = cast(SerializedWorkflowVariables, json.loads(self._environment_variables or "{}"))
|
||||
results = [
|
||||
variable_factory.build_environment_variable_from_mapping(v) for v in environment_variables_dict.values()
|
||||
]
|
||||
@@ -504,14 +571,39 @@ class Workflow(Base): # bug
|
||||
)
|
||||
self._environment_variables = environment_variables_json
|
||||
|
||||
def to_dict(self, *, include_secret: bool = False) -> Mapping[str, Any]:
|
||||
@staticmethod
|
||||
def normalize_environment_variable_mappings(
|
||||
mappings: Sequence[Mapping[str, Any]],
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Convert masked secret placeholders into the draft hidden sentinel.
|
||||
|
||||
Regular draft sync requests should preserve existing secrets without shipping
|
||||
plaintext values back from the client. The dedicated restore endpoint now
|
||||
copies published secrets server-side, so draft sync only needs to normalize
|
||||
the UI mask into `HIDDEN_VALUE`.
|
||||
"""
|
||||
masked_secret_value = encrypter.full_mask_token()
|
||||
normalized_mappings: list[dict[str, Any]] = []
|
||||
|
||||
for mapping in mappings:
|
||||
normalized_mapping = dict(mapping)
|
||||
if (
|
||||
normalized_mapping.get("value_type") == SegmentType.SECRET.value
|
||||
and normalized_mapping.get("value") == masked_secret_value
|
||||
):
|
||||
normalized_mapping["value"] = HIDDEN_VALUE
|
||||
normalized_mappings.append(normalized_mapping)
|
||||
|
||||
return normalized_mappings
|
||||
|
||||
def to_dict(self, *, include_secret: bool = False) -> WorkflowContentDict:
|
||||
environment_variables = list(self.environment_variables)
|
||||
environment_variables = [
|
||||
v if not isinstance(v, SecretVariable) or include_secret else v.model_copy(update={"value": ""})
|
||||
for v in environment_variables
|
||||
]
|
||||
|
||||
result = {
|
||||
result: WorkflowContentDict = {
|
||||
"graph": self.graph_dict,
|
||||
"features": self.features_dict,
|
||||
"environment_variables": [var.model_dump(mode="json") for var in environment_variables],
|
||||
@@ -522,11 +614,7 @@ class Workflow(Base): # bug
|
||||
|
||||
@property
|
||||
def conversation_variables(self) -> Sequence[VariableBase]:
|
||||
# TODO: find some way to init `self._conversation_variables` when instance created.
|
||||
if self._conversation_variables is None:
|
||||
self._conversation_variables = "{}"
|
||||
|
||||
variables_dict: dict[str, Any] = json.loads(self._conversation_variables)
|
||||
variables_dict = cast(SerializedWorkflowVariables, json.loads(self._conversation_variables or "{}"))
|
||||
results = [variable_factory.build_conversation_variable_from_mapping(v) for v in variables_dict.values()]
|
||||
return results
|
||||
|
||||
@@ -538,22 +626,29 @@ class Workflow(Base): # bug
|
||||
)
|
||||
|
||||
@property
|
||||
def rag_pipeline_variables(self) -> list[dict]:
|
||||
# TODO: find some way to init `self._conversation_variables` when instance created.
|
||||
if self._rag_pipeline_variables is None:
|
||||
self._rag_pipeline_variables = "{}"
|
||||
|
||||
variables_dict: dict[str, Any] = json.loads(self._rag_pipeline_variables)
|
||||
results = list(variables_dict.values())
|
||||
return results
|
||||
def rag_pipeline_variables(self) -> list[SerializedWorkflowValue]:
|
||||
variables_dict = cast(SerializedWorkflowVariables, json.loads(self._rag_pipeline_variables or "{}"))
|
||||
return [RAGPipelineVariable.model_validate(item).model_dump(mode="json") for item in variables_dict.values()]
|
||||
|
||||
@rag_pipeline_variables.setter
|
||||
def rag_pipeline_variables(self, values: list[dict]) -> None:
|
||||
def rag_pipeline_variables(self, values: Sequence[Mapping[str, Any] | RAGPipelineVariable]) -> None:
|
||||
self._rag_pipeline_variables = json.dumps(
|
||||
{item["variable"]: item for item in values},
|
||||
{
|
||||
rag_pipeline_variable.variable: rag_pipeline_variable.model_dump(mode="json")
|
||||
for rag_pipeline_variable in (
|
||||
item if isinstance(item, RAGPipelineVariable) else RAGPipelineVariable.model_validate(item)
|
||||
for item in values
|
||||
)
|
||||
},
|
||||
ensure_ascii=False,
|
||||
)
|
||||
|
||||
def copy_serialized_variable_storage_from(self, source_workflow: "Workflow") -> None:
|
||||
"""Copy stored variable JSON directly for same-tenant restore flows."""
|
||||
self._environment_variables = source_workflow._environment_variables
|
||||
self._conversation_variables = source_workflow._conversation_variables
|
||||
self._rag_pipeline_variables = source_workflow._rag_pipeline_variables
|
||||
|
||||
@staticmethod
|
||||
def version_from_datetime(d: datetime) -> str:
|
||||
return str(d)
|
||||
@@ -609,8 +704,8 @@ class WorkflowRun(Base):
|
||||
app_id: Mapped[str] = mapped_column(StringUUID)
|
||||
|
||||
workflow_id: Mapped[str] = mapped_column(StringUUID)
|
||||
type: Mapped[str] = mapped_column(String(255))
|
||||
triggered_from: Mapped[str] = mapped_column(String(255))
|
||||
type: Mapped[WorkflowType] = mapped_column(EnumText(WorkflowType, length=255))
|
||||
triggered_from: Mapped[WorkflowRunTriggeredFrom] = mapped_column(EnumText(WorkflowRunTriggeredFrom, length=255))
|
||||
version: Mapped[str] = mapped_column(String(255))
|
||||
graph: Mapped[str | None] = mapped_column(LongText)
|
||||
inputs: Mapped[str | None] = mapped_column(LongText)
|
||||
@@ -669,14 +764,14 @@ class WorkflowRun(Base):
|
||||
def message(self):
|
||||
from .model import Message
|
||||
|
||||
return (
|
||||
db.session.query(Message).where(Message.app_id == self.app_id, Message.workflow_run_id == self.id).first()
|
||||
return db.session.scalar(
|
||||
select(Message).where(Message.app_id == self.app_id, Message.workflow_run_id == self.id)
|
||||
)
|
||||
|
||||
@property
|
||||
@deprecated("This method is retained for historical reasons; avoid using it if possible.")
|
||||
def workflow(self):
|
||||
return db.session.query(Workflow).where(Workflow.id == self.workflow_id).first()
|
||||
return db.session.scalar(select(Workflow).where(Workflow.id == self.workflow_id))
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
@@ -788,50 +883,44 @@ class WorkflowNodeExecutionModel(Base): # This model is expected to have `offlo
|
||||
|
||||
__tablename__ = "workflow_node_executions"
|
||||
|
||||
@declared_attr.directive
|
||||
@classmethod
|
||||
def __table_args__(cls) -> Any:
|
||||
return (
|
||||
PrimaryKeyConstraint("id", name="workflow_node_execution_pkey"),
|
||||
Index(
|
||||
"workflow_node_execution_workflow_run_id_idx",
|
||||
"workflow_run_id",
|
||||
),
|
||||
Index(
|
||||
"workflow_node_execution_node_run_idx",
|
||||
"tenant_id",
|
||||
"app_id",
|
||||
"workflow_id",
|
||||
"triggered_from",
|
||||
"node_id",
|
||||
),
|
||||
Index(
|
||||
"workflow_node_execution_id_idx",
|
||||
"tenant_id",
|
||||
"app_id",
|
||||
"workflow_id",
|
||||
"triggered_from",
|
||||
"node_execution_id",
|
||||
),
|
||||
Index(
|
||||
# The first argument is the index name,
|
||||
# which we leave as `None`` to allow auto-generation by the ORM.
|
||||
None,
|
||||
cls.tenant_id,
|
||||
cls.workflow_id,
|
||||
cls.node_id,
|
||||
# MyPy may flag the following line because it doesn't recognize that
|
||||
# the `declared_attr` decorator passes the receiving class as the first
|
||||
# argument to this method, allowing us to reference class attributes.
|
||||
cls.created_at.desc(),
|
||||
),
|
||||
)
|
||||
__table_args__ = (
|
||||
PrimaryKeyConstraint("id", name="workflow_node_execution_pkey"),
|
||||
Index(
|
||||
"workflow_node_execution_workflow_run_id_idx",
|
||||
"workflow_run_id",
|
||||
),
|
||||
Index(
|
||||
"workflow_node_execution_node_run_idx",
|
||||
"tenant_id",
|
||||
"app_id",
|
||||
"workflow_id",
|
||||
"triggered_from",
|
||||
"node_id",
|
||||
),
|
||||
Index(
|
||||
"workflow_node_execution_id_idx",
|
||||
"tenant_id",
|
||||
"app_id",
|
||||
"workflow_id",
|
||||
"triggered_from",
|
||||
"node_execution_id",
|
||||
),
|
||||
Index(
|
||||
None,
|
||||
"tenant_id",
|
||||
"workflow_id",
|
||||
"node_id",
|
||||
sa.desc("created_at"),
|
||||
),
|
||||
)
|
||||
|
||||
id: Mapped[str] = mapped_column(StringUUID, default=lambda: str(uuid4()))
|
||||
tenant_id: Mapped[str] = mapped_column(StringUUID)
|
||||
app_id: Mapped[str] = mapped_column(StringUUID)
|
||||
workflow_id: Mapped[str] = mapped_column(StringUUID)
|
||||
triggered_from: Mapped[str] = mapped_column(String(255))
|
||||
triggered_from: Mapped[WorkflowNodeExecutionTriggeredFrom] = mapped_column(
|
||||
EnumText(WorkflowNodeExecutionTriggeredFrom, length=255)
|
||||
)
|
||||
workflow_run_id: Mapped[str | None] = mapped_column(StringUUID)
|
||||
index: Mapped[int] = mapped_column(sa.Integer)
|
||||
predecessor_node_id: Mapped[str | None] = mapped_column(String(255))
|
||||
@@ -847,7 +936,7 @@ class WorkflowNodeExecutionModel(Base): # This model is expected to have `offlo
|
||||
elapsed_time: Mapped[float] = mapped_column(sa.Float, server_default=sa.text("0"))
|
||||
execution_metadata: Mapped[str | None] = mapped_column(LongText)
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.current_timestamp())
|
||||
created_by_role: Mapped[str] = mapped_column(String(255))
|
||||
created_by_role: Mapped[CreatorUserRole] = mapped_column(EnumText(CreatorUserRole, length=255))
|
||||
created_by: Mapped[str] = mapped_column(StringUUID)
|
||||
finished_at: Mapped[datetime | None] = mapped_column(DateTime)
|
||||
|
||||
@@ -922,18 +1011,21 @@ class WorkflowNodeExecutionModel(Base): # This model is expected to have `offlo
|
||||
extras: dict[str, Any] = {}
|
||||
execution_metadata = self.execution_metadata_dict
|
||||
if execution_metadata:
|
||||
if self.node_type == NodeType.TOOL and "tool_info" in execution_metadata:
|
||||
if self.node_type == BuiltinNodeTypes.TOOL and "tool_info" in execution_metadata:
|
||||
tool_info: dict[str, Any] = execution_metadata["tool_info"]
|
||||
extras["icon"] = ToolManager.get_tool_icon(
|
||||
tenant_id=self.tenant_id,
|
||||
provider_type=tool_info["provider_type"],
|
||||
provider_id=tool_info["provider_id"],
|
||||
)
|
||||
elif self.node_type == NodeType.DATASOURCE and "datasource_info" in execution_metadata:
|
||||
elif self.node_type == BuiltinNodeTypes.DATASOURCE and "datasource_info" in execution_metadata:
|
||||
datasource_info = execution_metadata["datasource_info"]
|
||||
extras["icon"] = datasource_info.get("icon")
|
||||
elif self.node_type == NodeType.TRIGGER_PLUGIN and "trigger_info" in execution_metadata:
|
||||
trigger_info = execution_metadata["trigger_info"] or {}
|
||||
elif (
|
||||
self.node_type == TRIGGER_PLUGIN_NODE_TYPE
|
||||
and WorkflowNodeExecutionMetadataKey.TRIGGER_INFO in execution_metadata
|
||||
):
|
||||
trigger_info = execution_metadata[WorkflowNodeExecutionMetadataKey.TRIGGER_INFO] or {}
|
||||
provider_id = trigger_info.get("provider_id")
|
||||
if provider_id:
|
||||
extras["icon"] = TriggerManager.get_trigger_plugin_icon(
|
||||
@@ -1131,7 +1223,7 @@ class WorkflowAppLog(TypeBase):
|
||||
workflow_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
workflow_run_id: Mapped[str] = mapped_column(StringUUID)
|
||||
created_from: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
created_by_role: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
created_by_role: Mapped[CreatorUserRole] = mapped_column(EnumText(CreatorUserRole, length=255), nullable=False)
|
||||
created_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
created_at: Mapped[datetime] = mapped_column(
|
||||
DateTime, nullable=False, server_default=func.current_timestamp(), init=False
|
||||
@@ -1205,7 +1297,7 @@ class WorkflowArchiveLog(TypeBase):
|
||||
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
workflow_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
workflow_run_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
created_by_role: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
created_by_role: Mapped[CreatorUserRole] = mapped_column(EnumText(CreatorUserRole, length=255), nullable=False)
|
||||
created_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
|
||||
log_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
|
||||
@@ -1214,7 +1306,9 @@ class WorkflowArchiveLog(TypeBase):
|
||||
|
||||
run_version: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
run_status: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
run_triggered_from: Mapped[str] = mapped_column(String(255), nullable=False)
|
||||
run_triggered_from: Mapped[WorkflowRunTriggeredFrom] = mapped_column(
|
||||
EnumText(WorkflowRunTriggeredFrom, length=255), nullable=False
|
||||
)
|
||||
run_error: Mapped[str | None] = mapped_column(LongText, nullable=True)
|
||||
run_elapsed_time: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default=sa.text("0"))
|
||||
run_total_tokens: Mapped[int] = mapped_column(sa.BigInteger, server_default=sa.text("0"))
|
||||
@@ -1229,7 +1323,7 @@ class WorkflowArchiveLog(TypeBase):
|
||||
)
|
||||
|
||||
@property
|
||||
def workflow_run_summary(self) -> dict[str, Any]:
|
||||
def workflow_run_summary(self) -> WorkflowRunSummaryDict:
|
||||
return {
|
||||
"id": self.workflow_run_id,
|
||||
"status": self.run_status,
|
||||
@@ -1284,16 +1378,17 @@ class WorkflowDraftVariable(Base):
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def unique_app_id_node_id_name() -> list[str]:
|
||||
def unique_app_id_user_id_node_id_name() -> list[str]:
|
||||
return [
|
||||
"app_id",
|
||||
"user_id",
|
||||
"node_id",
|
||||
"name",
|
||||
]
|
||||
|
||||
__tablename__ = "workflow_draft_variables"
|
||||
__table_args__ = (
|
||||
UniqueConstraint(*unique_app_id_node_id_name()),
|
||||
UniqueConstraint(*unique_app_id_user_id_node_id_name()),
|
||||
Index("workflow_draft_variable_file_id_idx", "file_id"),
|
||||
)
|
||||
# Required for instance variable annotation.
|
||||
@@ -1319,6 +1414,11 @@ class WorkflowDraftVariable(Base):
|
||||
|
||||
# "`app_id` maps to the `id` field in the `model.App` model."
|
||||
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
# Owner of this draft variable.
|
||||
#
|
||||
# This field is nullable during migration and will be migrated to NOT NULL
|
||||
# in a follow-up release.
|
||||
user_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True, default=None)
|
||||
|
||||
# `last_edited_at` records when the value of a given draft variable
|
||||
# is edited.
|
||||
@@ -1345,7 +1445,7 @@ class WorkflowDraftVariable(Base):
|
||||
# From `VARIABLE_PATTERN`, we may conclude that the length of a top level variable is less than
|
||||
# 80 chars.
|
||||
#
|
||||
# ref: api/core/workflow/entities/variable_pool.py:18
|
||||
# ref: api/dify_graph/entities/variable_pool.py:18
|
||||
name: Mapped[str] = mapped_column(sa.String(255), nullable=False)
|
||||
description: Mapped[str] = mapped_column(
|
||||
sa.String(255),
|
||||
@@ -1571,6 +1671,7 @@ class WorkflowDraftVariable(Base):
|
||||
cls,
|
||||
*,
|
||||
app_id: str,
|
||||
user_id: str | None,
|
||||
node_id: str,
|
||||
name: str,
|
||||
value: Segment,
|
||||
@@ -1584,6 +1685,7 @@ class WorkflowDraftVariable(Base):
|
||||
variable.updated_at = naive_utc_now()
|
||||
variable.description = description
|
||||
variable.app_id = app_id
|
||||
variable.user_id = user_id
|
||||
variable.node_id = node_id
|
||||
variable.name = name
|
||||
variable.set_value(value)
|
||||
@@ -1597,12 +1699,14 @@ class WorkflowDraftVariable(Base):
|
||||
cls,
|
||||
*,
|
||||
app_id: str,
|
||||
user_id: str | None = None,
|
||||
name: str,
|
||||
value: Segment,
|
||||
description: str = "",
|
||||
) -> "WorkflowDraftVariable":
|
||||
variable = cls._new(
|
||||
app_id=app_id,
|
||||
user_id=user_id,
|
||||
node_id=CONVERSATION_VARIABLE_NODE_ID,
|
||||
name=name,
|
||||
value=value,
|
||||
@@ -1617,6 +1721,7 @@ class WorkflowDraftVariable(Base):
|
||||
cls,
|
||||
*,
|
||||
app_id: str,
|
||||
user_id: str | None = None,
|
||||
name: str,
|
||||
value: Segment,
|
||||
node_execution_id: str,
|
||||
@@ -1624,6 +1729,7 @@ class WorkflowDraftVariable(Base):
|
||||
) -> "WorkflowDraftVariable":
|
||||
variable = cls._new(
|
||||
app_id=app_id,
|
||||
user_id=user_id,
|
||||
node_id=SYSTEM_VARIABLE_NODE_ID,
|
||||
name=name,
|
||||
node_execution_id=node_execution_id,
|
||||
@@ -1637,6 +1743,7 @@ class WorkflowDraftVariable(Base):
|
||||
cls,
|
||||
*,
|
||||
app_id: str,
|
||||
user_id: str | None = None,
|
||||
node_id: str,
|
||||
name: str,
|
||||
value: Segment,
|
||||
@@ -1647,6 +1754,7 @@ class WorkflowDraftVariable(Base):
|
||||
) -> "WorkflowDraftVariable":
|
||||
variable = cls._new(
|
||||
app_id=app_id,
|
||||
user_id=user_id,
|
||||
node_id=node_id,
|
||||
name=name,
|
||||
node_execution_id=node_execution_id,
|
||||
|
||||
Reference in New Issue
Block a user