Files
dify/api/services/workflow_comment_service.py
2026-02-03 14:42:30 +08:00

465 lines
18 KiB
Python

import logging
from collections.abc import Sequence
from sqlalchemy import desc, select
from sqlalchemy.orm import Session, selectinload
from werkzeug.exceptions import Forbidden, NotFound
from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
from libs.helper import uuid_value
from models import App, TenantAccountJoin, WorkflowComment, WorkflowCommentMention, WorkflowCommentReply
from models.account import Account
from tasks.mail_workflow_comment_task import send_workflow_comment_mention_email_task
logger = logging.getLogger(__name__)
class WorkflowCommentService:
"""Service for managing workflow comments."""
@staticmethod
def _validate_content(content: str) -> None:
if len(content.strip()) == 0:
raise ValueError("Comment content cannot be empty")
if len(content) > 1000:
raise ValueError("Comment content cannot exceed 1000 characters")
@staticmethod
def _filter_valid_mentioned_user_ids(mentioned_user_ids: Sequence[str]) -> list[str]:
"""Return deduplicated UUID user IDs in the order provided."""
unique_user_ids: list[str] = []
seen: set[str] = set()
for user_id in mentioned_user_ids:
if not isinstance(user_id, str):
continue
if not uuid_value(user_id):
continue
if user_id in seen:
continue
seen.add(user_id)
unique_user_ids.append(user_id)
return unique_user_ids
@staticmethod
def _format_comment_excerpt(content: str, max_length: int = 200) -> str:
"""Trim comment content for email display."""
trimmed = content.strip()
if len(trimmed) <= max_length:
return trimmed
if max_length <= 3:
return trimmed[:max_length]
return f"{trimmed[: max_length - 3].rstrip()}..."
@staticmethod
def _build_mention_email_payloads(
session: Session,
tenant_id: str,
app_id: str,
mentioner_id: str,
mentioned_user_ids: Sequence[str],
content: str,
) -> list[dict[str, str]]:
"""Prepare email payloads for mentioned users."""
if not mentioned_user_ids:
return []
candidate_user_ids = [user_id for user_id in mentioned_user_ids if user_id != mentioner_id]
if not candidate_user_ids:
return []
app_name = session.scalar(select(App.name).where(App.id == app_id, App.tenant_id == tenant_id)) or "Dify app"
commenter_name = session.scalar(select(Account.name).where(Account.id == mentioner_id)) or "Dify user"
comment_excerpt = WorkflowCommentService._format_comment_excerpt(content)
accounts = session.scalars(
select(Account)
.join(TenantAccountJoin, TenantAccountJoin.account_id == Account.id)
.where(TenantAccountJoin.tenant_id == tenant_id, Account.id.in_(candidate_user_ids))
).all()
payloads: list[dict[str, str]] = []
for account in accounts:
payloads.append(
{
"language": account.interface_language or "en-US",
"to": account.email,
"mentioned_name": account.name or account.email,
"commenter_name": commenter_name,
"app_name": app_name,
"comment_content": comment_excerpt,
}
)
return payloads
@staticmethod
def _dispatch_mention_emails(payloads: Sequence[dict[str, str]]) -> None:
"""Enqueue mention notification emails."""
for payload in payloads:
send_workflow_comment_mention_email_task.delay(**payload)
@staticmethod
def get_comments(tenant_id: str, app_id: str) -> Sequence[WorkflowComment]:
"""Get all comments for a workflow."""
with Session(db.engine) as session:
# Get all comments with eager loading
stmt = (
select(WorkflowComment)
.options(selectinload(WorkflowComment.replies), selectinload(WorkflowComment.mentions))
.where(WorkflowComment.tenant_id == tenant_id, WorkflowComment.app_id == app_id)
.order_by(desc(WorkflowComment.created_at))
)
comments = session.scalars(stmt).all()
# Batch preload all Account objects to avoid N+1 queries
WorkflowCommentService._preload_accounts(session, comments)
return comments
@staticmethod
def _preload_accounts(session: Session, comments: Sequence[WorkflowComment]) -> None:
"""Batch preload Account objects for comments, replies, and mentions."""
# Collect all user IDs
user_ids: set[str] = set()
for comment in comments:
user_ids.add(comment.created_by)
if comment.resolved_by:
user_ids.add(comment.resolved_by)
user_ids.update(reply.created_by for reply in comment.replies)
user_ids.update(mention.mentioned_user_id for mention in comment.mentions)
if not user_ids:
return
# Batch query all accounts
accounts = session.scalars(select(Account).where(Account.id.in_(user_ids))).all()
account_map = {str(account.id): account for account in accounts}
# Cache accounts on objects
for comment in comments:
comment.cache_created_by_account(account_map.get(comment.created_by))
comment.cache_resolved_by_account(account_map.get(comment.resolved_by) if comment.resolved_by else None)
for reply in comment.replies:
reply.cache_created_by_account(account_map.get(reply.created_by))
for mention in comment.mentions:
mention.cache_mentioned_user_account(account_map.get(mention.mentioned_user_id))
@staticmethod
def get_comment(tenant_id: str, app_id: str, comment_id: str, session: Session | None = None) -> WorkflowComment:
"""Get a specific comment."""
def _get_comment(session: Session) -> WorkflowComment:
stmt = (
select(WorkflowComment)
.options(selectinload(WorkflowComment.replies), selectinload(WorkflowComment.mentions))
.where(
WorkflowComment.id == comment_id,
WorkflowComment.tenant_id == tenant_id,
WorkflowComment.app_id == app_id,
)
)
comment = session.scalar(stmt)
if not comment:
raise NotFound("Comment not found")
# Preload accounts to avoid N+1 queries
WorkflowCommentService._preload_accounts(session, [comment])
return comment
if session is not None:
return _get_comment(session)
else:
with Session(db.engine, expire_on_commit=False) as session:
return _get_comment(session)
@staticmethod
def create_comment(
tenant_id: str,
app_id: str,
created_by: str,
content: str,
position_x: float,
position_y: float,
mentioned_user_ids: list[str] | None = None,
) -> dict:
"""Create a new workflow comment and send mention notification emails."""
WorkflowCommentService._validate_content(content)
with Session(db.engine) as session:
comment = WorkflowComment(
tenant_id=tenant_id,
app_id=app_id,
position_x=position_x,
position_y=position_y,
content=content,
created_by=created_by,
)
session.add(comment)
session.flush() # Get the comment ID for mentions
# Create mentions if specified
mentioned_user_ids = WorkflowCommentService._filter_valid_mentioned_user_ids(mentioned_user_ids or [])
for user_id in mentioned_user_ids:
mention = WorkflowCommentMention(
comment_id=comment.id,
reply_id=None, # This is a comment mention, not reply mention
mentioned_user_id=user_id,
)
session.add(mention)
mention_email_payloads = WorkflowCommentService._build_mention_email_payloads(
session=session,
tenant_id=tenant_id,
app_id=app_id,
mentioner_id=created_by,
mentioned_user_ids=mentioned_user_ids,
content=content,
)
session.commit()
WorkflowCommentService._dispatch_mention_emails(mention_email_payloads)
# Return only what we need - id and created_at
return {"id": comment.id, "created_at": comment.created_at}
@staticmethod
def update_comment(
tenant_id: str,
app_id: str,
comment_id: str,
user_id: str,
content: str,
position_x: float | None = None,
position_y: float | None = None,
mentioned_user_ids: list[str] | None = None,
) -> dict:
"""Update a workflow comment and notify newly mentioned users."""
WorkflowCommentService._validate_content(content)
with Session(db.engine, expire_on_commit=False) as session:
# Get comment with validation
stmt = select(WorkflowComment).where(
WorkflowComment.id == comment_id,
WorkflowComment.tenant_id == tenant_id,
WorkflowComment.app_id == app_id,
)
comment = session.scalar(stmt)
if not comment:
raise NotFound("Comment not found")
# Only the creator can update the comment
if comment.created_by != user_id:
raise Forbidden("Only the comment creator can update it")
# Update comment fields
comment.content = content
if position_x is not None:
comment.position_x = position_x
if position_y is not None:
comment.position_y = position_y
# Update mentions - first remove existing mentions for this comment only (not replies)
existing_mentions = session.scalars(
select(WorkflowCommentMention).where(
WorkflowCommentMention.comment_id == comment.id,
WorkflowCommentMention.reply_id.is_(None), # Only comment mentions, not reply mentions
)
).all()
existing_mentioned_user_ids = {mention.mentioned_user_id for mention in existing_mentions}
for mention in existing_mentions:
session.delete(mention)
# Add new mentions
mentioned_user_ids = WorkflowCommentService._filter_valid_mentioned_user_ids(mentioned_user_ids or [])
new_mentioned_user_ids = [
user_id for user_id in mentioned_user_ids if user_id not in existing_mentioned_user_ids
]
for user_id_str in mentioned_user_ids:
mention = WorkflowCommentMention(
comment_id=comment.id,
reply_id=None, # This is a comment mention
mentioned_user_id=user_id_str,
)
session.add(mention)
mention_email_payloads = WorkflowCommentService._build_mention_email_payloads(
session=session,
tenant_id=tenant_id,
app_id=app_id,
mentioner_id=user_id,
mentioned_user_ids=new_mentioned_user_ids,
content=content,
)
session.commit()
WorkflowCommentService._dispatch_mention_emails(mention_email_payloads)
return {"id": comment.id, "updated_at": comment.updated_at}
@staticmethod
def delete_comment(tenant_id: str, app_id: str, comment_id: str, user_id: str) -> None:
"""Delete a workflow comment."""
with Session(db.engine, expire_on_commit=False) as session:
comment = WorkflowCommentService.get_comment(tenant_id, app_id, comment_id, session)
# Only the creator can delete the comment
if comment.created_by != user_id:
raise Forbidden("Only the comment creator can delete it")
# Delete associated mentions (both comment and reply mentions)
mentions = session.scalars(
select(WorkflowCommentMention).where(WorkflowCommentMention.comment_id == comment_id)
).all()
for mention in mentions:
session.delete(mention)
# Delete associated replies
replies = session.scalars(
select(WorkflowCommentReply).where(WorkflowCommentReply.comment_id == comment_id)
).all()
for reply in replies:
session.delete(reply)
session.delete(comment)
session.commit()
@staticmethod
def resolve_comment(tenant_id: str, app_id: str, comment_id: str, user_id: str) -> WorkflowComment:
"""Resolve a workflow comment."""
with Session(db.engine, expire_on_commit=False) as session:
comment = WorkflowCommentService.get_comment(tenant_id, app_id, comment_id, session)
if comment.resolved:
return comment
comment.resolved = True
comment.resolved_at = naive_utc_now()
comment.resolved_by = user_id
session.commit()
return comment
@staticmethod
def create_reply(
comment_id: str, content: str, created_by: str, mentioned_user_ids: list[str] | None = None
) -> dict:
"""Add a reply to a workflow comment and notify mentioned users."""
WorkflowCommentService._validate_content(content)
with Session(db.engine, expire_on_commit=False) as session:
# Check if comment exists
comment = session.get(WorkflowComment, comment_id)
if not comment:
raise NotFound("Comment not found")
reply = WorkflowCommentReply(comment_id=comment_id, content=content, created_by=created_by)
session.add(reply)
session.flush() # Get the reply ID for mentions
# Create mentions if specified
mentioned_user_ids = WorkflowCommentService._filter_valid_mentioned_user_ids(mentioned_user_ids or [])
for user_id in mentioned_user_ids:
# Create mention linking to specific reply
mention = WorkflowCommentMention(comment_id=comment_id, reply_id=reply.id, mentioned_user_id=user_id)
session.add(mention)
mention_email_payloads = WorkflowCommentService._build_mention_email_payloads(
session=session,
tenant_id=comment.tenant_id,
app_id=comment.app_id,
mentioner_id=created_by,
mentioned_user_ids=mentioned_user_ids,
content=content,
)
session.commit()
WorkflowCommentService._dispatch_mention_emails(mention_email_payloads)
return {"id": reply.id, "created_at": reply.created_at}
@staticmethod
def update_reply(reply_id: str, user_id: str, content: str, mentioned_user_ids: list[str] | None = None) -> dict:
"""Update a comment reply and notify newly mentioned users."""
WorkflowCommentService._validate_content(content)
with Session(db.engine, expire_on_commit=False) as session:
reply = session.get(WorkflowCommentReply, reply_id)
if not reply:
raise NotFound("Reply not found")
# Only the creator can update the reply
if reply.created_by != user_id:
raise Forbidden("Only the reply creator can update it")
reply.content = content
# Update mentions - first remove existing mentions for this reply
existing_mentions = session.scalars(
select(WorkflowCommentMention).where(WorkflowCommentMention.reply_id == reply.id)
).all()
existing_mentioned_user_ids = {mention.mentioned_user_id for mention in existing_mentions}
for mention in existing_mentions:
session.delete(mention)
# Add mentions
mentioned_user_ids = WorkflowCommentService._filter_valid_mentioned_user_ids(mentioned_user_ids or [])
new_mentioned_user_ids = [
user_id for user_id in mentioned_user_ids if user_id not in existing_mentioned_user_ids
]
for user_id_str in mentioned_user_ids:
mention = WorkflowCommentMention(
comment_id=reply.comment_id, reply_id=reply.id, mentioned_user_id=user_id_str
)
session.add(mention)
mention_email_payloads: list[dict[str, str]] = []
comment = session.get(WorkflowComment, reply.comment_id)
if comment:
mention_email_payloads = WorkflowCommentService._build_mention_email_payloads(
session=session,
tenant_id=comment.tenant_id,
app_id=comment.app_id,
mentioner_id=user_id,
mentioned_user_ids=new_mentioned_user_ids,
content=content,
)
session.commit()
session.refresh(reply) # Refresh to get updated timestamp
WorkflowCommentService._dispatch_mention_emails(mention_email_payloads)
return {"id": reply.id, "updated_at": reply.updated_at}
@staticmethod
def delete_reply(reply_id: str, user_id: str) -> None:
"""Delete a comment reply."""
with Session(db.engine, expire_on_commit=False) as session:
reply = session.get(WorkflowCommentReply, reply_id)
if not reply:
raise NotFound("Reply not found")
# Only the creator can delete the reply
if reply.created_by != user_id:
raise Forbidden("Only the reply creator can delete it")
# Delete associated mentions first
mentions = session.scalars(
select(WorkflowCommentMention).where(WorkflowCommentMention.reply_id == reply_id)
).all()
for mention in mentions:
session.delete(mention)
session.delete(reply)
session.commit()
@staticmethod
def validate_comment_access(comment_id: str, tenant_id: str, app_id: str) -> WorkflowComment:
"""Validate that a comment belongs to the specified tenant and app."""
return WorkflowCommentService.get_comment(tenant_id, app_id, comment_id)