Files
dify/api/core/sandbox/bash/session.py
Stream e0082dbf18 revert: add tools for output in agent mode
feat: hide output tools and improve JSON formatting for structured output
feat: hide output tools and improve JSON formatting for structured output
fix: handle prompt template correctly to extract selectors for step run
fix: emit StreamChunkEvent correctly for sandbox agent
chore: better debug message
fix: incorrect output tool runtime selection
fix: type issues
fix: align parameter list
fix: align parameter list
fix: hide internal builtin providers from tool list
vibe: implement file structured output
vibe: implement file structured output
fix: refix parameter for tool
fix: crash
fix: crash
refactor: remove union types
fix: type check
Merge branch 'feat/structured-output-with-sandbox' into feat/support-agent-sandbox
fix: provide json as text
fix: provide json as text
fix: get AgentResult correctly
fix: provides correct prompts, tools and terminal predicates
fix: provides correct prompts, tools and terminal predicates
fix: circular import
feat: support structured output in sandbox and tool mode
2026-02-04 21:43:53 +08:00

232 lines
8.6 KiB
Python

from __future__ import annotations
import json
import logging
import mimetypes
import os
from io import BytesIO
from types import TracebackType
from core.file import File, FileTransferMethod, FileType
from core.sandbox.sandbox import Sandbox
from core.session.cli_api import CliApiSession, CliApiSessionManager, CliContext
from core.skill.entities import ToolAccessPolicy
from core.skill.entities.tool_dependencies import ToolDependencies
from core.tools.signature import sign_tool_file
from core.tools.tool_file_manager import ToolFileManager
from core.virtual_environment.__base.helpers import pipeline
from ..bash.dify_cli import DifyCliConfig
from ..entities import DifyCli
from .bash_tool import SandboxBashTool
logger = logging.getLogger(__name__)
SANDBOX_READY_TIMEOUT = 60 * 10
# Default output directory for sandbox-generated files
SANDBOX_OUTPUT_DIR = "output"
# Maximum number of files to collect from sandbox output
MAX_OUTPUT_FILES = 50
# Maximum file size to collect (10MB)
MAX_OUTPUT_FILE_SIZE = 10 * 1024 * 1024
class SandboxBashSession:
def __init__(self, *, sandbox: Sandbox, node_id: str, tools: ToolDependencies | None) -> None:
self._sandbox = sandbox
self._node_id = node_id
self._tools = tools
self._bash_tool: SandboxBashTool | None = None
self._cli_api_session: CliApiSession | None = None
self._tenant_id = sandbox.tenant_id
self._user_id = sandbox.user_id
self._app_id = sandbox.app_id
self._assets_id = sandbox.assets_id
def __enter__(self) -> SandboxBashSession:
# Ensure sandbox initialization completes before any bash commands run.
self._sandbox.wait_ready(timeout=SANDBOX_READY_TIMEOUT)
self._cli_api_session = CliApiSessionManager().create(
tenant_id=self._tenant_id,
user_id=self._user_id,
context=CliContext(tool_access=ToolAccessPolicy.from_dependencies(self._tools)),
)
if self._tools is not None and not self._tools.is_empty():
tools_path = self._setup_node_tools_directory(self._node_id, self._tools, self._cli_api_session)
else:
tools_path = DifyCli.GLOBAL_TOOLS_PATH
self._bash_tool = SandboxBashTool(
sandbox=self._sandbox.vm,
tenant_id=self._tenant_id,
tools_path=tools_path,
)
return self
def _setup_node_tools_directory(
self,
node_id: str,
tools: ToolDependencies,
cli_api_session: CliApiSession,
) -> str:
node_tools_path = f"{DifyCli.TOOLS_ROOT}/{node_id}"
vm = self._sandbox.vm
(
pipeline(vm)
.add(["mkdir", "-p", DifyCli.GLOBAL_TOOLS_PATH], error_message="Failed to create global tools dir")
.add(["mkdir", "-p", node_tools_path], error_message="Failed to create node tools dir")
.execute(raise_on_error=True)
)
config_json = json.dumps(
DifyCliConfig.create(session=cli_api_session, tenant_id=self._tenant_id, tool_deps=tools).model_dump(
mode="json"
),
ensure_ascii=False,
)
vm.upload_file(f"{node_tools_path}/{DifyCli.CONFIG_FILENAME}", BytesIO(config_json.encode("utf-8")))
pipeline(vm, cwd=node_tools_path).add(
[DifyCli.PATH, "init"], error_message="Failed to initialize Dify CLI"
).execute(raise_on_error=True)
logger.info(
"Node %s tools initialized, path=%s, tool_count=%d", node_id, node_tools_path, len(tools.references)
)
return node_tools_path
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: TracebackType | None,
) -> bool:
try:
if self._cli_api_session is not None:
CliApiSessionManager().delete(self._cli_api_session.id)
logger.debug("Cleaned up SandboxSession session_id=%s", self._cli_api_session.id)
self._cli_api_session = None
except Exception:
logger.exception("Failed to cleanup SandboxSession")
return False
@property
def bash_tool(self) -> SandboxBashTool:
if self._bash_tool is None:
raise RuntimeError("SandboxSession is not initialized")
return self._bash_tool
def collect_output_files(self, output_dir: str = SANDBOX_OUTPUT_DIR) -> list[File]:
"""
Collect files from sandbox output directory and save them as ToolFiles.
Scans the specified output directory in sandbox, downloads each file,
saves it as a ToolFile, and returns a list of File objects. The File
objects will have valid tool_file_id that can be referenced by subsequent
nodes via structured output.
Args:
output_dir: Directory path in sandbox to scan for output files.
Defaults to "output" (relative to workspace).
Returns:
List of File objects representing the collected files.
"""
vm = self._sandbox.vm
collected_files: list[File] = []
try:
file_states = vm.list_files(output_dir, limit=MAX_OUTPUT_FILES)
except Exception as exc:
# Output directory may not exist if no files were generated
logger.debug("Failed to list sandbox output files in %s: %s", output_dir, exc)
return collected_files
tool_file_manager = ToolFileManager()
for file_state in file_states:
# Skip files that are too large
if file_state.size > MAX_OUTPUT_FILE_SIZE:
logger.warning(
"Skipping sandbox output file %s: size %d exceeds limit %d",
file_state.path,
file_state.size,
MAX_OUTPUT_FILE_SIZE,
)
continue
try:
# file_state.path is already relative to working_path (e.g., "output/file.png")
file_content = vm.download_file(file_state.path)
file_binary = file_content.getvalue()
# Determine mime type from extension
filename = os.path.basename(file_state.path)
mime_type, _ = mimetypes.guess_type(filename)
if not mime_type:
mime_type = "application/octet-stream"
# Save as ToolFile
tool_file = tool_file_manager.create_file_by_raw(
user_id=self._user_id,
tenant_id=self._tenant_id,
conversation_id=None,
file_binary=file_binary,
mimetype=mime_type,
filename=filename,
)
# Determine file type from mime type
file_type = _get_file_type_from_mime(mime_type)
extension = os.path.splitext(filename)[1] if "." in filename else ".bin"
url = sign_tool_file(tool_file.id, extension)
# Create File object with tool_file_id as related_id
file_obj = File(
id=tool_file.id, # Use tool_file_id as the File id for easy reference
tenant_id=self._tenant_id,
type=file_type,
transfer_method=FileTransferMethod.TOOL_FILE,
filename=filename,
extension=extension,
mime_type=mime_type,
size=len(file_binary),
related_id=tool_file.id,
url=url,
storage_key=tool_file.file_key,
)
collected_files.append(file_obj)
logger.info(
"Collected sandbox output file: %s -> tool_file_id=%s",
file_state.path,
tool_file.id,
)
except Exception as exc:
logger.warning("Failed to collect sandbox output file %s: %s", file_state.path, exc)
continue
logger.info(
"Collected %d files from sandbox output directory %s",
len(collected_files),
output_dir,
)
return collected_files
def _get_file_type_from_mime(mime_type: str) -> FileType:
"""Determine FileType from mime type."""
if mime_type.startswith("image/"):
return FileType.IMAGE
elif mime_type.startswith("video/"):
return FileType.VIDEO
elif mime_type.startswith("audio/"):
return FileType.AUDIO
elif "text" in mime_type or "pdf" in mime_type:
return FileType.DOCUMENT
else:
return FileType.CUSTOM