mirror of
https://github.com/langgenius/dify.git
synced 2026-05-01 10:00:11 -04:00
refactor: quota v3 integration (#35436)
Co-authored-by: Yansong Zhang <916125788@qq.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
@@ -27,7 +27,7 @@ from core.trigger.entities.entities import TriggerProviderEntity
|
||||
from core.trigger.provider import PluginTriggerProviderController
|
||||
from core.trigger.trigger_manager import TriggerManager
|
||||
from core.workflow.nodes.trigger_plugin.entities import TriggerEventNodeData
|
||||
from enums.quota_type import QuotaType, unlimited
|
||||
from enums.quota_type import QuotaType
|
||||
from graphon.enums import WorkflowExecutionStatus
|
||||
from models.enums import (
|
||||
AppTriggerType,
|
||||
@@ -42,6 +42,7 @@ from models.workflow import Workflow, WorkflowAppLog, WorkflowAppLogCreatedFrom,
|
||||
from services.async_workflow_service import AsyncWorkflowService
|
||||
from services.end_user_service import EndUserService
|
||||
from services.errors.app import QuotaExceededError
|
||||
from services.quota_service import QuotaService, unlimited
|
||||
from services.trigger.app_trigger_service import AppTriggerService
|
||||
from services.trigger.trigger_provider_service import TriggerProviderService
|
||||
from services.trigger.trigger_request_service import TriggerHttpRequestCachingService
|
||||
@@ -258,59 +259,58 @@ def dispatch_triggered_workflow(
|
||||
tenant_id=subscription.tenant_id, provider_id=TriggerProviderID(subscription.provider_id)
|
||||
)
|
||||
trigger_entity: TriggerProviderEntity = provider_controller.entity
|
||||
|
||||
# Ensure expire_on_commit is set to False to remain workflows available
|
||||
with session_factory.create_session() as session:
|
||||
workflows: Mapping[str, Workflow] = _get_latest_workflows_by_app_ids(session, subscribers)
|
||||
|
||||
end_users: Mapping[str, EndUser] = EndUserService.create_end_user_batch(
|
||||
type=InvokeFrom.TRIGGER,
|
||||
tenant_id=subscription.tenant_id,
|
||||
app_ids=[plugin_trigger.app_id for plugin_trigger in subscribers],
|
||||
user_id=user_id,
|
||||
)
|
||||
for plugin_trigger in subscribers:
|
||||
# Get workflow from mapping
|
||||
workflow: Workflow | None = workflows.get(plugin_trigger.app_id)
|
||||
if not workflow:
|
||||
logger.error(
|
||||
"Workflow not found for app %s",
|
||||
plugin_trigger.app_id,
|
||||
)
|
||||
continue
|
||||
end_users: Mapping[str, EndUser] = EndUserService.create_end_user_batch(
|
||||
type=InvokeFrom.TRIGGER,
|
||||
tenant_id=subscription.tenant_id,
|
||||
app_ids=[plugin_trigger.app_id for plugin_trigger in subscribers],
|
||||
user_id=user_id,
|
||||
)
|
||||
|
||||
# Find the trigger node in the workflow
|
||||
event_node = None
|
||||
for node_id, node_config in workflow.walk_nodes(TRIGGER_PLUGIN_NODE_TYPE):
|
||||
if node_id == plugin_trigger.node_id:
|
||||
event_node = node_config
|
||||
break
|
||||
|
||||
if not event_node:
|
||||
logger.error("Trigger event node not found for app %s", plugin_trigger.app_id)
|
||||
continue
|
||||
|
||||
# invoke trigger
|
||||
trigger_metadata = PluginTriggerMetadata(
|
||||
plugin_unique_identifier=provider_controller.plugin_unique_identifier or "",
|
||||
endpoint_id=subscription.endpoint_id,
|
||||
provider_id=subscription.provider_id,
|
||||
event_name=event_name,
|
||||
icon_filename=trigger_entity.identity.icon or "",
|
||||
icon_dark_filename=trigger_entity.identity.icon_dark or "",
|
||||
for plugin_trigger in subscribers:
|
||||
workflow: Workflow | None = workflows.get(plugin_trigger.app_id)
|
||||
if not workflow:
|
||||
logger.error(
|
||||
"Workflow not found for app %s",
|
||||
plugin_trigger.app_id,
|
||||
)
|
||||
continue
|
||||
|
||||
# consume quota before invoking trigger
|
||||
quota_charge = unlimited()
|
||||
try:
|
||||
quota_charge = QuotaType.TRIGGER.consume(subscription.tenant_id)
|
||||
except QuotaExceededError:
|
||||
AppTriggerService.mark_tenant_triggers_rate_limited(subscription.tenant_id)
|
||||
logger.info(
|
||||
"Tenant %s rate limited, skipping plugin trigger %s", subscription.tenant_id, plugin_trigger.id
|
||||
)
|
||||
return 0
|
||||
event_node = None
|
||||
for node_id, node_config in workflow.walk_nodes(TRIGGER_PLUGIN_NODE_TYPE):
|
||||
if node_id == plugin_trigger.node_id:
|
||||
event_node = node_config
|
||||
break
|
||||
|
||||
node_data: TriggerEventNodeData = TriggerEventNodeData.model_validate(event_node)
|
||||
invoke_response: TriggerInvokeEventResponse | None = None
|
||||
if not event_node:
|
||||
logger.error("Trigger event node not found for app %s", plugin_trigger.app_id)
|
||||
continue
|
||||
|
||||
trigger_metadata = PluginTriggerMetadata(
|
||||
plugin_unique_identifier=provider_controller.plugin_unique_identifier or "",
|
||||
endpoint_id=subscription.endpoint_id,
|
||||
provider_id=subscription.provider_id,
|
||||
event_name=event_name,
|
||||
icon_filename=trigger_entity.identity.icon or "",
|
||||
icon_dark_filename=trigger_entity.identity.icon_dark or "",
|
||||
)
|
||||
|
||||
quota_charge = unlimited()
|
||||
try:
|
||||
quota_charge = QuotaService.reserve(QuotaType.TRIGGER, subscription.tenant_id)
|
||||
except QuotaExceededError:
|
||||
AppTriggerService.mark_tenant_triggers_rate_limited(subscription.tenant_id)
|
||||
logger.info("Tenant %s rate limited, skipping plugin trigger %s", subscription.tenant_id, plugin_trigger.id)
|
||||
return dispatched_count
|
||||
|
||||
node_data: TriggerEventNodeData = TriggerEventNodeData.model_validate(event_node)
|
||||
invoke_response: TriggerInvokeEventResponse | None = None
|
||||
|
||||
with session_factory.create_session() as session:
|
||||
try:
|
||||
invoke_response = TriggerManager.invoke_trigger_event(
|
||||
tenant_id=subscription.tenant_id,
|
||||
@@ -387,6 +387,7 @@ def dispatch_triggered_workflow(
|
||||
raise ValueError(f"End user not found for app {plugin_trigger.app_id}")
|
||||
|
||||
AsyncWorkflowService.trigger_workflow_async(session=session, user=end_user, trigger_data=trigger_data)
|
||||
quota_charge.commit()
|
||||
dispatched_count += 1
|
||||
logger.info(
|
||||
"Triggered workflow for app %s with trigger event %s",
|
||||
@@ -401,7 +402,7 @@ def dispatch_triggered_workflow(
|
||||
plugin_trigger.app_id,
|
||||
)
|
||||
|
||||
return dispatched_count
|
||||
return dispatched_count
|
||||
|
||||
|
||||
def dispatch_triggered_workflows(
|
||||
|
||||
Reference in New Issue
Block a user