mirror of
https://github.com/langgenius/dify.git
synced 2026-02-13 07:01:23 -05:00
feat(docker-demuxer): implement producer-consumer pattern for stream demultiplexing
- Introduced threading to handle Docker's stdout/stderr streams, improving thread safety and preventing race conditions. - Replaced buffer-based reading with queue-based reading for stdout and stderr. - Updated read methods to handle errors and end-of-stream conditions more gracefully. - Enhanced documentation to reflect changes in the demuxing process.
This commit is contained in:
@@ -1,10 +1,13 @@
|
||||
import logging
|
||||
import socket
|
||||
import tarfile
|
||||
import threading
|
||||
from collections.abc import Mapping, Sequence
|
||||
from enum import IntEnum, StrEnum
|
||||
from functools import lru_cache
|
||||
from io import BytesIO
|
||||
from pathlib import PurePosixPath
|
||||
from queue import Queue
|
||||
from typing import Any, cast
|
||||
from uuid import uuid4
|
||||
|
||||
@@ -40,7 +43,7 @@ class DockerStreamType(IntEnum):
|
||||
|
||||
class DockerDemuxer:
|
||||
"""
|
||||
Demultiplexes Docker's combined stdout/stderr stream.
|
||||
Demultiplexes Docker's combined stdout/stderr stream using producer-consumer pattern.
|
||||
|
||||
Docker exec with tty=False sends stdout and stderr over a single socket,
|
||||
each frame prefixed with an 8-byte header:
|
||||
@@ -48,58 +51,56 @@ class DockerDemuxer:
|
||||
- Bytes 1-3: reserved (zeros)
|
||||
- Bytes 4-7: payload size (big-endian uint32)
|
||||
|
||||
This class reads frames and routes them to separate stdout/stderr buffers.
|
||||
Without demuxing, output contains binary garbage like:
|
||||
b'\\x01\\x00\\x00\\x00\\x00\\x00\\x00\\x0eHello World\\n'
|
||||
THREAD SAFETY:
|
||||
A single background thread reads frames from the socket and dispatches payloads
|
||||
to thread-safe queues. This avoids race conditions where multiple threads
|
||||
calling _read_next_frame() simultaneously caused frame header/body corruption,
|
||||
resulting in incomplete stdout/stderr output.
|
||||
"""
|
||||
|
||||
_HEADER_SIZE = 8
|
||||
|
||||
def __init__(self, sock: socket.SocketIO):
|
||||
self._sock = sock
|
||||
self._stdout_buf = bytearray()
|
||||
self._stderr_buf = bytearray()
|
||||
self._eof = False
|
||||
self._stdout_queue: Queue[bytes | None] = Queue()
|
||||
self._stderr_queue: Queue[bytes | None] = Queue()
|
||||
self._closed = False
|
||||
self._error: BaseException | None = None
|
||||
|
||||
def read_stdout(self, n: int) -> bytes:
|
||||
return self._read_from_buffer(self._stdout_buf, DockerStreamType.STDOUT, n)
|
||||
self._demux_thread = threading.Thread(
|
||||
target=self._demux_loop,
|
||||
daemon=True,
|
||||
name="docker-demuxer",
|
||||
)
|
||||
self._demux_thread.start()
|
||||
|
||||
def read_stderr(self, n: int) -> bytes:
|
||||
return self._read_from_buffer(self._stderr_buf, DockerStreamType.STDERR, n)
|
||||
def _demux_loop(self) -> None:
|
||||
try:
|
||||
while not self._closed:
|
||||
header = self._read_exact(self._HEADER_SIZE)
|
||||
if not header or len(header) < self._HEADER_SIZE:
|
||||
break
|
||||
|
||||
def _read_from_buffer(self, buffer: bytearray, target_type: DockerStreamType, n: int) -> bytes:
|
||||
while len(buffer) < n and not self._eof:
|
||||
self._read_next_frame()
|
||||
frame_type = header[0]
|
||||
payload_size = int.from_bytes(header[4:8], "big")
|
||||
|
||||
if not buffer:
|
||||
raise TransportEOFError("End of demuxed stream")
|
||||
if payload_size == 0:
|
||||
continue
|
||||
|
||||
result = bytes(buffer[:n])
|
||||
del buffer[:n]
|
||||
return result
|
||||
payload = self._read_exact(payload_size)
|
||||
if not payload:
|
||||
break
|
||||
|
||||
def _read_next_frame(self) -> None:
|
||||
header = self._read_exact(self._HEADER_SIZE)
|
||||
if not header or len(header) < self._HEADER_SIZE:
|
||||
self._eof = True
|
||||
return
|
||||
if frame_type == DockerStreamType.STDOUT:
|
||||
self._stdout_queue.put(payload)
|
||||
elif frame_type == DockerStreamType.STDERR:
|
||||
self._stderr_queue.put(payload)
|
||||
|
||||
frame_type = header[0]
|
||||
payload_size = int.from_bytes(header[4:8], "big")
|
||||
|
||||
if payload_size == 0:
|
||||
return
|
||||
|
||||
payload = self._read_exact(payload_size)
|
||||
if not payload:
|
||||
self._eof = True
|
||||
return
|
||||
|
||||
if frame_type == DockerStreamType.STDOUT:
|
||||
self._stdout_buf.extend(payload)
|
||||
elif frame_type == DockerStreamType.STDERR:
|
||||
self._stderr_buf.extend(payload)
|
||||
except BaseException as e:
|
||||
self._error = e
|
||||
finally:
|
||||
self._stdout_queue.put(None)
|
||||
self._stderr_queue.put(None)
|
||||
|
||||
def _read_exact(self, size: int) -> bytes:
|
||||
data = bytearray()
|
||||
@@ -115,18 +116,50 @@ class DockerDemuxer:
|
||||
return bytes(data) if data else b""
|
||||
return bytes(data)
|
||||
|
||||
def read_stdout(self) -> bytes:
|
||||
return self._read_from_queue(self._stdout_queue)
|
||||
|
||||
def read_stderr(self) -> bytes:
|
||||
return self._read_from_queue(self._stderr_queue)
|
||||
|
||||
def _read_from_queue(self, queue: Queue[bytes | None]) -> bytes:
|
||||
if self._error:
|
||||
raise TransportEOFError(f"Demuxer error: {self._error}") from self._error
|
||||
|
||||
chunk = queue.get()
|
||||
if chunk is None:
|
||||
if self._error:
|
||||
raise TransportEOFError(f"Demuxer error: {str(self._error)}")
|
||||
raise TransportEOFError("End of demuxed stream")
|
||||
return chunk
|
||||
|
||||
def close(self) -> None:
|
||||
if not self._closed:
|
||||
self._closed = True
|
||||
self._sock.close()
|
||||
try:
|
||||
self._sock.close()
|
||||
except Exception:
|
||||
logging.error("Failed to close Docker demuxer socket", exc_info=True)
|
||||
|
||||
|
||||
class DemuxedStdoutReader(TransportReadCloser):
|
||||
def __init__(self, demuxer: DockerDemuxer):
|
||||
self._demuxer = demuxer
|
||||
self._buffer = bytearray()
|
||||
|
||||
def read(self, n: int) -> bytes:
|
||||
return self._demuxer.read_stdout(n)
|
||||
if self._buffer:
|
||||
data = bytes(self._buffer[:n])
|
||||
del self._buffer[:n]
|
||||
if data:
|
||||
return data
|
||||
|
||||
chunk = self._demuxer.read_stdout()
|
||||
if len(chunk) <= n:
|
||||
return chunk
|
||||
|
||||
self._buffer.extend(chunk[n:])
|
||||
return chunk[:n]
|
||||
|
||||
def close(self) -> None:
|
||||
self._demuxer.close()
|
||||
@@ -135,9 +168,21 @@ class DemuxedStdoutReader(TransportReadCloser):
|
||||
class DemuxedStderrReader(TransportReadCloser):
|
||||
def __init__(self, demuxer: DockerDemuxer):
|
||||
self._demuxer = demuxer
|
||||
self._buffer = bytearray()
|
||||
|
||||
def read(self, n: int) -> bytes:
|
||||
return self._demuxer.read_stderr(n)
|
||||
if self._buffer:
|
||||
data = bytes(self._buffer[:n])
|
||||
del self._buffer[:n]
|
||||
if data:
|
||||
return data
|
||||
|
||||
chunk = self._demuxer.read_stderr()
|
||||
if len(chunk) <= n:
|
||||
return chunk
|
||||
|
||||
self._buffer.extend(chunk[n:])
|
||||
return chunk[:n]
|
||||
|
||||
def close(self) -> None:
|
||||
self._demuxer.close()
|
||||
|
||||
Reference in New Issue
Block a user