Files
dify/api/core/skill/skill_compiler.py
2026-01-22 13:41:21 +08:00

260 lines
9.6 KiB
Python

import hashlib
import logging
import re
from collections.abc import Mapping
from datetime import UTC, datetime
from typing import Any
from core.app.entities.app_asset_entities import AppAssetFileTree
from core.skill.entities.file_artifact import FilesArtifact
from core.skill.entities.skill_artifact import SkillArtifact, SkillSourceInfo
from core.skill.entities.skill_artifact_set import SkillArtifactSet
from core.skill.entities.skill_document import SkillDocument
from core.skill.entities.skill_metadata import (
FileReference,
SkillMetadata,
ToolConfiguration,
ToolReference,
)
from core.skill.entities.tool_artifact import ToolArtifact, ToolDependency
from core.tools.entities.tool_entities import ToolProviderType
logger = logging.getLogger(__name__)
TOOL_REFERENCE_PATTERN = re.compile(r"§\[tool\]\.\[([^\]]+)\]\.\[([^\]]+)\]\.\[([^\]]+)\")
FILE_REFERENCE_PATTERN = re.compile(r"§\[file\]\.\[([^\]]+)\]\.\[([^\]]+)\")
class SkillCompiler:
"""
Stateless skill compiler.
Responsibilities:
- Parse raw metadata dict into SkillMetadata
- Parse direct dependencies from skill content
- Compute transitive closure based on existing artifact set
- Resolve content by replacing references
- Generate SkillArtifact
"""
def _parse_metadata(self, content: str, raw_metadata: Mapping[str, Any]) -> SkillMetadata:
tools_raw: dict[str, Any] = dict(raw_metadata.get("tools", {}))
tools: dict[str, ToolReference] = {}
files: list[FileReference] = []
for match in TOOL_REFERENCE_PATTERN.finditer(content):
tool_id = match.group(3)
tool_name = match.group(2)
tool_provider = match.group(1)
tool_meta = tools_raw.get(tool_id)
if tool_meta is None:
continue
config_raw = tool_meta.get("configuration", {})
configuration = ToolConfiguration.model_validate(config_raw) if config_raw else None
tools[tool_id] = ToolReference(
uuid=tool_id,
type=ToolProviderType.value_of(tool_meta.get("type")),
provider=tool_provider,
tool_name=tool_name,
credential_id=tool_meta.get("credential_id"),
configuration=configuration,
)
for match in FILE_REFERENCE_PATTERN.finditer(content):
files.append(
FileReference(
source=match.group(1),
asset_id=match.group(2),
)
)
return SkillMetadata(tools=tools, files=files)
def compile_all(
self,
documents: list[SkillDocument],
file_tree: AppAssetFileTree,
assets_id: str,
) -> SkillArtifactSet:
artifact_set = SkillArtifactSet(
assets_id=assets_id,
built_at=datetime.now(UTC),
)
doc_map: dict[str, SkillDocument] = {doc.skill_id: doc for doc in documents}
parsed_metadata: dict[str, SkillMetadata] = {}
for doc in documents:
metadata = self._parse_metadata(doc.content, doc.metadata)
parsed_metadata[doc.skill_id] = metadata
direct_skill_refs = self._extract_skill_refs(metadata, doc_map)
artifact_set.dependency_graph[doc.skill_id] = list(direct_skill_refs)
for ref_id in direct_skill_refs:
if ref_id not in artifact_set.reverse_graph:
artifact_set.reverse_graph[ref_id] = []
artifact_set.reverse_graph[ref_id].append(doc.skill_id)
for doc in documents:
metadata = parsed_metadata[doc.skill_id]
artifact = self._compile_single(doc, metadata, artifact_set, parsed_metadata, file_tree)
artifact_set.upsert(artifact)
return artifact_set
def compile_one(
self,
artifact_set: SkillArtifactSet,
document: SkillDocument,
file_tree: AppAssetFileTree,
all_documents: dict[str, SkillDocument] | None = None,
) -> SkillArtifact:
doc_map = all_documents or {}
if document.skill_id not in doc_map:
doc_map[document.skill_id] = document
parsed_metadata: dict[str, SkillMetadata] = {}
for skill_id, doc in doc_map.items():
parsed_metadata[skill_id] = self._parse_metadata(doc.content, doc.metadata)
metadata = parsed_metadata[document.skill_id]
direct_skill_refs = self._extract_skill_refs(metadata, doc_map)
artifact_set.dependency_graph[document.skill_id] = list(direct_skill_refs)
for ref_id in direct_skill_refs:
if ref_id not in artifact_set.reverse_graph:
artifact_set.reverse_graph[ref_id] = []
if document.skill_id not in artifact_set.reverse_graph[ref_id]:
artifact_set.reverse_graph[ref_id].append(document.skill_id)
return self._compile_single(document, metadata, artifact_set, parsed_metadata, file_tree)
def _compile_single(
self,
document: SkillDocument,
metadata: SkillMetadata,
artifact_set: SkillArtifactSet,
parsed_metadata: dict[str, SkillMetadata],
file_tree: AppAssetFileTree,
) -> SkillArtifact:
all_tools, all_files = self._compute_transitive_closure(
document.skill_id, artifact_set, parsed_metadata
)
current_node = file_tree.get(document.skill_id)
resolved_content = self._resolve_content(
document.content, metadata, current_node, file_tree
)
content_digest = hashlib.sha256(document.content.encode("utf-8")).hexdigest()
return SkillArtifact(
skill_id=document.skill_id,
source=SkillSourceInfo(
asset_id=document.skill_id,
content_digest=content_digest,
),
tools=ToolArtifact(
dependencies=list(all_tools.values()),
references=list(metadata.tools.values()),
),
files=FilesArtifact(
references=list(all_files.values()),
),
content=resolved_content,
)
def _extract_skill_refs(
self,
metadata: SkillMetadata,
doc_map: dict[str, SkillDocument],
) -> set[str]:
skill_refs: set[str] = set()
for file_ref in metadata.files:
if file_ref.asset_id in doc_map:
skill_refs.add(file_ref.asset_id)
return skill_refs
def _compute_transitive_closure(
self,
skill_id: str,
artifact_set: SkillArtifactSet,
parsed_metadata: dict[str, SkillMetadata],
) -> tuple[dict[str, ToolDependency], dict[str, FileReference]]:
all_tools: dict[str, ToolDependency] = {}
all_files: dict[str, FileReference] = {}
visited: set[str] = set()
queue = [skill_id]
while queue:
current_id = queue.pop(0)
if current_id in visited:
continue
visited.add(current_id)
metadata = parsed_metadata.get(current_id)
if metadata is None:
existing_artifact = artifact_set.get(current_id)
if existing_artifact:
for dep in existing_artifact.tools.dependencies:
key = f"{dep.provider}.{dep.tool_name}"
if key not in all_tools:
all_tools[key] = dep
for file_ref in existing_artifact.files.references:
if file_ref.asset_id not in all_files:
all_files[file_ref.asset_id] = file_ref
continue
for tool_ref in metadata.tools.values():
key = f"{tool_ref.provider}.{tool_ref.tool_name}"
if key not in all_tools:
all_tools[key] = ToolDependency(
type=tool_ref.type,
provider=tool_ref.provider,
tool_name=tool_ref.tool_name,
)
for file_ref in metadata.files:
if file_ref.asset_id not in all_files:
all_files[file_ref.asset_id] = file_ref
for dep_id in artifact_set.dependency_graph.get(current_id, []):
if dep_id not in visited:
queue.append(dep_id)
return all_tools, all_files
def _resolve_content(
self,
content: str,
metadata: SkillMetadata,
current_node: Any,
file_tree: AppAssetFileTree,
) -> str:
if not content:
return content
for match in FILE_REFERENCE_PATTERN.finditer(content):
file_id = match.group(2)
file_node = file_tree.get(file_id)
if file_node is None:
logger.warning("File not found for id=%s, skipping", file_id)
content = content.replace(match.group(0), "[File not found]")
continue
if current_node is not None:
content = content.replace(match.group(0), file_tree.relative_path(current_node, file_node))
else:
content = content.replace(match.group(0), f"[{file_node.name}]")
for match in TOOL_REFERENCE_PATTERN.finditer(content):
tool_id = match.group(3)
tool = metadata.tools.get(tool_id)
if tool is None:
logger.warning("Tool not found for id=%s, skipping", tool_id)
content = content.replace(match.group(0), f"[Tool not found: {tool_id}]")
continue
content = content.replace(match.group(0), f"[Bash Command: {tool.tool_name}_{tool_id}]")
return content