diff --git a/api/services/trigger/trigger_debug_service.py b/api/services/trigger/trigger_debug_service.py index 0f2b3b3f40..cd786bd6d1 100644 --- a/api/services/trigger/trigger_debug_service.py +++ b/api/services/trigger/trigger_debug_service.py @@ -57,7 +57,7 @@ class PluginTriggerDebugEvent(BaseDebugEvent): provider_id = kwargs["provider_id"] subscription_id = kwargs["subscription_id"] event_name = kwargs["event_name"] - return f"trigger_debug_waiting_pool:{tenant_id}:{str(provider_id)}:{subscription_id}:{event_name}" + return f"plugin_trigger_debug_waiting_pool:{tenant_id}:{str(provider_id)}:{subscription_id}:{event_name}" class WebhookDebugEvent(BaseDebugEvent): @@ -79,7 +79,7 @@ class WebhookDebugEvent(BaseDebugEvent): tenant_id = kwargs["tenant_id"] app_id = kwargs["app_id"] node_id = kwargs["node_id"] - return f"trigger_debug_waiting_pool:{tenant_id}:{app_id}:{node_id}" + return f"webhook_trigger_debug_waiting_pool:{tenant_id}:{app_id}:{node_id}" class TriggerDebugService: diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index 30aeb20881..8dc1f64045 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -38,6 +38,7 @@ from models.workflow import Workflow, WorkflowNodeExecutionModel, WorkflowNodeEx from repositories.factory import DifyAPIRepositoryFactory from services.enterprise.plugin_manager_service import PluginCredentialType from services.errors.app import IsDraftWorkflowError, WorkflowHashNotEqualError +from services.workflow.entities import PluginTriggerData, ScheduleTriggerData from services.workflow.workflow_converter import WorkflowConverter from .errors.workflow_service import DraftWorkflowDeletionError, WorkflowInUseError @@ -633,6 +634,10 @@ class WorkflowService: ) if node_type == NodeType.TRIGGER_WEBHOOK: start_data = WebhookData.model_validate(node_data) + elif node_type == NodeType.TRIGGER_PLUGIN: + start_data = PluginTriggerData.model_validate(node_data) + elif node_type == NodeType.TRIGGER_SCHEDULE: + start_data = ScheduleTriggerData.model_validate(node_data) else: start_data = StartNodeData.model_validate(node_data) user_inputs = _rebuild_file_for_user_inputs_in_start_node( diff --git a/api/tasks/trigger_processing_tasks.py b/api/tasks/trigger_processing_tasks.py index fc48c8b3b0..cee064f88a 100644 --- a/api/tasks/trigger_processing_tasks.py +++ b/api/tasks/trigger_processing_tasks.py @@ -58,11 +58,12 @@ def dispatch_triggered_workflows_async( try: logger.info( - "Starting async trigger dispatching for endpoint=%s, events=%s, request_id=%s, timestamp=%s", + "Starting trigger dispatching endpoint=%s, events=%s, request_id=%s, subscription_id=%s, provider_id=%s", endpoint_id, events, request_id, - timestamp, + subscription_id, + provider_id, ) # Verify request exists in storage @@ -155,14 +156,25 @@ def dispatch_triggered_workflows_async( event=event, pool_key=pool_key, ) + logger.debug( + "Trigger debug dispatched %d sessions to pool %s for event %s for subscription %s provider %s", + debug_dispatched, + pool_key, + event_name, + subscription_id, + provider_id, + ) + except Exception: # Silent failure for debug dispatch logger.exception("Failed to dispatch to debug sessions") logger.info( - "Completed async trigger dispatching: processed %d/%d triggers", + "Completed async trigger dispatching: processed %d/%d triggers for subscription %s and provider %s", dispatched_count, len(events), + subscription_id, + provider_id, ) return { @@ -174,9 +186,11 @@ def dispatch_triggered_workflows_async( except Exception as e: logger.exception( - "Error in async trigger dispatching for endpoint %s data %s", + "Error in async trigger dispatching for endpoint %s data %s for subscription %s and provider %s", endpoint_id, dispatch_data, + subscription_id, + provider_id, ) return { "status": "failed",