refactor: streamline trigger event node metadata handling and update async workflow service for JSON serialization

- Removed unnecessary input data from the TriggerEventNode's metadata.
- Updated AsyncWorkflowService to use model_dump_json() for trigger metadata serialization.
- Added a comment in WorkflowAppService to address the large size of the workflow_app_log table and the use of an additional details field.
This commit is contained in:
Harry
2025-10-28 13:54:44 +08:00
parent db352c0a18
commit 0d686fc6ae
9 changed files with 125 additions and 5 deletions

View File

@@ -28,6 +28,7 @@ class WorkflowAppLogApi(Resource):
"created_at__after": "Filter logs created after this timestamp",
"created_by_end_user_session_id": "Filter by end user session ID",
"created_by_account": "Filter by account",
"detail": "Whether to return detailed logs",
"page": "Page number (1-99999)",
"limit": "Number of items per page (1-100)",
}
@@ -68,6 +69,7 @@ class WorkflowAppLogApi(Resource):
required=False,
default=None,
)
.add_argument("detail", type=bool, location="args", required=False, default=False)
.add_argument("page", type=int_range(1, 99999), default=1, location="args")
.add_argument("limit", type=int_range(1, 100), default=20, location="args")
)
@@ -92,6 +94,7 @@ class WorkflowAppLogApi(Resource):
created_at_after=args.created_at__after,
page=args.page,
limit=args.limit,
detail=args.detail,
created_by_end_user_session_id=args.created_by_end_user_session_id,
created_by_account=args.created_by_account,
)

View File

