242 lines
7.9 KiB
Python
242 lines
7.9 KiB
Python
#
|
|
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
from datetime import datetime, timezone
|
|
from typing import Any, Mapping
|
|
|
|
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
|
|
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
|
|
from airbyte_cdk.utils.datetime_helpers import ab_datetime_format
|
|
|
|
|
|
class TwilioDateTimeTypeTransformer(TypeTransformer):
|
|
"""
|
|
Twilio API returns datetime in two formats:
|
|
- RFC2822, like "Fri, 11 Dec 2020 04:28:40 +0000";
|
|
- ISO8601, like "2020-12-11T04:29:09Z".
|
|
We only transform RFC2822 values (detected by the presence of ", ").
|
|
|
|
Note:
|
|
This could be implemented using a transformation, but to avoid cluttering the manifest with many transformations,
|
|
this normalization is implemented here.
|
|
"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
# apply this transformer during schema normalization phase(s)
|
|
config = TransformConfig.DefaultSchemaNormalization | TransformConfig.CustomSchemaNormalization
|
|
super().__init__(config)
|
|
# register our custom transform
|
|
self.registerCustomTransform(self.get_transform_function())
|
|
|
|
@staticmethod
|
|
def get_transform_function():
|
|
def custom_transform_function(original_value: Any, field_schema: Mapping[str, Any]) -> Any:
|
|
if original_value and field_schema.get("format") == "date-time":
|
|
try:
|
|
dt = datetime.strptime(original_value, "%a, %d %b %Y %H:%M:%S %z").astimezone(timezone.utc)
|
|
return ab_datetime_format(dt, "%Y-%m-%dT%H:%M:%SZ")
|
|
except ValueError:
|
|
pass
|
|
return original_value
|
|
|
|
return custom_transform_function
|
|
|
|
|
|
class TwilioStateMigration(StateMigration):
|
|
"""
|
|
Ensure legacy partitions include an empty `parent_slice` required by the SubstreamPartitionRouter.
|
|
|
|
Initial:
|
|
{
|
|
"states": [
|
|
{
|
|
"partition": { "subresource_uri": "/2010-04-01/Accounts/AC123/Addresses.json" },
|
|
"cursor": { "date_created": "2022-01-01T00:00:00Z" }
|
|
}
|
|
]
|
|
}
|
|
|
|
Final:
|
|
{
|
|
"states": [
|
|
{
|
|
"partition": {
|
|
"subresource_uri": "/2010-04-01/Accounts/AC123/Addresses.json",
|
|
"parent_slice": {}
|
|
},
|
|
"cursor": { "date_created": "2022-01-01T00:00:00Z" }
|
|
}
|
|
]
|
|
}
|
|
"""
|
|
|
|
def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
|
|
for state in stream_state.get("states", []):
|
|
state["partition"]["parent_slice"] = {}
|
|
return stream_state
|
|
|
|
def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
|
|
if stream_state and any("parent_slice" not in state["partition"] for state in stream_state.get("states", [])):
|
|
return True
|
|
return False
|
|
|
|
|
|
class TwilioAlertsStateMigration(StateMigration):
|
|
"""
|
|
Migrates legacy `alerts` state to low-code shape. Previously, the stream incorrectly used per partition state.
|
|
|
|
Initial:
|
|
{
|
|
"states" : [
|
|
{
|
|
"partition" : {},
|
|
"cursor" : {
|
|
"date_generated" : "2025-08-05T16:43:50Z"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
|
|
Final:
|
|
{
|
|
"date_generated" : "2025-08-05T16:43:50Z"
|
|
}
|
|
"""
|
|
|
|
def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
|
|
return stream_state["states"][0]["cursor"]
|
|
|
|
def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
|
|
if (
|
|
stream_state
|
|
and "states" in stream_state
|
|
and stream_state["states"]
|
|
and "cursor" in stream_state["states"][0]
|
|
and "date_generated" in stream_state["states"][0]["cursor"]
|
|
):
|
|
return True
|
|
return False
|
|
|
|
|
|
class TwilioUsageRecordsStateMigration(StateMigration):
|
|
"""
|
|
Migrate legacy `usage_records` state to low-code shape.
|
|
|
|
- Add empty `parent_slice` to each partition.
|
|
- Drop legacy `partition.date_created`.
|
|
- Run if any partition lacks `parent_slice`.
|
|
|
|
Initial:
|
|
{
|
|
"states": [
|
|
{
|
|
"cursor": { "start_date": "2025-08-21T00:00:00Z" },
|
|
"partition": {
|
|
"account_sid": "ACdade166c12e160e9ed0a6088226718fb",
|
|
"date_created": "Tue, 17 Nov 2020 04:08:53 +0000"
|
|
}
|
|
},
|
|
{
|
|
"cursor": { "start_date": "2025-08-21T00:00:00Z" },
|
|
"partition": {
|
|
"account_sid": "AC4cac489c46197c9ebc91c840120a4dee",
|
|
"date_created": "Wed, 25 Nov 2020 09:36:42 +0000"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
|
|
Final:
|
|
{
|
|
"states": [
|
|
{
|
|
"cursor": { "start_date": "2025-08-21T00:00:00Z" },
|
|
"partition": { "account_sid": "ACdade166c12e160e9ed0a6088226718fb", "parent_slice": {} }
|
|
},
|
|
{
|
|
"cursor": { "start_date": "2025-08-21T00:00:00Z" },
|
|
"partition": { "account_sid": "AC4cac489c46197c9ebc91c840120a4dee", "parent_slice": {} }
|
|
}
|
|
]
|
|
}
|
|
"""
|
|
|
|
def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
|
|
new_state = {"states": []}
|
|
for state in stream_state.get("states", []):
|
|
partition_state = {}
|
|
if "partition" not in state or "account_sid" not in state["partition"]:
|
|
continue
|
|
|
|
partition_state["partition"] = {"account_sid": state["partition"]["account_sid"], "parent_slice": {}}
|
|
partition_state["cursor"] = state.get("cursor", {})
|
|
|
|
new_state["states"].append(partition_state)
|
|
return new_state
|
|
|
|
def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
|
|
if stream_state and any("parent_slice" not in state["partition"] for state in stream_state.get("states", [])):
|
|
return True
|
|
return False
|
|
|
|
|
|
class TwilioMessageMediaStateMigration(StateMigration):
|
|
"""
|
|
Reshape Message Media state to include hierarchical parent slices back to the
|
|
Messages collection. Low-code derives `message_media` partitions from `messages`,
|
|
so the state must retain the media-level `subresource_uri` and also include
|
|
`parent_slice.subresource_uri` pointing to the Messages collection
|
|
(e.g., “…/Messages.json”). States missing `partition.subresource_uri` are skipped.
|
|
|
|
Initial:
|
|
{
|
|
"states": [
|
|
{
|
|
"partition": { "subresource_uri": "/2010-04-01/Accounts/AC123/Messages/SM123/Media.json" },
|
|
"cursor": { "date_created": "2022-11-01T00:00:00Z" }
|
|
}
|
|
]
|
|
}
|
|
|
|
Final:
|
|
{
|
|
"states": [
|
|
{
|
|
"partition": {
|
|
"subresource_uri": "/2010-04-01/Accounts/AC123/Messages/SM123/Media.json",
|
|
"parent_slice": {
|
|
"subresource_uri": "/2010-04-01/Accounts/AC123/Messages.json",
|
|
"parent_slice": {}
|
|
}
|
|
},
|
|
"cursor": { "date_created": "2022-11-01T00:00:00Z" }
|
|
}
|
|
]
|
|
}
|
|
"""
|
|
|
|
def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
|
|
new_state = {"states": []}
|
|
for state in stream_state.get("states", []):
|
|
partition_state = {}
|
|
if not "partition" in state or "subresource_uri" not in state["partition"]:
|
|
continue
|
|
|
|
partition_state["partition"] = {
|
|
"subresource_uri": state["partition"]["subresource_uri"],
|
|
"parent_slice": {
|
|
"subresource_uri": state["partition"]["subresource_uri"].split("Messages")[0] + "Messages.json",
|
|
"parent_slice": {},
|
|
},
|
|
}
|
|
partition_state["cursor"] = state.get("cursor", {})
|
|
new_state["states"].append(partition_state)
|
|
|
|
return new_state
|
|
|
|
def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
|
|
if stream_state and any("parent_slice" not in state["partition"] for state in stream_state.get("states", [])):
|
|
return True
|
|
return False
|