Files
dify/api/services/trigger/trigger_request_service.py
Yeuoly b76e17b25d feat: introduce trigger functionality (#27644)
Signed-off-by: lyzno1 <yuanyouhuilyz@gmail.com>
Co-authored-by: Stream <Stream_2@qq.com>
Co-authored-by: lyzno1 <92089059+lyzno1@users.noreply.github.com>
Co-authored-by: zhsama <torvalds@linux.do>
Co-authored-by: Harry <xh001x@hotmail.com>
Co-authored-by: lyzno1 <yuanyouhuilyz@gmail.com>
Co-authored-by: yessenia <yessenia.contact@gmail.com>
Co-authored-by: hjlarry <hjlarry@163.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: WTW0313 <twwu@dify.ai>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-11-12 17:59:37 +08:00

66 lines
1.8 KiB
Python

from collections.abc import Mapping
from typing import Any
from flask import Request
from pydantic import TypeAdapter
from core.plugin.utils.http_parser import deserialize_request, serialize_request
from extensions.ext_storage import storage
class TriggerHttpRequestCachingService:
"""
Service for caching trigger requests.
"""
_TRIGGER_STORAGE_PATH = "triggers"
@classmethod
def get_request(cls, request_id: str) -> Request:
"""
Get the request object from the storage.
Args:
request_id: The ID of the request.
Returns:
The request object.
"""
return deserialize_request(storage.load_once(f"{cls._TRIGGER_STORAGE_PATH}/{request_id}.raw"))
@classmethod
def get_payload(cls, request_id: str) -> Mapping[str, Any]:
"""
Get the payload from the storage.
Args:
request_id: The ID of the request.
Returns:
The payload.
"""
return TypeAdapter(Mapping[str, Any]).validate_json(
storage.load_once(f"{cls._TRIGGER_STORAGE_PATH}/{request_id}.payload")
)
@classmethod
def persist_request(cls, request_id: str, request: Request) -> None:
"""
Persist the request in the storage.
Args:
request_id: The ID of the request.
request: The request object.
"""
storage.save(f"{cls._TRIGGER_STORAGE_PATH}/{request_id}.raw", serialize_request(request))
@classmethod
def persist_payload(cls, request_id: str, payload: Mapping[str, Any]) -> None:
"""
Persist the payload in the storage.
"""
storage.save(
f"{cls._TRIGGER_STORAGE_PATH}/{request_id}.payload",
TypeAdapter(Mapping[str, Any]).dump_json(payload), # type: ignore
)