@@ -68,7 +68,6 @@ class TriggerEventNode(Node):
inputs = dict(self.graph_runtime_state.variable_pool.user_inputs)
metadata = {
WorkflowNodeExecutionMetadataKey.TRIGGER_INFO: {
**inputs,
"provider_id": self._node_data.provider_id,
"event_name": self._node_data.event_name,
"plugin_unique_identifier": self._node_data.plugin_unique_identifier,

View File

@@ -8,6 +8,7 @@ from libs.helper import TimestampField
workflow_app_log_partial_fields = {
"id": fields.String,
"workflow_run": fields.Nested(workflow_run_for_log_fields, attribute="workflow_run", allow_null=True),
"details": fields.Raw(attribute="details"),
"created_from": fields.String,
"created_by_role": fields.String,
"created_by_account": fields.Nested(simple_account_fields, attribute="created_by_account", allow_null=True),

View File

@@ -0,0 +1,32 @@
"""trigger_log_metadata
Revision ID: 5ed4b21dbb8d
Revises: 132392a2635f
Create Date: 2025-10-27 17:52:35.658975
"""
from alembic import op
import models as models
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '5ed4b21dbb8d'
down_revision = '132392a2635f'
branch_labels = None
depends_on = None
def upgrade():
with op.batch_alter_table('workflow_trigger_logs', schema=None) as batch_op:
batch_op.add_column(sa.Column('trigger_metadata', sa.Text(), nullable=True))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('workflow_trigger_logs', schema=None) as batch_op:
batch_op.drop_column('trigger_metadata')
# ### end Alembic commands ###

View File

@@ -161,6 +161,7 @@ class WorkflowTriggerLog(Base):
- workflow_id (uuid) Workflow ID
- workflow_run_id (uuid) Optional - Associated workflow run ID when execution starts
- root_node_id (string) Optional - Custom starting node ID for workflow execution
- trigger_metadata (text) Optional - Trigger metadata (JSON)
- trigger_type (string) Type of trigger: webhook, schedule, plugin
- trigger_data (text) Full trigger data including inputs (JSON)
- inputs (text) Input parameters (JSON)
@@ -195,7 +196,7 @@ class WorkflowTriggerLog(Base):
workflow_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
workflow_run_id: Mapped[Optional[str]] = mapped_column(StringUUID, nullable=True)
root_node_id: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
trigger_metadata: Mapped[Optional[str]] = mapped_column(sa.Text, nullable=True)
trigger_type: Mapped[str] = mapped_column(EnumText(AppTriggerType, length=50), nullable=False)
trigger_data: Mapped[str] = mapped_column(sa.Text, nullable=False) # Full TriggerData as JSON
inputs: Mapped[str] = mapped_column(sa.Text, nullable=False) # Just inputs for easy viewing
@@ -240,6 +241,8 @@ class WorkflowTriggerLog(Base):
"app_id": self.app_id,
"workflow_id": self.workflow_id,
"workflow_run_id": self.workflow_run_id,
"root_node_id": self.root_node_id,
"trigger_metadata": json.loads(self.trigger_metadata) if self.trigger_metadata else None,
"trigger_type": self.trigger_type,
"trigger_data": json.loads(self.trigger_data),
"inputs": json.loads(self.inputs),

View File

@@ -111,6 +111,7 @@ class AsyncWorkflowService:
app_id=trigger_data.app_id,
workflow_id=workflow.id,
root_node_id=trigger_data.root_node_id,
trigger_metadata=trigger_data.trigger_metadata.model_dump_json(),
trigger_type=trigger_data.trigger_type,
trigger_data=trigger_data.model_dump_json(),
inputs=json.dumps(dict(trigger_data.inputs)),

View File

@@ -19,6 +19,12 @@ class AsyncTriggerStatus(StrEnum):
TIMEOUT = "timeout"
class TriggerMetadata(BaseModel):
"""Trigger metadata"""
pass
class TriggerData(BaseModel):
"""Base trigger data model for async workflow execution"""
@@ -30,6 +36,7 @@ class TriggerData(BaseModel):
files: Sequence[Mapping[str, Any]] = Field(default_factory=list)
trigger_type: AppTriggerType
trigger_from: WorkflowRunTriggeredFrom
trigger_metadata: TriggerMetadata = Field(default_factory=TriggerMetadata)
model_config = ConfigDict(use_enum_values=True)
@@ -48,6 +55,17 @@ class ScheduleTriggerData(TriggerData):
trigger_from: WorkflowRunTriggeredFrom = WorkflowRunTriggeredFrom.SCHEDULE
class PluginTriggerMetadata(TriggerMetadata):
"""Plugin trigger metadata"""
plugin_id: str
endpoint_id: str
plugin_unique_identifier: str
provider_id: str
icon_url: str
icon_dark_url: str
class PluginTriggerData(TriggerData):
"""Plugin webhook trigger data"""

View File

@@ -1,3 +1,4 @@
import json
import uuid
from datetime import datetime
@@ -7,6 +8,35 @@ from sqlalchemy.orm import Session
from core.workflow.enums import WorkflowExecutionStatus
from models import Account, App, EndUser, WorkflowAppLog, WorkflowRun
from models.enums import CreatorUserRole
from models.trigger import WorkflowTriggerLog
# Since the workflow_app_log table has exceeded 100 million records, we use an additional details field to extend it
class LogView:
"""Lightweight wrapper for WorkflowAppLog with computed details.
- Exposes `details_` for marshalling to `details` in API response
- Proxies all other attributes to the underlying `WorkflowAppLog`
"""
def __init__(self, log: WorkflowAppLog, details: dict | None):
self.log = log
self.details_ = details
def __getattr__(self, name):
return getattr(self.log, name)
# Helpers
def _safe_json_loads(val):
if not val:
return None
if isinstance(val, str):
try:
return json.loads(val)
except Exception:
return None
return val
class WorkflowAppService:
@@ -21,6 +51,7 @@ class WorkflowAppService:
created_at_after: datetime | None = None,
page: int = 1,
limit: int = 20,
detail: bool = False,
created_by_end_user_session_id: str | None = None,
created_by_account: str | None = None,
):
@@ -34,6 +65,7 @@ class WorkflowAppService:
:param created_at_after: filter logs created after this timestamp
:param page: page number
:param limit: items per page
:param detail: whether to return detailed logs
:param created_by_end_user_session_id: filter by end user session id
:param created_by_account: filter by account email
:return: Pagination object
@@ -43,8 +75,24 @@ class WorkflowAppService:
WorkflowAppLog.tenant_id == app_model.tenant_id, WorkflowAppLog.app_id == app_model.id
)
if detail:
# Correlated scalar subquery: fetch latest trigger_metadata per workflow_run_id
meta_expr = (
select(WorkflowTriggerLog.trigger_metadata)
.where(
WorkflowTriggerLog.workflow_run_id == WorkflowAppLog.workflow_run_id,
WorkflowTriggerLog.app_id == app_model.id,
WorkflowTriggerLog.tenant_id == app_model.tenant_id,
)
.order_by(WorkflowTriggerLog.created_at.desc())
.limit(1)
.scalar_subquery()
)
stmt = stmt.add_columns(meta_expr)
if keyword or status:
stmt = stmt.join(WorkflowRun, WorkflowRun.id == WorkflowAppLog.workflow_run_id)
# Join to workflow run for filtering when needed.
if keyword:
keyword_like_val = f"%{keyword[:30].encode('unicode_escape').decode('utf-8')}%".replace(r"\u", r"\\u")
@@ -108,9 +156,14 @@ class WorkflowAppService:
# Apply pagination limits
offset_stmt = stmt.offset((page - 1) * limit).limit(limit)
# Execute query and get items
items = list(session.scalars(offset_stmt).all())
# wrapper moved to module scope as `LogView`
# Execute query and get items
if detail:
rows = session.execute(offset_stmt).all()
items = [LogView(log, {"trigger_metadata": _safe_json_loads(meta_val)}) for log, meta_val in rows]
else:
items = [LogView(log, None) for log in session.scalars(offset_stmt).all()]
return {
"page": page,
"limit": limit,

View File

@@ -18,6 +18,7 @@ from core.plugin.entities.plugin_daemon import CredentialType
from core.plugin.entities.request import TriggerInvokeEventResponse
from core.trigger.debug.event_bus import TriggerDebugEventBus
from core.trigger.debug.events import PluginTriggerDebugEvent, build_plugin_pool_key
from core.trigger.entities.api_entities import TriggerProviderApiEntity
from core.trigger.provider import PluginTriggerProviderController
from core.trigger.trigger_manager import TriggerManager
from core.workflow.enums import NodeType
@@ -31,7 +32,7 @@ from services.async_workflow_service import AsyncWorkflowService
from services.end_user_service import EndUserService
from services.trigger.trigger_provider_service import TriggerProviderService
from services.trigger.trigger_request_service import TriggerHttpRequestCachingService
from services.workflow.entities import PluginTriggerData, PluginTriggerDispatchData
from services.workflow.entities import PluginTriggerData, PluginTriggerDispatchData, PluginTriggerMetadata
logger = logging.getLogger(__name__)
@@ -138,6 +139,7 @@ def dispatch_triggered_workflow(
provider_controller: PluginTriggerProviderController = TriggerManager.get_trigger_provider(
tenant_id=subscription.tenant_id, provider_id=TriggerProviderID(subscription.provider_id)
)
trigger_entity: TriggerProviderApiEntity = provider_controller.to_api_entity()
with Session(db.engine) as session:
workflows: Mapping[str, Workflow] = _get_latest_workflows_by_app_ids(session, subscribers)
@@ -201,6 +203,14 @@ def dispatch_triggered_workflow(
plugin_id=subscription.provider_id,
endpoint_id=subscription.endpoint_id,
inputs=invoke_response.variables,
trigger_metadata=PluginTriggerMetadata(
plugin_id=trigger_entity.plugin_id or "",
plugin_unique_identifier=trigger_entity.plugin_unique_identifier or "",
endpoint_id=subscription.endpoint_id,
provider_id=subscription.provider_id,
icon_url=trigger_entity.icon or "",
icon_dark_url=trigger_entity.icon_dark or "",
),
)
# Trigger async workflow