from __future__ import annotations import json import logging import mimetypes import os from io import BytesIO from types import TracebackType from core.file import File, FileTransferMethod, FileType from core.sandbox.sandbox import Sandbox from core.session.cli_api import CliApiSession, CliApiSessionManager, CliContext from core.skill.entities import ToolAccessPolicy from core.skill.entities.tool_dependencies import ToolDependencies from core.tools.signature import sign_tool_file from core.tools.tool_file_manager import ToolFileManager from core.virtual_environment.__base.helpers import pipeline from ..bash.dify_cli import DifyCliConfig from ..entities import DifyCli from .bash_tool import SandboxBashTool logger = logging.getLogger(__name__) SANDBOX_READY_TIMEOUT = 60 * 10 # Default output directory for sandbox-generated files SANDBOX_OUTPUT_DIR = "output" # Maximum number of files to collect from sandbox output MAX_OUTPUT_FILES = 50 # Maximum file size to collect (10MB) MAX_OUTPUT_FILE_SIZE = 10 * 1024 * 1024 class SandboxBashSession: def __init__(self, *, sandbox: Sandbox, node_id: str, tools: ToolDependencies | None) -> None: self._sandbox = sandbox self._node_id = node_id self._tools = tools self._bash_tool: SandboxBashTool | None = None self._cli_api_session: CliApiSession | None = None self._tenant_id = sandbox.tenant_id self._user_id = sandbox.user_id self._app_id = sandbox.app_id self._assets_id = sandbox.assets_id def __enter__(self) -> SandboxBashSession: # Ensure sandbox initialization completes before any bash commands run. self._sandbox.wait_ready(timeout=SANDBOX_READY_TIMEOUT) self._cli_api_session = CliApiSessionManager().create( tenant_id=self._tenant_id, user_id=self._user_id, context=CliContext(tool_access=ToolAccessPolicy.from_dependencies(self._tools)), ) if self._tools is not None and not self._tools.is_empty(): tools_path = self._setup_node_tools_directory(self._node_id, self._tools, self._cli_api_session) else: tools_path = DifyCli.GLOBAL_TOOLS_PATH self._bash_tool = SandboxBashTool( sandbox=self._sandbox.vm, tenant_id=self._tenant_id, tools_path=tools_path, ) return self def _setup_node_tools_directory( self, node_id: str, tools: ToolDependencies, cli_api_session: CliApiSession, ) -> str: node_tools_path = f"{DifyCli.TOOLS_ROOT}/{node_id}" vm = self._sandbox.vm ( pipeline(vm) .add(["mkdir", "-p", DifyCli.GLOBAL_TOOLS_PATH], error_message="Failed to create global tools dir") .add(["mkdir", "-p", node_tools_path], error_message="Failed to create node tools dir") .execute(raise_on_error=True) ) config_json = json.dumps( DifyCliConfig.create(session=cli_api_session, tenant_id=self._tenant_id, tool_deps=tools).model_dump( mode="json" ), ensure_ascii=False, ) vm.upload_file(f"{node_tools_path}/{DifyCli.CONFIG_FILENAME}", BytesIO(config_json.encode("utf-8"))) pipeline(vm, cwd=node_tools_path).add( [DifyCli.PATH, "init"], error_message="Failed to initialize Dify CLI" ).execute(raise_on_error=True) logger.info( "Node %s tools initialized, path=%s, tool_count=%d", node_id, node_tools_path, len(tools.references) ) return node_tools_path def __exit__( self, exc_type: type[BaseException] | None, exc: BaseException | None, tb: TracebackType | None, ) -> bool: try: if self._cli_api_session is not None: CliApiSessionManager().delete(self._cli_api_session.id) logger.debug("Cleaned up SandboxSession session_id=%s", self._cli_api_session.id) self._cli_api_session = None except Exception: logger.exception("Failed to cleanup SandboxSession") return False @property def bash_tool(self) -> SandboxBashTool: if self._bash_tool is None: raise RuntimeError("SandboxSession is not initialized") return self._bash_tool def collect_output_files(self, output_dir: str = SANDBOX_OUTPUT_DIR) -> list[File]: """ Collect files from sandbox output directory and save them as ToolFiles. Scans the specified output directory in sandbox, downloads each file, saves it as a ToolFile, and returns a list of File objects. The File objects will have valid tool_file_id that can be referenced by subsequent nodes via structured output. Args: output_dir: Directory path in sandbox to scan for output files. Defaults to "output" (relative to workspace). Returns: List of File objects representing the collected files. """ vm = self._sandbox.vm collected_files: list[File] = [] try: file_states = vm.list_files(output_dir, limit=MAX_OUTPUT_FILES) except Exception as exc: # Output directory may not exist if no files were generated logger.debug("Failed to list sandbox output files in %s: %s", output_dir, exc) return collected_files tool_file_manager = ToolFileManager() for file_state in file_states: # Skip files that are too large if file_state.size > MAX_OUTPUT_FILE_SIZE: logger.warning( "Skipping sandbox output file %s: size %d exceeds limit %d", file_state.path, file_state.size, MAX_OUTPUT_FILE_SIZE, ) continue try: # file_state.path is already relative to working_path (e.g., "output/file.png") file_content = vm.download_file(file_state.path) file_binary = file_content.getvalue() # Determine mime type from extension filename = os.path.basename(file_state.path) mime_type, _ = mimetypes.guess_type(filename) if not mime_type: mime_type = "application/octet-stream" # Save as ToolFile tool_file = tool_file_manager.create_file_by_raw( user_id=self._user_id, tenant_id=self._tenant_id, conversation_id=None, file_binary=file_binary, mimetype=mime_type, filename=filename, ) # Determine file type from mime type file_type = _get_file_type_from_mime(mime_type) extension = os.path.splitext(filename)[1] if "." in filename else ".bin" url = sign_tool_file(tool_file.id, extension) # Create File object with tool_file_id as related_id file_obj = File( id=tool_file.id, # Use tool_file_id as the File id for easy reference tenant_id=self._tenant_id, type=file_type, transfer_method=FileTransferMethod.TOOL_FILE, filename=filename, extension=extension, mime_type=mime_type, size=len(file_binary), related_id=tool_file.id, url=url, storage_key=tool_file.file_key, ) collected_files.append(file_obj) logger.info( "Collected sandbox output file: %s -> tool_file_id=%s", file_state.path, tool_file.id, ) except Exception as exc: logger.warning("Failed to collect sandbox output file %s: %s", file_state.path, exc) continue logger.info( "Collected %d files from sandbox output directory %s", len(collected_files), output_dir, ) return collected_files def _get_file_type_from_mime(mime_type: str) -> FileType: """Determine FileType from mime type.""" if mime_type.startswith("image/"): return FileType.IMAGE elif mime_type.startswith("video/"): return FileType.VIDEO elif mime_type.startswith("audio/"): return FileType.AUDIO elif "text" in mime_type or "pdf" in mime_type: return FileType.DOCUMENT else: return FileType.CUSTOM