mirror of
https://github.com/langgenius/dify.git
synced 2026-02-11 10:01:30 -05:00
169 lines
5.8 KiB
Python
169 lines
5.8 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
import threading
|
|
from collections.abc import Mapping, Sequence
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
from flask import current_app
|
|
|
|
from core.entities.provider_entities import BasicProviderConfig
|
|
from core.virtual_environment.__base.virtual_environment import VirtualEnvironment
|
|
|
|
from .entities.sandbox_type import SandboxType
|
|
from .initializer import AsyncSandboxInitializer, SandboxInitializer, SyncSandboxInitializer
|
|
from .sandbox import Sandbox
|
|
|
|
if TYPE_CHECKING:
|
|
from .storage.sandbox_storage import SandboxStorage
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _get_sandbox_class(sandbox_type: SandboxType) -> type[VirtualEnvironment]:
|
|
match sandbox_type:
|
|
case SandboxType.DOCKER:
|
|
from core.virtual_environment.providers.docker_daemon_sandbox import DockerDaemonEnvironment
|
|
|
|
return DockerDaemonEnvironment
|
|
case SandboxType.E2B:
|
|
from core.virtual_environment.providers.e2b_sandbox import E2BEnvironment
|
|
|
|
return E2BEnvironment
|
|
case SandboxType.LOCAL:
|
|
from core.virtual_environment.providers.local_without_isolation import LocalVirtualEnvironment
|
|
|
|
return LocalVirtualEnvironment
|
|
case SandboxType.SSH:
|
|
from core.virtual_environment.providers.ssh_sandbox import SSHSandboxEnvironment
|
|
|
|
return SSHSandboxEnvironment
|
|
case _:
|
|
raise ValueError(f"Unsupported sandbox type: {sandbox_type}")
|
|
|
|
|
|
class SandboxBuilder:
|
|
_tenant_id: str
|
|
_sandbox_type: SandboxType
|
|
_user_id: str | None
|
|
_app_id: str | None
|
|
_options: dict[str, Any]
|
|
_environments: dict[str, str]
|
|
_initializers: list[SandboxInitializer]
|
|
_storage: SandboxStorage | None
|
|
_assets_id: str | None
|
|
|
|
def __init__(self, tenant_id: str, sandbox_type: SandboxType) -> None:
|
|
self._tenant_id = tenant_id
|
|
self._sandbox_type = sandbox_type
|
|
self._user_id = None
|
|
self._app_id = None
|
|
self._options = {}
|
|
self._environments = {}
|
|
self._initializers = []
|
|
self._storage = None
|
|
self._assets_id = None
|
|
|
|
def user(self, user_id: str) -> SandboxBuilder:
|
|
self._user_id = user_id
|
|
return self
|
|
|
|
def app(self, app_id: str) -> SandboxBuilder:
|
|
self._app_id = app_id
|
|
return self
|
|
|
|
def options(self, options: Mapping[str, Any]) -> SandboxBuilder:
|
|
self._options = dict(options)
|
|
return self
|
|
|
|
def environments(self, environments: Mapping[str, str]) -> SandboxBuilder:
|
|
self._environments = dict(environments)
|
|
return self
|
|
|
|
def initializer(self, initializer: SandboxInitializer) -> SandboxBuilder:
|
|
self._initializers.append(initializer)
|
|
return self
|
|
|
|
def initializers(self, initializers: Sequence[SandboxInitializer]) -> SandboxBuilder:
|
|
self._initializers.extend(initializers)
|
|
return self
|
|
|
|
def storage(self, storage: SandboxStorage, assets_id: str) -> SandboxBuilder:
|
|
self._storage = storage
|
|
self._assets_id = assets_id
|
|
return self
|
|
|
|
def build(self) -> Sandbox:
|
|
if self._storage is None:
|
|
raise ValueError("storage is required, call .storage() before .build()")
|
|
if self._assets_id is None:
|
|
raise ValueError("assets_id is required, call .storage() before .build()")
|
|
if self._user_id is None:
|
|
raise ValueError("user_id is required, call .user() before .build()")
|
|
if self._app_id is None:
|
|
raise ValueError("app_id is required, call .app() before .build()")
|
|
|
|
vm_class = _get_sandbox_class(self._sandbox_type)
|
|
vm = vm_class(
|
|
tenant_id=self._tenant_id,
|
|
options=self._options,
|
|
environments=self._environments,
|
|
user_id=self._user_id,
|
|
)
|
|
sandbox = Sandbox(
|
|
vm=vm,
|
|
storage=self._storage,
|
|
tenant_id=self._tenant_id,
|
|
user_id=self._user_id,
|
|
app_id=self._app_id,
|
|
assets_id=self._assets_id,
|
|
)
|
|
|
|
# Run synchronous initializers before marking sandbox as ready.
|
|
for init in self._initializers:
|
|
if isinstance(init, SyncSandboxInitializer):
|
|
init.initialize(sandbox)
|
|
|
|
# Run sandbox setup asynchronously so workflow execution can proceed.
|
|
# Capture the Flask app before starting the thread for database access.
|
|
flask_app = current_app._get_current_object() # type: ignore
|
|
|
|
def initialize() -> None:
|
|
with flask_app.app_context():
|
|
try:
|
|
for init in self._initializers:
|
|
if not isinstance(init, AsyncSandboxInitializer):
|
|
continue
|
|
|
|
if sandbox.is_cancelled():
|
|
return
|
|
init.initialize(sandbox)
|
|
if sandbox.is_cancelled():
|
|
return
|
|
sandbox.mount()
|
|
sandbox.mark_ready()
|
|
except Exception as exc:
|
|
logger.exception(
|
|
"Failed to initialize sandbox: tenant_id=%s, app_id=%s", self._tenant_id, self._app_id
|
|
)
|
|
sandbox.mark_failed(exc)
|
|
|
|
# Background init completes or signals failure via sandbox state.
|
|
threading.Thread(target=initialize, daemon=True).start()
|
|
return sandbox
|
|
|
|
@staticmethod
|
|
def validate(vm_type: SandboxType, options: Mapping[str, Any]) -> None:
|
|
vm_class = _get_sandbox_class(vm_type)
|
|
vm_class.validate(options)
|
|
|
|
@classmethod
|
|
def draft_id(cls, user_id: str) -> str:
|
|
return user_id
|
|
|
|
|
|
class VMConfig:
|
|
@staticmethod
|
|
def get_schema(vm_type: SandboxType) -> list[BasicProviderConfig]:
|
|
return _get_sandbox_class(vm_type).get_config_schema()
|