diff --git a/.github/workflows/main-ci.yml b/.github/workflows/main-ci.yml
index 4f00a9101c..a19cb50abc 100644
--- a/.github/workflows/main-ci.yml
+++ b/.github/workflows/main-ci.yml
@@ -63,8 +63,9 @@ jobs:
if: needs.check-changes.outputs.web-changed == 'true'
uses: ./.github/workflows/web-tests.yml
with:
- base_sha: ${{ github.event_name == 'pull_request' && github.event.pull_request.base.sha || github.event.before }}
- head_sha: ${{ github.event_name == 'pull_request' && github.event.pull_request.head.sha || github.sha }}
+ base_sha: ${{ github.event.before || github.event.pull_request.base.sha }}
+ diff_range_mode: ${{ github.event.before && 'exact' || 'merge-base' }}
+ head_sha: ${{ github.event.after || github.event.pull_request.head.sha || github.sha }}
style-check:
name: Style Check
diff --git a/.github/workflows/web-tests.yml b/.github/workflows/web-tests.yml
index 443a22ea93..11222146cf 100644
--- a/.github/workflows/web-tests.yml
+++ b/.github/workflows/web-tests.yml
@@ -6,6 +6,9 @@ on:
base_sha:
required: false
type: string
+ diff_range_mode:
+ required: false
+ type: string
head_sha:
required: false
type: string
@@ -86,13 +89,24 @@ jobs:
- name: Merge reports
run: vp test --merge-reports --reporter=json --reporter=agent --coverage
- - name: Check app/components diff coverage
+ - name: Report app/components baseline coverage
+ run: node ./scripts/report-components-coverage-baseline.mjs
+
+ - name: Report app/components test touch
env:
BASE_SHA: ${{ inputs.base_sha }}
+ DIFF_RANGE_MODE: ${{ inputs.diff_range_mode }}
+ HEAD_SHA: ${{ inputs.head_sha }}
+ run: node ./scripts/report-components-test-touch.mjs
+
+ - name: Check app/components pure diff coverage
+ env:
+ BASE_SHA: ${{ inputs.base_sha }}
+ DIFF_RANGE_MODE: ${{ inputs.diff_range_mode }}
HEAD_SHA: ${{ inputs.head_sha }}
run: node ./scripts/check-components-diff-coverage.mjs
- - name: Coverage Summary
+ - name: Check Coverage Summary
if: always()
id: coverage-summary
run: |
@@ -101,313 +115,15 @@ jobs:
COVERAGE_FILE="coverage/coverage-final.json"
COVERAGE_SUMMARY_FILE="coverage/coverage-summary.json"
- if [ ! -f "$COVERAGE_FILE" ] && [ ! -f "$COVERAGE_SUMMARY_FILE" ]; then
- echo "has_coverage=false" >> "$GITHUB_OUTPUT"
- echo "### 🚨 Test Coverage Report :test_tube:" >> "$GITHUB_STEP_SUMMARY"
- echo "Coverage data not found. Ensure Vitest runs with coverage enabled." >> "$GITHUB_STEP_SUMMARY"
+ if [ -f "$COVERAGE_FILE" ] || [ -f "$COVERAGE_SUMMARY_FILE" ]; then
+ echo "has_coverage=true" >> "$GITHUB_OUTPUT"
exit 0
fi
- echo "has_coverage=true" >> "$GITHUB_OUTPUT"
-
- node <<'NODE' >> "$GITHUB_STEP_SUMMARY"
- const fs = require('fs');
- const path = require('path');
- let libCoverage = null;
-
- try {
- libCoverage = require('istanbul-lib-coverage');
- } catch (error) {
- libCoverage = null;
- }
-
- const summaryPath = path.join('coverage', 'coverage-summary.json');
- const finalPath = path.join('coverage', 'coverage-final.json');
-
- const hasSummary = fs.existsSync(summaryPath);
- const hasFinal = fs.existsSync(finalPath);
-
- if (!hasSummary && !hasFinal) {
- console.log('### Test Coverage Summary :test_tube:');
- console.log('');
- console.log('No coverage data found.');
- process.exit(0);
- }
-
- const summary = hasSummary
- ? JSON.parse(fs.readFileSync(summaryPath, 'utf8'))
- : null;
- const coverage = hasFinal
- ? JSON.parse(fs.readFileSync(finalPath, 'utf8'))
- : null;
-
- const getLineCoverageFromStatements = (statementMap, statementHits) => {
- const lineHits = {};
-
- if (!statementMap || !statementHits) {
- return lineHits;
- }
-
- Object.entries(statementMap).forEach(([key, statement]) => {
- const line = statement?.start?.line;
- if (!line) {
- return;
- }
- const hits = statementHits[key] ?? 0;
- const previous = lineHits[line];
- lineHits[line] = previous === undefined ? hits : Math.max(previous, hits);
- });
-
- return lineHits;
- };
-
- const getFileCoverage = (entry) => (
- libCoverage ? libCoverage.createFileCoverage(entry) : null
- );
-
- const getLineHits = (entry, fileCoverage) => {
- const lineHits = entry.l ?? {};
- if (Object.keys(lineHits).length > 0) {
- return lineHits;
- }
- if (fileCoverage) {
- return fileCoverage.getLineCoverage();
- }
- return getLineCoverageFromStatements(entry.statementMap ?? {}, entry.s ?? {});
- };
-
- const getUncoveredLines = (entry, fileCoverage, lineHits) => {
- if (lineHits && Object.keys(lineHits).length > 0) {
- return Object.entries(lineHits)
- .filter(([, count]) => count === 0)
- .map(([line]) => Number(line))
- .sort((a, b) => a - b);
- }
- if (fileCoverage) {
- return fileCoverage.getUncoveredLines();
- }
- return [];
- };
-
- const totals = {
- lines: { covered: 0, total: 0 },
- statements: { covered: 0, total: 0 },
- branches: { covered: 0, total: 0 },
- functions: { covered: 0, total: 0 },
- };
- const fileSummaries = [];
-
- if (summary) {
- const totalEntry = summary.total ?? {};
- ['lines', 'statements', 'branches', 'functions'].forEach((key) => {
- if (totalEntry[key]) {
- totals[key].covered = totalEntry[key].covered ?? 0;
- totals[key].total = totalEntry[key].total ?? 0;
- }
- });
-
- Object.entries(summary)
- .filter(([file]) => file !== 'total')
- .forEach(([file, data]) => {
- fileSummaries.push({
- file,
- pct: data.lines?.pct ?? data.statements?.pct ?? 0,
- lines: {
- covered: data.lines?.covered ?? 0,
- total: data.lines?.total ?? 0,
- },
- });
- });
- } else if (coverage) {
- Object.entries(coverage).forEach(([file, entry]) => {
- const fileCoverage = getFileCoverage(entry);
- const lineHits = getLineHits(entry, fileCoverage);
- const statementHits = entry.s ?? {};
- const branchHits = entry.b ?? {};
- const functionHits = entry.f ?? {};
-
- const lineTotal = Object.keys(lineHits).length;
- const lineCovered = Object.values(lineHits).filter((n) => n > 0).length;
-
- const statementTotal = Object.keys(statementHits).length;
- const statementCovered = Object.values(statementHits).filter((n) => n > 0).length;
-
- const branchTotal = Object.values(branchHits).reduce((acc, branches) => acc + branches.length, 0);
- const branchCovered = Object.values(branchHits).reduce(
- (acc, branches) => acc + branches.filter((n) => n > 0).length,
- 0,
- );
-
- const functionTotal = Object.keys(functionHits).length;
- const functionCovered = Object.values(functionHits).filter((n) => n > 0).length;
-
- totals.lines.total += lineTotal;
- totals.lines.covered += lineCovered;
- totals.statements.total += statementTotal;
- totals.statements.covered += statementCovered;
- totals.branches.total += branchTotal;
- totals.branches.covered += branchCovered;
- totals.functions.total += functionTotal;
- totals.functions.covered += functionCovered;
-
- const pct = (covered, tot) => (tot > 0 ? (covered / tot) * 100 : 0);
-
- fileSummaries.push({
- file,
- pct: pct(lineCovered || statementCovered, lineTotal || statementTotal),
- lines: {
- covered: lineCovered || statementCovered,
- total: lineTotal || statementTotal,
- },
- });
- });
- }
-
- const pct = (covered, tot) => (tot > 0 ? ((covered / tot) * 100).toFixed(2) : '0.00');
-
- console.log('### Test Coverage Summary :test_tube:');
- console.log('');
- console.log('| Metric | Coverage | Covered / Total |');
- console.log('|--------|----------|-----------------|');
- console.log(`| Lines | ${pct(totals.lines.covered, totals.lines.total)}% | ${totals.lines.covered} / ${totals.lines.total} |`);
- console.log(`| Statements | ${pct(totals.statements.covered, totals.statements.total)}% | ${totals.statements.covered} / ${totals.statements.total} |`);
- console.log(`| Branches | ${pct(totals.branches.covered, totals.branches.total)}% | ${totals.branches.covered} / ${totals.branches.total} |`);
- console.log(`| Functions | ${pct(totals.functions.covered, totals.functions.total)}% | ${totals.functions.covered} / ${totals.functions.total} |`);
-
- console.log('');
- console.log('File coverage (lowest lines first)
');
- console.log('');
- console.log('```');
- fileSummaries
- .sort((a, b) => (a.pct - b.pct) || (b.lines.total - a.lines.total))
- .slice(0, 25)
- .forEach(({ file, pct, lines }) => {
- console.log(`${pct.toFixed(2)}%\t${lines.covered}/${lines.total}\t${file}`);
- });
- console.log('```');
- console.log(' ');
-
- if (coverage) {
- const pctValue = (covered, tot) => {
- if (tot === 0) {
- return '0';
- }
- return ((covered / tot) * 100)
- .toFixed(2)
- .replace(/\.?0+$/, '');
- };
-
- const formatLineRanges = (lines) => {
- if (lines.length === 0) {
- return '';
- }
- const ranges = [];
- let start = lines[0];
- let end = lines[0];
-
- for (let i = 1; i < lines.length; i += 1) {
- const current = lines[i];
- if (current === end + 1) {
- end = current;
- continue;
- }
- ranges.push(start === end ? `${start}` : `${start}-${end}`);
- start = current;
- end = current;
- }
- ranges.push(start === end ? `${start}` : `${start}-${end}`);
- return ranges.join(',');
- };
-
- const tableTotals = {
- statements: { covered: 0, total: 0 },
- branches: { covered: 0, total: 0 },
- functions: { covered: 0, total: 0 },
- lines: { covered: 0, total: 0 },
- };
- const tableRows = Object.entries(coverage)
- .map(([file, entry]) => {
- const fileCoverage = getFileCoverage(entry);
- const lineHits = getLineHits(entry, fileCoverage);
- const statementHits = entry.s ?? {};
- const branchHits = entry.b ?? {};
- const functionHits = entry.f ?? {};
-
- const lineTotal = Object.keys(lineHits).length;
- const lineCovered = Object.values(lineHits).filter((n) => n > 0).length;
- const statementTotal = Object.keys(statementHits).length;
- const statementCovered = Object.values(statementHits).filter((n) => n > 0).length;
- const branchTotal = Object.values(branchHits).reduce((acc, branches) => acc + branches.length, 0);
- const branchCovered = Object.values(branchHits).reduce(
- (acc, branches) => acc + branches.filter((n) => n > 0).length,
- 0,
- );
- const functionTotal = Object.keys(functionHits).length;
- const functionCovered = Object.values(functionHits).filter((n) => n > 0).length;
-
- tableTotals.lines.total += lineTotal;
- tableTotals.lines.covered += lineCovered;
- tableTotals.statements.total += statementTotal;
- tableTotals.statements.covered += statementCovered;
- tableTotals.branches.total += branchTotal;
- tableTotals.branches.covered += branchCovered;
- tableTotals.functions.total += functionTotal;
- tableTotals.functions.covered += functionCovered;
-
- const uncoveredLines = getUncoveredLines(entry, fileCoverage, lineHits);
-
- const filePath = entry.path ?? file;
- const relativePath = path.isAbsolute(filePath)
- ? path.relative(process.cwd(), filePath)
- : filePath;
-
- return {
- file: relativePath || file,
- statements: pctValue(statementCovered, statementTotal),
- branches: pctValue(branchCovered, branchTotal),
- functions: pctValue(functionCovered, functionTotal),
- lines: pctValue(lineCovered, lineTotal),
- uncovered: formatLineRanges(uncoveredLines),
- };
- })
- .sort((a, b) => a.file.localeCompare(b.file));
-
- const columns = [
- { key: 'file', header: 'File', align: 'left' },
- { key: 'statements', header: '% Stmts', align: 'right' },
- { key: 'branches', header: '% Branch', align: 'right' },
- { key: 'functions', header: '% Funcs', align: 'right' },
- { key: 'lines', header: '% Lines', align: 'right' },
- { key: 'uncovered', header: 'Uncovered Line #s', align: 'left' },
- ];
-
- const allFilesRow = {
- file: 'All files',
- statements: pctValue(tableTotals.statements.covered, tableTotals.statements.total),
- branches: pctValue(tableTotals.branches.covered, tableTotals.branches.total),
- functions: pctValue(tableTotals.functions.covered, tableTotals.functions.total),
- lines: pctValue(tableTotals.lines.covered, tableTotals.lines.total),
- uncovered: '',
- };
-
- const rowsForOutput = [allFilesRow, ...tableRows];
- const formatRow = (row) => `| ${columns
- .map(({ key }) => String(row[key] ?? ''))
- .join(' | ')} |`;
- const headerRow = `| ${columns.map(({ header }) => header).join(' | ')} |`;
- const dividerRow = `| ${columns
- .map(({ align }) => (align === 'right' ? '---:' : ':---'))
- .join(' | ')} |`;
-
- console.log('');
- console.log('Vitest coverage table
');
- console.log('');
- console.log(headerRow);
- console.log(dividerRow);
- rowsForOutput.forEach((row) => console.log(formatRow(row)));
- console.log(' ');
- }
- NODE
+ echo "has_coverage=false" >> "$GITHUB_OUTPUT"
+ echo "### 🚨 app/components Diff Coverage" >> "$GITHUB_STEP_SUMMARY"
+ echo "" >> "$GITHUB_STEP_SUMMARY"
+ echo "Coverage artifacts not found. Ensure Vitest merge reports ran with coverage enabled." >> "$GITHUB_STEP_SUMMARY"
- name: Upload Coverage Artifact
if: steps.coverage-summary.outputs.has_coverage == 'true'
diff --git a/api/.env.example b/api/.env.example
index 8195a3c074..40e1c2dfdf 100644
--- a/api/.env.example
+++ b/api/.env.example
@@ -737,24 +737,25 @@ SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS=30
SANDBOX_EXPIRED_RECORDS_CLEAN_TASK_LOCK_TTL=90000
-# Redis URL used for PubSub between API and
+# Redis URL used for event bus between API and
# celery worker
# defaults to url constructed from `REDIS_*`
# configurations
-PUBSUB_REDIS_URL=
-# Pub/sub channel type for streaming events.
-# valid options are:
+EVENT_BUS_REDIS_URL=
+# Event transport type. Options are:
#
-# - pubsub: for normal Pub/Sub
-# - sharded: for sharded Pub/Sub
+# - pubsub: normal Pub/Sub (at-most-once)
+# - sharded: sharded Pub/Sub (at-most-once)
+# - streams: Redis Streams (at-least-once, recommended to avoid subscriber races)
#
-# It's highly recommended to use sharded Pub/Sub AND redis cluster
-# for large deployments.
-PUBSUB_REDIS_CHANNEL_TYPE=pubsub
-# Whether to use Redis cluster mode while running
-# PubSub.
+# Note: Before enabling 'streams' in production, estimate your expected event volume and retention needs.
+# Configure Redis memory limits and stream trimming appropriately (e.g., MAXLEN and key expiry) to reduce
+# the risk of data loss from Redis auto-eviction under memory pressure.
+# Also accepts ENV: EVENT_BUS_REDIS_CHANNEL_TYPE.
+EVENT_BUS_REDIS_CHANNEL_TYPE=pubsub
+# Whether to use Redis cluster mode while use redis as event bus.
# It's highly recommended to enable this for large deployments.
-PUBSUB_REDIS_USE_CLUSTERS=false
+EVENT_BUS_REDIS_USE_CLUSTERS=false
# Whether to Enable human input timeout check task
ENABLE_HUMAN_INPUT_TIMEOUT_TASK=true
diff --git a/api/configs/middleware/cache/redis_pubsub_config.py b/api/configs/middleware/cache/redis_pubsub_config.py
index 8cddc5677a..d30831a0ec 100644
--- a/api/configs/middleware/cache/redis_pubsub_config.py
+++ b/api/configs/middleware/cache/redis_pubsub_config.py
@@ -41,10 +41,10 @@ class RedisPubSubConfig(BaseSettings, RedisConfigDefaultsMixin):
)
PUBSUB_REDIS_USE_CLUSTERS: bool = Field(
- validation_alias=AliasChoices("EVENT_BUS_REDIS_CLUSTERS", "PUBSUB_REDIS_USE_CLUSTERS"),
+ validation_alias=AliasChoices("EVENT_BUS_REDIS_USE_CLUSTERS", "PUBSUB_REDIS_USE_CLUSTERS"),
description=(
"Enable Redis Cluster mode for pub/sub or streams transport. Recommended for large deployments. "
- "Also accepts ENV: EVENT_BUS_REDIS_CLUSTERS."
+ "Also accepts ENV: EVENT_BUS_REDIS_USE_CLUSTERS."
),
default=False,
)
diff --git a/api/core/indexing_runner.py b/api/core/indexing_runner.py
index 7eebd9ec95..275c1fc110 100644
--- a/api/core/indexing_runner.py
+++ b/api/core/indexing_runner.py
@@ -5,6 +5,7 @@ import re
import threading
import time
import uuid
+from collections.abc import Mapping
from typing import Any
from flask import Flask, current_app
@@ -37,7 +38,7 @@ from extensions.ext_storage import storage
from libs import helper
from libs.datetime_utils import naive_utc_now
from models import Account
-from models.dataset import ChildChunk, Dataset, DatasetProcessRule, DocumentSegment
+from models.dataset import AutomaticRulesConfig, ChildChunk, Dataset, DatasetProcessRule, DocumentSegment
from models.dataset import Document as DatasetDocument
from models.model import UploadFile
from services.feature_service import FeatureService
@@ -265,7 +266,7 @@ class IndexingRunner:
self,
tenant_id: str,
extract_settings: list[ExtractSetting],
- tmp_processing_rule: dict,
+ tmp_processing_rule: Mapping[str, Any],
doc_form: str | None = None,
doc_language: str = "English",
dataset_id: str | None = None,
@@ -376,7 +377,7 @@ class IndexingRunner:
return IndexingEstimate(total_segments=total_segments, preview=preview_texts)
def _extract(
- self, index_processor: BaseIndexProcessor, dataset_document: DatasetDocument, process_rule: dict
+ self, index_processor: BaseIndexProcessor, dataset_document: DatasetDocument, process_rule: Mapping[str, Any]
) -> list[Document]:
data_source_info = dataset_document.data_source_info_dict
text_docs = []
@@ -543,6 +544,7 @@ class IndexingRunner:
"""
Clean the document text according to the processing rules.
"""
+ rules: AutomaticRulesConfig | dict[str, Any]
if processing_rule.mode == "automatic":
rules = DatasetProcessRule.AUTOMATIC_RULES
else:
@@ -756,7 +758,7 @@ class IndexingRunner:
dataset: Dataset,
text_docs: list[Document],
doc_language: str,
- process_rule: dict,
+ process_rule: Mapping[str, Any],
current_user: Account | None = None,
) -> list[Document]:
# get embedding model instance
diff --git a/api/core/mcp/auth/auth_flow.py b/api/core/mcp/auth/auth_flow.py
index aef1afb235..d015769b54 100644
--- a/api/core/mcp/auth/auth_flow.py
+++ b/api/core/mcp/auth/auth_flow.py
@@ -55,15 +55,31 @@ def build_protected_resource_metadata_discovery_urls(
"""
urls = []
+ parsed_server_url = urlparse(server_url)
+ base_url = f"{parsed_server_url.scheme}://{parsed_server_url.netloc}"
+ path = parsed_server_url.path.rstrip("/")
+
# First priority: URL from WWW-Authenticate header
if www_auth_resource_metadata_url:
- urls.append(www_auth_resource_metadata_url)
+ parsed_metadata_url = urlparse(www_auth_resource_metadata_url)
+ normalized_metadata_url = None
+ if parsed_metadata_url.scheme and parsed_metadata_url.netloc:
+ normalized_metadata_url = www_auth_resource_metadata_url
+ elif not parsed_metadata_url.scheme and parsed_metadata_url.netloc:
+ normalized_metadata_url = f"{parsed_server_url.scheme}:{www_auth_resource_metadata_url}"
+ elif (
+ not parsed_metadata_url.scheme
+ and not parsed_metadata_url.netloc
+ and parsed_metadata_url.path.startswith("/")
+ ):
+ first_segment = parsed_metadata_url.path.lstrip("/").split("/", 1)[0]
+ if first_segment == ".well-known" or "." not in first_segment:
+ normalized_metadata_url = urljoin(base_url, parsed_metadata_url.path)
+
+ if normalized_metadata_url:
+ urls.append(normalized_metadata_url)
# Fallback: construct from server URL
- parsed = urlparse(server_url)
- base_url = f"{parsed.scheme}://{parsed.netloc}"
- path = parsed.path.rstrip("/")
-
# Priority 2: With path insertion (e.g., /.well-known/oauth-protected-resource/public/mcp)
if path:
path_url = f"{base_url}/.well-known/oauth-protected-resource{path}"
diff --git a/api/dify_graph/graph_engine/response_coordinator/__init__.py b/api/dify_graph/graph_engine/response_coordinator/__init__.py
index 2a80d316e8..e11d31199c 100644
--- a/api/dify_graph/graph_engine/response_coordinator/__init__.py
+++ b/api/dify_graph/graph_engine/response_coordinator/__init__.py
@@ -6,6 +6,5 @@ of responses based on upstream node outputs and constants.
"""
from .coordinator import ResponseStreamCoordinator
-from .session import RESPONSE_SESSION_NODE_TYPES
-__all__ = ["RESPONSE_SESSION_NODE_TYPES", "ResponseStreamCoordinator"]
+__all__ = ["ResponseStreamCoordinator"]
diff --git a/api/dify_graph/graph_engine/response_coordinator/session.py b/api/dify_graph/graph_engine/response_coordinator/session.py
index 99ac1b5edf..11a9f5dac5 100644
--- a/api/dify_graph/graph_engine/response_coordinator/session.py
+++ b/api/dify_graph/graph_engine/response_coordinator/session.py
@@ -3,10 +3,6 @@ Internal response session management for response coordinator.
This module contains the private ResponseSession class used internally
by ResponseStreamCoordinator to manage streaming sessions.
-
-`RESPONSE_SESSION_NODE_TYPES` is intentionally mutable so downstream applications
-can opt additional response-capable node types into session creation without
-patching the coordinator.
"""
from __future__ import annotations
@@ -14,7 +10,6 @@ from __future__ import annotations
from dataclasses import dataclass
from typing import Protocol, cast
-from dify_graph.enums import BuiltinNodeTypes, NodeType
from dify_graph.nodes.base.template import Template
from dify_graph.runtime.graph_runtime_state import NodeProtocol
@@ -25,12 +20,6 @@ class _ResponseSessionNodeProtocol(NodeProtocol, Protocol):
def get_streaming_template(self) -> Template: ...
-RESPONSE_SESSION_NODE_TYPES: list[NodeType] = [
- BuiltinNodeTypes.ANSWER,
- BuiltinNodeTypes.END,
-]
-
-
@dataclass
class ResponseSession:
"""
@@ -49,8 +38,8 @@ class ResponseSession:
Create a ResponseSession from a response-capable node.
The parameter is typed as `NodeProtocol` because the graph is exposed behind a protocol at the runtime layer.
- At runtime this must be a node whose `node_type` is listed in `RESPONSE_SESSION_NODE_TYPES`
- and which implements `get_streaming_template()`.
+ At runtime this must be a node that implements `get_streaming_template()`. The coordinator decides which
+ graph nodes should be treated as response-capable before they reach this factory.
Args:
node: Node from the materialized workflow graph.
@@ -59,15 +48,8 @@ class ResponseSession:
ResponseSession configured with the node's streaming template
Raises:
- TypeError: If node is not a supported response node type.
+ TypeError: If node does not implement the response-session streaming contract.
"""
- if node.node_type not in RESPONSE_SESSION_NODE_TYPES:
- supported_node_types = ", ".join(RESPONSE_SESSION_NODE_TYPES)
- raise TypeError(
- "ResponseSession.from_node only supports node types in "
- f"RESPONSE_SESSION_NODE_TYPES: {supported_node_types}"
- )
-
response_node = cast(_ResponseSessionNodeProtocol, node)
try:
template = response_node.get_streaming_template()
diff --git a/api/dify_graph/nodes/human_input/entities.py b/api/dify_graph/nodes/human_input/entities.py
index 7936e47213..2a33b4a0a8 100644
--- a/api/dify_graph/nodes/human_input/entities.py
+++ b/api/dify_graph/nodes/human_input/entities.py
@@ -8,6 +8,8 @@ from collections.abc import Mapping, Sequence
from datetime import datetime, timedelta
from typing import Annotated, Any, ClassVar, Literal, Self
+import bleach
+import markdown
from pydantic import BaseModel, Field, field_validator, model_validator
from dify_graph.entities.base_node_data import BaseNodeData
@@ -58,6 +60,39 @@ class EmailDeliveryConfig(BaseModel):
"""Configuration for email delivery method."""
URL_PLACEHOLDER: ClassVar[str] = "{{#url#}}"
+ _SUBJECT_NEWLINE_PATTERN: ClassVar[re.Pattern[str]] = re.compile(r"[\r\n]+")
+ _ALLOWED_HTML_TAGS: ClassVar[list[str]] = [
+ "a",
+ "blockquote",
+ "br",
+ "code",
+ "em",
+ "h1",
+ "h2",
+ "h3",
+ "h4",
+ "h5",
+ "h6",
+ "hr",
+ "li",
+ "ol",
+ "p",
+ "pre",
+ "strong",
+ "table",
+ "tbody",
+ "td",
+ "th",
+ "thead",
+ "tr",
+ "ul",
+ ]
+ _ALLOWED_HTML_ATTRIBUTES: ClassVar[dict[str, list[str]]] = {
+ "a": ["href", "title"],
+ "td": ["align"],
+ "th": ["align"],
+ }
+ _ALLOWED_PROTOCOLS: ClassVar[list[str]] = ["http", "https", "mailto"]
recipients: EmailRecipients
@@ -98,6 +133,43 @@ class EmailDeliveryConfig(BaseModel):
return templated_body
return variable_pool.convert_template(templated_body).text
+ @classmethod
+ def render_markdown_body(cls, body: str) -> str:
+ """Render markdown to safe HTML for email delivery."""
+ sanitized_markdown = bleach.clean(
+ body,
+ tags=[],
+ attributes={},
+ strip=True,
+ strip_comments=True,
+ )
+ rendered_html = markdown.markdown(
+ sanitized_markdown,
+ extensions=["nl2br", "tables"],
+ extension_configs={"tables": {"use_align_attribute": True}},
+ )
+ return bleach.clean(
+ rendered_html,
+ tags=cls._ALLOWED_HTML_TAGS,
+ attributes=cls._ALLOWED_HTML_ATTRIBUTES,
+ protocols=cls._ALLOWED_PROTOCOLS,
+ strip=True,
+ strip_comments=True,
+ )
+
+ @classmethod
+ def sanitize_subject(cls, subject: str) -> str:
+ """Sanitize email subject to plain text and prevent CRLF injection."""
+ sanitized_subject = bleach.clean(
+ subject,
+ tags=[],
+ attributes={},
+ strip=True,
+ strip_comments=True,
+ )
+ sanitized_subject = cls._SUBJECT_NEWLINE_PATTERN.sub(" ", sanitized_subject)
+ return " ".join(sanitized_subject.split())
+
class _DeliveryMethodBase(BaseModel):
"""Base delivery method configuration."""
diff --git a/api/models/dataset.py b/api/models/dataset.py
index b3fa11a58c..8438fda25f 100644
--- a/api/models/dataset.py
+++ b/api/models/dataset.py
@@ -10,7 +10,7 @@ import re
import time
from datetime import datetime
from json import JSONDecodeError
-from typing import Any, cast
+from typing import Any, TypedDict, cast
from uuid import uuid4
import sqlalchemy as sa
@@ -37,6 +37,61 @@ from .types import AdjustedJSON, BinaryData, EnumText, LongText, StringUUID, adj
logger = logging.getLogger(__name__)
+class PreProcessingRuleItem(TypedDict):
+ id: str
+ enabled: bool
+
+
+class SegmentationConfig(TypedDict):
+ delimiter: str
+ max_tokens: int
+ chunk_overlap: int
+
+
+class AutomaticRulesConfig(TypedDict):
+ pre_processing_rules: list[PreProcessingRuleItem]
+ segmentation: SegmentationConfig
+
+
+class ProcessRuleDict(TypedDict):
+ id: str
+ dataset_id: str
+ mode: str
+ rules: dict[str, Any] | None
+
+
+class DocMetadataDetailItem(TypedDict):
+ id: str
+ name: str
+ type: str
+ value: Any
+
+
+class AttachmentItem(TypedDict):
+ id: str
+ name: str
+ size: int
+ extension: str
+ mime_type: str
+ source_url: str
+
+
+class DatasetBindingItem(TypedDict):
+ id: str
+ name: str
+
+
+class ExternalKnowledgeApiDict(TypedDict):
+ id: str
+ tenant_id: str
+ name: str
+ description: str
+ settings: dict[str, Any] | None
+ dataset_bindings: list[DatasetBindingItem]
+ created_by: str
+ created_at: str
+
+
class DatasetPermissionEnum(enum.StrEnum):
ONLY_ME = "only_me"
ALL_TEAM = "all_team_members"
@@ -334,7 +389,7 @@ class DatasetProcessRule(Base): # bug
MODES = ["automatic", "custom", "hierarchical"]
PRE_PROCESSING_RULES = ["remove_stopwords", "remove_extra_spaces", "remove_urls_emails"]
- AUTOMATIC_RULES: dict[str, Any] = {
+ AUTOMATIC_RULES: AutomaticRulesConfig = {
"pre_processing_rules": [
{"id": "remove_extra_spaces", "enabled": True},
{"id": "remove_urls_emails", "enabled": False},
@@ -342,7 +397,7 @@ class DatasetProcessRule(Base): # bug
"segmentation": {"delimiter": "\n", "max_tokens": 500, "chunk_overlap": 50},
}
- def to_dict(self) -> dict[str, Any]:
+ def to_dict(self) -> ProcessRuleDict:
return {
"id": self.id,
"dataset_id": self.dataset_id,
@@ -531,7 +586,7 @@ class Document(Base):
return self.updated_at
@property
- def doc_metadata_details(self) -> list[dict[str, Any]] | None:
+ def doc_metadata_details(self) -> list[DocMetadataDetailItem] | None:
if self.doc_metadata:
document_metadatas = (
db.session.query(DatasetMetadata)
@@ -541,9 +596,9 @@ class Document(Base):
)
.all()
)
- metadata_list: list[dict[str, Any]] = []
+ metadata_list: list[DocMetadataDetailItem] = []
for metadata in document_metadatas:
- metadata_dict: dict[str, Any] = {
+ metadata_dict: DocMetadataDetailItem = {
"id": metadata.id,
"name": metadata.name,
"type": metadata.type,
@@ -557,13 +612,13 @@ class Document(Base):
return None
@property
- def process_rule_dict(self) -> dict[str, Any] | None:
+ def process_rule_dict(self) -> ProcessRuleDict | None:
if self.dataset_process_rule_id and self.dataset_process_rule:
return self.dataset_process_rule.to_dict()
return None
- def get_built_in_fields(self) -> list[dict[str, Any]]:
- built_in_fields: list[dict[str, Any]] = []
+ def get_built_in_fields(self) -> list[DocMetadataDetailItem]:
+ built_in_fields: list[DocMetadataDetailItem] = []
built_in_fields.append(
{
"id": "built-in",
@@ -877,7 +932,7 @@ class DocumentSegment(Base):
return text
@property
- def attachments(self) -> list[dict[str, Any]]:
+ def attachments(self) -> list[AttachmentItem]:
# Use JOIN to fetch attachments in a single query instead of two separate queries
attachments_with_bindings = db.session.execute(
select(SegmentAttachmentBinding, UploadFile)
@@ -891,7 +946,7 @@ class DocumentSegment(Base):
).all()
if not attachments_with_bindings:
return []
- attachment_list = []
+ attachment_list: list[AttachmentItem] = []
for _, attachment in attachments_with_bindings:
upload_file_id = attachment.id
nonce = os.urandom(16).hex()
@@ -1261,7 +1316,7 @@ class ExternalKnowledgeApis(TypeBase):
DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp(), init=False
)
- def to_dict(self) -> dict[str, Any]:
+ def to_dict(self) -> ExternalKnowledgeApiDict:
return {
"id": self.id,
"tenant_id": self.tenant_id,
@@ -1281,13 +1336,13 @@ class ExternalKnowledgeApis(TypeBase):
return None
@property
- def dataset_bindings(self) -> list[dict[str, Any]]:
+ def dataset_bindings(self) -> list[DatasetBindingItem]:
external_knowledge_bindings = db.session.scalars(
select(ExternalKnowledgeBindings).where(ExternalKnowledgeBindings.external_knowledge_api_id == self.id)
).all()
dataset_ids = [binding.dataset_id for binding in external_knowledge_bindings]
datasets = db.session.scalars(select(Dataset).where(Dataset.id.in_(dataset_ids))).all()
- dataset_bindings: list[dict[str, Any]] = []
+ dataset_bindings: list[DatasetBindingItem] = []
for dataset in datasets:
dataset_bindings.append({"id": dataset.id, "name": dataset.name})
diff --git a/api/pyproject.toml b/api/pyproject.toml
index 57d58ce5b8..ac51d10513 100644
--- a/api/pyproject.toml
+++ b/api/pyproject.toml
@@ -40,7 +40,7 @@ dependencies = [
"numpy~=1.26.4",
"openpyxl~=3.1.5",
"opik~=1.10.37",
- "litellm==1.82.2", # Pinned to avoid madoka dependency issue
+ "litellm==1.82.2", # Pinned to avoid madoka dependency issue
"opentelemetry-api==1.28.0",
"opentelemetry-distro==0.49b0",
"opentelemetry-exporter-otlp==1.28.0",
@@ -91,6 +91,7 @@ dependencies = [
"apscheduler>=3.11.0",
"weave>=0.52.16",
"fastopenapi[flask]>=0.7.0",
+ "bleach~=6.2.0",
]
# Before adding new dependency, consider place it in
# alphabet order (a-z) and suitable group.
@@ -118,7 +119,7 @@ dev = [
"pytest~=9.0.2",
"pytest-benchmark~=5.2.3",
"pytest-cov~=7.0.0",
- "pytest-env~=1.1.3",
+ "pytest-env~=1.6.0",
"pytest-mock~=3.15.1",
"testcontainers~=4.14.1",
"types-aiofiles~=25.1.0",
@@ -251,10 +252,7 @@ ignore_errors = true
[tool.pyrefly]
project-includes = ["."]
-project-excludes = [
- ".venv",
- "migrations/",
-]
+project-excludes = [".venv", "migrations/"]
python-platform = "linux"
python-version = "3.11.0"
infer-with-first-use = false
diff --git a/api/services/human_input_delivery_test_service.py b/api/services/human_input_delivery_test_service.py
index 80deb37a56..229e6608da 100644
--- a/api/services/human_input_delivery_test_service.py
+++ b/api/services/human_input_delivery_test_service.py
@@ -155,13 +155,15 @@ class EmailDeliveryTestHandler:
context=context,
recipient_email=recipient_email,
)
- subject = render_email_template(method.config.subject, substitutions)
+ subject_template = render_email_template(method.config.subject, substitutions)
+ subject = EmailDeliveryConfig.sanitize_subject(subject_template)
templated_body = EmailDeliveryConfig.render_body_template(
body=method.config.body,
url=substitutions.get("form_link"),
variable_pool=context.variable_pool,
)
body = render_email_template(templated_body, substitutions)
+ body = EmailDeliveryConfig.render_markdown_body(body)
mail.send(
to=recipient_email,
diff --git a/api/services/vector_service.py b/api/services/vector_service.py
index 73bb46b797..b66fdd7a20 100644
--- a/api/services/vector_service.py
+++ b/api/services/vector_service.py
@@ -156,7 +156,8 @@ class VectorService:
)
# use full doc mode to generate segment's child chunk
processing_rule_dict = processing_rule.to_dict()
- processing_rule_dict["rules"]["parent_mode"] = ParentMode.FULL_DOC
+ if processing_rule_dict["rules"] is not None:
+ processing_rule_dict["rules"]["parent_mode"] = ParentMode.FULL_DOC
documents = index_processor.transform(
[document],
embedding_model_instance=embedding_model_instance,
diff --git a/api/tasks/mail_human_input_delivery_task.py b/api/tasks/mail_human_input_delivery_task.py
index bded4cea2b..d241783359 100644
--- a/api/tasks/mail_human_input_delivery_task.py
+++ b/api/tasks/mail_human_input_delivery_task.py
@@ -111,7 +111,7 @@ def _render_body(
url=form_link,
variable_pool=variable_pool,
)
- return body
+ return EmailDeliveryConfig.render_markdown_body(body)
def _load_variable_pool(workflow_run_id: str | None) -> VariablePool | None:
@@ -173,10 +173,11 @@ def dispatch_human_input_email_task(form_id: str, node_title: str | None = None,
for recipient in job.recipients:
form_link = _build_form_link(recipient.token)
body = _render_body(job.body, form_link, variable_pool=variable_pool)
+ subject = EmailDeliveryConfig.sanitize_subject(job.subject)
mail.send(
to=recipient.email,
- subject=job.subject,
+ subject=subject,
html=body,
)
diff --git a/api/tests/test_containers_integration_tests/conftest.py b/api/tests/test_containers_integration_tests/conftest.py
index 2a23f1ea7d..0bdd3bdc47 100644
--- a/api/tests/test_containers_integration_tests/conftest.py
+++ b/api/tests/test_containers_integration_tests/conftest.py
@@ -186,7 +186,7 @@ class DifyTestContainers:
# Start Dify Plugin Daemon container for plugin management
# Dify Plugin Daemon provides plugin lifecycle management and execution
logger.info("Initializing Dify Plugin Daemon container...")
- self.dify_plugin_daemon = DockerContainer(image="langgenius/dify-plugin-daemon:0.3.0-local").with_network(
+ self.dify_plugin_daemon = DockerContainer(image="langgenius/dify-plugin-daemon:0.5.4-local").with_network(
self.network
)
self.dify_plugin_daemon.with_exposed_ports(5002)
diff --git a/api/tests/unit_tests/controllers/console/test_extension.py b/api/tests/unit_tests/controllers/console/test_extension.py
index 85eb6e7d71..0d1fb39348 100644
--- a/api/tests/unit_tests/controllers/console/test_extension.py
+++ b/api/tests/unit_tests/controllers/console/test_extension.py
@@ -22,7 +22,7 @@ from controllers.console.extension import (
)
if _NEEDS_METHOD_VIEW_CLEANUP:
- delattr(builtins, "MethodView")
+ del builtins.MethodView
from models.account import AccountStatus
from models.api_based_extension import APIBasedExtension
diff --git a/api/tests/unit_tests/core/mcp/auth/test_auth_flow.py b/api/tests/unit_tests/core/mcp/auth/test_auth_flow.py
index abf3c60fe0..fe533e62af 100644
--- a/api/tests/unit_tests/core/mcp/auth/test_auth_flow.py
+++ b/api/tests/unit_tests/core/mcp/auth/test_auth_flow.py
@@ -801,6 +801,27 @@ class TestAuthOrchestration:
urls = build_protected_resource_metadata_discovery_urls(None, "https://api.example.com")
assert urls == ["https://api.example.com/.well-known/oauth-protected-resource"]
+ def test_build_protected_resource_metadata_discovery_urls_with_relative_hint(self):
+ urls = build_protected_resource_metadata_discovery_urls(
+ "/.well-known/oauth-protected-resource/tenant/mcp",
+ "https://api.example.com/tenant/mcp",
+ )
+ assert urls == [
+ "https://api.example.com/.well-known/oauth-protected-resource/tenant/mcp",
+ "https://api.example.com/.well-known/oauth-protected-resource",
+ ]
+
+ def test_build_protected_resource_metadata_discovery_urls_ignores_scheme_less_hint(self):
+ urls = build_protected_resource_metadata_discovery_urls(
+ "/openapi-mcp.cn-hangzhou.aliyuncs.com/.well-known/oauth-protected-resource/tenant/mcp",
+ "https://openapi-mcp.cn-hangzhou.aliyuncs.com/tenant/mcp",
+ )
+
+ assert urls == [
+ "https://openapi-mcp.cn-hangzhou.aliyuncs.com/.well-known/oauth-protected-resource/tenant/mcp",
+ "https://openapi-mcp.cn-hangzhou.aliyuncs.com/.well-known/oauth-protected-resource",
+ ]
+
def test_build_oauth_authorization_server_metadata_discovery_urls(self):
# Case 1: with auth_server_url
urls = build_oauth_authorization_server_metadata_discovery_urls(
diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_response_session.py b/api/tests/unit_tests/core/workflow/graph_engine/test_response_session.py
index 198e133454..cd9d56f683 100644
--- a/api/tests/unit_tests/core/workflow/graph_engine/test_response_session.py
+++ b/api/tests/unit_tests/core/workflow/graph_engine/test_response_session.py
@@ -4,9 +4,7 @@ from __future__ import annotations
import pytest
-import dify_graph.graph_engine.response_coordinator.session as response_session_module
from dify_graph.enums import BuiltinNodeTypes, NodeExecutionType, NodeState, NodeType
-from dify_graph.graph_engine.response_coordinator import RESPONSE_SESSION_NODE_TYPES
from dify_graph.graph_engine.response_coordinator.session import ResponseSession
from dify_graph.nodes.base.template import Template, TextSegment
@@ -35,28 +33,14 @@ class DummyNodeWithoutStreamingTemplate:
self.state = NodeState.UNKNOWN
-def test_response_session_from_node_rejects_node_types_outside_allowlist() -> None:
- """Unsupported node types are rejected even if they expose a template."""
+def test_response_session_from_node_accepts_nodes_outside_previous_allowlist() -> None:
+ """Session creation depends on the streaming-template contract rather than node type."""
node = DummyResponseNode(
node_id="llm-node",
node_type=BuiltinNodeTypes.LLM,
template=Template(segments=[TextSegment(text="hello")]),
)
- with pytest.raises(TypeError, match="RESPONSE_SESSION_NODE_TYPES"):
- ResponseSession.from_node(node)
-
-
-def test_response_session_from_node_supports_downstream_allowlist_extension(monkeypatch) -> None:
- """Downstream applications can extend the supported node-type list."""
- node = DummyResponseNode(
- node_id="llm-node",
- node_type=BuiltinNodeTypes.LLM,
- template=Template(segments=[TextSegment(text="hello")]),
- )
- extended_node_types = [*RESPONSE_SESSION_NODE_TYPES, BuiltinNodeTypes.LLM]
- monkeypatch.setattr(response_session_module, "RESPONSE_SESSION_NODE_TYPES", extended_node_types)
-
session = ResponseSession.from_node(node)
assert session.node_id == "llm-node"
diff --git a/api/tests/unit_tests/core/workflow/nodes/human_input/test_email_delivery_config.py b/api/tests/unit_tests/core/workflow/nodes/human_input/test_email_delivery_config.py
index d4939b1071..d52dfa2a65 100644
--- a/api/tests/unit_tests/core/workflow/nodes/human_input/test_email_delivery_config.py
+++ b/api/tests/unit_tests/core/workflow/nodes/human_input/test_email_delivery_config.py
@@ -14,3 +14,64 @@ def test_render_body_template_replaces_variable_values():
result = config.render_body_template(body=config.body, url="https://example.com", variable_pool=variable_pool)
assert result == "Hello World https://example.com"
+
+
+def test_render_markdown_body_renders_markdown_to_html():
+ rendered = EmailDeliveryConfig.render_markdown_body("**Bold** and [link](https://example.com)")
+
+ assert "Bold" in rendered
+ assert 'link' in rendered
+
+
+def test_render_markdown_body_sanitizes_unsafe_html():
+ rendered = EmailDeliveryConfig.render_markdown_body(
+ 'Click'
+ )
+
+ assert "