mirror of
https://github.com/langgenius/dify.git
synced 2025-12-22 18:53:00 -05:00
* test: adding some web tests (#27792) * feat: add validation to prevent saving empty opening statement in conversation opener modal (#27843) * fix(web): improve the consistency of the inputs-form UI (#27837) * fix(web): increase z-index of PortalToFollowElemContent (#27823) * fix: installation_id is missing when in tools page (#27849) * fix: avoid passing empty uniqueIdentifier to InstallFromMarketplace (#27802) Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * test: create new test scripts and update some existing test scripts o… (#27850) * feat: change feedback to forum (#27862) * chore: translate i18n files and update type definitions (#27868) Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com> * Fix/template transformer line number (#27867) Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com> * bump vite to 6.4.1 (#27877) * Add WEAVIATE_GRPC_ENDPOINT as designed in weaviate migration guide (#27861) Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> * Fix: correct DraftWorkflowApi.post response model (#27289) Signed-off-by: Yongtao Huang <yongtaoh2022@gmail.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> * fix Version 2.0.0-beta.2: Chat annotations Api Error #25506 (#27206) Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Asuka Minato <i@asukaminato.eu.org> * fix jina reader creadential migration command (#27883) * fix agent putout the output of workflow-tool twice (#26835) (#27087) * fix jina reader transform (#27922) * fix: prevent fetch version info in enterprise edition (#27923) * fix(api): fix `VariablePool.get` adding unexpected keys to variable_dictionary (#26767) Co-authored-by: -LAN- <laipz8200@outlook.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * refactor: implement tenant self queue for rag tasks (#27559) Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: -LAN- <laipz8200@outlook.com> * fix: bump brotli to 1.2.0 resloved CVE-2025-6176 (#27950) Signed-off-by: kenwoodjw <blackxin55+@gmail.com> * docs: clarify how to obtain workflow_id for version execution (#28007) Signed-off-by: OneZero-Y <aukovyps@163.com> * fix: fix https://github.com/langgenius/dify/issues/27939 (#27985) * fix: the model list encountered two children with the same key (#27956) Co-authored-by: haokai <haokai@shuwen.com> * add onupdate=func.current_timestamp() (#28014) Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * chore(deps): bump scipy-stubs from 1.16.2.3 to 1.16.3.0 in /api (#28025) Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Fix typo in weaviate comment, improve time test precision, and add security tests for get-icon utility (#27919) Signed-off-by: NeatGuyCoding <15627489+NeatGuyCoding@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * feat: Add Audio Content Support for MCP Tools (#27979) * fix: elasticsearch_vector version (#28028) Co-authored-by: huangzhuo <huangzhuo1@xiaomi.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * fix workflow default updated_at (#28047) * feat(api): Introduce Broadcast Channel (#27835) This PR introduces a `BroadcastChannel` abstraction with broadcasting and at-most once delivery semantics, serving as the communication component between celery worker and API server. It also includes a reference implementation backed by Redis PubSub. Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> * fix * back --------- Signed-off-by: Yongtao Huang <yongtaoh2022@gmail.com> Signed-off-by: kenwoodjw <blackxin55+@gmail.com> Signed-off-by: OneZero-Y <aukovyps@163.com> Signed-off-by: dependabot[bot] <support@github.com> Signed-off-by: NeatGuyCoding <15627489+NeatGuyCoding@users.noreply.github.com> Co-authored-by: aka James4u <smart.jamesjin@gmail.com> Co-authored-by: Novice <novice12185727@gmail.com> Co-authored-by: yangzheli <43645580+yangzheli@users.noreply.github.com> Co-authored-by: Elliott <105957288+Elliott-byte@users.noreply.github.com> Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com> Co-authored-by: johnny0120 <johnny0120@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Gritty_dev <101377478+codomposer@users.noreply.github.com> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: wangjifeng <163279492+kk-wangjifeng@users.noreply.github.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Boris Polonsky <BorisPolonsky@users.noreply.github.com> Co-authored-by: Yongtao Huang <yongtaoh2022@gmail.com> Co-authored-by: Cursx <33718736+Cursx@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Asuka Minato <i@asukaminato.eu.org> Co-authored-by: Jyong <76649700+JohnJyong@users.noreply.github.com> Co-authored-by: red_sun <56100962+redSun64@users.noreply.github.com> Co-authored-by: NFish <douxc512@gmail.com> Co-authored-by: QuantumGhost <obelisk.reg+git@gmail.com> Co-authored-by: -LAN- <laipz8200@outlook.com> Co-authored-by: hj24 <huangjian@dify.ai> Co-authored-by: kenwoodjw <blackxin55+@gmail.com> Co-authored-by: OneZero-Y <aukovyps@163.com> Co-authored-by: wangxiaolei <fatelei@gmail.com> Co-authored-by: Kenn <kennfalcon@gmail.com> Co-authored-by: haokai <haokai@shuwen.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: NeatGuyCoding <15627489+NeatGuyCoding@users.noreply.github.com> Co-authored-by: Will <vvfriday@gmail.com> Co-authored-by: huangzhuo1949 <167434202+huangzhuo1949@users.noreply.github.com> Co-authored-by: huangzhuo <huangzhuo1@xiaomi.com>
175 lines
7.3 KiB
Python
175 lines
7.3 KiB
Python
import base64
|
|
import json
|
|
import logging
|
|
from collections.abc import Generator
|
|
from typing import Any
|
|
|
|
from core.mcp.auth_client import MCPClientWithAuthRetry
|
|
from core.mcp.error import MCPConnectionError
|
|
from core.mcp.types import AudioContent, CallToolResult, ImageContent, TextContent
|
|
from core.tools.__base.tool import Tool
|
|
from core.tools.__base.tool_runtime import ToolRuntime
|
|
from core.tools.entities.tool_entities import ToolEntity, ToolInvokeMessage, ToolProviderType
|
|
from core.tools.errors import ToolInvokeError
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MCPTool(Tool):
|
|
def __init__(
|
|
self,
|
|
entity: ToolEntity,
|
|
runtime: ToolRuntime,
|
|
tenant_id: str,
|
|
icon: str,
|
|
server_url: str,
|
|
provider_id: str,
|
|
headers: dict[str, str] | None = None,
|
|
timeout: float | None = None,
|
|
sse_read_timeout: float | None = None,
|
|
):
|
|
super().__init__(entity, runtime)
|
|
self.tenant_id = tenant_id
|
|
self.icon = icon
|
|
self.server_url = server_url
|
|
self.provider_id = provider_id
|
|
self.headers = headers or {}
|
|
self.timeout = timeout
|
|
self.sse_read_timeout = sse_read_timeout
|
|
|
|
def tool_provider_type(self) -> ToolProviderType:
|
|
return ToolProviderType.MCP
|
|
|
|
def _invoke(
|
|
self,
|
|
user_id: str,
|
|
tool_parameters: dict[str, Any],
|
|
conversation_id: str | None = None,
|
|
app_id: str | None = None,
|
|
message_id: str | None = None,
|
|
) -> Generator[ToolInvokeMessage, None, None]:
|
|
result = self.invoke_remote_mcp_tool(tool_parameters)
|
|
# handle dify tool output
|
|
for content in result.content:
|
|
if isinstance(content, TextContent):
|
|
yield from self._process_text_content(content)
|
|
elif isinstance(content, ImageContent):
|
|
yield self._process_image_content(content)
|
|
elif isinstance(content, AudioContent):
|
|
yield self._process_audio_content(content)
|
|
else:
|
|
logger.warning("Unsupported content type=%s", type(content))
|
|
|
|
# handle MCP structured output
|
|
if self.entity.output_schema and result.structuredContent:
|
|
for k, v in result.structuredContent.items():
|
|
yield self.create_variable_message(k, v)
|
|
|
|
def _process_text_content(self, content: TextContent) -> Generator[ToolInvokeMessage, None, None]:
|
|
"""Process text content and yield appropriate messages."""
|
|
# Check if content looks like JSON before attempting to parse
|
|
text = content.text.strip()
|
|
if text and text[0] in ("{", "[") and text[-1] in ("}", "]"):
|
|
try:
|
|
content_json = json.loads(text)
|
|
yield from self._process_json_content(content_json)
|
|
return
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# If not JSON or parsing failed, treat as plain text
|
|
yield self.create_text_message(content.text)
|
|
|
|
def _process_json_content(self, content_json: Any) -> Generator[ToolInvokeMessage, None, None]:
|
|
"""Process JSON content based on its type."""
|
|
if isinstance(content_json, dict):
|
|
yield self.create_json_message(content_json)
|
|
elif isinstance(content_json, list):
|
|
yield from self._process_json_list(content_json)
|
|
else:
|
|
# For primitive types (str, int, bool, etc.), convert to string
|
|
yield self.create_text_message(str(content_json))
|
|
|
|
def _process_json_list(self, json_list: list) -> Generator[ToolInvokeMessage, None, None]:
|
|
"""Process a list of JSON items."""
|
|
if any(not isinstance(item, dict) for item in json_list):
|
|
# If the list contains any non-dict item, treat the entire list as a text message.
|
|
yield self.create_text_message(str(json_list))
|
|
return
|
|
|
|
# Otherwise, process each dictionary as a separate JSON message.
|
|
for item in json_list:
|
|
yield self.create_json_message(item)
|
|
|
|
def _process_image_content(self, content: ImageContent) -> ToolInvokeMessage:
|
|
"""Process image content and return a blob message."""
|
|
return self.create_blob_message(blob=base64.b64decode(content.data), meta={"mime_type": content.mimeType})
|
|
|
|
def _process_audio_content(self, content: AudioContent) -> ToolInvokeMessage:
|
|
"""Process audio content and return a blob message."""
|
|
return self.create_blob_message(blob=base64.b64decode(content.data), meta={"mime_type": content.mimeType})
|
|
|
|
def fork_tool_runtime(self, runtime: ToolRuntime) -> "MCPTool":
|
|
return MCPTool(
|
|
entity=self.entity,
|
|
runtime=runtime,
|
|
tenant_id=self.tenant_id,
|
|
icon=self.icon,
|
|
server_url=self.server_url,
|
|
provider_id=self.provider_id,
|
|
headers=self.headers,
|
|
timeout=self.timeout,
|
|
sse_read_timeout=self.sse_read_timeout,
|
|
)
|
|
|
|
def _handle_none_parameter(self, parameter: dict[str, Any]) -> dict[str, Any]:
|
|
"""
|
|
in mcp tool invoke, if the parameter is empty, it will be set to None
|
|
"""
|
|
return {
|
|
key: value
|
|
for key, value in parameter.items()
|
|
if value is not None and not (isinstance(value, str) and value.strip() == "")
|
|
}
|
|
|
|
def invoke_remote_mcp_tool(self, tool_parameters: dict[str, Any]) -> CallToolResult:
|
|
headers = self.headers.copy() if self.headers else {}
|
|
tool_parameters = self._handle_none_parameter(tool_parameters)
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from extensions.ext_database import db
|
|
from services.tools.mcp_tools_manage_service import MCPToolManageService
|
|
|
|
# Step 1: Load provider entity and credentials in a short-lived session
|
|
# This minimizes database connection hold time
|
|
with Session(db.engine, expire_on_commit=False) as session:
|
|
mcp_service = MCPToolManageService(session=session)
|
|
provider_entity = mcp_service.get_provider_entity(self.provider_id, self.tenant_id, by_server_id=True)
|
|
|
|
# Decrypt and prepare all credentials before closing session
|
|
server_url = provider_entity.decrypt_server_url()
|
|
headers = provider_entity.decrypt_headers()
|
|
|
|
# Try to get existing token and add to headers
|
|
if not headers:
|
|
tokens = provider_entity.retrieve_tokens()
|
|
if tokens and tokens.access_token:
|
|
headers["Authorization"] = f"{tokens.token_type.capitalize()} {tokens.access_token}"
|
|
|
|
# Step 2: Session is now closed, perform network operations without holding database connection
|
|
# MCPClientWithAuthRetry will create a new session lazily only if auth retry is needed
|
|
try:
|
|
with MCPClientWithAuthRetry(
|
|
server_url=server_url,
|
|
headers=headers,
|
|
timeout=self.timeout,
|
|
sse_read_timeout=self.sse_read_timeout,
|
|
provider_entity=provider_entity,
|
|
) as mcp_client:
|
|
return mcp_client.invoke_tool(tool_name=self.entity.identity.name, tool_args=tool_parameters)
|
|
except MCPConnectionError as e:
|
|
raise ToolInvokeError(f"Failed to connect to MCP server: {e}") from e
|
|
except Exception as e:
|
|
raise ToolInvokeError(f"Failed to invoke tool: {e}") from e
|