fix: resolve human input workflow ci regressions

This commit is contained in:
-LAN-
2026-03-17 19:04:13 +08:00
parent 1dd706f1f1
commit 5cdce96580
28 changed files with 251 additions and 86 deletions

View File

@@ -6,13 +6,13 @@ from collections.abc import Mapping, Sequence
from copy import deepcopy
from typing import Annotated, Any, Union, cast
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, model_validator
from dify_graph.file import File, FileAttribute, file_manager
from dify_graph.variables import Segment, SegmentGroup, VariableBase, build_segment, segment_to_variable
from dify_graph.variables.consts import SELECTORS_LENGTH
from dify_graph.variables.segments import FileSegment, ObjectSegment
from dify_graph.variables.variables import Variable
from dify_graph.variables.variables import RAGPipelineVariableInput, Variable
VariableValue = Union[str, int, float, dict[str, object], list[object], File]
@@ -20,14 +20,66 @@ VARIABLE_PATTERN = re.compile(r"\{\{#([a-zA-Z0-9_]{1,50}(?:\.[a-zA-Z_][a-zA-Z0-9
class VariablePool(BaseModel):
_SYSTEM_VARIABLE_NODE_ID = "sys"
_ENVIRONMENT_VARIABLE_NODE_ID = "env"
_CONVERSATION_VARIABLE_NODE_ID = "conversation"
_RAG_PIPELINE_VARIABLE_NODE_ID = "rag"
# Variable dictionary is a dictionary for looking up variables by their selector.
# The first element of the selector is the node id, it's the first-level key in the dictionary.
# Other elements of the selector are the keys in the second-level dictionary. To get the key, we hash the
# elements of the selector except the first one.
variable_dictionary: defaultdict[str, Annotated[dict[str, Variable], Field(default_factory=dict)]] = Field(
description="Variables mapping",
default=defaultdict(dict),
default_factory=lambda: defaultdict(dict),
)
system_variables: Sequence[Variable] = Field(default_factory=tuple, exclude=True)
environment_variables: Sequence[Variable] = Field(default_factory=tuple, exclude=True)
conversation_variables: Sequence[Variable] = Field(default_factory=tuple, exclude=True)
rag_pipeline_variables: Sequence[RAGPipelineVariableInput] = Field(default_factory=tuple, exclude=True)
user_inputs: Mapping[str, Any] = Field(default_factory=dict, exclude=True)
@model_validator(mode="after")
def _load_legacy_bootstrap_inputs(self) -> VariablePool:
"""
Accept legacy constructor kwargs that still appear throughout the workflow
layer while keeping serialized state focused on `variable_dictionary`.
"""
self._ingest_legacy_variables(self.system_variables, node_id=self._SYSTEM_VARIABLE_NODE_ID)
self._ingest_legacy_variables(self.environment_variables, node_id=self._ENVIRONMENT_VARIABLE_NODE_ID)
self._ingest_legacy_variables(self.conversation_variables, node_id=self._CONVERSATION_VARIABLE_NODE_ID)
self._ingest_legacy_rag_variables(self.rag_pipeline_variables)
# These kwargs are accepted for compatibility but should not affect the
# stable serialized form or model equality.
self.system_variables = ()
self.environment_variables = ()
self.conversation_variables = ()
self.rag_pipeline_variables = ()
self.user_inputs = {}
return self
def _ingest_legacy_variables(self, variables: Sequence[Variable], *, node_id: str) -> None:
for variable in variables:
selector = [node_id, variable.name]
normalized_variable = variable
if list(variable.selector) != selector:
normalized_variable = variable.model_copy(update={"selector": selector})
self.add(normalized_variable.selector, normalized_variable)
def _ingest_legacy_rag_variables(self, rag_pipeline_variables: Sequence[RAGPipelineVariableInput]) -> None:
if not rag_pipeline_variables:
return
values_by_node_id: defaultdict[str, dict[str, Any]] = defaultdict(dict)
for rag_variable_input in rag_pipeline_variables:
values_by_node_id[rag_variable_input.variable.belong_to_node_id][rag_variable_input.variable.variable] = (
rag_variable_input.value
)
for node_id, value in values_by_node_id.items():
self.add((self._RAG_PIPELINE_VARIABLE_NODE_ID, node_id), value)
def add(self, selector: Sequence[str], value: Any, /):
"""