diff --git a/api/core/virtual_environment/providers/docker_daemon_sandbox.py b/api/core/virtual_environment/providers/docker_daemon_sandbox.py index 86b6c5a9f5..d34e4aa6c0 100644 --- a/api/core/virtual_environment/providers/docker_daemon_sandbox.py +++ b/api/core/virtual_environment/providers/docker_daemon_sandbox.py @@ -1,10 +1,13 @@ +import logging import socket import tarfile +import threading from collections.abc import Mapping, Sequence from enum import IntEnum, StrEnum from functools import lru_cache from io import BytesIO from pathlib import PurePosixPath +from queue import Queue from typing import Any, cast from uuid import uuid4 @@ -40,7 +43,7 @@ class DockerStreamType(IntEnum): class DockerDemuxer: """ - Demultiplexes Docker's combined stdout/stderr stream. + Demultiplexes Docker's combined stdout/stderr stream using producer-consumer pattern. Docker exec with tty=False sends stdout and stderr over a single socket, each frame prefixed with an 8-byte header: @@ -48,58 +51,56 @@ class DockerDemuxer: - Bytes 1-3: reserved (zeros) - Bytes 4-7: payload size (big-endian uint32) - This class reads frames and routes them to separate stdout/stderr buffers. - Without demuxing, output contains binary garbage like: - b'\\x01\\x00\\x00\\x00\\x00\\x00\\x00\\x0eHello World\\n' + THREAD SAFETY: + A single background thread reads frames from the socket and dispatches payloads + to thread-safe queues. This avoids race conditions where multiple threads + calling _read_next_frame() simultaneously caused frame header/body corruption, + resulting in incomplete stdout/stderr output. """ _HEADER_SIZE = 8 def __init__(self, sock: socket.SocketIO): self._sock = sock - self._stdout_buf = bytearray() - self._stderr_buf = bytearray() - self._eof = False + self._stdout_queue: Queue[bytes | None] = Queue() + self._stderr_queue: Queue[bytes | None] = Queue() self._closed = False + self._error: BaseException | None = None - def read_stdout(self, n: int) -> bytes: - return self._read_from_buffer(self._stdout_buf, DockerStreamType.STDOUT, n) + self._demux_thread = threading.Thread( + target=self._demux_loop, + daemon=True, + name="docker-demuxer", + ) + self._demux_thread.start() - def read_stderr(self, n: int) -> bytes: - return self._read_from_buffer(self._stderr_buf, DockerStreamType.STDERR, n) + def _demux_loop(self) -> None: + try: + while not self._closed: + header = self._read_exact(self._HEADER_SIZE) + if not header or len(header) < self._HEADER_SIZE: + break - def _read_from_buffer(self, buffer: bytearray, target_type: DockerStreamType, n: int) -> bytes: - while len(buffer) < n and not self._eof: - self._read_next_frame() + frame_type = header[0] + payload_size = int.from_bytes(header[4:8], "big") - if not buffer: - raise TransportEOFError("End of demuxed stream") + if payload_size == 0: + continue - result = bytes(buffer[:n]) - del buffer[:n] - return result + payload = self._read_exact(payload_size) + if not payload: + break - def _read_next_frame(self) -> None: - header = self._read_exact(self._HEADER_SIZE) - if not header or len(header) < self._HEADER_SIZE: - self._eof = True - return + if frame_type == DockerStreamType.STDOUT: + self._stdout_queue.put(payload) + elif frame_type == DockerStreamType.STDERR: + self._stderr_queue.put(payload) - frame_type = header[0] - payload_size = int.from_bytes(header[4:8], "big") - - if payload_size == 0: - return - - payload = self._read_exact(payload_size) - if not payload: - self._eof = True - return - - if frame_type == DockerStreamType.STDOUT: - self._stdout_buf.extend(payload) - elif frame_type == DockerStreamType.STDERR: - self._stderr_buf.extend(payload) + except BaseException as e: + self._error = e + finally: + self._stdout_queue.put(None) + self._stderr_queue.put(None) def _read_exact(self, size: int) -> bytes: data = bytearray() @@ -115,18 +116,50 @@ class DockerDemuxer: return bytes(data) if data else b"" return bytes(data) + def read_stdout(self) -> bytes: + return self._read_from_queue(self._stdout_queue) + + def read_stderr(self) -> bytes: + return self._read_from_queue(self._stderr_queue) + + def _read_from_queue(self, queue: Queue[bytes | None]) -> bytes: + if self._error: + raise TransportEOFError(f"Demuxer error: {self._error}") from self._error + + chunk = queue.get() + if chunk is None: + if self._error: + raise TransportEOFError(f"Demuxer error: {str(self._error)}") + raise TransportEOFError("End of demuxed stream") + return chunk + def close(self) -> None: if not self._closed: self._closed = True - self._sock.close() + try: + self._sock.close() + except Exception: + logging.error("Failed to close Docker demuxer socket", exc_info=True) class DemuxedStdoutReader(TransportReadCloser): def __init__(self, demuxer: DockerDemuxer): self._demuxer = demuxer + self._buffer = bytearray() def read(self, n: int) -> bytes: - return self._demuxer.read_stdout(n) + if self._buffer: + data = bytes(self._buffer[:n]) + del self._buffer[:n] + if data: + return data + + chunk = self._demuxer.read_stdout() + if len(chunk) <= n: + return chunk + + self._buffer.extend(chunk[n:]) + return chunk[:n] def close(self) -> None: self._demuxer.close() @@ -135,9 +168,21 @@ class DemuxedStdoutReader(TransportReadCloser): class DemuxedStderrReader(TransportReadCloser): def __init__(self, demuxer: DockerDemuxer): self._demuxer = demuxer + self._buffer = bytearray() def read(self, n: int) -> bytes: - return self._demuxer.read_stderr(n) + if self._buffer: + data = bytes(self._buffer[:n]) + del self._buffer[:n] + if data: + return data + + chunk = self._demuxer.read_stderr() + if len(chunk) <= n: + return chunk + + self._buffer.extend(chunk[n:]) + return chunk[:n] def close(self) -> None: self._demuxer.close()