333 lines
15 KiB
Python
333 lines
15 KiB
Python
#
|
|
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
|
|
#
|
|
import logging
|
|
from abc import ABC
|
|
from dataclasses import dataclass
|
|
from typing import Any, Dict, Iterable, Mapping, Optional, Union
|
|
|
|
import dpath
|
|
import requests
|
|
from requests.exceptions import InvalidURL
|
|
|
|
from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor
|
|
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
|
|
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
|
|
from airbyte_cdk.sources.declarative.models.declarative_component_schema import DeclarativeStream as DeclarativeStreamModel
|
|
from airbyte_cdk.sources.declarative.requesters.error_handlers import DefaultErrorHandler
|
|
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies import WaitTimeFromHeaderBackoffStrategy
|
|
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
|
|
from airbyte_cdk.sources.message.repository import InMemoryMessageRepository
|
|
from airbyte_cdk.sources.streams.call_rate import APIBudget
|
|
from airbyte_cdk.sources.streams.http.error_handlers import BackoffStrategy, ErrorHandler, HttpStatusErrorHandler
|
|
from airbyte_cdk.sources.streams.http.error_handlers.default_error_mapping import DEFAULT_ERROR_MAPPING
|
|
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ErrorResolution, FailureType, ResponseAction
|
|
from airbyte_cdk.sources.streams.http.http_client import HttpClient
|
|
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
|
|
|
|
|
|
ARCHIVED_EMAIL = {"archived": "true", "campaign_type": "email"}
|
|
NOT_ARCHIVED_EMAIL = {"archived": "false", "campaign_type": "email"}
|
|
|
|
ARCHIVED = {"archived": "true"}
|
|
NOT_ARCHIVED = {"archived": "false"}
|
|
|
|
DEFAULT_START_DATE = "2012-01-01T00:00:00Z"
|
|
|
|
|
|
class ArchivedToPerPartitionStateMigration(StateMigration, ABC):
|
|
"""
|
|
Updates old format state to new per partitioned format.
|
|
Partitions: [{archived: True}, {archived: False}]
|
|
Default built in airbyte cdk migration will recognise only top-level field cursor value(updated_at),
|
|
but for partition {archived: True} source should use cursor value from archived object.
|
|
|
|
Example input state:
|
|
{
|
|
"updated_at": "2020-10-10T00:00:00+00:00",
|
|
"archived": {
|
|
"updated_at": "2021-10-10T00:00:00+00:00"
|
|
}
|
|
}
|
|
|
|
Example output state:
|
|
{
|
|
"partition":{ "archived":"true" },
|
|
"cursor":{ "updated_at":"2021-10-10T00:00:00+00:00" }
|
|
}
|
|
{
|
|
"partition":{ "archived":"false" },
|
|
"cursor":{ "updated_at":"2020-10-10T00:00:00+00:00" }
|
|
}
|
|
"""
|
|
|
|
declarative_stream: DeclarativeStreamModel
|
|
config: Config
|
|
|
|
def __init__(self, declarative_stream: DeclarativeStreamModel, config: Config):
|
|
self._config = config
|
|
self.declarative_stream = declarative_stream
|
|
self._cursor = declarative_stream.incremental_sync
|
|
self._parameters = declarative_stream.parameters
|
|
self._cursor_field = InterpolatedString.create(self._cursor.cursor_field, parameters=self._parameters).eval(self._config)
|
|
|
|
def get_archived_cursor_value(self, stream_state: Mapping[str, Any]):
|
|
return stream_state.get("archived", {}).get(self._cursor.cursor_field, self._config.get("start_date", DEFAULT_START_DATE))
|
|
|
|
def get_not_archived_cursor_value(self, stream_state: Mapping[str, Any]):
|
|
return stream_state.get(self._cursor.cursor_field, self._config.get("start_date", DEFAULT_START_DATE))
|
|
|
|
def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
|
|
return bool("states" not in stream_state and stream_state)
|
|
|
|
def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
|
|
if not self.should_migrate(stream_state):
|
|
return stream_state
|
|
is_archived_updated_at = self.get_archived_cursor_value(stream_state)
|
|
is_not_archived_updated_at = self.get_not_archived_cursor_value(stream_state)
|
|
|
|
migrated_stream_state = {
|
|
"states": [
|
|
{"partition": ARCHIVED, "cursor": {self._cursor.cursor_field: is_archived_updated_at}},
|
|
{"partition": NOT_ARCHIVED, "cursor": {self._cursor.cursor_field: is_not_archived_updated_at}},
|
|
]
|
|
}
|
|
return migrated_stream_state
|
|
|
|
|
|
class CampaignsStateMigration(ArchivedToPerPartitionStateMigration):
|
|
"""
|
|
Campaigns stream has 2 partition field: archived and campaign_type(email, sms).
|
|
Previous API version didn't return sms in campaigns output so we need to migrate only email partition.
|
|
|
|
Example input state:
|
|
{
|
|
"updated_at": "2020-10-10T00:00:00+00:00",
|
|
"archived": {
|
|
"updated_at": "2021-10-10T00:00:00+00:00"
|
|
}
|
|
}
|
|
Example output state:
|
|
{
|
|
"partition":{ "archived":"true","campaign_type":"email" },
|
|
"cursor":{ "updated_at":"2021-10-10T00:00:00+00:00" }
|
|
}
|
|
{
|
|
"partition":{ "archived":"false","campaign_type":"email" },
|
|
"cursor":{ "updated_at":"2020-10-10T00:00:00+00:00" }
|
|
}
|
|
"""
|
|
|
|
def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
|
|
if not self.should_migrate(stream_state):
|
|
return stream_state
|
|
is_archived_updated_at = self.get_archived_cursor_value(stream_state)
|
|
is_not_archived_updated_at = self.get_not_archived_cursor_value(stream_state)
|
|
|
|
migrated_stream_state = {
|
|
"states": [
|
|
{"partition": ARCHIVED_EMAIL, "cursor": {self._cursor.cursor_field: is_archived_updated_at}},
|
|
{"partition": NOT_ARCHIVED_EMAIL, "cursor": {self._cursor.cursor_field: is_not_archived_updated_at}},
|
|
]
|
|
}
|
|
return migrated_stream_state
|
|
|
|
|
|
class CampaignsDetailedTransformation(RecordTransformation):
|
|
"""
|
|
Campaigns detailed stream fetches detailed campaigns info:
|
|
estimated_recipient_count: integer
|
|
campaign_messages: list of objects.
|
|
|
|
To get this data CampaignsDetailedTransformation makes extra API requests:
|
|
https://a.klaviyo.com/api/campaign-recipient-estimations/{campaign_id}
|
|
https://developers.klaviyo.com/en/v2024-10-15/reference/get_messages_for_campaign
|
|
"""
|
|
|
|
config: Config
|
|
|
|
api_revision = "2024-10-15"
|
|
url_base = "https://a.klaviyo.com/api/"
|
|
name = "campaigns_detailed"
|
|
max_retries = 5
|
|
max_time = 60 * 10
|
|
|
|
def __init__(self, config: Config, **kwargs):
|
|
self.logger = logging.getLogger("airbyte")
|
|
self.config = config
|
|
self._api_key = self.config["api_key"]
|
|
self._http_client = HttpClient(
|
|
name=self.name,
|
|
logger=self.logger,
|
|
error_handler=self.get_error_handler(),
|
|
api_budget=APIBudget(policies=[]),
|
|
backoff_strategy=self.get_backoff_strategy(),
|
|
message_repository=InMemoryMessageRepository(),
|
|
)
|
|
|
|
def transform(
|
|
self,
|
|
record: Dict[str, Any],
|
|
config: Optional[Config] = None,
|
|
stream_state: Optional[StreamState] = None,
|
|
stream_slice: Optional[StreamSlice] = None,
|
|
) -> None:
|
|
self._set_recipient_count(record)
|
|
self._set_campaign_message(record)
|
|
|
|
def _set_recipient_count(self, record: Mapping[str, Any]) -> None:
|
|
campaign_id = record["id"]
|
|
_, recipient_count_response = self._http_client.send_request(
|
|
url=f"{self.url_base}campaign-recipient-estimations/{campaign_id}",
|
|
request_kwargs={},
|
|
headers=self.request_headers(),
|
|
http_method="GET",
|
|
)
|
|
record["estimated_recipient_count"] = (
|
|
recipient_count_response.json().get("data", {}).get("attributes", {}).get("estimated_recipient_count", 0)
|
|
)
|
|
|
|
def _set_campaign_message(self, record: Mapping[str, Any]) -> None:
|
|
messages_link = record.get("relationships", {}).get("campaign-messages", {}).get("links", {}).get("related")
|
|
if messages_link:
|
|
_, campaign_message_response = self._http_client.send_request(
|
|
url=messages_link, request_kwargs={}, headers=self.request_headers(), http_method="GET"
|
|
)
|
|
record["campaign_messages"] = campaign_message_response.json().get("data")
|
|
|
|
def get_backoff_strategy(self) -> BackoffStrategy:
|
|
return WaitTimeFromHeaderBackoffStrategy(header="Retry-After", max_waiting_time_in_seconds=self.max_time, parameters={}, config={})
|
|
|
|
def request_headers(self):
|
|
return {
|
|
"Accept": "application/json",
|
|
"Revision": self.api_revision,
|
|
"Authorization": f"Klaviyo-API-Key {self._api_key}",
|
|
}
|
|
|
|
def get_error_handler(self) -> ErrorHandler:
|
|
error_mapping = DEFAULT_ERROR_MAPPING | {
|
|
404: ErrorResolution(ResponseAction.IGNORE, FailureType.config_error, "Resource not found. Ignoring.")
|
|
}
|
|
|
|
return HttpStatusErrorHandler(logger=self.logger, error_mapping=error_mapping, max_retries=self.max_retries)
|
|
|
|
|
|
@dataclass
|
|
class KlaviyoIncludedFieldExtractor(DpathExtractor):
|
|
def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]:
|
|
# Evaluate and retrieve the extraction paths
|
|
evaluated_field_paths = [field_path.eval(self.config) for field_path in self._field_path]
|
|
target_records = self.extract_records_by_path(response, evaluated_field_paths)
|
|
included_relations = list(self.extract_records_by_path(response, ["included"]))
|
|
|
|
# Update target records with included records
|
|
updated_records = self.update_target_records_with_included(target_records, included_relations)
|
|
yield from updated_records
|
|
|
|
@staticmethod
|
|
def update_target_records_with_included(
|
|
target_records: Iterable[Mapping[str, Any]], included_relations: Iterable[Mapping[str, Any]]
|
|
) -> Iterable[Mapping[str, Any]]:
|
|
for target_record in target_records:
|
|
target_relationships = target_record.get("relationships", {})
|
|
for relationship_type, relationship in target_relationships.items():
|
|
relationship_data = relationship.get("data", {})
|
|
if isinstance(relationship_data, dict):
|
|
# Single object relationship (e.g., metric, profile)
|
|
included_type = relationship_data.get("type")
|
|
included_id = relationship_data.get("id")
|
|
for included_relation in included_relations:
|
|
if included_relation.get("type") == included_type and included_relation.get("id") == included_id:
|
|
relationship_data.update(included_relation.get("attributes", {}))
|
|
if "relationships" in included_relation:
|
|
relationship_data["relationships"] = included_relation["relationships"]
|
|
elif isinstance(relationship_data, list):
|
|
# Array of objects relationship (e.g., attributions)
|
|
for item in relationship_data:
|
|
included_type = item.get("type")
|
|
included_id = item.get("id")
|
|
for included_relation in included_relations:
|
|
if included_relation.get("type") == included_type and included_relation.get("id") == included_id:
|
|
item.update(included_relation.get("attributes", {}))
|
|
if "relationships" in included_relation:
|
|
item["relationships"] = included_relation["relationships"]
|
|
yield target_record
|
|
|
|
def extract_records_by_path(self, response: requests.Response, field_paths: list = None) -> Iterable[Mapping[str, Any]]:
|
|
try:
|
|
response_body = response.json()
|
|
except Exception as e:
|
|
raise Exception(f"Failed to parse response body as JSON: {e}")
|
|
|
|
# Extract data from the response body based on the provided field paths
|
|
if not field_paths:
|
|
extracted_data = response_body
|
|
else:
|
|
field_path_str = "/".join(field_paths) # Convert list of field paths to a single string path for dpath
|
|
if "*" in field_path_str:
|
|
extracted_data = dpath.values(response_body, field_path_str)
|
|
else:
|
|
extracted_data = dpath.get(response_body, field_path_str, default=[])
|
|
|
|
# Yield extracted data as individual records
|
|
if isinstance(extracted_data, list):
|
|
yield from extracted_data
|
|
elif extracted_data:
|
|
yield extracted_data
|
|
else:
|
|
yield from []
|
|
|
|
|
|
class KlaviyoErrorHandler(DefaultErrorHandler):
|
|
def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]]) -> ErrorResolution:
|
|
"""
|
|
We have seen `[Errno -3] Temporary failure in name resolution` a couple of times on two different connections
|
|
(1fed2ede-2d33-4543-85e3-7d6e5736075d and 1b276f7d-358a-4fe3-a437-6747fd780eed). Retrying the requests on later syncs is working
|
|
which makes it sound like a transient issue.
|
|
"""
|
|
if isinstance(response_or_exception, InvalidURL):
|
|
return ErrorResolution(
|
|
response_action=ResponseAction.RETRY,
|
|
failure_type=FailureType.transient_error,
|
|
error_message="source-klaviyo has faced a temporary DNS resolution issue. Retrying...",
|
|
)
|
|
return super().interpret_response(response_or_exception)
|
|
|
|
|
|
class PerPartitionToSingleStateMigration(StateMigration):
|
|
"""
|
|
Transforms the input state for per-partitioned streams from the legacy format to the low-code format.
|
|
The cursor field and partition ID fields are automatically extracted from the stream's DatetimebasedCursor and SubstreamPartitionRouter.
|
|
|
|
Example input state:
|
|
{
|
|
"partition": {"event_id": "13506132"},
|
|
"cursor": {"datetime": "2120-10-10 00:00:00+00:00"}
|
|
}
|
|
Example output state:
|
|
{
|
|
"datetime": "2120-10-10 00:00:00+00:00"
|
|
}
|
|
"""
|
|
|
|
declarative_stream: DeclarativeStreamModel
|
|
config: Config
|
|
|
|
def __init__(self, declarative_stream: DeclarativeStreamModel, config: Config):
|
|
self._config = config
|
|
self.declarative_stream = declarative_stream
|
|
self._cursor = declarative_stream.incremental_sync
|
|
self._parameters = declarative_stream.parameters
|
|
self._cursor_field = InterpolatedString.create(self._cursor.cursor_field, parameters=self._parameters).eval(self._config)
|
|
|
|
def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
|
|
return "states" in stream_state
|
|
|
|
def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
|
|
if not self.should_migrate(stream_state):
|
|
return stream_state
|
|
|
|
min_state = min(stream_state.get("states"), key=lambda state: state["cursor"][self._cursor_field])
|
|
return min_state.get("cursor")
|