feat: add subscription rebuild functionality and enhance trigger subscription handling

- Introduced `SubscriptionRebuildRequest` model for rebuilding existing subscriptions.
- Implemented `TriggerSubscriptionRebuildApi` to handle rebuild requests, allowing users to recreate subscriptions while retaining identifiers.
- Enhanced `TriggerProviderService` with `rebuild_trigger_subscription` method to manage subscription rebuilding logic, including credential and parameter updates.
- Updated existing subscription handling to support new parameters and credentials during updates.
This commit is contained in:
Harry
2025-12-18 12:24:35 +08:00
parent 40f8a5e755
commit 0bca67e45a
2 changed files with 198 additions and 36 deletions

View File

@@ -1,4 +1,5 @@
import logging
from collections.abc import Mapping
from typing import Any
from flask import make_response, redirect, request
@@ -41,11 +42,23 @@ class TriggerSubscriptionUpdateRequest(BaseModel):
properties: dict[str, Any] | None = Field(default=None, description="Subscription properties")
class SubscriptionRebuildRequest(BaseModel):
"""Request payload for rebuilding an existing subscription."""
credentials: Mapping[str, Any] = Field(default_factory=dict, description="The credentials for the subscription")
parameters: Mapping[str, Any] = Field(default_factory=dict, description="The parameters for the subscription")
console_ns.schema_model(
TriggerSubscriptionUpdateRequest.__name__,
TriggerSubscriptionUpdateRequest.model_json_schema(ref_template="#/definitions/{model}"),
)
console_ns.schema_model(
SubscriptionRebuildRequest.__name__,
SubscriptionRebuildRequest.model_json_schema(ref_template="#/definitions/{model}"),
)
@console_ns.route("/workspaces/current/trigger-provider/<path:provider>/icon")
class TriggerProviderIconApi(Resource):
@@ -320,15 +333,24 @@ class TriggerSubscriptionUpdateApi(Resource):
args = TriggerSubscriptionUpdateRequest.model_validate(console_ns.payload)
subscription = TriggerProviderService.get_subscription_by_id(
tenant_id=user.current_tenant_id,
subscription_id=subscription_id,
)
if not subscription:
raise NotFoundError(f"Subscription {subscription_id} not found")
if subscription.credential_type is not CredentialType.UNAUTHORIZED:
raise Forbidden("Only unauthorized subscriptions can be update directly")
try:
return jsonable_encoder(
TriggerProviderService.update_trigger_subscription(
tenant_id=user.current_tenant_id,
subscription_id=subscription_id,
name=args.name,
properties=args.properties,
)
TriggerProviderService.update_trigger_subscription(
tenant_id=user.current_tenant_id,
subscription_id=subscription_id,
name=args.name,
properties=args.properties,
)
return 200
except ValueError as e:
raise BadRequest(str(e))
except Exception as e:
@@ -372,6 +394,47 @@ class TriggerSubscriptionDeleteApi(Resource):
raise
@console_ns.route(
"/workspaces/current/trigger-provider/<path:provider>/subscriptions/<path:subscription_id>/rebuild",
)
class TriggerSubscriptionRebuildApi(Resource):
@console_ns.expect(console_ns.models[SubscriptionRebuildRequest.__name__])
@setup_required
@login_required
@edit_permission_required
@account_initialization_required
def post(self, provider: str, subscription_id: str):
"""
Rebuild an existing subscription instance.
This will:
1. Unsubscribe from the provider (delete webhook on provider side)
2. Create a new subscription with the request data and keep the same subscription_id and endpoint_id
The user can then go through the normal build flow to re-create the webhook.
"""
user = current_user
assert user.current_tenant_id is not None
rebuild_request: SubscriptionRebuildRequest = SubscriptionRebuildRequest.model_validate(console_ns.payload)
try:
TriggerProviderService.rebuild_trigger_subscription(
tenant_id=user.current_tenant_id,
provider_id=TriggerProviderID(provider),
subscription_id=subscription_id,
credentials=rebuild_request.credentials,
parameters=rebuild_request.parameters,
)
return 200
except ValueError as e:
raise BadRequest(str(e))
except Exception as e:
logger.exception("Error rebuilding subscription", exc_info=e)
raise
@console_ns.route("/workspaces/current/trigger-provider/<path:provider>/subscriptions/oauth/authorize")
class TriggerOAuthAuthorizeApi(Resource):
@setup_required

View File

@@ -110,9 +110,7 @@ class TriggerProviderService:
subscription.properties = dict(
properties_encrypter.mask_credentials(dict(properties_encrypter.decrypt(subscription.properties)))
)
subscription.parameters = dict(
credential_encrypter.mask_credentials(dict(credential_encrypter.decrypt(subscription.parameters)))
)
subscription.parameters = dict(subscription.parameters)
count = workflows_in_use_map.get(subscription.id)
subscription.workflows_in_use = count if count is not None else 0
@@ -225,7 +223,11 @@ class TriggerProviderService:
subscription_id: str,
name: str | None = None,
properties: Mapping[str, Any] | None = None,
) -> Mapping[str, Any]:
parameters: Mapping[str, Any] | None = None,
credentials: Mapping[str, Any] | None = None,
credential_expires_at: int | None = None,
expires_at: int | None = None,
) -> None:
"""
Update an existing trigger subscription.
@@ -233,6 +235,10 @@ class TriggerProviderService:
:param subscription_id: Subscription instance ID
:param name: Optional new name for this subscription
:param properties: Optional new properties
:param parameters: Optional new parameters
:param credentials: Optional new credentials
:param credential_expires_at: Optional new credential expiration timestamp
:param expires_at: Optional new expiration timestamp
:return: Success response with updated subscription info
"""
with Session(db.engine, expire_on_commit=False) as session:
@@ -261,7 +267,7 @@ class TriggerProviderService:
# Update properties if provided
if properties is not None:
properties_encrypter, properties_cache = create_provider_encrypter(
properties_encrypter, _ = create_provider_encrypter(
tenant_id=tenant_id,
config=provider_controller.get_properties_schema(),
cache=NoOpProviderCredentialCache(),
@@ -273,7 +279,28 @@ class TriggerProviderService:
for key, value in properties.items()
}
subscription.properties = dict(properties_encrypter.encrypt(new_properties))
properties_cache.delete()
# Update parameters if provided
if parameters is not None:
subscription.parameters = dict(parameters)
# Update credentials if provided
if credentials is not None:
credential_type = CredentialType.of(subscription.credential_type)
credential_encrypter, _ = create_provider_encrypter(
tenant_id=tenant_id,
config=provider_controller.get_credential_schema_config(credential_type),
cache=NoOpProviderCredentialCache(),
)
subscription.credentials = dict(credential_encrypter.encrypt(dict(credentials)))
# Update credential expiration timestamp if provided
if credential_expires_at is not None:
subscription.credential_expires_at = credential_expires_at
# Update expiration timestamp if provided
if expires_at is not None:
subscription.expires_at = expires_at
session.commit()
@@ -284,11 +311,6 @@ class TriggerProviderService:
subscription_id=subscription.id,
)
return {
"result": "success",
"id": str(subscription.id),
}
@classmethod
def get_subscription_by_id(cls, tenant_id: str, subscription_id: str | None = None) -> TriggerSubscription | None:
"""
@@ -339,26 +361,28 @@ class TriggerProviderService:
credential_type: CredentialType = CredentialType.of(subscription.credential_type)
is_auto_created: bool = credential_type in [CredentialType.OAUTH2, CredentialType.API_KEY]
if is_auto_created:
provider_id = TriggerProviderID(subscription.provider_id)
provider_controller: PluginTriggerProviderController = TriggerManager.get_trigger_provider(
tenant_id=tenant_id, provider_id=provider_id
)
encrypter, _ = create_trigger_provider_encrypter_for_subscription(
return None
provider_id = TriggerProviderID(subscription.provider_id)
provider_controller: PluginTriggerProviderController = TriggerManager.get_trigger_provider(
tenant_id=tenant_id, provider_id=provider_id
)
encrypter, _ = create_trigger_provider_encrypter_for_subscription(
tenant_id=tenant_id,
controller=provider_controller,
subscription=subscription,
)
try:
TriggerManager.unsubscribe_trigger(
tenant_id=tenant_id,
controller=provider_controller,
subscription=subscription,
user_id=subscription.user_id,
provider_id=provider_id,
subscription=subscription.to_entity(),
credentials=encrypter.decrypt(subscription.credentials),
credential_type=credential_type,
)
try:
TriggerManager.unsubscribe_trigger(
tenant_id=tenant_id,
user_id=subscription.user_id,
provider_id=provider_id,
subscription=subscription.to_entity(),
credentials=encrypter.decrypt(subscription.credentials),
credential_type=credential_type,
)
except Exception as e:
logger.exception("Error unsubscribing trigger", exc_info=e)
except Exception as e:
logger.exception("Error unsubscribing trigger", exc_info=e)
# Clear cache
session.delete(subscription)
@@ -768,3 +792,78 @@ class TriggerProviderService:
)
subscription.properties = dict(properties_encrypter.decrypt(subscription.properties))
return subscription
@classmethod
def rebuild_trigger_subscription(
cls,
tenant_id: str,
provider_id: TriggerProviderID,
subscription_id: str,
credentials: Mapping[str, Any],
parameters: Mapping[str, Any],
) -> None:
"""
Create a subscription builder for rebuilding an existing subscription.
This method creates a builder pre-filled with data from the rebuild request,
keeping the same subscription_id and endpoint_id so the webhook URL remains unchanged.
:param tenant_id: Tenant ID
:param subscription_id: Subscription ID
:param provider_id: Provider identifier
:param credentials: Credentials for the subscription
:param parameters: Parameters for the subscription
:return: SubscriptionBuilderApiEntity
"""
provider_controller = TriggerManager.get_trigger_provider(tenant_id, provider_id)
if not provider_controller:
raise ValueError(f"Provider {provider_id} not found")
subscription = TriggerProviderService.get_subscription_by_id(
tenant_id=tenant_id,
subscription_id=subscription_id,
)
if not subscription:
raise ValueError(f"Subscription {subscription_id} not found")
credential_type = CredentialType.of(subscription.credential_type)
if credential_type not in [CredentialType.OAUTH2, CredentialType.API_KEY]:
raise ValueError("Credential type not supported for rebuild")
# TODO: Tring to invoke update api of the plugin trigger provider
# FALLBACK: If the update api is not implemented, delete the previous subscription and create a new one
# Delete the previous subscription
encrypter, _ = create_trigger_provider_encrypter_for_subscription(
tenant_id=tenant_id,
controller=provider_controller,
subscription=subscription,
)
user_id = subscription.user_id
TriggerManager.unsubscribe_trigger(
tenant_id=tenant_id,
user_id=user_id,
provider_id=provider_id,
subscription=subscription.to_entity(),
credentials=encrypter.decrypt(subscription.credentials),
credential_type=credential_type,
)
# Create a new subscription with the same subscription_id and endpoint_id
new_subscription: TriggerSubscriptionEntity = TriggerManager.subscribe_trigger(
tenant_id=tenant_id,
user_id=user_id,
provider_id=provider_id,
endpoint=generate_plugin_trigger_endpoint_url(subscription.endpoint_id),
parameters=parameters,
credentials=credentials,
credential_type=credential_type,
)
TriggerProviderService.update_trigger_subscription(
tenant_id=tenant_id,
subscription_id=subscription.id,
parameters=parameters,
credentials=credentials,
expires_at=new_subscription.expires_at,
)