mirror of
https://github.com/langgenius/dify.git
synced 2025-12-19 17:27:16 -05:00
if session unauthorized, rejoin
This commit is contained in:
@@ -1,6 +1,7 @@
|
|||||||
import { LoroDoc, LoroList, LoroMap, UndoManager } from 'loro-crdt'
|
import { LoroDoc, LoroList, LoroMap, UndoManager } from 'loro-crdt'
|
||||||
import { cloneDeep, isEqual } from 'lodash-es'
|
import { cloneDeep, isEqual } from 'lodash-es'
|
||||||
import { webSocketClient } from './websocket-manager'
|
import type { Socket } from 'socket.io-client'
|
||||||
|
import { emitWithAuthGuard, webSocketClient } from './websocket-manager'
|
||||||
import { CRDTProvider } from './crdt-provider'
|
import { CRDTProvider } from './crdt-provider'
|
||||||
import { EventEmitter } from './event-emitter'
|
import { EventEmitter } from './event-emitter'
|
||||||
import type { Edge, Node } from '../../types'
|
import type { Edge, Node } from '../../types'
|
||||||
@@ -36,6 +37,58 @@ export class CollaborationManager {
|
|||||||
private activeConnections = new Set<string>()
|
private activeConnections = new Set<string>()
|
||||||
private isUndoRedoInProgress = false
|
private isUndoRedoInProgress = false
|
||||||
private pendingInitialSync = false
|
private pendingInitialSync = false
|
||||||
|
private rejoinInProgress = false
|
||||||
|
|
||||||
|
private getActiveSocket(): Socket | null {
|
||||||
|
if (!this.currentAppId)
|
||||||
|
return null
|
||||||
|
return webSocketClient.getSocket(this.currentAppId)
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleSessionUnauthorized = (): void => {
|
||||||
|
if (this.rejoinInProgress)
|
||||||
|
return
|
||||||
|
if (!this.currentAppId)
|
||||||
|
return
|
||||||
|
|
||||||
|
const socket = this.getActiveSocket()
|
||||||
|
if (!socket)
|
||||||
|
return
|
||||||
|
|
||||||
|
this.rejoinInProgress = true
|
||||||
|
console.warn('Collaboration session expired, attempting to rejoin workflow.')
|
||||||
|
emitWithAuthGuard(
|
||||||
|
socket,
|
||||||
|
'user_connect',
|
||||||
|
{ workflow_id: this.currentAppId },
|
||||||
|
{
|
||||||
|
onAck: () => {
|
||||||
|
this.rejoinInProgress = false
|
||||||
|
},
|
||||||
|
onUnauthorized: () => {
|
||||||
|
this.rejoinInProgress = false
|
||||||
|
console.error('Rejoin failed due to authorization error, forcing disconnect.')
|
||||||
|
this.forceDisconnect()
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
private sendCollaborationEvent(payload: any): void {
|
||||||
|
const socket = this.getActiveSocket()
|
||||||
|
if (!socket)
|
||||||
|
return
|
||||||
|
|
||||||
|
emitWithAuthGuard(socket, 'collaboration_event', payload, { onUnauthorized: this.handleSessionUnauthorized })
|
||||||
|
}
|
||||||
|
|
||||||
|
private sendGraphEvent(payload: any): void {
|
||||||
|
const socket = this.getActiveSocket()
|
||||||
|
if (!socket)
|
||||||
|
return
|
||||||
|
|
||||||
|
emitWithAuthGuard(socket, 'graph_event', payload, { onUnauthorized: this.handleSessionUnauthorized })
|
||||||
|
}
|
||||||
|
|
||||||
private getNodeContainer(nodeId: string): LoroMap<any> {
|
private getNodeContainer(nodeId: string): LoroMap<any> {
|
||||||
if (!this.nodesMap)
|
if (!this.nodesMap)
|
||||||
@@ -362,13 +415,13 @@ export class CollaborationManager {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
this.provider = new CRDTProvider(socket, this.doc)
|
this.provider = new CRDTProvider(socket, this.doc, this.handleSessionUnauthorized)
|
||||||
|
|
||||||
this.setupSubscriptions()
|
this.setupSubscriptions()
|
||||||
|
|
||||||
// Force user_connect if already connected
|
// Force user_connect if already connected
|
||||||
if (socket.connected)
|
if (socket.connected)
|
||||||
socket.emit('user_connect', { workflow_id: appId })
|
emitWithAuthGuard(socket, 'user_connect', { workflow_id: appId }, { onUnauthorized: this.handleSessionUnauthorized })
|
||||||
|
|
||||||
return connectionId
|
return connectionId
|
||||||
}
|
}
|
||||||
@@ -397,6 +450,7 @@ export class CollaborationManager {
|
|||||||
this.cursors = {}
|
this.cursors = {}
|
||||||
this.nodePanelPresence = {}
|
this.nodePanelPresence = {}
|
||||||
this.isUndoRedoInProgress = false
|
this.isUndoRedoInProgress = false
|
||||||
|
this.rejoinInProgress = false
|
||||||
|
|
||||||
// Only reset leader status when actually disconnecting
|
// Only reset leader status when actually disconnecting
|
||||||
const wasLeader = this.isLeader
|
const wasLeader = this.isLeader
|
||||||
@@ -426,49 +480,44 @@ export class CollaborationManager {
|
|||||||
emitCursorMove(position: CursorPosition): void {
|
emitCursorMove(position: CursorPosition): void {
|
||||||
if (!this.currentAppId || !webSocketClient.isConnected(this.currentAppId)) return
|
if (!this.currentAppId || !webSocketClient.isConnected(this.currentAppId)) return
|
||||||
|
|
||||||
const socket = webSocketClient.getSocket(this.currentAppId)
|
const socket = this.getActiveSocket()
|
||||||
if (socket) {
|
if (!socket)
|
||||||
socket.emit('collaboration_event', {
|
return
|
||||||
type: 'mouse_move',
|
|
||||||
userId: socket.id,
|
this.sendCollaborationEvent({
|
||||||
data: { x: position.x, y: position.y },
|
type: 'mouse_move',
|
||||||
timestamp: Date.now(),
|
userId: socket.id,
|
||||||
})
|
data: { x: position.x, y: position.y },
|
||||||
}
|
timestamp: Date.now(),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
emitSyncRequest(): void {
|
emitSyncRequest(): void {
|
||||||
if (!this.currentAppId || !webSocketClient.isConnected(this.currentAppId)) return
|
if (!this.currentAppId || !webSocketClient.isConnected(this.currentAppId)) return
|
||||||
|
|
||||||
const socket = webSocketClient.getSocket(this.currentAppId)
|
console.log('Emitting sync request to leader')
|
||||||
if (socket) {
|
this.sendCollaborationEvent({
|
||||||
console.log('Emitting sync request to leader')
|
type: 'sync_request',
|
||||||
socket.emit('collaboration_event', {
|
data: { timestamp: Date.now() },
|
||||||
type: 'sync_request',
|
timestamp: Date.now(),
|
||||||
data: { timestamp: Date.now() },
|
})
|
||||||
timestamp: Date.now(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
emitWorkflowUpdate(appId: string): void {
|
emitWorkflowUpdate(appId: string): void {
|
||||||
if (!this.currentAppId || !webSocketClient.isConnected(this.currentAppId)) return
|
if (!this.currentAppId || !webSocketClient.isConnected(this.currentAppId)) return
|
||||||
|
|
||||||
const socket = webSocketClient.getSocket(this.currentAppId)
|
console.log('Emitting Workflow update event')
|
||||||
if (socket) {
|
this.sendCollaborationEvent({
|
||||||
console.log('Emitting Workflow update event')
|
type: 'workflow_update',
|
||||||
socket.emit('collaboration_event', {
|
data: { appId, timestamp: Date.now() },
|
||||||
type: 'workflow_update',
|
timestamp: Date.now(),
|
||||||
data: { appId, timestamp: Date.now() },
|
})
|
||||||
timestamp: Date.now(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
emitNodePanelPresence(nodeId: string, isOpen: boolean, user: NodePanelPresenceUser): void {
|
emitNodePanelPresence(nodeId: string, isOpen: boolean, user: NodePanelPresenceUser): void {
|
||||||
if (!this.currentAppId || !webSocketClient.isConnected(this.currentAppId)) return
|
if (!this.currentAppId || !webSocketClient.isConnected(this.currentAppId)) return
|
||||||
|
|
||||||
const socket = webSocketClient.getSocket(this.currentAppId)
|
const socket = this.getActiveSocket()
|
||||||
if (!socket || !nodeId || !user?.userId) return
|
if (!socket || !nodeId || !user?.userId) return
|
||||||
|
|
||||||
const payload: NodePanelPresenceEventData = {
|
const payload: NodePanelPresenceEventData = {
|
||||||
@@ -479,7 +528,7 @@ export class CollaborationManager {
|
|||||||
timestamp: Date.now(),
|
timestamp: Date.now(),
|
||||||
}
|
}
|
||||||
|
|
||||||
socket.emit('collaboration_event', {
|
this.sendCollaborationEvent({
|
||||||
type: 'node_panel_presence',
|
type: 'node_panel_presence',
|
||||||
data: payload,
|
data: payload,
|
||||||
timestamp: payload.timestamp,
|
timestamp: payload.timestamp,
|
||||||
@@ -545,15 +594,12 @@ export class CollaborationManager {
|
|||||||
emitCommentsUpdate(appId: string): void {
|
emitCommentsUpdate(appId: string): void {
|
||||||
if (!this.currentAppId || !webSocketClient.isConnected(this.currentAppId)) return
|
if (!this.currentAppId || !webSocketClient.isConnected(this.currentAppId)) return
|
||||||
|
|
||||||
const socket = webSocketClient.getSocket(this.currentAppId)
|
console.log('Emitting Comments update event')
|
||||||
if (socket) {
|
this.sendCollaborationEvent({
|
||||||
console.log('Emitting Comments update event')
|
type: 'comments_update',
|
||||||
socket.emit('collaboration_event', {
|
data: { appId, timestamp: Date.now() },
|
||||||
type: 'comments_update',
|
timestamp: Date.now(),
|
||||||
data: { appId, timestamp: Date.now() },
|
})
|
||||||
timestamp: Date.now(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
onUndoRedoStateChange(callback: (state: { canUndo: boolean; canRedo: boolean }) => void): () => void {
|
onUndoRedoStateChange(callback: (state: { canUndo: boolean; canRedo: boolean }) => void): () => void {
|
||||||
@@ -994,10 +1040,7 @@ export class CollaborationManager {
|
|||||||
private emitGraphResyncRequest(): void {
|
private emitGraphResyncRequest(): void {
|
||||||
if (!this.currentAppId || !webSocketClient.isConnected(this.currentAppId)) return
|
if (!this.currentAppId || !webSocketClient.isConnected(this.currentAppId)) return
|
||||||
|
|
||||||
const socket = webSocketClient.getSocket(this.currentAppId)
|
this.sendCollaborationEvent({
|
||||||
if (!socket) return
|
|
||||||
|
|
||||||
socket.emit('collaboration_event', {
|
|
||||||
type: 'graph_resync_request',
|
type: 'graph_resync_request',
|
||||||
data: { timestamp: Date.now() },
|
data: { timestamp: Date.now() },
|
||||||
timestamp: Date.now(),
|
timestamp: Date.now(),
|
||||||
@@ -1013,7 +1056,7 @@ export class CollaborationManager {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
const snapshot = this.doc.export({ mode: 'snapshot' })
|
const snapshot = this.doc.export({ mode: 'snapshot' })
|
||||||
socket.emit('graph_event', snapshot)
|
this.sendGraphEvent(snapshot)
|
||||||
}
|
}
|
||||||
catch (error) {
|
catch (error) {
|
||||||
console.error('Failed to broadcast graph snapshot:', error)
|
console.error('Failed to broadcast graph snapshot:', error)
|
||||||
|
|||||||
@@ -1,13 +1,16 @@
|
|||||||
import type { LoroDoc } from 'loro-crdt'
|
import type { LoroDoc } from 'loro-crdt'
|
||||||
import type { Socket } from 'socket.io-client'
|
import type { Socket } from 'socket.io-client'
|
||||||
|
import { emitWithAuthGuard } from './websocket-manager'
|
||||||
|
|
||||||
export class CRDTProvider {
|
export class CRDTProvider {
|
||||||
private doc: LoroDoc
|
private doc: LoroDoc
|
||||||
private socket: Socket
|
private socket: Socket
|
||||||
|
private onUnauthorized?: () => void
|
||||||
|
|
||||||
constructor(socket: Socket, doc: LoroDoc) {
|
constructor(socket: Socket, doc: LoroDoc, onUnauthorized?: () => void) {
|
||||||
this.socket = socket
|
this.socket = socket
|
||||||
this.doc = doc
|
this.doc = doc
|
||||||
|
this.onUnauthorized = onUnauthorized
|
||||||
this.setupEventListeners()
|
this.setupEventListeners()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -15,7 +18,7 @@ export class CRDTProvider {
|
|||||||
this.doc.subscribe((event: any) => {
|
this.doc.subscribe((event: any) => {
|
||||||
if (event.by === 'local') {
|
if (event.by === 'local') {
|
||||||
const update = this.doc.export({ mode: 'update' })
|
const update = this.doc.export({ mode: 'update' })
|
||||||
this.socket.emit('graph_event', update)
|
emitWithAuthGuard(this.socket, 'graph_event', update, { onUnauthorized: this.onUnauthorized })
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
@@ -3,6 +3,43 @@ import { io } from 'socket.io-client'
|
|||||||
import { ACCESS_TOKEN_LOCAL_STORAGE_NAME } from '@/config'
|
import { ACCESS_TOKEN_LOCAL_STORAGE_NAME } from '@/config'
|
||||||
import type { DebugInfo, WebSocketConfig } from '../types/websocket'
|
import type { DebugInfo, WebSocketConfig } from '../types/websocket'
|
||||||
|
|
||||||
|
const isUnauthorizedAck = (...ackArgs: any[]): boolean => {
|
||||||
|
const [first, second] = ackArgs
|
||||||
|
|
||||||
|
if (second === 401 || first === 401)
|
||||||
|
return true
|
||||||
|
|
||||||
|
if (first && typeof first === 'object' && first.msg === 'unauthorized')
|
||||||
|
return true
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
export type EmitAckOptions = {
|
||||||
|
onAck?: (...ackArgs: any[]) => void
|
||||||
|
onUnauthorized?: (...ackArgs: any[]) => void
|
||||||
|
}
|
||||||
|
|
||||||
|
export const emitWithAuthGuard = (
|
||||||
|
socket: Socket | null | undefined,
|
||||||
|
event: string,
|
||||||
|
payload: any,
|
||||||
|
options?: EmitAckOptions,
|
||||||
|
): void => {
|
||||||
|
if (!socket)
|
||||||
|
return
|
||||||
|
|
||||||
|
socket.emit(
|
||||||
|
event,
|
||||||
|
payload,
|
||||||
|
(...ackArgs: any[]) => {
|
||||||
|
options?.onAck?.(...ackArgs)
|
||||||
|
if (isUnauthorizedAck(...ackArgs))
|
||||||
|
options?.onUnauthorized?.(...ackArgs)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
export class WebSocketClient {
|
export class WebSocketClient {
|
||||||
private connections: Map<string, Socket> = new Map()
|
private connections: Map<string, Socket> = new Map()
|
||||||
private connecting: Set<string> = new Set()
|
private connecting: Set<string> = new Set()
|
||||||
@@ -115,7 +152,7 @@ export class WebSocketClient {
|
|||||||
private setupBaseEventListeners(socket: Socket, appId: string): void {
|
private setupBaseEventListeners(socket: Socket, appId: string): void {
|
||||||
socket.on('connect', () => {
|
socket.on('connect', () => {
|
||||||
this.connecting.delete(appId)
|
this.connecting.delete(appId)
|
||||||
socket.emit('user_connect', { workflow_id: appId })
|
emitWithAuthGuard(socket, 'user_connect', { workflow_id: appId })
|
||||||
})
|
})
|
||||||
|
|
||||||
socket.on('disconnect', () => {
|
socket.on('disconnect', () => {
|
||||||
|
|||||||
Reference in New Issue
Block a user