feat(collaboration): enhance collaboration features with Loro integration and improved hooks

- Introduced a new  module to manage Loro integration for web environments.
- Updated  to ensure Loro is initialized before establishing connections.
- Enhanced  hook to conditionally enable collaboration based on workflow state.
- Added tests for  to verify connection handling and initialization behavior.
- Refactored components to utilize the updated collaboration hooks, improving overall workflow collaboration experience.
This commit is contained in:
CodingOnStar
2026-03-31 17:57:10 +08:00
parent 8843a62b4e
commit e650109f94
19 changed files with 687 additions and 112 deletions

View File

@@ -365,6 +365,22 @@ describe('PromptEditor', () => {
expect(() => unmount()).not.toThrow()
})
it('should rerender rapidly without triggering a ref update loop', () => {
const { rerender } = render(
<React.StrictMode>
<PromptEditor value="first" />
</React.StrictMode>,
)
expect(() => {
rerender(
<React.StrictMode>
<PromptEditor value="second" />
</React.StrictMode>,
)
}).not.toThrow()
})
it('should render hitl block when show=true', () => {
render(
<PromptEditor

View File

@@ -38,7 +38,7 @@ import {
TextNode,
} from 'lexical'
import * as React from 'react'
import { useEffect, useState } from 'react'
import { useCallback, useEffect, useState } from 'react'
import { WorkflowContext } from '@/app/components/workflow/context'
import { HooksStoreContext } from '@/app/components/workflow/hooks-store/provider'
import { FileReferenceNode } from '@/app/components/workflow/skill/editor/skill-editor/plugins/file-reference-block/node'
@@ -344,10 +344,12 @@ const PromptEditorContent: FC<PromptEditorContentProps> = ({
const [floatingAnchorElem, setFloatingAnchorElem] = useState<HTMLDivElement | null>(null)
const onRef = (floatingAnchorElement: HTMLDivElement | null) => {
if (floatingAnchorElement !== null)
setFloatingAnchorElem(floatingAnchorElement)
}
const onRef = useCallback((floatingAnchorElement: HTMLDivElement | null) => {
if (floatingAnchorElement === null)
return
setFloatingAnchorElem(prev => prev === floatingAnchorElement ? prev : floatingAnchorElement)
}, [])
return (
<LexicalComposer initialConfig={{ ...initialConfig, editable }}>

View File

@@ -19,6 +19,7 @@ const mockUseSubscription = vi.fn()
const mockCollaborationSetNodes = vi.fn()
const mockCollaborationSetEdges = vi.fn()
const mockEmitGraphViewActive = vi.fn()
const mockUseCollaboration = vi.fn()
let appStoreState: {
appDetail?: {
@@ -63,6 +64,8 @@ let searchParamsValue: string | null = null
const workflowUiState = {
appId: 'app-1',
isResponding: false,
isRestoring: false,
historyWorkflowData: undefined as Record<string, unknown> | undefined,
showUpgradeRuntimeModal: false,
setShowUpgradeRuntimeModal: mockSetShowUpgradeRuntimeModal,
}
@@ -145,7 +148,7 @@ vi.mock('@/context/event-emitter', () => ({
}))
vi.mock('@/app/components/workflow/collaboration', () => ({
useCollaboration: () => undefined,
useCollaboration: (...args: unknown[]) => mockUseCollaboration(...args),
}))
vi.mock('@/app/components/workflow/collaboration/core/collaboration-manager', () => ({
@@ -293,6 +296,8 @@ describe('WorkflowApp', () => {
mockInitialEdges.mockReturnValue([{ id: 'edge-1' }])
mockGetWorkflowRunAndTraceUrl.mockReturnValue({ runUrl: '/runs/run-1' })
mockSyncWorkflowDraftImmediately.mockResolvedValue(undefined)
workflowUiState.isRestoring = false
workflowUiState.historyWorkflowData = undefined
})
it('should render the loading shell while workflow data is still loading', () => {
@@ -427,4 +432,27 @@ describe('WorkflowApp', () => {
expect(mockSetShowInputsPanel).not.toHaveBeenCalled()
expect(mockSetShowDebugAndPreviewPanel).not.toHaveBeenCalled()
})
it.each([
{
description: 'the workflow is restoring',
workflowState: {
isRestoring: true,
historyWorkflowData: undefined,
},
},
{
description: 'a historical workflow version is selected',
workflowState: {
isRestoring: false,
historyWorkflowData: { id: 'history-1' },
},
},
])('should disable the collaboration session when $description', ({ workflowState }) => {
Object.assign(workflowUiState, workflowState)
render(<WorkflowApp />)
expect(mockUseCollaboration).toHaveBeenCalledWith('app-1', undefined, false)
})
})

View File

@@ -14,9 +14,12 @@ const mockGetNodes = vi.fn()
const mockSetNodes = vi.fn()
const mockGetEdges = vi.fn()
const mockSetEdges = vi.fn()
const mockUseCollaboration = vi.fn()
const workflowUiState = {
appId: 'app-1',
isRestoring: false,
historyWorkflowData: undefined as Record<string, unknown> | undefined,
}
const hookFns = {
@@ -97,14 +100,17 @@ vi.mock('@/app/components/workflow/collaboration', () => ({
onWorkflowUpdate: vi.fn(() => vi.fn()),
onSyncRequest: vi.fn(() => vi.fn()),
},
useCollaboration: () => ({
startCursorTracking: mockStartCursorTracking,
stopCursorTracking: mockStopCursorTracking,
onlineUsers: [],
cursors: {},
isConnected: false,
isEnabled: false,
}),
useCollaboration: (...args: unknown[]) => {
mockUseCollaboration(...args)
return {
startCursorTracking: mockStartCursorTracking,
stopCursorTracking: mockStopCursorTracking,
onlineUsers: [],
cursors: {},
isConnected: false,
isEnabled: false,
}
},
}))
vi.mock('@/app/components/workflow/block-selector/context/mcp-tool-availability-context', () => ({
@@ -215,6 +221,8 @@ describe('WorkflowMain', () => {
capturedContextProps = null
mockGetNodes.mockReturnValue([])
mockGetEdges.mockReturnValue([])
workflowUiState.isRestoring = false
workflowUiState.historyWorkflowData = undefined
})
it('should render the inner workflow context with children and forwarded graph props', () => {
@@ -331,4 +339,39 @@ describe('WorkflowMain', () => {
configsMap: { flowId: 'app-1', flowType: 'app-flow', fileSettings: { enabled: true } },
})
})
it.each([
{
description: 'the workflow is restoring',
workflowState: {
isRestoring: true,
historyWorkflowData: undefined,
},
},
{
description: 'viewing workflow history',
workflowState: {
isRestoring: false,
historyWorkflowData: { id: 'history-1' },
},
},
])('should disable collaboration when $description', ({ workflowState }) => {
Object.assign(workflowUiState, workflowState)
render(
<WorkflowMain
nodes={[]}
edges={[]}
viewport={{ x: 0, y: 0, zoom: 1 }}
/>,
)
expect(mockUseCollaboration).toHaveBeenCalledWith(
'app-1',
expect.objectContaining({
getState: expect.any(Function),
}),
false,
)
})
})

View File

@@ -50,6 +50,7 @@ const WorkflowMain = ({
const featuresStore = useFeaturesStore()
const workflowStore = useWorkflowStore()
const appId = useStore(s => s.appId)
const isWorkflowCollaborationEnabled = useStore(s => !s.isRestoring && !s.historyWorkflowData)
const containerRef = useRef<HTMLDivElement>(null)
const reactFlow = useReactFlow()
@@ -68,7 +69,7 @@ const WorkflowMain = ({
cursors,
isConnected,
isEnabled: isCollaborationEnabled,
} = useCollaboration(appId || '', reactFlowStore)
} = useCollaboration(appId || '', reactFlowStore, isWorkflowCollaborationEnabled)
const myUserId = useMemo(
() => (isCollaborationEnabled && isConnected ? 'current-user' : null),
[isCollaborationEnabled, isConnected],

View File

@@ -65,7 +65,8 @@ const SkillMain = dynamic(() => import('@/app/components/workflow/skill/main'),
const CollaborationSession = () => {
const appId = useStore(s => s.appId)
useCollaboration(appId || '')
const isCollaborationSessionEnabled = useStore(s => !s.isRestoring && !s.historyWorkflowData)
useCollaboration(appId || '', undefined, isCollaborationSessionEnabled)
return null
}

View File

@@ -0,0 +1,123 @@
import { CollaborationManager } from '../collaboration-manager'
const mocks = vi.hoisted(() => {
const socket = {
connected: false,
emit: vi.fn(),
id: 'socket-1',
off: vi.fn(),
on: vi.fn(),
}
return {
connectSocket: vi.fn(() => socket),
disconnectSocket: vi.fn(),
emitWithAuthGuard: vi.fn(),
getSocket: vi.fn(() => socket),
initLoro: vi.fn<() => Promise<void>>(),
isConnected: vi.fn(() => false),
LoroDoc: vi.fn(function MockLoroDoc(this: { getMap: ReturnType<typeof vi.fn> }) {
this.getMap = vi.fn(() => ({
get: vi.fn(),
keys: vi.fn(() => []),
subscribe: vi.fn(),
values: vi.fn(() => []),
}))
}),
providerDestroy: vi.fn(),
UndoManager: vi.fn(function MockUndoManager(this: { canRedo: ReturnType<typeof vi.fn>, canUndo: ReturnType<typeof vi.fn> }) {
this.canRedo = vi.fn(() => false)
this.canUndo = vi.fn(() => false)
}),
}
})
vi.mock('../loro-web', () => ({
default: () => mocks.initLoro(),
LoroDoc: mocks.LoroDoc,
LoroList: class {},
LoroMap: class {},
UndoManager: mocks.UndoManager,
}))
vi.mock('../crdt-provider', () => ({
CRDTProvider: vi.fn(function MockCRDTProvider(this: { destroy: typeof mocks.providerDestroy }) {
this.destroy = mocks.providerDestroy
}),
}))
vi.mock('../websocket-manager', () => ({
emitWithAuthGuard: (...args: unknown[]) => mocks.emitWithAuthGuard(...args),
webSocketClient: {
connect: mocks.connectSocket,
disconnect: (appId?: string) => mocks.disconnectSocket(appId),
getSocket: mocks.getSocket,
isConnected: mocks.isConnected,
},
}))
type CollaborationManagerInternals = {
activeConnections: Set<string>
connectionInitializationPromise: Promise<void> | null
currentAppId: string | null
doc: unknown
}
const getManagerInternals = (manager: CollaborationManager): CollaborationManagerInternals =>
manager as unknown as CollaborationManagerInternals
const createDeferred = () => {
let resolve!: () => void
const promise = new Promise<void>((res) => {
resolve = res
})
return {
promise,
resolve,
}
}
// Covers Loro wasm bootstrapping during workflow collaboration startup.
describe('CollaborationManager connect', () => {
beforeEach(() => {
vi.clearAllMocks()
mocks.initLoro.mockResolvedValue(undefined)
})
it('should rollback connection state when Loro initialization fails', async () => {
const manager = new CollaborationManager()
const internals = getManagerInternals(manager)
const initError = new Error('init failed')
mocks.initLoro.mockRejectedValueOnce(initError)
await expect(manager.connect('app-1')).rejects.toThrow(initError)
expect(mocks.connectSocket).toHaveBeenCalledWith('app-1')
expect(mocks.disconnectSocket).toHaveBeenCalledWith('app-1')
expect(mocks.LoroDoc).not.toHaveBeenCalled()
expect(internals.currentAppId).toBeNull()
expect(internals.doc).toBeNull()
expect(internals.connectionInitializationPromise).toBeNull()
expect(internals.activeConnections.size).toBe(0)
})
it('should reuse the in-flight initialization for concurrent callers', async () => {
const manager = new CollaborationManager()
const deferred = createDeferred()
mocks.initLoro.mockReturnValueOnce(deferred.promise)
const firstConnect = manager.connect('app-1')
const secondConnect = manager.connect('app-1')
expect(mocks.initLoro).toHaveBeenCalledTimes(1)
expect(mocks.connectSocket).toHaveBeenCalledTimes(1)
deferred.resolve()
await expect(firstConnect).resolves.toMatch(/[a-z0-9]{9}/)
await expect(secondConnect).resolves.toMatch(/[a-z0-9]{9}/)
expect(mocks.LoroDoc).toHaveBeenCalledTimes(1)
expect(mocks.UndoManager).toHaveBeenCalledTimes(1)
})
})

View File

@@ -1,8 +1,8 @@
import type { LoroMap } from 'loro-crdt'
import type { LoroDocInstance, LoroMapInstance } from '../loro-web'
import type { Node } from '@/app/components/workflow/types'
import { LoroDoc } from 'loro-crdt'
import { BlockEnum } from '@/app/components/workflow/types'
import { CollaborationManager } from '../collaboration-manager'
import initLoro, { LoroDoc } from '../loro-web'
const NODE_ID = 'node-1'
const LLM_NODE_ID = 'llm-node'
@@ -74,9 +74,9 @@ type ParameterExtractorNodeData = {
}
type CollaborationManagerInternals = {
doc: LoroDoc
nodesMap: LoroMap
edgesMap: LoroMap
doc: LoroDocInstance
nodesMap: LoroMapInstance
edgesMap: LoroMapInstance
syncNodes: (oldNodes: Node[], newNodes: Node[]) => void
}
@@ -159,7 +159,7 @@ const createParameterExtractorNode = (parameters: ParameterItem[]): Node<Paramet
const getManagerInternals = (manager: CollaborationManager): CollaborationManagerInternals =>
manager as unknown as CollaborationManagerInternals
const getManager = (doc: LoroDoc) => {
const getManager = (doc: LoroDocInstance) => {
const manager = new CollaborationManager()
const internals = getManagerInternals(manager)
internals.doc = doc
@@ -177,6 +177,10 @@ const syncNodes = (manager: CollaborationManager, previous: Node[], next: Node[]
const exportNodes = (manager: CollaborationManager) => manager.getNodes()
beforeAll(async () => {
await initLoro()
})
describe('Loro merge behavior smoke test', () => {
it('inspects concurrent edits after merge', () => {
const docA = new LoroDoc()

View File

@@ -1,13 +1,13 @@
import type { LoroMap } from 'loro-crdt'
import type { LoroDocInstance, LoroMapInstance } from '../loro-web'
import type {
NodePanelPresenceMap,
NodePanelPresenceUser,
} from '@/app/components/workflow/collaboration/types/collaboration'
import type { CommonNodeType, Edge, Node } from '@/app/components/workflow/types'
import { LoroDoc } from 'loro-crdt'
import { Position } from 'reactflow'
import { CollaborationManager } from '@/app/components/workflow/collaboration/core/collaboration-manager'
import { BlockEnum } from '@/app/components/workflow/types'
import initLoro, { LoroDoc } from '../loro-web'
const NODE_ID = '1760342909316'
@@ -88,12 +88,12 @@ type LLMNodeDataWithUnknownTemplate = Omit<LLMNodeData, 'prompt_template'> & {
prompt_template: unknown
}
type ManagerDoc = LoroDoc | { commit: () => void }
type ManagerDoc = LoroDocInstance | { commit: () => void }
type CollaborationManagerInternals = {
doc: ManagerDoc
nodesMap: LoroMap
edgesMap: LoroMap
nodesMap: LoroMapInstance
edgesMap: LoroMapInstance
syncNodes: (oldNodes: Node[], newNodes: Node[]) => void
syncEdges: (oldEdges: Edge[], newEdges: Edge[]) => void
applyNodePanelPresenceUpdate: (update: NodePanelPresenceEventData) => void
@@ -246,6 +246,10 @@ const setupManager = (): { manager: CollaborationManager, internals: Collaborati
return { manager, internals }
}
beforeAll(async () => {
await initLoro()
})
describe('CollaborationManager syncNodes', () => {
let manager: CollaborationManager
let internals: CollaborationManagerInternals

View File

@@ -1,4 +1,3 @@
import type { Value } from 'loro-crdt'
import type { Socket } from 'socket.io-client'
import type {
CommonNodeType,
@@ -16,11 +15,18 @@ import type {
RestoreIntentData,
RestoreRequestData,
} from '../types/collaboration'
import type {
LoroDocInstance,
LoroListInstance,
LoroMapInstance,
UndoManagerInstance,
Value,
} from './loro-web'
import { cloneDeep } from 'es-toolkit/object'
import { isEqual } from 'es-toolkit/predicate'
import { LoroDoc, LoroList, LoroMap, UndoManager } from 'loro-crdt'
import { CRDTProvider } from './crdt-provider'
import { EventEmitter } from './event-emitter'
import initLoro, { LoroDoc, LoroList, LoroMap, UndoManager } from './loro-web'
import { emitWithAuthGuard, webSocketClient } from './websocket-manager'
type NodePanelPresenceEventData = {
@@ -105,12 +111,28 @@ const SET_NODES_ANOMALY_LOG_LIMIT = 100
const toLoroValue = (value: unknown): Value => cloneDeep(value) as Value
const toLoroRecord = (value: unknown): Record<string, Value> => cloneDeep(value) as Record<string, Value>
let loroInitializationPromise: Promise<void> | null = null
const ensureLoroReady = async (): Promise<void> => {
if (!loroInitializationPromise) {
loroInitializationPromise = Promise.resolve(initLoro())
.then(() => undefined)
.catch((error) => {
loroInitializationPromise = null
throw error
})
}
await loroInitializationPromise
}
export class CollaborationManager {
private doc: LoroDoc | null = null
private undoManager: UndoManager | null = null
private doc: LoroDocInstance | null = null
private undoManager: UndoManagerInstance | null = null
private provider: CRDTProvider | null = null
private nodesMap: LoroMap<Record<string, Value>> | null = null
private edgesMap: LoroMap<Record<string, Value>> | null = null
private nodesMap: LoroMapInstance<Record<string, Value>> | null = null
private edgesMap: LoroMapInstance<Record<string, Value>> | null = null
private eventEmitter = new EventEmitter()
private currentAppId: string | null = null
private reactFlowStore: ReactFlowStore | null = null
@@ -127,6 +149,7 @@ export class CollaborationManager {
private graphViewActive: boolean | null = null
private graphImportLogs: GraphImportLogEntry[] = []
private setNodesAnomalyLogs: SetNodesAnomalyLogEntry[] = []
private connectionInitializationPromise: Promise<void> | null = null
private pendingImportLog: {
timestamp: number
sources: Set<'nodes' | 'edges'>
@@ -191,13 +214,13 @@ export class CollaborationManager {
emitWithAuthGuard(socket, 'graph_event', payload, { onUnauthorized: this.handleSessionUnauthorized })
}
private getNodeContainer(nodeId: string): LoroMap<Record<string, Value>> {
private getNodeContainer(nodeId: string): LoroMapInstance<Record<string, Value>> {
if (!this.nodesMap)
throw new Error('Nodes map not initialized')
let container = this.nodesMap.get(nodeId) as unknown
const isMapContainer = (value: unknown): value is LoroMap<Record<string, Value>> & LoroContainer => {
const isMapContainer = (value: unknown): value is LoroMapInstance<Record<string, Value>> & LoroContainer => {
return !!value && typeof (value as LoroContainer).kind === 'function' && (value as LoroContainer).kind?.() === 'Map'
}
@@ -207,27 +230,27 @@ export class CollaborationManager {
const attached = (newContainer as LoroContainer).getAttached?.() ?? newContainer
container = attached
if (previousValue && typeof previousValue === 'object')
this.populateNodeContainer(container as LoroMap<Record<string, Value>>, previousValue as Node)
this.populateNodeContainer(container as LoroMapInstance<Record<string, Value>>, previousValue as Node)
}
else {
const attached = (container as LoroContainer).getAttached?.() ?? container
container = attached
}
return container as LoroMap<Record<string, Value>>
return container as LoroMapInstance<Record<string, Value>>
}
private ensureDataContainer(nodeContainer: LoroMap<Record<string, Value>>): LoroMap<Record<string, Value>> {
private ensureDataContainer(nodeContainer: LoroMapInstance<Record<string, Value>>): LoroMapInstance<Record<string, Value>> {
let dataContainer = nodeContainer.get('data') as unknown
if (!dataContainer || typeof (dataContainer as LoroContainer).kind !== 'function' || (dataContainer as LoroContainer).kind?.() !== 'Map')
dataContainer = nodeContainer.setContainer('data', new LoroMap())
const attached = (dataContainer as LoroContainer).getAttached?.() ?? dataContainer
return attached as LoroMap<Record<string, Value>>
return attached as LoroMapInstance<Record<string, Value>>
}
private ensureList(nodeContainer: LoroMap<Record<string, Value>>, key: string): LoroList<unknown> {
private ensureList(nodeContainer: LoroMapInstance<Record<string, Value>>, key: string): LoroListInstance<unknown> {
const dataContainer = this.ensureDataContainer(nodeContainer)
let list = dataContainer.get(key) as unknown
@@ -235,7 +258,7 @@ export class CollaborationManager {
list = dataContainer.setContainer(key, new LoroList())
const attached = (list as LoroContainer).getAttached?.() ?? list
return attached as LoroList<unknown>
return attached as LoroListInstance<unknown>
}
private exportNode(nodeId: string): Node {
@@ -247,7 +270,7 @@ export class CollaborationManager {
}
}
private populateNodeContainer(container: LoroMap<Record<string, Value>>, node: Node): void {
private populateNodeContainer(container: LoroMapInstance<Record<string, Value>>, node: Node): void {
const listFields = new Set(['variables', 'prompt_template', 'parameters'])
container.set('id', node.id)
container.set('type', node.type)
@@ -325,7 +348,7 @@ export class CollaborationManager {
return (syncDataAllowList.has(key) || !key.startsWith('_')) && key !== 'selected'
}
private syncList(nodeContainer: LoroMap<Record<string, Value>>, key: string, desired: Array<unknown>): void {
private syncList(nodeContainer: LoroMapInstance<Record<string, Value>>, key: string, desired: Array<unknown>): void {
const list = this.ensureList(nodeContainer, key)
const current = list.toJSON() as Array<unknown>
const target = Array.isArray(desired) ? desired : []
@@ -471,71 +494,107 @@ export class CollaborationManager {
if (reactFlowStore)
this.reactFlowStore = reactFlowStore
if (this.connectionInitializationPromise) {
try {
await this.connectionInitializationPromise
return connectionId
}
catch (error) {
this.activeConnections.delete(connectionId)
throw error
}
}
const initializationPromise = this.initializeConnection(appId)
this.connectionInitializationPromise = initializationPromise
try {
await initializationPromise
}
catch (error) {
this.activeConnections.delete(connectionId)
throw error
}
finally {
if (this.connectionInitializationPromise === initializationPromise)
this.connectionInitializationPromise = null
}
return connectionId
}
private async initializeConnection(appId: string): Promise<void> {
const socket = webSocketClient.connect(appId)
// Setup event listeners BEFORE any other operations
this.setupSocketEventListeners(socket)
this.doc = new LoroDoc()
this.nodesMap = this.doc.getMap('nodes') as LoroMap<Record<string, Value>>
this.edgesMap = this.doc.getMap('edges') as LoroMap<Record<string, Value>>
try {
await ensureLoroReady()
// Initialize UndoManager for collaborative undo/redo
this.undoManager = new UndoManager(this.doc, {
maxUndoSteps: 100,
mergeInterval: 500, // Merge operations within 500ms
excludeOriginPrefixes: [], // Don't exclude anything - let UndoManager track all local operations
onPush: (_isUndo, _range, _event) => {
// Store current selection state when an operation is pushed
const selectedNode = this.reactFlowStore?.getState().getNodes().find((n: Node) => n.data?.selected)
this.doc = new LoroDoc()
this.nodesMap = this.doc.getMap('nodes') as LoroMapInstance<Record<string, Value>>
this.edgesMap = this.doc.getMap('edges') as LoroMapInstance<Record<string, Value>>
// Emit event to update UI button states when new operation is pushed
setTimeout(() => {
this.eventEmitter.emit('undoRedoStateChange', {
canUndo: this.undoManager?.canUndo() || false,
canRedo: this.undoManager?.canRedo() || false,
})
}, 0)
// Initialize UndoManager for collaborative undo/redo
this.undoManager = new UndoManager(this.doc, {
maxUndoSteps: 100,
mergeInterval: 500, // Merge operations within 500ms
excludeOriginPrefixes: [], // Don't exclude anything - let UndoManager track all local operations
onPush: (_isUndo, _range, _event) => {
// Store current selection state when an operation is pushed
const selectedNode = this.reactFlowStore?.getState().getNodes().find((n: Node) => n.data?.selected)
return {
value: {
selectedNodeId: selectedNode?.id || null,
timestamp: Date.now(),
},
cursors: [],
}
},
onPop: (_isUndo, value, _counterRange) => {
// Restore selection state when undoing/redoing
if (value?.value && typeof value.value === 'object' && 'selectedNodeId' in value.value && this.reactFlowStore) {
const selectedNodeId = (value.value as { selectedNodeId?: string | null }).selectedNodeId
if (selectedNodeId) {
const state = this.reactFlowStore.getState()
const { setNodes } = state
const nodes = state.getNodes()
const newNodes = nodes.map((n: Node) => ({
...n,
data: {
...n.data,
selected: n.id === selectedNodeId,
},
}))
this.captureSetNodesAnomaly(nodes, newNodes, 'reactflow-native:undo-redo-selection-restore')
setNodes(newNodes)
// Emit event to update UI button states when new operation is pushed
setTimeout(() => {
this.eventEmitter.emit('undoRedoStateChange', {
canUndo: this.undoManager?.canUndo() || false,
canRedo: this.undoManager?.canRedo() || false,
})
}, 0)
return {
value: {
selectedNodeId: selectedNode?.id || null,
timestamp: Date.now(),
},
cursors: [],
}
}
},
})
},
onPop: (_isUndo, value, _counterRange) => {
// Restore selection state when undoing/redoing
if (value?.value && typeof value.value === 'object' && 'selectedNodeId' in value.value && this.reactFlowStore) {
const selectedNodeId = (value.value as { selectedNodeId?: string | null }).selectedNodeId
if (selectedNodeId) {
const state = this.reactFlowStore.getState()
const { setNodes } = state
const nodes = state.getNodes()
const newNodes = nodes.map((n: Node) => ({
...n,
data: {
...n.data,
selected: n.id === selectedNodeId,
},
}))
this.captureSetNodesAnomaly(nodes, newNodes, 'reactflow-native:undo-redo-selection-restore')
setNodes(newNodes)
}
}
},
})
this.provider = new CRDTProvider(socket, this.doc, this.handleSessionUnauthorized)
this.provider = new CRDTProvider(socket, this.doc, this.handleSessionUnauthorized)
this.setupSubscriptions()
this.setupSubscriptions()
// Force user_connect if already connected
if (socket.connected)
emitWithAuthGuard(socket, 'user_connect', { workflow_id: appId }, { onUnauthorized: this.handleSessionUnauthorized })
return connectionId
// Force user_connect if already connected
if (socket.connected)
emitWithAuthGuard(socket, 'user_connect', { workflow_id: appId }, { onUnauthorized: this.handleSessionUnauthorized })
}
catch (error) {
this.forceDisconnect()
throw error
}
}
disconnect = (connectionId?: string): void => {
@@ -564,6 +623,7 @@ export class CollaborationManager {
this.onlineUsers = []
this.isUndoRedoInProgress = false
this.rejoinInProgress = false
this.connectionInitializationPromise = null
this.clearGraphImportLog()
// Only reset leader status when actually disconnecting

View File

@@ -1,13 +1,13 @@
import type { LoroDoc } from 'loro-crdt'
import type { Socket } from 'socket.io-client'
import type { LoroDocInstance } from './loro-web'
import { emitWithAuthGuard } from './websocket-manager'
export class CRDTProvider {
private doc: LoroDoc
private doc: LoroDocInstance
private socket: Socket
private onUnauthorized?: () => void
constructor(socket: Socket, doc: LoroDoc, onUnauthorized?: () => void) {
constructor(socket: Socket, doc: LoroDocInstance, onUnauthorized?: () => void) {
this.socket = socket
this.doc = doc
this.onUnauthorized = onUnauthorized

View File

@@ -0,0 +1,43 @@
import type {
Container,
LoroDoc as LoroDocShape,
LoroList as LoroListShape,
LoroMap as LoroMapShape,
UndoManager as UndoManagerShape,
Value,
} from 'loro-crdt'
import {
LoroDoc as NodeLoroDoc,
LoroList as NodeLoroList,
LoroMap as NodeLoroMap,
UndoManager as NodeUndoManager,
} from 'loro-crdt'
// eslint-disable-next-line antfu/no-import-node-modules-by-path -- loro-crdt does not export a browser-ready wasm entry, so collaboration must target the web bundle file directly.
import initWebLoro, {
LoroDoc as WebLoroDoc,
LoroList as WebLoroList,
LoroMap as WebLoroMap,
UndoManager as WebUndoManager,
} from '../../../../../node_modules/loro-crdt/web/loro_wasm.js'
const shouldUseWebLoro = typeof window !== 'undefined' && !import.meta.env?.VITEST
export type LoroDocInstance<T extends Record<string, Container> = Record<string, Container>> = LoroDocShape<T>
export type LoroListInstance<T = unknown> = LoroListShape<T>
export type LoroMapInstance<T extends Record<string, unknown> = Record<string, unknown>> = LoroMapShape<T>
export type UndoManagerInstance = UndoManagerShape
export type { Value }
export const LoroDoc = (shouldUseWebLoro ? WebLoroDoc : NodeLoroDoc) as typeof NodeLoroDoc
export const LoroList = (shouldUseWebLoro ? WebLoroList : NodeLoroList) as typeof NodeLoroList
export const LoroMap = (shouldUseWebLoro ? WebLoroMap : NodeLoroMap) as typeof NodeLoroMap
export const UndoManager = (shouldUseWebLoro ? WebUndoManager : NodeUndoManager) as typeof NodeUndoManager
const initLoro = async (): Promise<void> => {
if (!shouldUseWebLoro)
return
await initWebLoro()
}
export default initLoro

View File

@@ -0,0 +1,105 @@
import { renderHook, waitFor } from '@testing-library/react'
import { useCollaboration } from '../use-collaboration'
const mocks = vi.hoisted(() => ({
connect: vi.fn<(appId: string) => Promise<string>>().mockResolvedValue('connection-1'),
disconnect: vi.fn<(connectionId?: string) => void>(),
onStateChange: vi.fn(() => vi.fn()),
onCursorUpdate: vi.fn(() => vi.fn()),
onOnlineUsersUpdate: vi.fn(() => vi.fn()),
onNodePanelPresenceUpdate: vi.fn(() => vi.fn()),
onLeaderChange: vi.fn(() => vi.fn()),
setReactFlowStore: vi.fn<(store: unknown) => void>(),
isConnected: vi.fn<() => boolean>(() => false),
getLeaderId: vi.fn<() => string | null>(() => null),
emitCursorMove: vi.fn<(position: unknown) => void>(),
}))
vi.mock('@/context/global-public-context', () => ({
useGlobalPublicStore: <T,>(selector: (state: { systemFeatures: { enable_collaboration_mode: boolean } }) => T) => selector({
systemFeatures: {
enable_collaboration_mode: true,
},
}),
}))
vi.mock('../../core/collaboration-manager', () => ({
collaborationManager: {
connect: (appId: string) => mocks.connect(appId),
disconnect: (connectionId?: string) => mocks.disconnect(connectionId),
onStateChange: () => mocks.onStateChange(),
onCursorUpdate: () => mocks.onCursorUpdate(),
onOnlineUsersUpdate: () => mocks.onOnlineUsersUpdate(),
onNodePanelPresenceUpdate: () => mocks.onNodePanelPresenceUpdate(),
onLeaderChange: () => mocks.onLeaderChange(),
setReactFlowStore: (store: unknown) => mocks.setReactFlowStore(store),
isConnected: () => mocks.isConnected(),
getLeaderId: () => mocks.getLeaderId(),
emitCursorMove: (position: unknown) => mocks.emitCursorMove(position),
},
}))
describe('useCollaboration', () => {
beforeEach(() => {
vi.clearAllMocks()
})
it('should skip collaboration setup when disabled by the caller', async () => {
const reactFlowStore = {
getState: vi.fn(),
}
const { result } = renderHook(() => useCollaboration('app-1', reactFlowStore as never, false))
await waitFor(() => {
expect(mocks.setReactFlowStore).toHaveBeenCalledWith(null)
})
expect(mocks.connect).not.toHaveBeenCalled()
expect(result.current.isEnabled).toBe(false)
expect(result.current.onlineUsers).toEqual([])
expect(result.current.nodePanelPresence).toEqual({})
})
it('should connect and attach the react flow store when collaboration is enabled', async () => {
const reactFlowStore = {
getState: vi.fn(),
}
const { result } = renderHook(() => useCollaboration('app-1', reactFlowStore as never, true))
await waitFor(() => {
expect(mocks.connect).toHaveBeenCalledWith('app-1')
expect(mocks.setReactFlowStore).toHaveBeenCalledWith(reactFlowStore)
})
expect(result.current.isEnabled).toBe(true)
})
it('should disconnect and clear the react flow store when collaboration gets disabled', async () => {
const reactFlowStore = {
getState: vi.fn(),
}
const { rerender } = renderHook(
({ enabled }) => useCollaboration('app-1', reactFlowStore as never, enabled),
{
initialProps: {
enabled: true,
},
},
)
await waitFor(() => {
expect(mocks.connect).toHaveBeenCalledWith('app-1')
expect(mocks.setReactFlowStore).toHaveBeenCalledWith(reactFlowStore)
})
rerender({ enabled: false })
await waitFor(() => {
expect(mocks.disconnect).toHaveBeenCalledWith('connection-1')
expect(mocks.setReactFlowStore).toHaveBeenLastCalledWith(null)
})
})
})

View File

@@ -28,15 +28,17 @@ const initialState: CollaborationViewState = {
isLeader: false,
}
export function useCollaboration(appId: string, reactFlowStore?: ReactFlowStore) {
export function useCollaboration(appId: string, reactFlowStore?: ReactFlowStore, enabled = true) {
const [state, setState] = useState<CollaborationViewState>(initialState)
const cursorServiceRef = useRef<CursorService | null>(null)
const lastDisconnectReasonRef = useRef<string | null>(null)
const isCollaborationEnabled = useGlobalPublicStore(s => s.systemFeatures.enable_collaboration_mode)
const isCollaborationFeatureEnabled = useGlobalPublicStore(s => s.systemFeatures.enable_collaboration_mode)
const isCollaborationEnabled = isCollaborationFeatureEnabled && enabled
useEffect(() => {
if (!appId || !isCollaborationEnabled) {
lastDisconnectReasonRef.current = null
Promise.resolve().then(() => {
setState(initialState)
})
@@ -109,14 +111,16 @@ export function useCollaboration(appId: string, reactFlowStore?: ReactFlowStore)
}, [appId, isCollaborationEnabled])
useEffect(() => {
if (!reactFlowStore)
if (!isCollaborationEnabled || !reactFlowStore) {
collaborationManager.setReactFlowStore(null)
return
}
collaborationManager.setReactFlowStore(reactFlowStore)
return () => {
collaborationManager.setReactFlowStore(null)
}
}, [reactFlowStore])
}, [isCollaborationEnabled, reactFlowStore])
const prevIsConnected = useRef(false)
useEffect(() => {

View File

@@ -83,7 +83,8 @@ const OnlineUserAvatar = ({
const OnlineUsers = () => {
const { t } = useTranslation()
const appId = useStore(s => s.appId)
const { onlineUsers, cursors, isEnabled: isCollaborationEnabled } = useCollaboration(appId as string)
const isWorkflowCollaborationEnabled = useStore(s => !s.isRestoring && !s.historyWorkflowData)
const { onlineUsers, cursors, isEnabled: isCollaborationEnabled } = useCollaboration(appId as string, undefined, isWorkflowCollaborationEnabled)
const { userProfile } = useAppContext()
const reactFlow = useReactFlow()
const [dropdownOpen, setDropdownOpen] = useState(false)

View File

@@ -102,8 +102,9 @@ const BasePanel: FC<BasePanelProps> = ({
const { t } = useTranslation()
const language = useLanguage()
const appId = useStore(s => s.appId)
const isWorkflowCollaborationEnabled = useStore(s => !s.isRestoring && !s.historyWorkflowData)
const { userProfile } = useAppContext()
const { isConnected, nodePanelPresence } = useCollaboration(appId as string)
const { isConnected, nodePanelPresence } = useCollaboration(appId as string, undefined, isWorkflowCollaborationEnabled)
const { showMessageLogModal } = useAppStore(useShallow(state => ({
showMessageLogModal: state.showMessageLogModal,
})))

View File

@@ -88,7 +88,8 @@ const BaseNode: FC<BaseNodeProps> = ({
const resolvedWorkflowStore = workflowStore ?? fallbackWorkflowStoreRef.current
const appId = useZustandStore(resolvedWorkflowStore, s => s.appId)
const controlMode = useZustandStore(resolvedWorkflowStore, s => s.controlMode)
const { nodePanelPresence } = useCollaboration(appId as string)
const isWorkflowCollaborationEnabled = useZustandStore(resolvedWorkflowStore, s => !s.isRestoring && !s.historyWorkflowData)
const { nodePanelPresence } = useCollaboration(appId as string, undefined, isWorkflowCollaborationEnabled)
const { shouldDim: pluginDimmed, isChecking: pluginIsChecking, isMissing: pluginIsMissing, canInstall: pluginCanInstall, uniqueIdentifier: pluginUniqueIdentifier } = useNodePluginInstallation(data)
const pluginInstallLocked = !pluginIsChecking && pluginIsMissing && pluginCanInstall && Boolean(pluginUniqueIdentifier)

View File

@@ -0,0 +1,138 @@
import type { AgentNodeType } from '../types'
import { renderHook } from '@testing-library/react'
import { FormTypeEnum } from '@/app/components/header/account-setting/model-provider-page/declarations'
import { BlockEnum } from '@/app/components/workflow/types'
import { VarType as ToolVarType } from '../../tool/types'
import useConfig from '../use-config'
const mockSetInputs = vi.fn()
const mockUseNodesReadOnly = vi.fn()
const mockUseNodeCrud = vi.fn()
const mockUseVarList = vi.fn()
const mockUseStrategyProviderDetail = vi.fn()
const mockUseFetchPluginsInMarketPlaceByIds = vi.fn()
const mockUseCheckInstalled = vi.fn()
const mockUseAvailableVarList = vi.fn()
const mockUseIsChatMode = vi.fn()
vi.mock('@/app/components/workflow/hooks', () => ({
useIsChatMode: () => mockUseIsChatMode(),
useNodesReadOnly: () => mockUseNodesReadOnly(),
}))
vi.mock('@/app/components/workflow/nodes/_base/hooks/use-node-crud', () => ({
default: (...args: unknown[]) => mockUseNodeCrud(...args),
}))
vi.mock('@/app/components/workflow/nodes/_base/hooks/use-var-list', () => ({
default: (...args: unknown[]) => mockUseVarList(...args),
}))
vi.mock('@/app/components/workflow/nodes/_base/hooks/use-available-var-list', () => ({
default: (...args: unknown[]) => mockUseAvailableVarList(...args),
}))
vi.mock('@/service/use-strategy', () => ({
useStrategyProviderDetail: (...args: unknown[]) => mockUseStrategyProviderDetail(...args),
}))
vi.mock('@/service/use-plugins', () => ({
useFetchPluginsInMarketPlaceByIds: (...args: unknown[]) => mockUseFetchPluginsInMarketPlaceByIds(...args),
useCheckInstalled: (...args: unknown[]) => mockUseCheckInstalled(...args),
}))
const createPayload = (overrides: Partial<AgentNodeType> = {}): AgentNodeType => ({
title: 'Agent',
desc: '',
type: BlockEnum.Agent,
output_schema: {},
agent_strategy_provider_name: 'provider/agent',
agent_strategy_name: 'react',
agent_strategy_label: 'React Agent',
agent_parameters: {
toolParam: {
type: ToolVarType.constant,
value: {
settings: {},
parameters: {},
schemas: [],
},
},
},
...overrides,
})
const createStrategyProviderDetail = () => ({
declaration: {
strategies: [{
identity: {
name: 'react',
},
parameters: [{
name: 'toolParam',
type: FormTypeEnum.toolSelector,
}],
}],
},
})
describe('agent useConfig', () => {
beforeEach(() => {
vi.clearAllMocks()
mockUseNodesReadOnly.mockReturnValue({
nodesReadOnly: false,
})
mockUseNodeCrud.mockImplementation((_id: string, payload: AgentNodeType) => ({
inputs: payload,
setInputs: mockSetInputs,
}))
mockUseVarList.mockReturnValue({
handleVarListChange: vi.fn(),
handleAddVariable: vi.fn(),
})
mockUseStrategyProviderDetail.mockReturnValue({
data: createStrategyProviderDetail(),
isLoading: false,
isError: false,
refetch: vi.fn(),
})
mockUseFetchPluginsInMarketPlaceByIds.mockReturnValue({
data: {
data: {
plugins: [],
},
},
isLoading: false,
refetch: vi.fn(),
})
mockUseCheckInstalled.mockReturnValue({
data: {
plugins: [],
},
})
mockUseAvailableVarList.mockReturnValue({
availableVars: [],
availableNodesWithParent: [],
})
mockUseIsChatMode.mockReturnValue(true)
})
it('should skip legacy migration when the node is read-only', () => {
mockUseNodesReadOnly.mockReturnValue({
nodesReadOnly: true,
})
renderHook(() => useConfig('agent-node', createPayload()))
expect(mockSetInputs).not.toHaveBeenCalled()
})
it('should migrate legacy agent tool data when the node is editable', () => {
renderHook(() => useConfig('agent-node', createPayload()))
expect(mockSetInputs).toHaveBeenCalledWith(expect.objectContaining({
tool_node_version: '2',
}))
})
})

View File

@@ -129,9 +129,7 @@ const useConfig = (id: string, payload: AgentNodeType) => {
return res
}
const formattingLegacyData = () => {
if (inputs.version || inputs.tool_node_version)
return inputs
const formattingLegacyData = useCallback(() => {
const newData = produce(inputs, (draft) => {
const schemas = currentStrategy?.parameters || []
Object.keys(draft.agent_parameters || {}).forEach((key) => {
@@ -144,15 +142,17 @@ const useConfig = (id: string, payload: AgentNodeType) => {
draft.tool_node_version = '2'
})
return newData
}
}, [currentStrategy?.parameters, inputs])
const shouldFormatLegacyData = Boolean(currentStrategy) && !readOnly && !inputs.version && !inputs.tool_node_version
// formatting legacy data
useEffect(() => {
if (!currentStrategy)
if (!shouldFormatLegacyData)
return
const newData = formattingLegacyData()
setInputs(newData)
}, [currentStrategy])
}, [formattingLegacyData, setInputs, shouldFormatLegacyData])
// vars