From 0bca67e45a06031d520c4133052eca1f2852ba71 Mon Sep 17 00:00:00 2001 From: Harry Date: Thu, 18 Dec 2025 12:24:35 +0800 Subject: [PATCH] feat: add subscription rebuild functionality and enhance trigger subscription handling - Introduced `SubscriptionRebuildRequest` model for rebuilding existing subscriptions. - Implemented `TriggerSubscriptionRebuildApi` to handle rebuild requests, allowing users to recreate subscriptions while retaining identifiers. - Enhanced `TriggerProviderService` with `rebuild_trigger_subscription` method to manage subscription rebuilding logic, including credential and parameter updates. - Updated existing subscription handling to support new parameters and credentials during updates. --- .../console/workspace/trigger_providers.py | 77 ++++++++- .../trigger/trigger_provider_service.py | 157 ++++++++++++++---- 2 files changed, 198 insertions(+), 36 deletions(-) diff --git a/api/controllers/console/workspace/trigger_providers.py b/api/controllers/console/workspace/trigger_providers.py index 78d41aba07..6eab1daca3 100644 --- a/api/controllers/console/workspace/trigger_providers.py +++ b/api/controllers/console/workspace/trigger_providers.py @@ -1,4 +1,5 @@ import logging +from collections.abc import Mapping from typing import Any from flask import make_response, redirect, request @@ -41,11 +42,23 @@ class TriggerSubscriptionUpdateRequest(BaseModel): properties: dict[str, Any] | None = Field(default=None, description="Subscription properties") +class SubscriptionRebuildRequest(BaseModel): + """Request payload for rebuilding an existing subscription.""" + + credentials: Mapping[str, Any] = Field(default_factory=dict, description="The credentials for the subscription") + parameters: Mapping[str, Any] = Field(default_factory=dict, description="The parameters for the subscription") + + console_ns.schema_model( TriggerSubscriptionUpdateRequest.__name__, TriggerSubscriptionUpdateRequest.model_json_schema(ref_template="#/definitions/{model}"), ) +console_ns.schema_model( + SubscriptionRebuildRequest.__name__, + SubscriptionRebuildRequest.model_json_schema(ref_template="#/definitions/{model}"), +) + @console_ns.route("/workspaces/current/trigger-provider//icon") class TriggerProviderIconApi(Resource): @@ -320,15 +333,24 @@ class TriggerSubscriptionUpdateApi(Resource): args = TriggerSubscriptionUpdateRequest.model_validate(console_ns.payload) + subscription = TriggerProviderService.get_subscription_by_id( + tenant_id=user.current_tenant_id, + subscription_id=subscription_id, + ) + if not subscription: + raise NotFoundError(f"Subscription {subscription_id} not found") + + if subscription.credential_type is not CredentialType.UNAUTHORIZED: + raise Forbidden("Only unauthorized subscriptions can be update directly") + try: - return jsonable_encoder( - TriggerProviderService.update_trigger_subscription( - tenant_id=user.current_tenant_id, - subscription_id=subscription_id, - name=args.name, - properties=args.properties, - ) + TriggerProviderService.update_trigger_subscription( + tenant_id=user.current_tenant_id, + subscription_id=subscription_id, + name=args.name, + properties=args.properties, ) + return 200 except ValueError as e: raise BadRequest(str(e)) except Exception as e: @@ -372,6 +394,47 @@ class TriggerSubscriptionDeleteApi(Resource): raise +@console_ns.route( + "/workspaces/current/trigger-provider//subscriptions//rebuild", +) +class TriggerSubscriptionRebuildApi(Resource): + @console_ns.expect(console_ns.models[SubscriptionRebuildRequest.__name__]) + @setup_required + @login_required + @edit_permission_required + @account_initialization_required + def post(self, provider: str, subscription_id: str): + """ + Rebuild an existing subscription instance. + + This will: + 1. Unsubscribe from the provider (delete webhook on provider side) + 2. Create a new subscription with the request data and keep the same subscription_id and endpoint_id + + The user can then go through the normal build flow to re-create the webhook. + """ + user = current_user + assert user.current_tenant_id is not None + + rebuild_request: SubscriptionRebuildRequest = SubscriptionRebuildRequest.model_validate(console_ns.payload) + + try: + TriggerProviderService.rebuild_trigger_subscription( + tenant_id=user.current_tenant_id, + provider_id=TriggerProviderID(provider), + subscription_id=subscription_id, + credentials=rebuild_request.credentials, + parameters=rebuild_request.parameters, + ) + + return 200 + except ValueError as e: + raise BadRequest(str(e)) + except Exception as e: + logger.exception("Error rebuilding subscription", exc_info=e) + raise + + @console_ns.route("/workspaces/current/trigger-provider//subscriptions/oauth/authorize") class TriggerOAuthAuthorizeApi(Resource): @setup_required diff --git a/api/services/trigger/trigger_provider_service.py b/api/services/trigger/trigger_provider_service.py index 791cd74129..9fa3386e5e 100644 --- a/api/services/trigger/trigger_provider_service.py +++ b/api/services/trigger/trigger_provider_service.py @@ -110,9 +110,7 @@ class TriggerProviderService: subscription.properties = dict( properties_encrypter.mask_credentials(dict(properties_encrypter.decrypt(subscription.properties))) ) - subscription.parameters = dict( - credential_encrypter.mask_credentials(dict(credential_encrypter.decrypt(subscription.parameters))) - ) + subscription.parameters = dict(subscription.parameters) count = workflows_in_use_map.get(subscription.id) subscription.workflows_in_use = count if count is not None else 0 @@ -225,7 +223,11 @@ class TriggerProviderService: subscription_id: str, name: str | None = None, properties: Mapping[str, Any] | None = None, - ) -> Mapping[str, Any]: + parameters: Mapping[str, Any] | None = None, + credentials: Mapping[str, Any] | None = None, + credential_expires_at: int | None = None, + expires_at: int | None = None, + ) -> None: """ Update an existing trigger subscription. @@ -233,6 +235,10 @@ class TriggerProviderService: :param subscription_id: Subscription instance ID :param name: Optional new name for this subscription :param properties: Optional new properties + :param parameters: Optional new parameters + :param credentials: Optional new credentials + :param credential_expires_at: Optional new credential expiration timestamp + :param expires_at: Optional new expiration timestamp :return: Success response with updated subscription info """ with Session(db.engine, expire_on_commit=False) as session: @@ -261,7 +267,7 @@ class TriggerProviderService: # Update properties if provided if properties is not None: - properties_encrypter, properties_cache = create_provider_encrypter( + properties_encrypter, _ = create_provider_encrypter( tenant_id=tenant_id, config=provider_controller.get_properties_schema(), cache=NoOpProviderCredentialCache(), @@ -273,7 +279,28 @@ class TriggerProviderService: for key, value in properties.items() } subscription.properties = dict(properties_encrypter.encrypt(new_properties)) - properties_cache.delete() + + # Update parameters if provided + if parameters is not None: + subscription.parameters = dict(parameters) + + # Update credentials if provided + if credentials is not None: + credential_type = CredentialType.of(subscription.credential_type) + credential_encrypter, _ = create_provider_encrypter( + tenant_id=tenant_id, + config=provider_controller.get_credential_schema_config(credential_type), + cache=NoOpProviderCredentialCache(), + ) + subscription.credentials = dict(credential_encrypter.encrypt(dict(credentials))) + + # Update credential expiration timestamp if provided + if credential_expires_at is not None: + subscription.credential_expires_at = credential_expires_at + + # Update expiration timestamp if provided + if expires_at is not None: + subscription.expires_at = expires_at session.commit() @@ -284,11 +311,6 @@ class TriggerProviderService: subscription_id=subscription.id, ) - return { - "result": "success", - "id": str(subscription.id), - } - @classmethod def get_subscription_by_id(cls, tenant_id: str, subscription_id: str | None = None) -> TriggerSubscription | None: """ @@ -339,26 +361,28 @@ class TriggerProviderService: credential_type: CredentialType = CredentialType.of(subscription.credential_type) is_auto_created: bool = credential_type in [CredentialType.OAUTH2, CredentialType.API_KEY] if is_auto_created: - provider_id = TriggerProviderID(subscription.provider_id) - provider_controller: PluginTriggerProviderController = TriggerManager.get_trigger_provider( - tenant_id=tenant_id, provider_id=provider_id - ) - encrypter, _ = create_trigger_provider_encrypter_for_subscription( + return None + + provider_id = TriggerProviderID(subscription.provider_id) + provider_controller: PluginTriggerProviderController = TriggerManager.get_trigger_provider( + tenant_id=tenant_id, provider_id=provider_id + ) + encrypter, _ = create_trigger_provider_encrypter_for_subscription( + tenant_id=tenant_id, + controller=provider_controller, + subscription=subscription, + ) + try: + TriggerManager.unsubscribe_trigger( tenant_id=tenant_id, - controller=provider_controller, - subscription=subscription, + user_id=subscription.user_id, + provider_id=provider_id, + subscription=subscription.to_entity(), + credentials=encrypter.decrypt(subscription.credentials), + credential_type=credential_type, ) - try: - TriggerManager.unsubscribe_trigger( - tenant_id=tenant_id, - user_id=subscription.user_id, - provider_id=provider_id, - subscription=subscription.to_entity(), - credentials=encrypter.decrypt(subscription.credentials), - credential_type=credential_type, - ) - except Exception as e: - logger.exception("Error unsubscribing trigger", exc_info=e) + except Exception as e: + logger.exception("Error unsubscribing trigger", exc_info=e) # Clear cache session.delete(subscription) @@ -768,3 +792,78 @@ class TriggerProviderService: ) subscription.properties = dict(properties_encrypter.decrypt(subscription.properties)) return subscription + + @classmethod + def rebuild_trigger_subscription( + cls, + tenant_id: str, + provider_id: TriggerProviderID, + subscription_id: str, + credentials: Mapping[str, Any], + parameters: Mapping[str, Any], + ) -> None: + """ + Create a subscription builder for rebuilding an existing subscription. + + This method creates a builder pre-filled with data from the rebuild request, + keeping the same subscription_id and endpoint_id so the webhook URL remains unchanged. + + :param tenant_id: Tenant ID + :param subscription_id: Subscription ID + :param provider_id: Provider identifier + :param credentials: Credentials for the subscription + :param parameters: Parameters for the subscription + :return: SubscriptionBuilderApiEntity + """ + provider_controller = TriggerManager.get_trigger_provider(tenant_id, provider_id) + if not provider_controller: + raise ValueError(f"Provider {provider_id} not found") + + subscription = TriggerProviderService.get_subscription_by_id( + tenant_id=tenant_id, + subscription_id=subscription_id, + ) + if not subscription: + raise ValueError(f"Subscription {subscription_id} not found") + + credential_type = CredentialType.of(subscription.credential_type) + if credential_type not in [CredentialType.OAUTH2, CredentialType.API_KEY]: + raise ValueError("Credential type not supported for rebuild") + + # TODO: Tring to invoke update api of the plugin trigger provider + + # FALLBACK: If the update api is not implemented, delete the previous subscription and create a new one + + # Delete the previous subscription + encrypter, _ = create_trigger_provider_encrypter_for_subscription( + tenant_id=tenant_id, + controller=provider_controller, + subscription=subscription, + ) + user_id = subscription.user_id + TriggerManager.unsubscribe_trigger( + tenant_id=tenant_id, + user_id=user_id, + provider_id=provider_id, + subscription=subscription.to_entity(), + credentials=encrypter.decrypt(subscription.credentials), + credential_type=credential_type, + ) + + # Create a new subscription with the same subscription_id and endpoint_id + new_subscription: TriggerSubscriptionEntity = TriggerManager.subscribe_trigger( + tenant_id=tenant_id, + user_id=user_id, + provider_id=provider_id, + endpoint=generate_plugin_trigger_endpoint_url(subscription.endpoint_id), + parameters=parameters, + credentials=credentials, + credential_type=credential_type, + ) + TriggerProviderService.update_trigger_subscription( + tenant_id=tenant_id, + subscription_id=subscription.id, + parameters=parameters, + credentials=credentials, + expires_at=new_subscription.expires_at, + )