Files
dify/api/core/sandbox/builder.py

169 lines
5.8 KiB
Python

from __future__ import annotations
import logging
import threading
from collections.abc import Mapping, Sequence
from typing import TYPE_CHECKING, Any
from flask import current_app
from core.entities.provider_entities import BasicProviderConfig
from core.virtual_environment.__base.virtual_environment import VirtualEnvironment
from .entities.sandbox_type import SandboxType
from .initializer import AsyncSandboxInitializer, SandboxInitializer, SyncSandboxInitializer
from .sandbox import Sandbox
if TYPE_CHECKING:
from .storage.sandbox_storage import SandboxStorage
logger = logging.getLogger(__name__)
def _get_sandbox_class(sandbox_type: SandboxType) -> type[VirtualEnvironment]:
match sandbox_type:
case SandboxType.DOCKER:
from core.virtual_environment.providers.docker_daemon_sandbox import DockerDaemonEnvironment
return DockerDaemonEnvironment
case SandboxType.E2B:
from core.virtual_environment.providers.e2b_sandbox import E2BEnvironment
return E2BEnvironment
case SandboxType.LOCAL:
from core.virtual_environment.providers.local_without_isolation import LocalVirtualEnvironment
return LocalVirtualEnvironment
case SandboxType.SSH:
from core.virtual_environment.providers.ssh_sandbox import SSHSandboxEnvironment
return SSHSandboxEnvironment
case _:
raise ValueError(f"Unsupported sandbox type: {sandbox_type}")
class SandboxBuilder:
_tenant_id: str
_sandbox_type: SandboxType
_user_id: str | None
_app_id: str | None
_options: dict[str, Any]
_environments: dict[str, str]
_initializers: list[SandboxInitializer]
_storage: SandboxStorage | None
_assets_id: str | None
def __init__(self, tenant_id: str, sandbox_type: SandboxType) -> None:
self._tenant_id = tenant_id
self._sandbox_type = sandbox_type
self._user_id = None
self._app_id = None
self._options = {}
self._environments = {}
self._initializers = []
self._storage = None
self._assets_id = None
def user(self, user_id: str) -> SandboxBuilder:
self._user_id = user_id
return self
def app(self, app_id: str) -> SandboxBuilder:
self._app_id = app_id
return self
def options(self, options: Mapping[str, Any]) -> SandboxBuilder:
self._options = dict(options)
return self
def environments(self, environments: Mapping[str, str]) -> SandboxBuilder:
self._environments = dict(environments)
return self
def initializer(self, initializer: SandboxInitializer) -> SandboxBuilder:
self._initializers.append(initializer)
return self
def initializers(self, initializers: Sequence[SandboxInitializer]) -> SandboxBuilder:
self._initializers.extend(initializers)
return self
def storage(self, storage: SandboxStorage, assets_id: str) -> SandboxBuilder:
self._storage = storage
self._assets_id = assets_id
return self
def build(self) -> Sandbox:
if self._storage is None:
raise ValueError("storage is required, call .storage() before .build()")
if self._assets_id is None:
raise ValueError("assets_id is required, call .storage() before .build()")
if self._user_id is None:
raise ValueError("user_id is required, call .user() before .build()")
if self._app_id is None:
raise ValueError("app_id is required, call .app() before .build()")
vm_class = _get_sandbox_class(self._sandbox_type)
vm = vm_class(
tenant_id=self._tenant_id,
options=self._options,
environments=self._environments,
user_id=self._user_id,
)
sandbox = Sandbox(
vm=vm,
storage=self._storage,
tenant_id=self._tenant_id,
user_id=self._user_id,
app_id=self._app_id,
assets_id=self._assets_id,
)
# Run synchronous initializers before marking sandbox as ready.
for init in self._initializers:
if isinstance(init, SyncSandboxInitializer):
init.initialize(sandbox)
# Run sandbox setup asynchronously so workflow execution can proceed.
# Capture the Flask app before starting the thread for database access.
flask_app = current_app._get_current_object() # type: ignore
def initialize() -> None:
with flask_app.app_context():
try:
for init in self._initializers:
if not isinstance(init, AsyncSandboxInitializer):
continue
if sandbox.is_cancelled():
return
init.initialize(sandbox)
if sandbox.is_cancelled():
return
sandbox.mount()
sandbox.mark_ready()
except Exception as exc:
logger.exception(
"Failed to initialize sandbox: tenant_id=%s, app_id=%s", self._tenant_id, self._app_id
)
sandbox.mark_failed(exc)
# Background init completes or signals failure via sandbox state.
threading.Thread(target=initialize, daemon=True).start()
return sandbox
@staticmethod
def validate(vm_type: SandboxType, options: Mapping[str, Any]) -> None:
vm_class = _get_sandbox_class(vm_type)
vm_class.validate(options)
@classmethod
def draft_id(cls, user_id: str) -> str:
return user_id
class VMConfig:
@staticmethod
def get_schema(vm_type: SandboxType) -> list[BasicProviderConfig]:
return _get_sandbox_class(vm_type).get_config_schema()