mirror of
https://github.com/langgenius/dify.git
synced 2026-02-19 07:01:42 -05:00
feat: implement keepalive mechanism for E2B sandbox
- Added a keepalive thread to maintain the E2B sandbox timeout, preventing premature termination. - Introduced a stop event to manage the lifecycle of the keepalive thread. - Refactored the sandbox initialization to include the new keepalive functionality. - Enhanced logging to capture failures in refreshing the sandbox timeout.
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
import logging
|
||||
import posixpath
|
||||
import shlex
|
||||
import threading
|
||||
@@ -32,6 +33,9 @@ from core.virtual_environment.channel.transport import (
|
||||
TransportWriteCloser,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
"""
|
||||
import logging
|
||||
from collections.abc import Mapping
|
||||
@@ -96,6 +100,7 @@ class E2BEnvironment(VirtualEnvironment):
|
||||
|
||||
class StoreKey(StrEnum):
|
||||
SANDBOX = "sandbox"
|
||||
KEEPALIVE_STOP = "keepalive_stop"
|
||||
|
||||
@classmethod
|
||||
def get_config_schema(cls) -> list[BasicProviderConfig]:
|
||||
@@ -107,7 +112,9 @@ class E2BEnvironment(VirtualEnvironment):
|
||||
|
||||
@classmethod
|
||||
def validate(cls, options: Mapping[str, Any]) -> None:
|
||||
from e2b.exceptions import AuthenticationException # type: ignore[import-untyped]
|
||||
from e2b.exceptions import (
|
||||
AuthenticationException, # type: ignore[import-untyped]
|
||||
)
|
||||
|
||||
api_key = options.get(cls.OptionsKey.API_KEY, "")
|
||||
if not api_key:
|
||||
@@ -140,12 +147,20 @@ class E2BEnvironment(VirtualEnvironment):
|
||||
arch_part = system_parts[0]
|
||||
os_part = system_parts[1] if len(system_parts) > 1 else ""
|
||||
|
||||
stop_event = threading.Event()
|
||||
threading.Thread(
|
||||
target=self._keepalive_thread,
|
||||
args=(sandbox, stop_event),
|
||||
daemon=True,
|
||||
).start()
|
||||
|
||||
return Metadata(
|
||||
id=info.sandbox_id,
|
||||
arch=self._convert_architecture(arch_part.strip()),
|
||||
os=self._convert_operating_system(os_part.strip()),
|
||||
store={
|
||||
self.StoreKey.SANDBOX: sandbox,
|
||||
self.StoreKey.KEEPALIVE_STOP: stop_event,
|
||||
},
|
||||
)
|
||||
|
||||
@@ -153,6 +168,10 @@ class E2BEnvironment(VirtualEnvironment):
|
||||
"""
|
||||
Release the E2B virtual environment.
|
||||
"""
|
||||
stop_event: threading.Event | None = self.metadata.store.get(self.StoreKey.KEEPALIVE_STOP)
|
||||
if stop_event:
|
||||
stop_event.set()
|
||||
|
||||
if not Sandbox.kill(api_key=self.api_key, sandbox_id=self.metadata.id):
|
||||
raise Exception(f"Failed to release E2B sandbox with ID: {self.metadata.id}")
|
||||
|
||||
@@ -278,6 +297,14 @@ class E2BEnvironment(VirtualEnvironment):
|
||||
stdout_stream.close()
|
||||
stderr_stream.close()
|
||||
|
||||
def _keepalive_thread(self, sandbox: Sandbox, stop_event: threading.Event) -> None:
|
||||
while not stop_event.wait(timeout=60):
|
||||
try:
|
||||
sandbox.set_timeout(300)
|
||||
except Exception:
|
||||
logger.warning("Failed to refresh E2B sandbox timeout, sandbox may have been killed")
|
||||
break
|
||||
|
||||
@cached_property
|
||||
def api_key(self) -> str:
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user