diff --git a/api/repositories/workflow_collaboration_repository.py b/api/repositories/workflow_collaboration_repository.py index 000f80496d..c6af15b434 100644 --- a/api/repositories/workflow_collaboration_repository.py +++ b/api/repositories/workflow_collaboration_repository.py @@ -17,6 +17,7 @@ class WorkflowSessionInfo(TypedDict): avatar: str | None sid: str connected_at: int + graph_active: bool class SidMapping(TypedDict): @@ -69,6 +70,44 @@ class WorkflowCollaborationRepository: ) self.refresh_session_state(workflow_id, session_info["sid"]) + def get_session_info(self, workflow_id: str, sid: str) -> WorkflowSessionInfo | None: + raw = self._redis.hget(self.workflow_key(workflow_id), sid) + value = self._decode(raw) + if not value: + return None + try: + session_info = json.loads(value) + except (TypeError, json.JSONDecodeError): + return None + + if not isinstance(session_info, dict): + return None + if "user_id" not in session_info or "username" not in session_info or "sid" not in session_info: + return None + + return { + "user_id": str(session_info["user_id"]), + "username": str(session_info["username"]), + "avatar": session_info.get("avatar"), + "sid": str(session_info["sid"]), + "connected_at": int(session_info.get("connected_at") or 0), + "graph_active": bool(session_info.get("graph_active")), + } + + def set_graph_active(self, workflow_id: str, sid: str, active: bool) -> None: + session_info = self.get_session_info(workflow_id, sid) + if not session_info: + return + session_info["graph_active"] = bool(active) + self._redis.hset(self.workflow_key(workflow_id), sid, json.dumps(session_info)) + self.refresh_session_state(workflow_id, sid) + + def is_graph_active(self, workflow_id: str, sid: str) -> bool: + session_info = self.get_session_info(workflow_id, sid) + if not session_info: + return False + return bool(session_info.get("graph_active") or False) + def get_sid_mapping(self, sid: str) -> SidMapping | None: raw = self._redis.get(self.sid_key(sid)) if not raw: @@ -125,6 +164,7 @@ class WorkflowCollaborationRepository: "avatar": session_info.get("avatar"), "sid": str(session_info["sid"]), "connected_at": int(session_info.get("connected_at") or 0), + "graph_active": bool(session_info.get("graph_active")), } ) diff --git a/api/services/workflow_collaboration_service.py b/api/services/workflow_collaboration_service.py index 02d08fcaff..d72299592b 100644 --- a/api/services/workflow_collaboration_service.py +++ b/api/services/workflow_collaboration_service.py @@ -38,12 +38,13 @@ class WorkflowCollaborationService: "avatar": session.get("avatar"), "sid": sid, "connected_at": int(time.time()), + "graph_active": True, } self._repository.set_session_info(workflow_id, session_info) leader_sid = self.get_or_set_leader(workflow_id, sid) - is_leader = leader_sid == sid + is_leader = leader_sid == sid if leader_sid else False self._socketio.enter_room(sid, workflow_id) self.broadcast_online_users(workflow_id) @@ -79,6 +80,15 @@ class WorkflowCollaborationService: if not event_type: return {"msg": "invalid event type"}, 400 + if event_type == "graph_view_active": + is_active = False + if isinstance(event_data, dict): + is_active = bool(event_data.get("active") or False) + self._repository.set_graph_active(workflow_id, sid, is_active) + self.refresh_session_state(workflow_id, sid) + self.broadcast_online_users(workflow_id) + return {"msg": "graph_view_active_updated"}, 200 + self._socketio.emit( "collaboration_update", {"type": event_type, "userId": user_id, "data": event_data, "timestamp": timestamp}, @@ -100,27 +110,33 @@ class WorkflowCollaborationService: return {"msg": "graph_update_broadcasted"}, 200 - def get_or_set_leader(self, workflow_id: str, sid: str) -> str: + def get_or_set_leader(self, workflow_id: str, sid: str) -> str | None: current_leader = self._repository.get_current_leader(workflow_id) if current_leader: - if self.is_session_active(workflow_id, current_leader): + if self.is_session_active(workflow_id, current_leader) and self._repository.is_graph_active( + workflow_id, current_leader + ): return current_leader self._repository.delete_session(workflow_id, current_leader) self._repository.delete_leader(workflow_id) - was_set = self._repository.set_leader_if_absent(workflow_id, sid) + new_leader_sid = self._select_graph_leader(workflow_id, preferred_sid=sid) + if not new_leader_sid: + return None + + was_set = self._repository.set_leader_if_absent(workflow_id, new_leader_sid) if was_set: if current_leader: - self.broadcast_leader_change(workflow_id, sid) - return sid + self.broadcast_leader_change(workflow_id, new_leader_sid) + return new_leader_sid current_leader = self._repository.get_current_leader(workflow_id) if current_leader: return current_leader - return sid + return new_leader_sid def handle_leader_disconnect(self, workflow_id: str, disconnected_sid: str) -> None: current_leader = self._repository.get_current_leader(workflow_id) @@ -130,18 +146,18 @@ class WorkflowCollaborationService: if current_leader != disconnected_sid: return - session_sids = self._repository.get_session_sids(workflow_id) - if session_sids: - new_leader_sid = session_sids[0] + new_leader_sid = self._select_graph_leader(workflow_id) + if new_leader_sid: self._repository.set_leader(workflow_id, new_leader_sid) self.broadcast_leader_change(workflow_id, new_leader_sid) else: self._repository.delete_leader(workflow_id) + self.broadcast_leader_change(workflow_id, None) - def broadcast_leader_change(self, workflow_id: str, new_leader_sid: str) -> None: + def broadcast_leader_change(self, workflow_id: str, new_leader_sid: str | None) -> None: for sid in self._repository.get_session_sids(workflow_id): try: - is_leader = sid == new_leader_sid + is_leader = new_leader_sid is not None and sid == new_leader_sid self._socketio.emit("status", {"isLeader": is_leader}, room=sid) except Exception: logging.exception("Failed to emit leader status to session %s", sid) @@ -167,15 +183,34 @@ class WorkflowCollaborationService: def _ensure_leader(self, workflow_id: str, sid: str) -> None: current_leader = self._repository.get_current_leader(workflow_id) - if current_leader and self.is_session_active(workflow_id, current_leader): + if current_leader and self.is_session_active(workflow_id, current_leader) and self._repository.is_graph_active( + workflow_id, current_leader + ): self._repository.expire_leader(workflow_id) return if current_leader: self._repository.delete_leader(workflow_id) - self._repository.set_leader(workflow_id, sid) - self.broadcast_leader_change(workflow_id, sid) + new_leader_sid = self._select_graph_leader(workflow_id, preferred_sid=sid) + if not new_leader_sid: + self.broadcast_leader_change(workflow_id, None) + return + + self._repository.set_leader(workflow_id, new_leader_sid) + self.broadcast_leader_change(workflow_id, new_leader_sid) + + def _select_graph_leader(self, workflow_id: str, preferred_sid: str | None = None) -> str | None: + session_sids = [ + session["sid"] + for session in self._repository.list_sessions(workflow_id) + if session.get("graph_active") + ] + if not session_sids: + return None + if preferred_sid and preferred_sid in session_sids: + return preferred_sid + return session_sids[0] def is_session_active(self, workflow_id: str, sid: str) -> bool: if not sid: diff --git a/web/app/components/workflow-app/index.tsx b/web/app/components/workflow-app/index.tsx index a153145c76..5117dec637 100644 --- a/web/app/components/workflow-app/index.tsx +++ b/web/app/components/workflow-app/index.tsx @@ -18,6 +18,7 @@ import { FeaturesProvider } from '@/app/components/base/features' import Loading from '@/app/components/base/loading' import { FILE_EXTS } from '@/app/components/base/prompt-editor/constants' import WorkflowWithDefaultContext from '@/app/components/workflow' +import { useCollaboration } from '@/app/components/workflow/collaboration' import { collaborationManager } from '@/app/components/workflow/collaboration/core/collaboration-manager' import { WorkflowContextProvider, @@ -52,6 +53,12 @@ const SkillMain = dynamic(() => import('@/app/components/workflow/skill/main'), ssr: false, }) +const CollaborationSession = () => { + const appId = useStore(s => s.appId) + useCollaboration(appId || '') + return null +} + type WorkflowViewContentProps = { renderGraph: (headerLeftSlot: ReactNode) => ReactNode reload: () => Promise @@ -95,6 +102,20 @@ const WorkflowViewContent = ({ } }, [doSetViewType, refreshGraph, syncWorkflowDraftImmediately, viewType]) + useEffect(() => { + if (!isSupportSandbox) { + collaborationManager.emitGraphViewActive(true) + return () => { + collaborationManager.emitGraphViewActive(false) + } + } + + collaborationManager.emitGraphViewActive(viewType === ViewType.graph) + return () => { + collaborationManager.emitGraphViewActive(false) + } + }, [isSupportSandbox, viewType]) + if (!isSupportSandbox) return renderGraph(null) @@ -321,17 +342,20 @@ const WorkflowAppWithAdditionalContext = () => { } return ( - - - - - + <> + + + + + + + ) } diff --git a/web/app/components/workflow/collaboration/core/collaboration-manager.ts b/web/app/components/workflow/collaboration/core/collaboration-manager.ts index 0f0546ae8c..f7690f010e 100644 --- a/web/app/components/workflow/collaboration/core/collaboration-manager.ts +++ b/web/app/components/workflow/collaboration/core/collaboration-manager.ts @@ -71,11 +71,13 @@ export class CollaborationManager { private leaderId: string | null = null private cursors: Record = {} private nodePanelPresence: NodePanelPresenceMap = {} + private onlineUsers: OnlineUser[] = [] private activeConnections = new Set() private isUndoRedoInProgress = false private pendingInitialSync = false private rejoinInProgress = false private pendingGraphImportEmit = false + private graphViewActive: boolean | null = null private getActiveSocket(): Socket | null { if (!this.currentAppId) @@ -83,6 +85,10 @@ export class CollaborationManager { return webSocketClient.getSocket(this.currentAppId) } + setReactFlowStore(store: ReactFlowStore | null): void { + this.reactFlowStore = store + } + private handleSessionUnauthorized = (): void => { if (this.rejoinInProgress) return @@ -495,6 +501,7 @@ export class CollaborationManager { this.reactFlowStore = null this.cursors = {} this.nodePanelPresence = {} + this.onlineUsers = [] this.isUndoRedoInProgress = false this.rejoinInProgress = false @@ -600,11 +607,15 @@ export class CollaborationManager { } onCursorUpdate(callback: (cursors: Record) => void): () => void { - return this.eventEmitter.on('cursors', callback) + const off = this.eventEmitter.on('cursors', callback) + callback({ ...this.cursors }) + return off } onOnlineUsersUpdate(callback: (users: OnlineUser[]) => void): () => void { - return this.eventEmitter.on('onlineUsers', callback) + const off = this.eventEmitter.on('onlineUsers', callback) + callback([...this.onlineUsers]) + return off } onWorkflowUpdate(callback: (update: { appId: string, timestamp: number }) => void): () => void { @@ -656,6 +667,18 @@ export class CollaborationManager { }) } + emitGraphViewActive(isActive: boolean): void { + this.graphViewActive = isActive + if (!this.currentAppId || !webSocketClient.isConnected(this.currentAppId)) + return + + this.sendCollaborationEvent({ + type: 'graph_view_active', + data: { active: isActive }, + timestamp: Date.now(), + }) + } + onUndoRedoStateChange(callback: (state: { canUndo: boolean, canRedo: boolean }) => void): () => void { return this.eventEmitter.on('undoRedoStateChange', callback) } @@ -1081,6 +1104,7 @@ export class CollaborationManager { if (data.leader && typeof data.leader === 'string') this.leaderId = data.leader + this.onlineUsers = data.users this.eventEmitter.emit('onlineUsers', data.users) this.eventEmitter.emit('cursors', { ...this.cursors }) } @@ -1115,6 +1139,8 @@ export class CollaborationManager { socket.on('connect', () => { this.eventEmitter.emit('stateChange', { isConnected: true }) this.pendingInitialSync = true + if (this.graphViewActive !== null) + this.emitGraphViewActive(this.graphViewActive) }) socket.on('disconnect', () => { diff --git a/web/app/components/workflow/collaboration/hooks/use-collaboration.ts b/web/app/components/workflow/collaboration/hooks/use-collaboration.ts index a8715d7571..4752a1edf7 100644 --- a/web/app/components/workflow/collaboration/hooks/use-collaboration.ts +++ b/web/app/components/workflow/collaboration/hooks/use-collaboration.ts @@ -51,7 +51,7 @@ export function useCollaboration(appId: string, reactFlowStore?: ReactFlowStore) const initCollaboration = async () => { try { - const id = await collaborationManager.connect(appId, reactFlowStore) + const id = await collaborationManager.connect(appId) if (isUnmounted) { collaborationManager.disconnect(id) return @@ -100,7 +100,17 @@ export function useCollaboration(appId: string, reactFlowStore?: ReactFlowStore) if (connectionId) collaborationManager.disconnect(connectionId) } - }, [appId, reactFlowStore, isCollaborationEnabled]) + }, [appId, isCollaborationEnabled]) + + useEffect(() => { + if (!reactFlowStore) + return + + collaborationManager.setReactFlowStore(reactFlowStore) + return () => { + collaborationManager.setReactFlowStore(null) + } + }, [reactFlowStore]) const prevIsConnected = useRef(false) useEffect(() => { diff --git a/web/app/components/workflow/collaboration/types/collaboration.ts b/web/app/components/workflow/collaboration/types/collaboration.ts index d94ad25759..75d6c079dc 100644 --- a/web/app/components/workflow/collaboration/types/collaboration.ts +++ b/web/app/components/workflow/collaboration/types/collaboration.ts @@ -62,6 +62,7 @@ export type CollaborationEventType | 'comments_update' | 'node_panel_presence' | 'app_publish_update' + | 'graph_view_active' | 'graph_resync_request' | 'workflow_restore_request' | 'workflow_restore_intent'