Files
dify/api/core/rag/pipeline/queue.py
longbingljw 36e6205c6c feat:update latest commit (#51)
* test: adding some web tests (#27792)

* feat: add validation to prevent saving empty opening statement in conversation opener modal (#27843)

* fix(web): improve the consistency of the inputs-form UI (#27837)

* fix(web): increase z-index of PortalToFollowElemContent (#27823)

* fix: installation_id is missing when in tools page (#27849)

* fix: avoid passing empty uniqueIdentifier to InstallFromMarketplace (#27802)

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* test: create new test scripts and update some existing test scripts o… (#27850)

* feat: change feedback to forum (#27862)

* chore: translate i18n files and update type definitions (#27868)

Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com>

* Fix/template transformer line number (#27867)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com>

* bump vite to 6.4.1 (#27877)

* Add WEAVIATE_GRPC_ENDPOINT as designed in weaviate migration guide (#27861)

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>

* Fix: correct DraftWorkflowApi.post response model (#27289)

Signed-off-by: Yongtao Huang <yongtaoh2022@gmail.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>

* fix Version 2.0.0-beta.2: Chat annotations Api Error #25506  (#27206)

Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: Asuka Minato <i@asukaminato.eu.org>

* fix jina reader creadential migration command (#27883)

* fix agent putout the output of workflow-tool twice (#26835) (#27087)

* fix jina reader transform (#27922)

* fix: prevent fetch version info in enterprise edition (#27923)

* fix(api): fix `VariablePool.get` adding unexpected keys to variable_dictionary (#26767)

Co-authored-by: -LAN- <laipz8200@outlook.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* refactor: implement tenant self queue for rag tasks (#27559)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: -LAN- <laipz8200@outlook.com>

* fix: bump brotli to 1.2.0 resloved CVE-2025-6176 (#27950)

Signed-off-by: kenwoodjw <blackxin55+@gmail.com>

---------

Signed-off-by: Yongtao Huang <yongtaoh2022@gmail.com>
Signed-off-by: kenwoodjw <blackxin55+@gmail.com>
Co-authored-by: aka James4u <smart.jamesjin@gmail.com>
Co-authored-by: Novice <novice12185727@gmail.com>
Co-authored-by: yangzheli <43645580+yangzheli@users.noreply.github.com>
Co-authored-by: Elliott <105957288+Elliott-byte@users.noreply.github.com>
Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com>
Co-authored-by: johnny0120 <johnny0120@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: Gritty_dev <101377478+codomposer@users.noreply.github.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: wangjifeng <163279492+kk-wangjifeng@users.noreply.github.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Boris Polonsky <BorisPolonsky@users.noreply.github.com>
Co-authored-by: Yongtao Huang <yongtaoh2022@gmail.com>
Co-authored-by: Cursx <33718736+Cursx@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Asuka Minato <i@asukaminato.eu.org>
Co-authored-by: Jyong <76649700+JohnJyong@users.noreply.github.com>
Co-authored-by: red_sun <56100962+redSun64@users.noreply.github.com>
Co-authored-by: NFish <douxc512@gmail.com>
Co-authored-by: QuantumGhost <obelisk.reg+git@gmail.com>
Co-authored-by: -LAN- <laipz8200@outlook.com>
Co-authored-by: hj24 <huangjian@dify.ai>
Co-authored-by: kenwoodjw <blackxin55+@gmail.com>
2025-11-07 17:25:58 +08:00

80 lines
2.6 KiB
Python

import json
from collections.abc import Sequence
from typing import Any
from pydantic import BaseModel, ValidationError
from extensions.ext_redis import redis_client
_DEFAULT_TASK_TTL = 60 * 60 # 1 hour
class TaskWrapper(BaseModel):
data: Any
def serialize(self) -> str:
return self.model_dump_json()
@classmethod
def deserialize(cls, serialized_data: str) -> "TaskWrapper":
return cls.model_validate_json(serialized_data)
class TenantIsolatedTaskQueue:
"""
Simple queue for tenant isolated tasks, used for rag related tenant tasks isolation.
It uses Redis list to store tasks, and Redis key to store task waiting flag.
Support tasks that can be serialized by json.
"""
def __init__(self, tenant_id: str, unique_key: str):
self._tenant_id = tenant_id
self._unique_key = unique_key
self._queue = f"tenant_self_{unique_key}_task_queue:{tenant_id}"
self._task_key = f"tenant_{unique_key}_task:{tenant_id}"
def get_task_key(self):
return redis_client.get(self._task_key)
def set_task_waiting_time(self, ttl: int = _DEFAULT_TASK_TTL):
redis_client.setex(self._task_key, ttl, 1)
def delete_task_key(self):
redis_client.delete(self._task_key)
def push_tasks(self, tasks: Sequence[Any]):
serialized_tasks = []
for task in tasks:
# Store str list directly, maintaining full compatibility for pipeline scenarios
if isinstance(task, str):
serialized_tasks.append(task)
else:
# Use TaskWrapper to do JSON serialization for non-string tasks
wrapper = TaskWrapper(data=task)
serialized_data = wrapper.serialize()
serialized_tasks.append(serialized_data)
redis_client.lpush(self._queue, *serialized_tasks)
def pull_tasks(self, count: int = 1) -> Sequence[Any]:
if count <= 0:
return []
tasks = []
for _ in range(count):
serialized_task = redis_client.rpop(self._queue)
if not serialized_task:
break
if isinstance(serialized_task, bytes):
serialized_task = serialized_task.decode("utf-8")
try:
wrapper = TaskWrapper.deserialize(serialized_task)
tasks.append(wrapper.data)
except (json.JSONDecodeError, ValidationError, TypeError, ValueError):
# Fall back to raw string for legacy format or invalid JSON
tasks.append(serialized_task)
return tasks