mirror of
https://github.com/langgenius/dify.git
synced 2026-02-11 10:01:30 -05:00
465 lines
18 KiB
Python
465 lines
18 KiB
Python
import logging
|
|
from collections.abc import Sequence
|
|
|
|
from sqlalchemy import desc, select
|
|
from sqlalchemy.orm import Session, selectinload
|
|
from werkzeug.exceptions import Forbidden, NotFound
|
|
|
|
from extensions.ext_database import db
|
|
from libs.datetime_utils import naive_utc_now
|
|
from libs.helper import uuid_value
|
|
from models import App, TenantAccountJoin, WorkflowComment, WorkflowCommentMention, WorkflowCommentReply
|
|
from models.account import Account
|
|
from tasks.mail_workflow_comment_task import send_workflow_comment_mention_email_task
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class WorkflowCommentService:
|
|
"""Service for managing workflow comments."""
|
|
|
|
@staticmethod
|
|
def _validate_content(content: str) -> None:
|
|
if len(content.strip()) == 0:
|
|
raise ValueError("Comment content cannot be empty")
|
|
|
|
if len(content) > 1000:
|
|
raise ValueError("Comment content cannot exceed 1000 characters")
|
|
|
|
@staticmethod
|
|
def _filter_valid_mentioned_user_ids(mentioned_user_ids: Sequence[str]) -> list[str]:
|
|
"""Return deduplicated UUID user IDs in the order provided."""
|
|
unique_user_ids: list[str] = []
|
|
seen: set[str] = set()
|
|
for user_id in mentioned_user_ids:
|
|
if not isinstance(user_id, str):
|
|
continue
|
|
if not uuid_value(user_id):
|
|
continue
|
|
if user_id in seen:
|
|
continue
|
|
seen.add(user_id)
|
|
unique_user_ids.append(user_id)
|
|
return unique_user_ids
|
|
|
|
@staticmethod
|
|
def _format_comment_excerpt(content: str, max_length: int = 200) -> str:
|
|
"""Trim comment content for email display."""
|
|
trimmed = content.strip()
|
|
if len(trimmed) <= max_length:
|
|
return trimmed
|
|
if max_length <= 3:
|
|
return trimmed[:max_length]
|
|
return f"{trimmed[: max_length - 3].rstrip()}..."
|
|
|
|
@staticmethod
|
|
def _build_mention_email_payloads(
|
|
session: Session,
|
|
tenant_id: str,
|
|
app_id: str,
|
|
mentioner_id: str,
|
|
mentioned_user_ids: Sequence[str],
|
|
content: str,
|
|
) -> list[dict[str, str]]:
|
|
"""Prepare email payloads for mentioned users."""
|
|
if not mentioned_user_ids:
|
|
return []
|
|
|
|
candidate_user_ids = [user_id for user_id in mentioned_user_ids if user_id != mentioner_id]
|
|
if not candidate_user_ids:
|
|
return []
|
|
|
|
app_name = session.scalar(select(App.name).where(App.id == app_id, App.tenant_id == tenant_id)) or "Dify app"
|
|
commenter_name = session.scalar(select(Account.name).where(Account.id == mentioner_id)) or "Dify user"
|
|
comment_excerpt = WorkflowCommentService._format_comment_excerpt(content)
|
|
|
|
accounts = session.scalars(
|
|
select(Account)
|
|
.join(TenantAccountJoin, TenantAccountJoin.account_id == Account.id)
|
|
.where(TenantAccountJoin.tenant_id == tenant_id, Account.id.in_(candidate_user_ids))
|
|
).all()
|
|
|
|
payloads: list[dict[str, str]] = []
|
|
for account in accounts:
|
|
payloads.append(
|
|
{
|
|
"language": account.interface_language or "en-US",
|
|
"to": account.email,
|
|
"mentioned_name": account.name or account.email,
|
|
"commenter_name": commenter_name,
|
|
"app_name": app_name,
|
|
"comment_content": comment_excerpt,
|
|
}
|
|
)
|
|
return payloads
|
|
|
|
@staticmethod
|
|
def _dispatch_mention_emails(payloads: Sequence[dict[str, str]]) -> None:
|
|
"""Enqueue mention notification emails."""
|
|
for payload in payloads:
|
|
send_workflow_comment_mention_email_task.delay(**payload)
|
|
|
|
@staticmethod
|
|
def get_comments(tenant_id: str, app_id: str) -> Sequence[WorkflowComment]:
|
|
"""Get all comments for a workflow."""
|
|
with Session(db.engine) as session:
|
|
# Get all comments with eager loading
|
|
stmt = (
|
|
select(WorkflowComment)
|
|
.options(selectinload(WorkflowComment.replies), selectinload(WorkflowComment.mentions))
|
|
.where(WorkflowComment.tenant_id == tenant_id, WorkflowComment.app_id == app_id)
|
|
.order_by(desc(WorkflowComment.created_at))
|
|
)
|
|
|
|
comments = session.scalars(stmt).all()
|
|
|
|
# Batch preload all Account objects to avoid N+1 queries
|
|
WorkflowCommentService._preload_accounts(session, comments)
|
|
|
|
return comments
|
|
|
|
@staticmethod
|
|
def _preload_accounts(session: Session, comments: Sequence[WorkflowComment]) -> None:
|
|
"""Batch preload Account objects for comments, replies, and mentions."""
|
|
# Collect all user IDs
|
|
user_ids: set[str] = set()
|
|
for comment in comments:
|
|
user_ids.add(comment.created_by)
|
|
if comment.resolved_by:
|
|
user_ids.add(comment.resolved_by)
|
|
user_ids.update(reply.created_by for reply in comment.replies)
|
|
user_ids.update(mention.mentioned_user_id for mention in comment.mentions)
|
|
|
|
if not user_ids:
|
|
return
|
|
|
|
# Batch query all accounts
|
|
accounts = session.scalars(select(Account).where(Account.id.in_(user_ids))).all()
|
|
account_map = {str(account.id): account for account in accounts}
|
|
|
|
# Cache accounts on objects
|
|
for comment in comments:
|
|
comment.cache_created_by_account(account_map.get(comment.created_by))
|
|
comment.cache_resolved_by_account(account_map.get(comment.resolved_by) if comment.resolved_by else None)
|
|
for reply in comment.replies:
|
|
reply.cache_created_by_account(account_map.get(reply.created_by))
|
|
for mention in comment.mentions:
|
|
mention.cache_mentioned_user_account(account_map.get(mention.mentioned_user_id))
|
|
|
|
@staticmethod
|
|
def get_comment(tenant_id: str, app_id: str, comment_id: str, session: Session | None = None) -> WorkflowComment:
|
|
"""Get a specific comment."""
|
|
|
|
def _get_comment(session: Session) -> WorkflowComment:
|
|
stmt = (
|
|
select(WorkflowComment)
|
|
.options(selectinload(WorkflowComment.replies), selectinload(WorkflowComment.mentions))
|
|
.where(
|
|
WorkflowComment.id == comment_id,
|
|
WorkflowComment.tenant_id == tenant_id,
|
|
WorkflowComment.app_id == app_id,
|
|
)
|
|
)
|
|
comment = session.scalar(stmt)
|
|
|
|
if not comment:
|
|
raise NotFound("Comment not found")
|
|
|
|
# Preload accounts to avoid N+1 queries
|
|
WorkflowCommentService._preload_accounts(session, [comment])
|
|
|
|
return comment
|
|
|
|
if session is not None:
|
|
return _get_comment(session)
|
|
else:
|
|
with Session(db.engine, expire_on_commit=False) as session:
|
|
return _get_comment(session)
|
|
|
|
@staticmethod
|
|
def create_comment(
|
|
tenant_id: str,
|
|
app_id: str,
|
|
created_by: str,
|
|
content: str,
|
|
position_x: float,
|
|
position_y: float,
|
|
mentioned_user_ids: list[str] | None = None,
|
|
) -> dict:
|
|
"""Create a new workflow comment and send mention notification emails."""
|
|
WorkflowCommentService._validate_content(content)
|
|
|
|
with Session(db.engine) as session:
|
|
comment = WorkflowComment(
|
|
tenant_id=tenant_id,
|
|
app_id=app_id,
|
|
position_x=position_x,
|
|
position_y=position_y,
|
|
content=content,
|
|
created_by=created_by,
|
|
)
|
|
|
|
session.add(comment)
|
|
session.flush() # Get the comment ID for mentions
|
|
|
|
# Create mentions if specified
|
|
mentioned_user_ids = WorkflowCommentService._filter_valid_mentioned_user_ids(mentioned_user_ids or [])
|
|
for user_id in mentioned_user_ids:
|
|
mention = WorkflowCommentMention(
|
|
comment_id=comment.id,
|
|
reply_id=None, # This is a comment mention, not reply mention
|
|
mentioned_user_id=user_id,
|
|
)
|
|
session.add(mention)
|
|
|
|
mention_email_payloads = WorkflowCommentService._build_mention_email_payloads(
|
|
session=session,
|
|
tenant_id=tenant_id,
|
|
app_id=app_id,
|
|
mentioner_id=created_by,
|
|
mentioned_user_ids=mentioned_user_ids,
|
|
content=content,
|
|
)
|
|
|
|
session.commit()
|
|
WorkflowCommentService._dispatch_mention_emails(mention_email_payloads)
|
|
|
|
# Return only what we need - id and created_at
|
|
return {"id": comment.id, "created_at": comment.created_at}
|
|
|
|
@staticmethod
|
|
def update_comment(
|
|
tenant_id: str,
|
|
app_id: str,
|
|
comment_id: str,
|
|
user_id: str,
|
|
content: str,
|
|
position_x: float | None = None,
|
|
position_y: float | None = None,
|
|
mentioned_user_ids: list[str] | None = None,
|
|
) -> dict:
|
|
"""Update a workflow comment and notify newly mentioned users."""
|
|
WorkflowCommentService._validate_content(content)
|
|
|
|
with Session(db.engine, expire_on_commit=False) as session:
|
|
# Get comment with validation
|
|
stmt = select(WorkflowComment).where(
|
|
WorkflowComment.id == comment_id,
|
|
WorkflowComment.tenant_id == tenant_id,
|
|
WorkflowComment.app_id == app_id,
|
|
)
|
|
comment = session.scalar(stmt)
|
|
|
|
if not comment:
|
|
raise NotFound("Comment not found")
|
|
|
|
# Only the creator can update the comment
|
|
if comment.created_by != user_id:
|
|
raise Forbidden("Only the comment creator can update it")
|
|
|
|
# Update comment fields
|
|
comment.content = content
|
|
if position_x is not None:
|
|
comment.position_x = position_x
|
|
if position_y is not None:
|
|
comment.position_y = position_y
|
|
|
|
# Update mentions - first remove existing mentions for this comment only (not replies)
|
|
existing_mentions = session.scalars(
|
|
select(WorkflowCommentMention).where(
|
|
WorkflowCommentMention.comment_id == comment.id,
|
|
WorkflowCommentMention.reply_id.is_(None), # Only comment mentions, not reply mentions
|
|
)
|
|
).all()
|
|
existing_mentioned_user_ids = {mention.mentioned_user_id for mention in existing_mentions}
|
|
for mention in existing_mentions:
|
|
session.delete(mention)
|
|
|
|
# Add new mentions
|
|
mentioned_user_ids = WorkflowCommentService._filter_valid_mentioned_user_ids(mentioned_user_ids or [])
|
|
new_mentioned_user_ids = [
|
|
user_id for user_id in mentioned_user_ids if user_id not in existing_mentioned_user_ids
|
|
]
|
|
for user_id_str in mentioned_user_ids:
|
|
mention = WorkflowCommentMention(
|
|
comment_id=comment.id,
|
|
reply_id=None, # This is a comment mention
|
|
mentioned_user_id=user_id_str,
|
|
)
|
|
session.add(mention)
|
|
|
|
mention_email_payloads = WorkflowCommentService._build_mention_email_payloads(
|
|
session=session,
|
|
tenant_id=tenant_id,
|
|
app_id=app_id,
|
|
mentioner_id=user_id,
|
|
mentioned_user_ids=new_mentioned_user_ids,
|
|
content=content,
|
|
)
|
|
|
|
session.commit()
|
|
WorkflowCommentService._dispatch_mention_emails(mention_email_payloads)
|
|
|
|
return {"id": comment.id, "updated_at": comment.updated_at}
|
|
|
|
@staticmethod
|
|
def delete_comment(tenant_id: str, app_id: str, comment_id: str, user_id: str) -> None:
|
|
"""Delete a workflow comment."""
|
|
with Session(db.engine, expire_on_commit=False) as session:
|
|
comment = WorkflowCommentService.get_comment(tenant_id, app_id, comment_id, session)
|
|
|
|
# Only the creator can delete the comment
|
|
if comment.created_by != user_id:
|
|
raise Forbidden("Only the comment creator can delete it")
|
|
|
|
# Delete associated mentions (both comment and reply mentions)
|
|
mentions = session.scalars(
|
|
select(WorkflowCommentMention).where(WorkflowCommentMention.comment_id == comment_id)
|
|
).all()
|
|
for mention in mentions:
|
|
session.delete(mention)
|
|
|
|
# Delete associated replies
|
|
replies = session.scalars(
|
|
select(WorkflowCommentReply).where(WorkflowCommentReply.comment_id == comment_id)
|
|
).all()
|
|
for reply in replies:
|
|
session.delete(reply)
|
|
|
|
session.delete(comment)
|
|
session.commit()
|
|
|
|
@staticmethod
|
|
def resolve_comment(tenant_id: str, app_id: str, comment_id: str, user_id: str) -> WorkflowComment:
|
|
"""Resolve a workflow comment."""
|
|
with Session(db.engine, expire_on_commit=False) as session:
|
|
comment = WorkflowCommentService.get_comment(tenant_id, app_id, comment_id, session)
|
|
if comment.resolved:
|
|
return comment
|
|
|
|
comment.resolved = True
|
|
comment.resolved_at = naive_utc_now()
|
|
comment.resolved_by = user_id
|
|
session.commit()
|
|
|
|
return comment
|
|
|
|
@staticmethod
|
|
def create_reply(
|
|
comment_id: str, content: str, created_by: str, mentioned_user_ids: list[str] | None = None
|
|
) -> dict:
|
|
"""Add a reply to a workflow comment and notify mentioned users."""
|
|
WorkflowCommentService._validate_content(content)
|
|
|
|
with Session(db.engine, expire_on_commit=False) as session:
|
|
# Check if comment exists
|
|
comment = session.get(WorkflowComment, comment_id)
|
|
if not comment:
|
|
raise NotFound("Comment not found")
|
|
|
|
reply = WorkflowCommentReply(comment_id=comment_id, content=content, created_by=created_by)
|
|
|
|
session.add(reply)
|
|
session.flush() # Get the reply ID for mentions
|
|
|
|
# Create mentions if specified
|
|
mentioned_user_ids = WorkflowCommentService._filter_valid_mentioned_user_ids(mentioned_user_ids or [])
|
|
for user_id in mentioned_user_ids:
|
|
# Create mention linking to specific reply
|
|
mention = WorkflowCommentMention(comment_id=comment_id, reply_id=reply.id, mentioned_user_id=user_id)
|
|
session.add(mention)
|
|
|
|
mention_email_payloads = WorkflowCommentService._build_mention_email_payloads(
|
|
session=session,
|
|
tenant_id=comment.tenant_id,
|
|
app_id=comment.app_id,
|
|
mentioner_id=created_by,
|
|
mentioned_user_ids=mentioned_user_ids,
|
|
content=content,
|
|
)
|
|
|
|
session.commit()
|
|
WorkflowCommentService._dispatch_mention_emails(mention_email_payloads)
|
|
|
|
return {"id": reply.id, "created_at": reply.created_at}
|
|
|
|
@staticmethod
|
|
def update_reply(reply_id: str, user_id: str, content: str, mentioned_user_ids: list[str] | None = None) -> dict:
|
|
"""Update a comment reply and notify newly mentioned users."""
|
|
WorkflowCommentService._validate_content(content)
|
|
|
|
with Session(db.engine, expire_on_commit=False) as session:
|
|
reply = session.get(WorkflowCommentReply, reply_id)
|
|
if not reply:
|
|
raise NotFound("Reply not found")
|
|
|
|
# Only the creator can update the reply
|
|
if reply.created_by != user_id:
|
|
raise Forbidden("Only the reply creator can update it")
|
|
|
|
reply.content = content
|
|
|
|
# Update mentions - first remove existing mentions for this reply
|
|
existing_mentions = session.scalars(
|
|
select(WorkflowCommentMention).where(WorkflowCommentMention.reply_id == reply.id)
|
|
).all()
|
|
existing_mentioned_user_ids = {mention.mentioned_user_id for mention in existing_mentions}
|
|
for mention in existing_mentions:
|
|
session.delete(mention)
|
|
|
|
# Add mentions
|
|
mentioned_user_ids = WorkflowCommentService._filter_valid_mentioned_user_ids(mentioned_user_ids or [])
|
|
new_mentioned_user_ids = [
|
|
user_id for user_id in mentioned_user_ids if user_id not in existing_mentioned_user_ids
|
|
]
|
|
for user_id_str in mentioned_user_ids:
|
|
mention = WorkflowCommentMention(
|
|
comment_id=reply.comment_id, reply_id=reply.id, mentioned_user_id=user_id_str
|
|
)
|
|
session.add(mention)
|
|
|
|
mention_email_payloads: list[dict[str, str]] = []
|
|
comment = session.get(WorkflowComment, reply.comment_id)
|
|
if comment:
|
|
mention_email_payloads = WorkflowCommentService._build_mention_email_payloads(
|
|
session=session,
|
|
tenant_id=comment.tenant_id,
|
|
app_id=comment.app_id,
|
|
mentioner_id=user_id,
|
|
mentioned_user_ids=new_mentioned_user_ids,
|
|
content=content,
|
|
)
|
|
|
|
session.commit()
|
|
session.refresh(reply) # Refresh to get updated timestamp
|
|
WorkflowCommentService._dispatch_mention_emails(mention_email_payloads)
|
|
|
|
return {"id": reply.id, "updated_at": reply.updated_at}
|
|
|
|
@staticmethod
|
|
def delete_reply(reply_id: str, user_id: str) -> None:
|
|
"""Delete a comment reply."""
|
|
with Session(db.engine, expire_on_commit=False) as session:
|
|
reply = session.get(WorkflowCommentReply, reply_id)
|
|
if not reply:
|
|
raise NotFound("Reply not found")
|
|
|
|
# Only the creator can delete the reply
|
|
if reply.created_by != user_id:
|
|
raise Forbidden("Only the reply creator can delete it")
|
|
|
|
# Delete associated mentions first
|
|
mentions = session.scalars(
|
|
select(WorkflowCommentMention).where(WorkflowCommentMention.reply_id == reply_id)
|
|
).all()
|
|
for mention in mentions:
|
|
session.delete(mention)
|
|
|
|
session.delete(reply)
|
|
session.commit()
|
|
|
|
@staticmethod
|
|
def validate_comment_access(comment_id: str, tenant_id: str, app_id: str) -> WorkflowComment:
|
|
"""Validate that a comment belongs to the specified tenant and app."""
|
|
return WorkflowCommentService.get_comment(tenant_id, app_id, comment_id)
|