From e650109f94c9b35abd02b26837fa8bc22d6d4c2c Mon Sep 17 00:00:00 2001 From: CodingOnStar Date: Tue, 31 Mar 2026 17:57:10 +0800 Subject: [PATCH] feat(collaboration): enhance collaboration features with Loro integration and improved hooks - Introduced a new module to manage Loro integration for web environments. - Updated to ensure Loro is initialized before establishing connections. - Enhanced hook to conditionally enable collaboration based on workflow state. - Added tests for to verify connection handling and initialization behavior. - Refactored components to utilize the updated collaboration hooks, improving overall workflow collaboration experience. --- .../prompt-editor/__tests__/index.spec.tsx | 16 ++ .../components/base/prompt-editor/index.tsx | 12 +- .../workflow-app/__tests__/index.spec.tsx | 30 ++- .../__tests__/workflow-main.spec.tsx | 59 +++++- .../workflow-app/components/workflow-main.tsx | 3 +- web/app/components/workflow-app/index.tsx | 3 +- .../collaboration-manager.connect.spec.ts | 123 +++++++++++ ...llaboration-manager.merge-behavior.test.ts | 16 +- .../__tests__/collaboration-manager.test.ts | 14 +- .../core/collaboration-manager.ts | 198 ++++++++++++------ .../collaboration/core/crdt-provider.ts | 6 +- .../workflow/collaboration/core/loro-web.ts | 43 ++++ .../__tests__/use-collaboration.spec.tsx | 105 ++++++++++ .../collaboration/hooks/use-collaboration.ts | 12 +- .../workflow/header/online-users.tsx | 3 +- .../_base/components/workflow-panel/index.tsx | 3 +- .../components/workflow/nodes/_base/node.tsx | 3 +- .../nodes/agent/__tests__/use-config.spec.tsx | 138 ++++++++++++ .../workflow/nodes/agent/use-config.ts | 12 +- 19 files changed, 687 insertions(+), 112 deletions(-) create mode 100644 web/app/components/workflow/collaboration/core/__tests__/collaboration-manager.connect.spec.ts create mode 100644 web/app/components/workflow/collaboration/core/loro-web.ts create mode 100644 web/app/components/workflow/collaboration/hooks/__tests__/use-collaboration.spec.tsx create mode 100644 web/app/components/workflow/nodes/agent/__tests__/use-config.spec.tsx diff --git a/web/app/components/base/prompt-editor/__tests__/index.spec.tsx b/web/app/components/base/prompt-editor/__tests__/index.spec.tsx index e8d7bc716f..e8bc8194c6 100644 --- a/web/app/components/base/prompt-editor/__tests__/index.spec.tsx +++ b/web/app/components/base/prompt-editor/__tests__/index.spec.tsx @@ -365,6 +365,22 @@ describe('PromptEditor', () => { expect(() => unmount()).not.toThrow() }) + it('should rerender rapidly without triggering a ref update loop', () => { + const { rerender } = render( + + + , + ) + + expect(() => { + rerender( + + + , + ) + }).not.toThrow() + }) + it('should render hitl block when show=true', () => { render( = ({ const [floatingAnchorElem, setFloatingAnchorElem] = useState(null) - const onRef = (floatingAnchorElement: HTMLDivElement | null) => { - if (floatingAnchorElement !== null) - setFloatingAnchorElem(floatingAnchorElement) - } + const onRef = useCallback((floatingAnchorElement: HTMLDivElement | null) => { + if (floatingAnchorElement === null) + return + + setFloatingAnchorElem(prev => prev === floatingAnchorElement ? prev : floatingAnchorElement) + }, []) return ( diff --git a/web/app/components/workflow-app/__tests__/index.spec.tsx b/web/app/components/workflow-app/__tests__/index.spec.tsx index edba570fd6..d2c1246343 100644 --- a/web/app/components/workflow-app/__tests__/index.spec.tsx +++ b/web/app/components/workflow-app/__tests__/index.spec.tsx @@ -19,6 +19,7 @@ const mockUseSubscription = vi.fn() const mockCollaborationSetNodes = vi.fn() const mockCollaborationSetEdges = vi.fn() const mockEmitGraphViewActive = vi.fn() +const mockUseCollaboration = vi.fn() let appStoreState: { appDetail?: { @@ -63,6 +64,8 @@ let searchParamsValue: string | null = null const workflowUiState = { appId: 'app-1', isResponding: false, + isRestoring: false, + historyWorkflowData: undefined as Record | undefined, showUpgradeRuntimeModal: false, setShowUpgradeRuntimeModal: mockSetShowUpgradeRuntimeModal, } @@ -145,7 +148,7 @@ vi.mock('@/context/event-emitter', () => ({ })) vi.mock('@/app/components/workflow/collaboration', () => ({ - useCollaboration: () => undefined, + useCollaboration: (...args: unknown[]) => mockUseCollaboration(...args), })) vi.mock('@/app/components/workflow/collaboration/core/collaboration-manager', () => ({ @@ -293,6 +296,8 @@ describe('WorkflowApp', () => { mockInitialEdges.mockReturnValue([{ id: 'edge-1' }]) mockGetWorkflowRunAndTraceUrl.mockReturnValue({ runUrl: '/runs/run-1' }) mockSyncWorkflowDraftImmediately.mockResolvedValue(undefined) + workflowUiState.isRestoring = false + workflowUiState.historyWorkflowData = undefined }) it('should render the loading shell while workflow data is still loading', () => { @@ -427,4 +432,27 @@ describe('WorkflowApp', () => { expect(mockSetShowInputsPanel).not.toHaveBeenCalled() expect(mockSetShowDebugAndPreviewPanel).not.toHaveBeenCalled() }) + + it.each([ + { + description: 'the workflow is restoring', + workflowState: { + isRestoring: true, + historyWorkflowData: undefined, + }, + }, + { + description: 'a historical workflow version is selected', + workflowState: { + isRestoring: false, + historyWorkflowData: { id: 'history-1' }, + }, + }, + ])('should disable the collaboration session when $description', ({ workflowState }) => { + Object.assign(workflowUiState, workflowState) + + render() + + expect(mockUseCollaboration).toHaveBeenCalledWith('app-1', undefined, false) + }) }) diff --git a/web/app/components/workflow-app/components/__tests__/workflow-main.spec.tsx b/web/app/components/workflow-app/components/__tests__/workflow-main.spec.tsx index af17db7bff..331032ed98 100644 --- a/web/app/components/workflow-app/components/__tests__/workflow-main.spec.tsx +++ b/web/app/components/workflow-app/components/__tests__/workflow-main.spec.tsx @@ -14,9 +14,12 @@ const mockGetNodes = vi.fn() const mockSetNodes = vi.fn() const mockGetEdges = vi.fn() const mockSetEdges = vi.fn() +const mockUseCollaboration = vi.fn() const workflowUiState = { appId: 'app-1', + isRestoring: false, + historyWorkflowData: undefined as Record | undefined, } const hookFns = { @@ -97,14 +100,17 @@ vi.mock('@/app/components/workflow/collaboration', () => ({ onWorkflowUpdate: vi.fn(() => vi.fn()), onSyncRequest: vi.fn(() => vi.fn()), }, - useCollaboration: () => ({ - startCursorTracking: mockStartCursorTracking, - stopCursorTracking: mockStopCursorTracking, - onlineUsers: [], - cursors: {}, - isConnected: false, - isEnabled: false, - }), + useCollaboration: (...args: unknown[]) => { + mockUseCollaboration(...args) + return { + startCursorTracking: mockStartCursorTracking, + stopCursorTracking: mockStopCursorTracking, + onlineUsers: [], + cursors: {}, + isConnected: false, + isEnabled: false, + } + }, })) vi.mock('@/app/components/workflow/block-selector/context/mcp-tool-availability-context', () => ({ @@ -215,6 +221,8 @@ describe('WorkflowMain', () => { capturedContextProps = null mockGetNodes.mockReturnValue([]) mockGetEdges.mockReturnValue([]) + workflowUiState.isRestoring = false + workflowUiState.historyWorkflowData = undefined }) it('should render the inner workflow context with children and forwarded graph props', () => { @@ -331,4 +339,39 @@ describe('WorkflowMain', () => { configsMap: { flowId: 'app-1', flowType: 'app-flow', fileSettings: { enabled: true } }, }) }) + + it.each([ + { + description: 'the workflow is restoring', + workflowState: { + isRestoring: true, + historyWorkflowData: undefined, + }, + }, + { + description: 'viewing workflow history', + workflowState: { + isRestoring: false, + historyWorkflowData: { id: 'history-1' }, + }, + }, + ])('should disable collaboration when $description', ({ workflowState }) => { + Object.assign(workflowUiState, workflowState) + + render( + , + ) + + expect(mockUseCollaboration).toHaveBeenCalledWith( + 'app-1', + expect.objectContaining({ + getState: expect.any(Function), + }), + false, + ) + }) }) diff --git a/web/app/components/workflow-app/components/workflow-main.tsx b/web/app/components/workflow-app/components/workflow-main.tsx index 59476544e0..09d543dc58 100644 --- a/web/app/components/workflow-app/components/workflow-main.tsx +++ b/web/app/components/workflow-app/components/workflow-main.tsx @@ -50,6 +50,7 @@ const WorkflowMain = ({ const featuresStore = useFeaturesStore() const workflowStore = useWorkflowStore() const appId = useStore(s => s.appId) + const isWorkflowCollaborationEnabled = useStore(s => !s.isRestoring && !s.historyWorkflowData) const containerRef = useRef(null) const reactFlow = useReactFlow() @@ -68,7 +69,7 @@ const WorkflowMain = ({ cursors, isConnected, isEnabled: isCollaborationEnabled, - } = useCollaboration(appId || '', reactFlowStore) + } = useCollaboration(appId || '', reactFlowStore, isWorkflowCollaborationEnabled) const myUserId = useMemo( () => (isCollaborationEnabled && isConnected ? 'current-user' : null), [isCollaborationEnabled, isConnected], diff --git a/web/app/components/workflow-app/index.tsx b/web/app/components/workflow-app/index.tsx index cfab038e18..be92ca7a82 100644 --- a/web/app/components/workflow-app/index.tsx +++ b/web/app/components/workflow-app/index.tsx @@ -65,7 +65,8 @@ const SkillMain = dynamic(() => import('@/app/components/workflow/skill/main'), const CollaborationSession = () => { const appId = useStore(s => s.appId) - useCollaboration(appId || '') + const isCollaborationSessionEnabled = useStore(s => !s.isRestoring && !s.historyWorkflowData) + useCollaboration(appId || '', undefined, isCollaborationSessionEnabled) return null } diff --git a/web/app/components/workflow/collaboration/core/__tests__/collaboration-manager.connect.spec.ts b/web/app/components/workflow/collaboration/core/__tests__/collaboration-manager.connect.spec.ts new file mode 100644 index 0000000000..aabb83a749 --- /dev/null +++ b/web/app/components/workflow/collaboration/core/__tests__/collaboration-manager.connect.spec.ts @@ -0,0 +1,123 @@ +import { CollaborationManager } from '../collaboration-manager' + +const mocks = vi.hoisted(() => { + const socket = { + connected: false, + emit: vi.fn(), + id: 'socket-1', + off: vi.fn(), + on: vi.fn(), + } + + return { + connectSocket: vi.fn(() => socket), + disconnectSocket: vi.fn(), + emitWithAuthGuard: vi.fn(), + getSocket: vi.fn(() => socket), + initLoro: vi.fn<() => Promise>(), + isConnected: vi.fn(() => false), + LoroDoc: vi.fn(function MockLoroDoc(this: { getMap: ReturnType }) { + this.getMap = vi.fn(() => ({ + get: vi.fn(), + keys: vi.fn(() => []), + subscribe: vi.fn(), + values: vi.fn(() => []), + })) + }), + providerDestroy: vi.fn(), + UndoManager: vi.fn(function MockUndoManager(this: { canRedo: ReturnType, canUndo: ReturnType }) { + this.canRedo = vi.fn(() => false) + this.canUndo = vi.fn(() => false) + }), + } +}) + +vi.mock('../loro-web', () => ({ + default: () => mocks.initLoro(), + LoroDoc: mocks.LoroDoc, + LoroList: class {}, + LoroMap: class {}, + UndoManager: mocks.UndoManager, +})) + +vi.mock('../crdt-provider', () => ({ + CRDTProvider: vi.fn(function MockCRDTProvider(this: { destroy: typeof mocks.providerDestroy }) { + this.destroy = mocks.providerDestroy + }), +})) + +vi.mock('../websocket-manager', () => ({ + emitWithAuthGuard: (...args: unknown[]) => mocks.emitWithAuthGuard(...args), + webSocketClient: { + connect: mocks.connectSocket, + disconnect: (appId?: string) => mocks.disconnectSocket(appId), + getSocket: mocks.getSocket, + isConnected: mocks.isConnected, + }, +})) + +type CollaborationManagerInternals = { + activeConnections: Set + connectionInitializationPromise: Promise | null + currentAppId: string | null + doc: unknown +} + +const getManagerInternals = (manager: CollaborationManager): CollaborationManagerInternals => + manager as unknown as CollaborationManagerInternals + +const createDeferred = () => { + let resolve!: () => void + const promise = new Promise((res) => { + resolve = res + }) + + return { + promise, + resolve, + } +} + +// Covers Loro wasm bootstrapping during workflow collaboration startup. +describe('CollaborationManager connect', () => { + beforeEach(() => { + vi.clearAllMocks() + mocks.initLoro.mockResolvedValue(undefined) + }) + + it('should rollback connection state when Loro initialization fails', async () => { + const manager = new CollaborationManager() + const internals = getManagerInternals(manager) + const initError = new Error('init failed') + mocks.initLoro.mockRejectedValueOnce(initError) + + await expect(manager.connect('app-1')).rejects.toThrow(initError) + + expect(mocks.connectSocket).toHaveBeenCalledWith('app-1') + expect(mocks.disconnectSocket).toHaveBeenCalledWith('app-1') + expect(mocks.LoroDoc).not.toHaveBeenCalled() + expect(internals.currentAppId).toBeNull() + expect(internals.doc).toBeNull() + expect(internals.connectionInitializationPromise).toBeNull() + expect(internals.activeConnections.size).toBe(0) + }) + + it('should reuse the in-flight initialization for concurrent callers', async () => { + const manager = new CollaborationManager() + const deferred = createDeferred() + mocks.initLoro.mockReturnValueOnce(deferred.promise) + + const firstConnect = manager.connect('app-1') + const secondConnect = manager.connect('app-1') + + expect(mocks.initLoro).toHaveBeenCalledTimes(1) + expect(mocks.connectSocket).toHaveBeenCalledTimes(1) + + deferred.resolve() + + await expect(firstConnect).resolves.toMatch(/[a-z0-9]{9}/) + await expect(secondConnect).resolves.toMatch(/[a-z0-9]{9}/) + expect(mocks.LoroDoc).toHaveBeenCalledTimes(1) + expect(mocks.UndoManager).toHaveBeenCalledTimes(1) + }) +}) diff --git a/web/app/components/workflow/collaboration/core/__tests__/collaboration-manager.merge-behavior.test.ts b/web/app/components/workflow/collaboration/core/__tests__/collaboration-manager.merge-behavior.test.ts index 93d1c6b746..819e843b7c 100644 --- a/web/app/components/workflow/collaboration/core/__tests__/collaboration-manager.merge-behavior.test.ts +++ b/web/app/components/workflow/collaboration/core/__tests__/collaboration-manager.merge-behavior.test.ts @@ -1,8 +1,8 @@ -import type { LoroMap } from 'loro-crdt' +import type { LoroDocInstance, LoroMapInstance } from '../loro-web' import type { Node } from '@/app/components/workflow/types' -import { LoroDoc } from 'loro-crdt' import { BlockEnum } from '@/app/components/workflow/types' import { CollaborationManager } from '../collaboration-manager' +import initLoro, { LoroDoc } from '../loro-web' const NODE_ID = 'node-1' const LLM_NODE_ID = 'llm-node' @@ -74,9 +74,9 @@ type ParameterExtractorNodeData = { } type CollaborationManagerInternals = { - doc: LoroDoc - nodesMap: LoroMap - edgesMap: LoroMap + doc: LoroDocInstance + nodesMap: LoroMapInstance + edgesMap: LoroMapInstance syncNodes: (oldNodes: Node[], newNodes: Node[]) => void } @@ -159,7 +159,7 @@ const createParameterExtractorNode = (parameters: ParameterItem[]): Node manager as unknown as CollaborationManagerInternals -const getManager = (doc: LoroDoc) => { +const getManager = (doc: LoroDocInstance) => { const manager = new CollaborationManager() const internals = getManagerInternals(manager) internals.doc = doc @@ -177,6 +177,10 @@ const syncNodes = (manager: CollaborationManager, previous: Node[], next: Node[] const exportNodes = (manager: CollaborationManager) => manager.getNodes() +beforeAll(async () => { + await initLoro() +}) + describe('Loro merge behavior smoke test', () => { it('inspects concurrent edits after merge', () => { const docA = new LoroDoc() diff --git a/web/app/components/workflow/collaboration/core/__tests__/collaboration-manager.test.ts b/web/app/components/workflow/collaboration/core/__tests__/collaboration-manager.test.ts index 1728bcad55..4f4cb0e798 100644 --- a/web/app/components/workflow/collaboration/core/__tests__/collaboration-manager.test.ts +++ b/web/app/components/workflow/collaboration/core/__tests__/collaboration-manager.test.ts @@ -1,13 +1,13 @@ -import type { LoroMap } from 'loro-crdt' +import type { LoroDocInstance, LoroMapInstance } from '../loro-web' import type { NodePanelPresenceMap, NodePanelPresenceUser, } from '@/app/components/workflow/collaboration/types/collaboration' import type { CommonNodeType, Edge, Node } from '@/app/components/workflow/types' -import { LoroDoc } from 'loro-crdt' import { Position } from 'reactflow' import { CollaborationManager } from '@/app/components/workflow/collaboration/core/collaboration-manager' import { BlockEnum } from '@/app/components/workflow/types' +import initLoro, { LoroDoc } from '../loro-web' const NODE_ID = '1760342909316' @@ -88,12 +88,12 @@ type LLMNodeDataWithUnknownTemplate = Omit & { prompt_template: unknown } -type ManagerDoc = LoroDoc | { commit: () => void } +type ManagerDoc = LoroDocInstance | { commit: () => void } type CollaborationManagerInternals = { doc: ManagerDoc - nodesMap: LoroMap - edgesMap: LoroMap + nodesMap: LoroMapInstance + edgesMap: LoroMapInstance syncNodes: (oldNodes: Node[], newNodes: Node[]) => void syncEdges: (oldEdges: Edge[], newEdges: Edge[]) => void applyNodePanelPresenceUpdate: (update: NodePanelPresenceEventData) => void @@ -246,6 +246,10 @@ const setupManager = (): { manager: CollaborationManager, internals: Collaborati return { manager, internals } } +beforeAll(async () => { + await initLoro() +}) + describe('CollaborationManager syncNodes', () => { let manager: CollaborationManager let internals: CollaborationManagerInternals diff --git a/web/app/components/workflow/collaboration/core/collaboration-manager.ts b/web/app/components/workflow/collaboration/core/collaboration-manager.ts index ba84d6ea2a..29edb6bb14 100644 --- a/web/app/components/workflow/collaboration/core/collaboration-manager.ts +++ b/web/app/components/workflow/collaboration/core/collaboration-manager.ts @@ -1,4 +1,3 @@ -import type { Value } from 'loro-crdt' import type { Socket } from 'socket.io-client' import type { CommonNodeType, @@ -16,11 +15,18 @@ import type { RestoreIntentData, RestoreRequestData, } from '../types/collaboration' +import type { + LoroDocInstance, + LoroListInstance, + LoroMapInstance, + UndoManagerInstance, + Value, +} from './loro-web' import { cloneDeep } from 'es-toolkit/object' import { isEqual } from 'es-toolkit/predicate' -import { LoroDoc, LoroList, LoroMap, UndoManager } from 'loro-crdt' import { CRDTProvider } from './crdt-provider' import { EventEmitter } from './event-emitter' +import initLoro, { LoroDoc, LoroList, LoroMap, UndoManager } from './loro-web' import { emitWithAuthGuard, webSocketClient } from './websocket-manager' type NodePanelPresenceEventData = { @@ -105,12 +111,28 @@ const SET_NODES_ANOMALY_LOG_LIMIT = 100 const toLoroValue = (value: unknown): Value => cloneDeep(value) as Value const toLoroRecord = (value: unknown): Record => cloneDeep(value) as Record + +let loroInitializationPromise: Promise | null = null + +const ensureLoroReady = async (): Promise => { + if (!loroInitializationPromise) { + loroInitializationPromise = Promise.resolve(initLoro()) + .then(() => undefined) + .catch((error) => { + loroInitializationPromise = null + throw error + }) + } + + await loroInitializationPromise +} + export class CollaborationManager { - private doc: LoroDoc | null = null - private undoManager: UndoManager | null = null + private doc: LoroDocInstance | null = null + private undoManager: UndoManagerInstance | null = null private provider: CRDTProvider | null = null - private nodesMap: LoroMap> | null = null - private edgesMap: LoroMap> | null = null + private nodesMap: LoroMapInstance> | null = null + private edgesMap: LoroMapInstance> | null = null private eventEmitter = new EventEmitter() private currentAppId: string | null = null private reactFlowStore: ReactFlowStore | null = null @@ -127,6 +149,7 @@ export class CollaborationManager { private graphViewActive: boolean | null = null private graphImportLogs: GraphImportLogEntry[] = [] private setNodesAnomalyLogs: SetNodesAnomalyLogEntry[] = [] + private connectionInitializationPromise: Promise | null = null private pendingImportLog: { timestamp: number sources: Set<'nodes' | 'edges'> @@ -191,13 +214,13 @@ export class CollaborationManager { emitWithAuthGuard(socket, 'graph_event', payload, { onUnauthorized: this.handleSessionUnauthorized }) } - private getNodeContainer(nodeId: string): LoroMap> { + private getNodeContainer(nodeId: string): LoroMapInstance> { if (!this.nodesMap) throw new Error('Nodes map not initialized') let container = this.nodesMap.get(nodeId) as unknown - const isMapContainer = (value: unknown): value is LoroMap> & LoroContainer => { + const isMapContainer = (value: unknown): value is LoroMapInstance> & LoroContainer => { return !!value && typeof (value as LoroContainer).kind === 'function' && (value as LoroContainer).kind?.() === 'Map' } @@ -207,27 +230,27 @@ export class CollaborationManager { const attached = (newContainer as LoroContainer).getAttached?.() ?? newContainer container = attached if (previousValue && typeof previousValue === 'object') - this.populateNodeContainer(container as LoroMap>, previousValue as Node) + this.populateNodeContainer(container as LoroMapInstance>, previousValue as Node) } else { const attached = (container as LoroContainer).getAttached?.() ?? container container = attached } - return container as LoroMap> + return container as LoroMapInstance> } - private ensureDataContainer(nodeContainer: LoroMap>): LoroMap> { + private ensureDataContainer(nodeContainer: LoroMapInstance>): LoroMapInstance> { let dataContainer = nodeContainer.get('data') as unknown if (!dataContainer || typeof (dataContainer as LoroContainer).kind !== 'function' || (dataContainer as LoroContainer).kind?.() !== 'Map') dataContainer = nodeContainer.setContainer('data', new LoroMap()) const attached = (dataContainer as LoroContainer).getAttached?.() ?? dataContainer - return attached as LoroMap> + return attached as LoroMapInstance> } - private ensureList(nodeContainer: LoroMap>, key: string): LoroList { + private ensureList(nodeContainer: LoroMapInstance>, key: string): LoroListInstance { const dataContainer = this.ensureDataContainer(nodeContainer) let list = dataContainer.get(key) as unknown @@ -235,7 +258,7 @@ export class CollaborationManager { list = dataContainer.setContainer(key, new LoroList()) const attached = (list as LoroContainer).getAttached?.() ?? list - return attached as LoroList + return attached as LoroListInstance } private exportNode(nodeId: string): Node { @@ -247,7 +270,7 @@ export class CollaborationManager { } } - private populateNodeContainer(container: LoroMap>, node: Node): void { + private populateNodeContainer(container: LoroMapInstance>, node: Node): void { const listFields = new Set(['variables', 'prompt_template', 'parameters']) container.set('id', node.id) container.set('type', node.type) @@ -325,7 +348,7 @@ export class CollaborationManager { return (syncDataAllowList.has(key) || !key.startsWith('_')) && key !== 'selected' } - private syncList(nodeContainer: LoroMap>, key: string, desired: Array): void { + private syncList(nodeContainer: LoroMapInstance>, key: string, desired: Array): void { const list = this.ensureList(nodeContainer, key) const current = list.toJSON() as Array const target = Array.isArray(desired) ? desired : [] @@ -471,71 +494,107 @@ export class CollaborationManager { if (reactFlowStore) this.reactFlowStore = reactFlowStore + if (this.connectionInitializationPromise) { + try { + await this.connectionInitializationPromise + return connectionId + } + catch (error) { + this.activeConnections.delete(connectionId) + throw error + } + } + + const initializationPromise = this.initializeConnection(appId) + this.connectionInitializationPromise = initializationPromise + + try { + await initializationPromise + } + catch (error) { + this.activeConnections.delete(connectionId) + throw error + } + finally { + if (this.connectionInitializationPromise === initializationPromise) + this.connectionInitializationPromise = null + } + + return connectionId + } + + private async initializeConnection(appId: string): Promise { const socket = webSocketClient.connect(appId) // Setup event listeners BEFORE any other operations this.setupSocketEventListeners(socket) - this.doc = new LoroDoc() - this.nodesMap = this.doc.getMap('nodes') as LoroMap> - this.edgesMap = this.doc.getMap('edges') as LoroMap> + try { + await ensureLoroReady() - // Initialize UndoManager for collaborative undo/redo - this.undoManager = new UndoManager(this.doc, { - maxUndoSteps: 100, - mergeInterval: 500, // Merge operations within 500ms - excludeOriginPrefixes: [], // Don't exclude anything - let UndoManager track all local operations - onPush: (_isUndo, _range, _event) => { - // Store current selection state when an operation is pushed - const selectedNode = this.reactFlowStore?.getState().getNodes().find((n: Node) => n.data?.selected) + this.doc = new LoroDoc() + this.nodesMap = this.doc.getMap('nodes') as LoroMapInstance> + this.edgesMap = this.doc.getMap('edges') as LoroMapInstance> - // Emit event to update UI button states when new operation is pushed - setTimeout(() => { - this.eventEmitter.emit('undoRedoStateChange', { - canUndo: this.undoManager?.canUndo() || false, - canRedo: this.undoManager?.canRedo() || false, - }) - }, 0) + // Initialize UndoManager for collaborative undo/redo + this.undoManager = new UndoManager(this.doc, { + maxUndoSteps: 100, + mergeInterval: 500, // Merge operations within 500ms + excludeOriginPrefixes: [], // Don't exclude anything - let UndoManager track all local operations + onPush: (_isUndo, _range, _event) => { + // Store current selection state when an operation is pushed + const selectedNode = this.reactFlowStore?.getState().getNodes().find((n: Node) => n.data?.selected) - return { - value: { - selectedNodeId: selectedNode?.id || null, - timestamp: Date.now(), - }, - cursors: [], - } - }, - onPop: (_isUndo, value, _counterRange) => { - // Restore selection state when undoing/redoing - if (value?.value && typeof value.value === 'object' && 'selectedNodeId' in value.value && this.reactFlowStore) { - const selectedNodeId = (value.value as { selectedNodeId?: string | null }).selectedNodeId - if (selectedNodeId) { - const state = this.reactFlowStore.getState() - const { setNodes } = state - const nodes = state.getNodes() - const newNodes = nodes.map((n: Node) => ({ - ...n, - data: { - ...n.data, - selected: n.id === selectedNodeId, - }, - })) - this.captureSetNodesAnomaly(nodes, newNodes, 'reactflow-native:undo-redo-selection-restore') - setNodes(newNodes) + // Emit event to update UI button states when new operation is pushed + setTimeout(() => { + this.eventEmitter.emit('undoRedoStateChange', { + canUndo: this.undoManager?.canUndo() || false, + canRedo: this.undoManager?.canRedo() || false, + }) + }, 0) + + return { + value: { + selectedNodeId: selectedNode?.id || null, + timestamp: Date.now(), + }, + cursors: [], } - } - }, - }) + }, + onPop: (_isUndo, value, _counterRange) => { + // Restore selection state when undoing/redoing + if (value?.value && typeof value.value === 'object' && 'selectedNodeId' in value.value && this.reactFlowStore) { + const selectedNodeId = (value.value as { selectedNodeId?: string | null }).selectedNodeId + if (selectedNodeId) { + const state = this.reactFlowStore.getState() + const { setNodes } = state + const nodes = state.getNodes() + const newNodes = nodes.map((n: Node) => ({ + ...n, + data: { + ...n.data, + selected: n.id === selectedNodeId, + }, + })) + this.captureSetNodesAnomaly(nodes, newNodes, 'reactflow-native:undo-redo-selection-restore') + setNodes(newNodes) + } + } + }, + }) - this.provider = new CRDTProvider(socket, this.doc, this.handleSessionUnauthorized) + this.provider = new CRDTProvider(socket, this.doc, this.handleSessionUnauthorized) - this.setupSubscriptions() + this.setupSubscriptions() - // Force user_connect if already connected - if (socket.connected) - emitWithAuthGuard(socket, 'user_connect', { workflow_id: appId }, { onUnauthorized: this.handleSessionUnauthorized }) - - return connectionId + // Force user_connect if already connected + if (socket.connected) + emitWithAuthGuard(socket, 'user_connect', { workflow_id: appId }, { onUnauthorized: this.handleSessionUnauthorized }) + } + catch (error) { + this.forceDisconnect() + throw error + } } disconnect = (connectionId?: string): void => { @@ -564,6 +623,7 @@ export class CollaborationManager { this.onlineUsers = [] this.isUndoRedoInProgress = false this.rejoinInProgress = false + this.connectionInitializationPromise = null this.clearGraphImportLog() // Only reset leader status when actually disconnecting diff --git a/web/app/components/workflow/collaboration/core/crdt-provider.ts b/web/app/components/workflow/collaboration/core/crdt-provider.ts index ce3fff4b32..93b8187d4d 100644 --- a/web/app/components/workflow/collaboration/core/crdt-provider.ts +++ b/web/app/components/workflow/collaboration/core/crdt-provider.ts @@ -1,13 +1,13 @@ -import type { LoroDoc } from 'loro-crdt' import type { Socket } from 'socket.io-client' +import type { LoroDocInstance } from './loro-web' import { emitWithAuthGuard } from './websocket-manager' export class CRDTProvider { - private doc: LoroDoc + private doc: LoroDocInstance private socket: Socket private onUnauthorized?: () => void - constructor(socket: Socket, doc: LoroDoc, onUnauthorized?: () => void) { + constructor(socket: Socket, doc: LoroDocInstance, onUnauthorized?: () => void) { this.socket = socket this.doc = doc this.onUnauthorized = onUnauthorized diff --git a/web/app/components/workflow/collaboration/core/loro-web.ts b/web/app/components/workflow/collaboration/core/loro-web.ts new file mode 100644 index 0000000000..45769dc1a4 --- /dev/null +++ b/web/app/components/workflow/collaboration/core/loro-web.ts @@ -0,0 +1,43 @@ +import type { + Container, + LoroDoc as LoroDocShape, + LoroList as LoroListShape, + LoroMap as LoroMapShape, + UndoManager as UndoManagerShape, + Value, +} from 'loro-crdt' +import { + LoroDoc as NodeLoroDoc, + LoroList as NodeLoroList, + LoroMap as NodeLoroMap, + UndoManager as NodeUndoManager, +} from 'loro-crdt' +// eslint-disable-next-line antfu/no-import-node-modules-by-path -- loro-crdt does not export a browser-ready wasm entry, so collaboration must target the web bundle file directly. +import initWebLoro, { + LoroDoc as WebLoroDoc, + LoroList as WebLoroList, + LoroMap as WebLoroMap, + UndoManager as WebUndoManager, +} from '../../../../../node_modules/loro-crdt/web/loro_wasm.js' + +const shouldUseWebLoro = typeof window !== 'undefined' && !import.meta.env?.VITEST + +export type LoroDocInstance = Record> = LoroDocShape +export type LoroListInstance = LoroListShape +export type LoroMapInstance = Record> = LoroMapShape +export type UndoManagerInstance = UndoManagerShape +export type { Value } + +export const LoroDoc = (shouldUseWebLoro ? WebLoroDoc : NodeLoroDoc) as typeof NodeLoroDoc +export const LoroList = (shouldUseWebLoro ? WebLoroList : NodeLoroList) as typeof NodeLoroList +export const LoroMap = (shouldUseWebLoro ? WebLoroMap : NodeLoroMap) as typeof NodeLoroMap +export const UndoManager = (shouldUseWebLoro ? WebUndoManager : NodeUndoManager) as typeof NodeUndoManager + +const initLoro = async (): Promise => { + if (!shouldUseWebLoro) + return + + await initWebLoro() +} + +export default initLoro diff --git a/web/app/components/workflow/collaboration/hooks/__tests__/use-collaboration.spec.tsx b/web/app/components/workflow/collaboration/hooks/__tests__/use-collaboration.spec.tsx new file mode 100644 index 0000000000..6fe572fb5a --- /dev/null +++ b/web/app/components/workflow/collaboration/hooks/__tests__/use-collaboration.spec.tsx @@ -0,0 +1,105 @@ +import { renderHook, waitFor } from '@testing-library/react' +import { useCollaboration } from '../use-collaboration' + +const mocks = vi.hoisted(() => ({ + connect: vi.fn<(appId: string) => Promise>().mockResolvedValue('connection-1'), + disconnect: vi.fn<(connectionId?: string) => void>(), + onStateChange: vi.fn(() => vi.fn()), + onCursorUpdate: vi.fn(() => vi.fn()), + onOnlineUsersUpdate: vi.fn(() => vi.fn()), + onNodePanelPresenceUpdate: vi.fn(() => vi.fn()), + onLeaderChange: vi.fn(() => vi.fn()), + setReactFlowStore: vi.fn<(store: unknown) => void>(), + isConnected: vi.fn<() => boolean>(() => false), + getLeaderId: vi.fn<() => string | null>(() => null), + emitCursorMove: vi.fn<(position: unknown) => void>(), +})) + +vi.mock('@/context/global-public-context', () => ({ + useGlobalPublicStore: (selector: (state: { systemFeatures: { enable_collaboration_mode: boolean } }) => T) => selector({ + systemFeatures: { + enable_collaboration_mode: true, + }, + }), +})) + +vi.mock('../../core/collaboration-manager', () => ({ + collaborationManager: { + connect: (appId: string) => mocks.connect(appId), + disconnect: (connectionId?: string) => mocks.disconnect(connectionId), + onStateChange: () => mocks.onStateChange(), + onCursorUpdate: () => mocks.onCursorUpdate(), + onOnlineUsersUpdate: () => mocks.onOnlineUsersUpdate(), + onNodePanelPresenceUpdate: () => mocks.onNodePanelPresenceUpdate(), + onLeaderChange: () => mocks.onLeaderChange(), + setReactFlowStore: (store: unknown) => mocks.setReactFlowStore(store), + isConnected: () => mocks.isConnected(), + getLeaderId: () => mocks.getLeaderId(), + emitCursorMove: (position: unknown) => mocks.emitCursorMove(position), + }, +})) + +describe('useCollaboration', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + it('should skip collaboration setup when disabled by the caller', async () => { + const reactFlowStore = { + getState: vi.fn(), + } + + const { result } = renderHook(() => useCollaboration('app-1', reactFlowStore as never, false)) + + await waitFor(() => { + expect(mocks.setReactFlowStore).toHaveBeenCalledWith(null) + }) + + expect(mocks.connect).not.toHaveBeenCalled() + expect(result.current.isEnabled).toBe(false) + expect(result.current.onlineUsers).toEqual([]) + expect(result.current.nodePanelPresence).toEqual({}) + }) + + it('should connect and attach the react flow store when collaboration is enabled', async () => { + const reactFlowStore = { + getState: vi.fn(), + } + + const { result } = renderHook(() => useCollaboration('app-1', reactFlowStore as never, true)) + + await waitFor(() => { + expect(mocks.connect).toHaveBeenCalledWith('app-1') + expect(mocks.setReactFlowStore).toHaveBeenCalledWith(reactFlowStore) + }) + + expect(result.current.isEnabled).toBe(true) + }) + + it('should disconnect and clear the react flow store when collaboration gets disabled', async () => { + const reactFlowStore = { + getState: vi.fn(), + } + + const { rerender } = renderHook( + ({ enabled }) => useCollaboration('app-1', reactFlowStore as never, enabled), + { + initialProps: { + enabled: true, + }, + }, + ) + + await waitFor(() => { + expect(mocks.connect).toHaveBeenCalledWith('app-1') + expect(mocks.setReactFlowStore).toHaveBeenCalledWith(reactFlowStore) + }) + + rerender({ enabled: false }) + + await waitFor(() => { + expect(mocks.disconnect).toHaveBeenCalledWith('connection-1') + expect(mocks.setReactFlowStore).toHaveBeenLastCalledWith(null) + }) + }) +}) diff --git a/web/app/components/workflow/collaboration/hooks/use-collaboration.ts b/web/app/components/workflow/collaboration/hooks/use-collaboration.ts index 03a9749961..5b621a5250 100644 --- a/web/app/components/workflow/collaboration/hooks/use-collaboration.ts +++ b/web/app/components/workflow/collaboration/hooks/use-collaboration.ts @@ -28,15 +28,17 @@ const initialState: CollaborationViewState = { isLeader: false, } -export function useCollaboration(appId: string, reactFlowStore?: ReactFlowStore) { +export function useCollaboration(appId: string, reactFlowStore?: ReactFlowStore, enabled = true) { const [state, setState] = useState(initialState) const cursorServiceRef = useRef(null) const lastDisconnectReasonRef = useRef(null) - const isCollaborationEnabled = useGlobalPublicStore(s => s.systemFeatures.enable_collaboration_mode) + const isCollaborationFeatureEnabled = useGlobalPublicStore(s => s.systemFeatures.enable_collaboration_mode) + const isCollaborationEnabled = isCollaborationFeatureEnabled && enabled useEffect(() => { if (!appId || !isCollaborationEnabled) { + lastDisconnectReasonRef.current = null Promise.resolve().then(() => { setState(initialState) }) @@ -109,14 +111,16 @@ export function useCollaboration(appId: string, reactFlowStore?: ReactFlowStore) }, [appId, isCollaborationEnabled]) useEffect(() => { - if (!reactFlowStore) + if (!isCollaborationEnabled || !reactFlowStore) { + collaborationManager.setReactFlowStore(null) return + } collaborationManager.setReactFlowStore(reactFlowStore) return () => { collaborationManager.setReactFlowStore(null) } - }, [reactFlowStore]) + }, [isCollaborationEnabled, reactFlowStore]) const prevIsConnected = useRef(false) useEffect(() => { diff --git a/web/app/components/workflow/header/online-users.tsx b/web/app/components/workflow/header/online-users.tsx index 5b80ea0e90..2b35258d5f 100644 --- a/web/app/components/workflow/header/online-users.tsx +++ b/web/app/components/workflow/header/online-users.tsx @@ -83,7 +83,8 @@ const OnlineUserAvatar = ({ const OnlineUsers = () => { const { t } = useTranslation() const appId = useStore(s => s.appId) - const { onlineUsers, cursors, isEnabled: isCollaborationEnabled } = useCollaboration(appId as string) + const isWorkflowCollaborationEnabled = useStore(s => !s.isRestoring && !s.historyWorkflowData) + const { onlineUsers, cursors, isEnabled: isCollaborationEnabled } = useCollaboration(appId as string, undefined, isWorkflowCollaborationEnabled) const { userProfile } = useAppContext() const reactFlow = useReactFlow() const [dropdownOpen, setDropdownOpen] = useState(false) diff --git a/web/app/components/workflow/nodes/_base/components/workflow-panel/index.tsx b/web/app/components/workflow/nodes/_base/components/workflow-panel/index.tsx index 80ba0e5a3f..f40d27d113 100644 --- a/web/app/components/workflow/nodes/_base/components/workflow-panel/index.tsx +++ b/web/app/components/workflow/nodes/_base/components/workflow-panel/index.tsx @@ -102,8 +102,9 @@ const BasePanel: FC = ({ const { t } = useTranslation() const language = useLanguage() const appId = useStore(s => s.appId) + const isWorkflowCollaborationEnabled = useStore(s => !s.isRestoring && !s.historyWorkflowData) const { userProfile } = useAppContext() - const { isConnected, nodePanelPresence } = useCollaboration(appId as string) + const { isConnected, nodePanelPresence } = useCollaboration(appId as string, undefined, isWorkflowCollaborationEnabled) const { showMessageLogModal } = useAppStore(useShallow(state => ({ showMessageLogModal: state.showMessageLogModal, }))) diff --git a/web/app/components/workflow/nodes/_base/node.tsx b/web/app/components/workflow/nodes/_base/node.tsx index 07bd28f45c..4c2d0f7a90 100644 --- a/web/app/components/workflow/nodes/_base/node.tsx +++ b/web/app/components/workflow/nodes/_base/node.tsx @@ -88,7 +88,8 @@ const BaseNode: FC = ({ const resolvedWorkflowStore = workflowStore ?? fallbackWorkflowStoreRef.current const appId = useZustandStore(resolvedWorkflowStore, s => s.appId) const controlMode = useZustandStore(resolvedWorkflowStore, s => s.controlMode) - const { nodePanelPresence } = useCollaboration(appId as string) + const isWorkflowCollaborationEnabled = useZustandStore(resolvedWorkflowStore, s => !s.isRestoring && !s.historyWorkflowData) + const { nodePanelPresence } = useCollaboration(appId as string, undefined, isWorkflowCollaborationEnabled) const { shouldDim: pluginDimmed, isChecking: pluginIsChecking, isMissing: pluginIsMissing, canInstall: pluginCanInstall, uniqueIdentifier: pluginUniqueIdentifier } = useNodePluginInstallation(data) const pluginInstallLocked = !pluginIsChecking && pluginIsMissing && pluginCanInstall && Boolean(pluginUniqueIdentifier) diff --git a/web/app/components/workflow/nodes/agent/__tests__/use-config.spec.tsx b/web/app/components/workflow/nodes/agent/__tests__/use-config.spec.tsx new file mode 100644 index 0000000000..b4ba9a593f --- /dev/null +++ b/web/app/components/workflow/nodes/agent/__tests__/use-config.spec.tsx @@ -0,0 +1,138 @@ +import type { AgentNodeType } from '../types' +import { renderHook } from '@testing-library/react' +import { FormTypeEnum } from '@/app/components/header/account-setting/model-provider-page/declarations' +import { BlockEnum } from '@/app/components/workflow/types' +import { VarType as ToolVarType } from '../../tool/types' +import useConfig from '../use-config' + +const mockSetInputs = vi.fn() +const mockUseNodesReadOnly = vi.fn() +const mockUseNodeCrud = vi.fn() +const mockUseVarList = vi.fn() +const mockUseStrategyProviderDetail = vi.fn() +const mockUseFetchPluginsInMarketPlaceByIds = vi.fn() +const mockUseCheckInstalled = vi.fn() +const mockUseAvailableVarList = vi.fn() +const mockUseIsChatMode = vi.fn() + +vi.mock('@/app/components/workflow/hooks', () => ({ + useIsChatMode: () => mockUseIsChatMode(), + useNodesReadOnly: () => mockUseNodesReadOnly(), +})) + +vi.mock('@/app/components/workflow/nodes/_base/hooks/use-node-crud', () => ({ + default: (...args: unknown[]) => mockUseNodeCrud(...args), +})) + +vi.mock('@/app/components/workflow/nodes/_base/hooks/use-var-list', () => ({ + default: (...args: unknown[]) => mockUseVarList(...args), +})) + +vi.mock('@/app/components/workflow/nodes/_base/hooks/use-available-var-list', () => ({ + default: (...args: unknown[]) => mockUseAvailableVarList(...args), +})) + +vi.mock('@/service/use-strategy', () => ({ + useStrategyProviderDetail: (...args: unknown[]) => mockUseStrategyProviderDetail(...args), +})) + +vi.mock('@/service/use-plugins', () => ({ + useFetchPluginsInMarketPlaceByIds: (...args: unknown[]) => mockUseFetchPluginsInMarketPlaceByIds(...args), + useCheckInstalled: (...args: unknown[]) => mockUseCheckInstalled(...args), +})) + +const createPayload = (overrides: Partial = {}): AgentNodeType => ({ + title: 'Agent', + desc: '', + type: BlockEnum.Agent, + output_schema: {}, + agent_strategy_provider_name: 'provider/agent', + agent_strategy_name: 'react', + agent_strategy_label: 'React Agent', + agent_parameters: { + toolParam: { + type: ToolVarType.constant, + value: { + settings: {}, + parameters: {}, + schemas: [], + }, + }, + }, + ...overrides, +}) + +const createStrategyProviderDetail = () => ({ + declaration: { + strategies: [{ + identity: { + name: 'react', + }, + parameters: [{ + name: 'toolParam', + type: FormTypeEnum.toolSelector, + }], + }], + }, +}) + +describe('agent useConfig', () => { + beforeEach(() => { + vi.clearAllMocks() + + mockUseNodesReadOnly.mockReturnValue({ + nodesReadOnly: false, + }) + mockUseNodeCrud.mockImplementation((_id: string, payload: AgentNodeType) => ({ + inputs: payload, + setInputs: mockSetInputs, + })) + mockUseVarList.mockReturnValue({ + handleVarListChange: vi.fn(), + handleAddVariable: vi.fn(), + }) + mockUseStrategyProviderDetail.mockReturnValue({ + data: createStrategyProviderDetail(), + isLoading: false, + isError: false, + refetch: vi.fn(), + }) + mockUseFetchPluginsInMarketPlaceByIds.mockReturnValue({ + data: { + data: { + plugins: [], + }, + }, + isLoading: false, + refetch: vi.fn(), + }) + mockUseCheckInstalled.mockReturnValue({ + data: { + plugins: [], + }, + }) + mockUseAvailableVarList.mockReturnValue({ + availableVars: [], + availableNodesWithParent: [], + }) + mockUseIsChatMode.mockReturnValue(true) + }) + + it('should skip legacy migration when the node is read-only', () => { + mockUseNodesReadOnly.mockReturnValue({ + nodesReadOnly: true, + }) + + renderHook(() => useConfig('agent-node', createPayload())) + + expect(mockSetInputs).not.toHaveBeenCalled() + }) + + it('should migrate legacy agent tool data when the node is editable', () => { + renderHook(() => useConfig('agent-node', createPayload())) + + expect(mockSetInputs).toHaveBeenCalledWith(expect.objectContaining({ + tool_node_version: '2', + })) + }) +}) diff --git a/web/app/components/workflow/nodes/agent/use-config.ts b/web/app/components/workflow/nodes/agent/use-config.ts index 49311f4d6d..5f61a0972c 100644 --- a/web/app/components/workflow/nodes/agent/use-config.ts +++ b/web/app/components/workflow/nodes/agent/use-config.ts @@ -129,9 +129,7 @@ const useConfig = (id: string, payload: AgentNodeType) => { return res } - const formattingLegacyData = () => { - if (inputs.version || inputs.tool_node_version) - return inputs + const formattingLegacyData = useCallback(() => { const newData = produce(inputs, (draft) => { const schemas = currentStrategy?.parameters || [] Object.keys(draft.agent_parameters || {}).forEach((key) => { @@ -144,15 +142,17 @@ const useConfig = (id: string, payload: AgentNodeType) => { draft.tool_node_version = '2' }) return newData - } + }, [currentStrategy?.parameters, inputs]) + + const shouldFormatLegacyData = Boolean(currentStrategy) && !readOnly && !inputs.version && !inputs.tool_node_version // formatting legacy data useEffect(() => { - if (!currentStrategy) + if (!shouldFormatLegacyData) return const newData = formattingLegacyData() setInputs(newData) - }, [currentStrategy]) + }, [formattingLegacyData, setInputs, shouldFormatLegacyData]) // vars