1
0
mirror of synced 2025-12-19 18:14:56 -05:00
Files
2025-09-03 10:20:07 +03:00

242 lines
7.9 KiB
Python

#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#
from datetime import datetime, timezone
from typing import Any, Mapping
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from airbyte_cdk.utils.datetime_helpers import ab_datetime_format
class TwilioDateTimeTypeTransformer(TypeTransformer):
"""
Twilio API returns datetime in two formats:
- RFC2822, like "Fri, 11 Dec 2020 04:28:40 +0000";
- ISO8601, like "2020-12-11T04:29:09Z".
We only transform RFC2822 values (detected by the presence of ", ").
Note:
This could be implemented using a transformation, but to avoid cluttering the manifest with many transformations,
this normalization is implemented here.
"""
def __init__(self, *args, **kwargs):
# apply this transformer during schema normalization phase(s)
config = TransformConfig.DefaultSchemaNormalization | TransformConfig.CustomSchemaNormalization
super().__init__(config)
# register our custom transform
self.registerCustomTransform(self.get_transform_function())
@staticmethod
def get_transform_function():
def custom_transform_function(original_value: Any, field_schema: Mapping[str, Any]) -> Any:
if original_value and field_schema.get("format") == "date-time":
try:
dt = datetime.strptime(original_value, "%a, %d %b %Y %H:%M:%S %z").astimezone(timezone.utc)
return ab_datetime_format(dt, "%Y-%m-%dT%H:%M:%SZ")
except ValueError:
pass
return original_value
return custom_transform_function
class TwilioStateMigration(StateMigration):
"""
Ensure legacy partitions include an empty `parent_slice` required by the SubstreamPartitionRouter.
Initial:
{
"states": [
{
"partition": { "subresource_uri": "/2010-04-01/Accounts/AC123/Addresses.json" },
"cursor": { "date_created": "2022-01-01T00:00:00Z" }
}
]
}
Final:
{
"states": [
{
"partition": {
"subresource_uri": "/2010-04-01/Accounts/AC123/Addresses.json",
"parent_slice": {}
},
"cursor": { "date_created": "2022-01-01T00:00:00Z" }
}
]
}
"""
def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
for state in stream_state.get("states", []):
state["partition"]["parent_slice"] = {}
return stream_state
def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
if stream_state and any("parent_slice" not in state["partition"] for state in stream_state.get("states", [])):
return True
return False
class TwilioAlertsStateMigration(StateMigration):
"""
Migrates legacy `alerts` state to low-code shape. Previously, the stream incorrectly used per partition state.
Initial:
{
"states" : [
{
"partition" : {},
"cursor" : {
"date_generated" : "2025-08-05T16:43:50Z"
}
}
]
}
Final:
{
"date_generated" : "2025-08-05T16:43:50Z"
}
"""
def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
return stream_state["states"][0]["cursor"]
def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
if (
stream_state
and "states" in stream_state
and stream_state["states"]
and "cursor" in stream_state["states"][0]
and "date_generated" in stream_state["states"][0]["cursor"]
):
return True
return False
class TwilioUsageRecordsStateMigration(StateMigration):
"""
Migrate legacy `usage_records` state to low-code shape.
- Add empty `parent_slice` to each partition.
- Drop legacy `partition.date_created`.
- Run if any partition lacks `parent_slice`.
Initial:
{
"states": [
{
"cursor": { "start_date": "2025-08-21T00:00:00Z" },
"partition": {
"account_sid": "ACdade166c12e160e9ed0a6088226718fb",
"date_created": "Tue, 17 Nov 2020 04:08:53 +0000"
}
},
{
"cursor": { "start_date": "2025-08-21T00:00:00Z" },
"partition": {
"account_sid": "AC4cac489c46197c9ebc91c840120a4dee",
"date_created": "Wed, 25 Nov 2020 09:36:42 +0000"
}
}
]
}
Final:
{
"states": [
{
"cursor": { "start_date": "2025-08-21T00:00:00Z" },
"partition": { "account_sid": "ACdade166c12e160e9ed0a6088226718fb", "parent_slice": {} }
},
{
"cursor": { "start_date": "2025-08-21T00:00:00Z" },
"partition": { "account_sid": "AC4cac489c46197c9ebc91c840120a4dee", "parent_slice": {} }
}
]
}
"""
def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
new_state = {"states": []}
for state in stream_state.get("states", []):
partition_state = {}
if "partition" not in state or "account_sid" not in state["partition"]:
continue
partition_state["partition"] = {"account_sid": state["partition"]["account_sid"], "parent_slice": {}}
partition_state["cursor"] = state.get("cursor", {})
new_state["states"].append(partition_state)
return new_state
def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
if stream_state and any("parent_slice" not in state["partition"] for state in stream_state.get("states", [])):
return True
return False
class TwilioMessageMediaStateMigration(StateMigration):
"""
Reshape Message Media state to include hierarchical parent slices back to the
Messages collection. Low-code derives `message_media` partitions from `messages`,
so the state must retain the media-level `subresource_uri` and also include
`parent_slice.subresource_uri` pointing to the Messages collection
(e.g., “…/Messages.json”). States missing `partition.subresource_uri` are skipped.
Initial:
{
"states": [
{
"partition": { "subresource_uri": "/2010-04-01/Accounts/AC123/Messages/SM123/Media.json" },
"cursor": { "date_created": "2022-11-01T00:00:00Z" }
}
]
}
Final:
{
"states": [
{
"partition": {
"subresource_uri": "/2010-04-01/Accounts/AC123/Messages/SM123/Media.json",
"parent_slice": {
"subresource_uri": "/2010-04-01/Accounts/AC123/Messages.json",
"parent_slice": {}
}
},
"cursor": { "date_created": "2022-11-01T00:00:00Z" }
}
]
}
"""
def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
new_state = {"states": []}
for state in stream_state.get("states", []):
partition_state = {}
if not "partition" in state or "subresource_uri" not in state["partition"]:
continue
partition_state["partition"] = {
"subresource_uri": state["partition"]["subresource_uri"],
"parent_slice": {
"subresource_uri": state["partition"]["subresource_uri"].split("Messages")[0] + "Messages.json",
"parent_slice": {},
},
}
partition_state["cursor"] = state.get("cursor", {})
new_state["states"].append(partition_state)
return new_state
def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
if stream_state and any("parent_slice" not in state["partition"] for state in stream_state.get("states", [])):
return True
return False