mirror of
https://github.com/langgenius/dify.git
synced 2026-05-31 19:00:22 -04:00
@@ -1,6 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Generator, Mapping, Sequence
|
||||
from collections.abc import Callable, Generator, Mapping, Sequence
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING, Any, cast
|
||||
|
||||
@@ -15,13 +15,14 @@ from core.model_manager import ModelInstance
|
||||
from core.plugin.impl.exc import PluginDaemonClientSideError, PluginInvokeError
|
||||
from core.plugin.impl.plugin import PluginInstaller
|
||||
from core.prompt.utils.prompt_message_util import PromptMessageUtil
|
||||
from core.repositories.human_input_repository import FormCreateParams, HumanInputFormRepositoryImpl
|
||||
from core.tools.entities.tool_entities import ToolProviderType as CoreToolProviderType
|
||||
from core.tools.errors import ToolInvokeError
|
||||
from core.tools.signature import sign_upload_file
|
||||
from core.tools.tool_engine import ToolEngine
|
||||
from core.tools.tool_file_manager import ToolFileManager
|
||||
from core.tools.tool_manager import ToolManager
|
||||
from core.tools.utils.message_transformer import ToolFileMessageTransformer
|
||||
from core.workflow.file_reference import build_file_reference
|
||||
from dify_graph.file import FileTransferMethod, FileType
|
||||
from dify_graph.model_runtime.entities import LLMMode
|
||||
from dify_graph.model_runtime.entities.llm_entities import (
|
||||
@@ -34,7 +35,7 @@ from dify_graph.model_runtime.entities.llm_entities import (
|
||||
from dify_graph.model_runtime.entities.message_entities import PromptMessage, PromptMessageTool
|
||||
from dify_graph.model_runtime.entities.model_entities import AIModelEntity
|
||||
from dify_graph.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
|
||||
from dify_graph.nodes.human_input.entities import DeliveryChannelConfig, apply_debug_email_recipient
|
||||
from dify_graph.nodes.human_input.entities import HumanInputNodeData
|
||||
from dify_graph.nodes.llm.runtime_protocols import (
|
||||
PreparedLLMProtocol,
|
||||
PromptMessageSerializerProtocol,
|
||||
@@ -57,6 +58,17 @@ from models.dataset import SegmentAttachmentBinding
|
||||
from models.model import UploadFile
|
||||
from services.tools.builtin_tools_manage_service import BuiltinToolManageService
|
||||
|
||||
from .human_input_compat import (
|
||||
BoundRecipient,
|
||||
DeliveryChannelConfig,
|
||||
DeliveryMethodType,
|
||||
EmailDeliveryMethod,
|
||||
EmailRecipients,
|
||||
is_human_input_webapp_enabled,
|
||||
parse_human_input_delivery_methods,
|
||||
)
|
||||
from .system_variables import SystemVariableKey, get_system_text
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.tools.__base.tool import Tool
|
||||
from core.tools.entities.tool_entities import ToolInvokeMessage as CoreToolInvokeMessage
|
||||
@@ -77,6 +89,31 @@ def resolve_dify_run_context(run_context: Mapping[str, Any] | DifyRunContext) ->
|
||||
return DifyRunContext.model_validate(raw_ctx)
|
||||
|
||||
|
||||
def apply_dify_debug_email_recipient(
|
||||
method: DeliveryChannelConfig,
|
||||
*,
|
||||
enabled: bool,
|
||||
actor_id: str | None,
|
||||
) -> DeliveryChannelConfig:
|
||||
"""Apply the Dify debugger-specific email recipient override outside `dify_graph`."""
|
||||
if not enabled:
|
||||
return method
|
||||
if not isinstance(method, EmailDeliveryMethod):
|
||||
return method
|
||||
if not method.config.debug_mode:
|
||||
return method
|
||||
|
||||
if actor_id is None:
|
||||
debug_recipients = EmailRecipients(include_bound_group=False, items=[])
|
||||
else:
|
||||
debug_recipients = EmailRecipients(
|
||||
include_bound_group=False,
|
||||
items=[BoundRecipient(reference_id=actor_id)],
|
||||
)
|
||||
debug_config = method.config.with_recipients(debug_recipients)
|
||||
return method.model_copy(update={"config": debug_config})
|
||||
|
||||
|
||||
class DifyFileReferenceFactory(FileReferenceFactoryProtocol):
|
||||
def __init__(self, run_context: Mapping[str, Any] | DifyRunContext) -> None:
|
||||
self._run_context = resolve_dify_run_context(run_context)
|
||||
@@ -200,11 +237,8 @@ class DifyRetrieverAttachmentLoader(RetrieverAttachmentLoaderProtocol):
|
||||
"type": FileType.IMAGE,
|
||||
"transfer_method": FileTransferMethod.LOCAL_FILE,
|
||||
"remote_url": upload_file.source_url,
|
||||
"related_id": upload_file.id,
|
||||
"upload_file_id": upload_file.id,
|
||||
"reference": build_file_reference(record_id=str(upload_file.id), storage_key=upload_file.key),
|
||||
"size": upload_file.size,
|
||||
"storage_key": upload_file.key,
|
||||
"url": sign_upload_file(upload_file.id, upload_file.extension),
|
||||
}
|
||||
)
|
||||
for _, upload_file in attachments_with_bindings
|
||||
@@ -212,18 +246,28 @@ class DifyRetrieverAttachmentLoader(RetrieverAttachmentLoaderProtocol):
|
||||
|
||||
|
||||
class DifyToolFileManager(ToolFileManagerProtocol):
|
||||
def __init__(self, run_context: Mapping[str, Any] | DifyRunContext) -> None:
|
||||
"""Workflow adapter that resolves conversation scope outside `dify_graph`."""
|
||||
|
||||
_conversation_id_getter: Callable[[], str | None] | None
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
run_context: Mapping[str, Any] | DifyRunContext,
|
||||
*,
|
||||
conversation_id_getter: Callable[[], str | None] | None = None,
|
||||
) -> None:
|
||||
self._run_context = resolve_dify_run_context(run_context)
|
||||
self._manager = ToolFileManager()
|
||||
self._conversation_id_getter = conversation_id_getter
|
||||
|
||||
def create_file_by_raw(
|
||||
self,
|
||||
*,
|
||||
conversation_id: str | None,
|
||||
file_binary: bytes,
|
||||
mimetype: str,
|
||||
filename: str | None = None,
|
||||
) -> Any:
|
||||
conversation_id = self._conversation_id_getter() if self._conversation_id_getter is not None else None
|
||||
return self._manager.create_file_by_raw(
|
||||
user_id=self._run_context.user_id,
|
||||
tenant_id=self._run_context.tenant_id,
|
||||
@@ -246,6 +290,18 @@ class _WorkflowToolRuntimeSpec:
|
||||
credential_id: str | None = None
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class _WorkflowToolRuntimeBinding:
|
||||
"""Workflow-private runtime state stored inside the opaque graph handle.
|
||||
|
||||
The binding keeps conversation scope in `core.workflow` while `dify_graph`
|
||||
continues to treat the handle as an opaque token.
|
||||
"""
|
||||
|
||||
tool: Tool
|
||||
conversation_id: str | None = None
|
||||
|
||||
|
||||
class DifyToolNodeRuntime(ToolNodeRuntimeProtocol):
|
||||
def __init__(self, run_context: Mapping[str, Any] | DifyRunContext) -> None:
|
||||
self._run_context = resolve_dify_run_context(run_context)
|
||||
@@ -279,7 +335,10 @@ class DifyToolNodeRuntime(ToolNodeRuntimeProtocol):
|
||||
except Exception as exc:
|
||||
raise ToolRuntimeResolutionError(str(exc)) from exc
|
||||
|
||||
return ToolRuntimeHandle(raw=tool_runtime)
|
||||
conversation_id = (
|
||||
None if variable_pool is None else get_system_text(variable_pool, SystemVariableKey.CONVERSATION_ID)
|
||||
)
|
||||
return ToolRuntimeHandle(raw=_WorkflowToolRuntimeBinding(tool=tool_runtime, conversation_id=conversation_id))
|
||||
|
||||
def get_runtime_parameters(
|
||||
self,
|
||||
@@ -298,10 +357,10 @@ class DifyToolNodeRuntime(ToolNodeRuntimeProtocol):
|
||||
tool_runtime: ToolRuntimeHandle,
|
||||
tool_parameters: Mapping[str, Any],
|
||||
workflow_call_depth: int,
|
||||
conversation_id: str | None,
|
||||
provider_name: str,
|
||||
) -> Generator[ToolRuntimeMessage, None, None]:
|
||||
tool = self._tool_from_handle(tool_runtime)
|
||||
runtime_binding = self._binding_from_handle(tool_runtime)
|
||||
tool = cast("Tool", runtime_binding.tool)
|
||||
callback = DifyWorkflowCallbackHandler()
|
||||
|
||||
try:
|
||||
@@ -312,7 +371,7 @@ class DifyToolNodeRuntime(ToolNodeRuntimeProtocol):
|
||||
workflow_tool_callback=callback,
|
||||
workflow_call_depth=workflow_call_depth,
|
||||
app_id=self._run_context.app_id,
|
||||
conversation_id=conversation_id,
|
||||
conversation_id=runtime_binding.conversation_id,
|
||||
)
|
||||
except Exception as exc:
|
||||
raise self._map_invocation_exception(exc, provider_name=provider_name) from exc
|
||||
@@ -321,7 +380,7 @@ class DifyToolNodeRuntime(ToolNodeRuntimeProtocol):
|
||||
messages=messages,
|
||||
user_id=self._run_context.user_id,
|
||||
tenant_id=self._run_context.tenant_id,
|
||||
conversation_id=None,
|
||||
conversation_id=runtime_binding.conversation_id,
|
||||
)
|
||||
|
||||
return self._adapt_messages(transformed_messages, provider_name=provider_name)
|
||||
@@ -331,7 +390,7 @@ class DifyToolNodeRuntime(ToolNodeRuntimeProtocol):
|
||||
*,
|
||||
tool_runtime: ToolRuntimeHandle,
|
||||
) -> LLMUsage:
|
||||
latest = getattr(tool_runtime.raw, "latest_usage", None)
|
||||
latest = getattr(self._binding_from_handle(tool_runtime).tool, "latest_usage", None)
|
||||
if isinstance(latest, LLMUsage):
|
||||
return latest
|
||||
if isinstance(latest, dict):
|
||||
@@ -373,7 +432,13 @@ class DifyToolNodeRuntime(ToolNodeRuntimeProtocol):
|
||||
|
||||
@staticmethod
|
||||
def _tool_from_handle(tool_runtime: ToolRuntimeHandle) -> Tool:
|
||||
return cast("Tool", tool_runtime.raw)
|
||||
return cast("Tool", DifyToolNodeRuntime._binding_from_handle(tool_runtime).tool)
|
||||
|
||||
@staticmethod
|
||||
def _binding_from_handle(tool_runtime: ToolRuntimeHandle) -> _WorkflowToolRuntimeBinding:
|
||||
if isinstance(tool_runtime.raw, _WorkflowToolRuntimeBinding):
|
||||
return tool_runtime.raw
|
||||
return _WorkflowToolRuntimeBinding(tool=cast("Tool", tool_runtime.raw))
|
||||
|
||||
@staticmethod
|
||||
def _build_tool_runtime_spec(node_data: ToolNodeData) -> _WorkflowToolRuntimeSpec:
|
||||
@@ -491,42 +556,85 @@ class DifyToolNodeRuntime(ToolNodeRuntimeProtocol):
|
||||
|
||||
|
||||
class DifyHumanInputNodeRuntime(HumanInputNodeRuntimeProtocol):
|
||||
def __init__(self, run_context: Mapping[str, Any] | DifyRunContext) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
run_context: Mapping[str, Any] | DifyRunContext,
|
||||
*,
|
||||
workflow_execution_id_getter: Callable[[], str | None] | None = None,
|
||||
) -> None:
|
||||
self._run_context = resolve_dify_run_context(run_context)
|
||||
self._workflow_execution_id_getter = workflow_execution_id_getter
|
||||
|
||||
def invoke_source(self) -> str:
|
||||
def _invoke_source(self) -> str:
|
||||
invoke_from = self._run_context.invoke_from
|
||||
if isinstance(invoke_from, str):
|
||||
return invoke_from
|
||||
return str(getattr(invoke_from, "value", invoke_from))
|
||||
|
||||
def apply_delivery_runtime(
|
||||
self,
|
||||
*,
|
||||
methods: Sequence[DeliveryChannelConfig],
|
||||
) -> Sequence[DeliveryChannelConfig]:
|
||||
def _resolve_delivery_methods(self, *, node_data: HumanInputNodeData) -> Sequence[DeliveryChannelConfig]:
|
||||
invoke_source = self._invoke_source()
|
||||
methods = [method for method in parse_human_input_delivery_methods(node_data) if method.enabled]
|
||||
if invoke_source in {"debugger", "explore"}:
|
||||
methods = [method for method in methods if method.type != DeliveryMethodType.WEBAPP]
|
||||
return [
|
||||
apply_debug_email_recipient(
|
||||
apply_dify_debug_email_recipient(
|
||||
method,
|
||||
enabled=self.invoke_source() == "debugger",
|
||||
user_id=self._run_context.user_id,
|
||||
enabled=invoke_source == "debugger",
|
||||
actor_id=self._run_context.user_id,
|
||||
)
|
||||
for method in methods
|
||||
]
|
||||
|
||||
def console_actor_id(self) -> str | None:
|
||||
return self._run_context.user_id
|
||||
def _display_in_ui(self, *, node_data: HumanInputNodeData) -> bool:
|
||||
if self._invoke_source() == "debugger":
|
||||
return True
|
||||
return is_human_input_webapp_enabled(node_data)
|
||||
|
||||
def _build_form_repository(self) -> HumanInputFormRepositoryImpl:
|
||||
invoke_source = self._invoke_source()
|
||||
return HumanInputFormRepositoryImpl(
|
||||
tenant_id=self._run_context.tenant_id,
|
||||
app_id=self._run_context.app_id,
|
||||
workflow_execution_id=self._workflow_execution_id_getter() if self._workflow_execution_id_getter else None,
|
||||
invoke_source=invoke_source,
|
||||
submission_actor_id=self._run_context.user_id if invoke_source in {"debugger", "explore"} else None,
|
||||
)
|
||||
|
||||
def get_form(self, *, node_id: str):
|
||||
repo = self._build_form_repository()
|
||||
return repo.get_form(node_id)
|
||||
|
||||
def create_form(
|
||||
self,
|
||||
*,
|
||||
node_id: str,
|
||||
node_data: HumanInputNodeData,
|
||||
rendered_content: str,
|
||||
resolved_default_values: Mapping[str, Any],
|
||||
):
|
||||
repo = self._build_form_repository()
|
||||
params = FormCreateParams(
|
||||
workflow_execution_id=self._workflow_execution_id_getter() if self._workflow_execution_id_getter else None,
|
||||
node_id=node_id,
|
||||
form_config=node_data,
|
||||
rendered_content=rendered_content,
|
||||
delivery_methods=self._resolve_delivery_methods(node_data=node_data),
|
||||
display_in_ui=self._display_in_ui(node_data=node_data),
|
||||
resolved_default_values=resolved_default_values,
|
||||
)
|
||||
return repo.create_form(params)
|
||||
|
||||
|
||||
def build_dify_llm_file_saver(
|
||||
*,
|
||||
run_context: Mapping[str, Any] | DifyRunContext,
|
||||
http_client: HttpClientProtocol,
|
||||
conversation_id_getter: Callable[[], str | None] | None = None,
|
||||
) -> LLMFileSaver:
|
||||
from dify_graph.nodes.llm.file_saver import FileSaverImpl
|
||||
|
||||
return FileSaverImpl(
|
||||
tool_file_manager=DifyToolFileManager(run_context),
|
||||
tool_file_manager=DifyToolFileManager(run_context, conversation_id_getter=conversation_id_getter),
|
||||
file_reference_factory=DifyFileReferenceFactory(run_context),
|
||||
http_client=http_client,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user