590 lines
20 KiB
Python
590 lines
20 KiB
Python
#
|
|
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
import csv
|
|
import json
|
|
from abc import ABC, abstractmethod
|
|
from io import StringIO
|
|
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Union
|
|
|
|
import pendulum
|
|
import requests
|
|
from pendulum.datetime import DateTime
|
|
from requests import HTTPError
|
|
from requests.exceptions import ChunkedEncodingError
|
|
|
|
from airbyte_cdk.models import SyncMode
|
|
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
|
|
from airbyte_cdk.sources.streams.core import CheckpointMixin, package_name_from_class
|
|
from airbyte_cdk.sources.streams.http import HttpStream
|
|
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException, UserDefinedBackoffException
|
|
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
|
|
from source_iterable.slice_generators import AdjustableSliceGenerator, RangeSliceGenerator, StreamSlice
|
|
from source_iterable.utils import dateutil_parse
|
|
|
|
|
|
EVENT_ROWS_LIMIT = 200
|
|
CAMPAIGNS_PER_REQUEST = 20
|
|
|
|
|
|
class IterableStream(HttpStream, ABC):
|
|
# in case we get a 401 error (api token disabled or deleted) on a stream slice, do not make further requests within the current stream
|
|
# to prevent 429 error on other streams
|
|
ignore_further_slices = False
|
|
|
|
url_base = "https://api.iterable.com/api/"
|
|
primary_key = "id"
|
|
|
|
def __init__(self, authenticator):
|
|
self._cred = authenticator
|
|
self._slice_retry = 0
|
|
super().__init__(authenticator)
|
|
|
|
@property
|
|
def retry_factor(self) -> int:
|
|
return 20
|
|
|
|
# With factor 20 it would be from 20 to 400 seconds delay
|
|
@property
|
|
def max_retries(self) -> Union[int, None]:
|
|
return 10
|
|
|
|
@property
|
|
@abstractmethod
|
|
def data_field(self) -> str:
|
|
"""
|
|
:return: Default field name to get data from response
|
|
"""
|
|
|
|
@property
|
|
def availability_strategy(self) -> Optional["AvailabilityStrategy"]:
|
|
return None
|
|
|
|
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
|
|
"""
|
|
Iterable API does not support pagination
|
|
"""
|
|
return None
|
|
|
|
def check_generic_error(self, response: requests.Response) -> bool:
|
|
"""
|
|
https://github.com/airbytehq/oncall/issues/1592#issuecomment-1499109251
|
|
https://github.com/airbytehq/oncall/issues/1985
|
|
"""
|
|
codes = ["Generic Error", "GenericError"]
|
|
msg_pattern = "Please try again later"
|
|
|
|
if response.status_code == 500:
|
|
# I am not sure that all 500 errors return valid json
|
|
try:
|
|
response_json = json.loads(response.text)
|
|
except ValueError:
|
|
return
|
|
if response_json.get("code") in codes and msg_pattern in response_json.get("msg", ""):
|
|
return True
|
|
|
|
def request_kwargs(
|
|
self,
|
|
stream_state: Mapping[str, Any],
|
|
stream_slice: Mapping[str, Any] = None,
|
|
next_page_token: Mapping[str, Any] = None,
|
|
) -> Mapping[str, Any]:
|
|
"""
|
|
https://requests.readthedocs.io/en/latest/user/advanced/#timeouts
|
|
https://github.com/airbytehq/oncall/issues/1985#issuecomment-1559276465
|
|
"""
|
|
return {"timeout": (60, 300)}
|
|
|
|
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
|
|
response_json = response.json() or {}
|
|
records = response_json.get(self.data_field, [])
|
|
for record in records:
|
|
yield record
|
|
|
|
def should_retry(self, response: requests.Response) -> bool:
|
|
if self.check_generic_error(response):
|
|
self._slice_retry += 1
|
|
if self._slice_retry < 3:
|
|
return True
|
|
return False
|
|
return response.status_code == 429 or 500 <= response.status_code < 600
|
|
|
|
def read_records(
|
|
self,
|
|
sync_mode: SyncMode,
|
|
cursor_field: List[str] = None,
|
|
stream_slice: Mapping[str, Any] = None,
|
|
stream_state: Mapping[str, Any] = None,
|
|
) -> Iterable[Mapping[str, Any]]:
|
|
self._slice_retry = 0
|
|
if self.ignore_further_slices:
|
|
return
|
|
|
|
try:
|
|
yield from super().read_records(sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state)
|
|
except (HTTPError, UserDefinedBackoffException, DefaultBackoffException) as e:
|
|
response = e.response
|
|
if self.check_generic_error(response):
|
|
return
|
|
raise e
|
|
|
|
|
|
class IterableExportStream(IterableStream, CheckpointMixin, ABC):
|
|
"""
|
|
This stream utilize "export" Iterable api for getting large amount of data.
|
|
It can return data in form of new line separater strings each of each
|
|
representing json object.
|
|
Data could be windowed by date ranges by applying startDateTime and
|
|
endDateTime parameters. Single request could return large volumes of data
|
|
and request rate is limited by 4 requests per minute.
|
|
|
|
Details: https://api.iterable.com/api/docs#export_exportDataJson
|
|
"""
|
|
|
|
cursor_field = "createdAt"
|
|
primary_key = None
|
|
|
|
@property
|
|
def state(self) -> MutableMapping[str, Any]:
|
|
return self._state
|
|
|
|
@state.setter
|
|
def state(self, value: MutableMapping[str, Any]):
|
|
self._state = value
|
|
|
|
def __init__(self, start_date=None, end_date=None, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self._start_date = pendulum.parse(start_date)
|
|
self._end_date = end_date and pendulum.parse(end_date)
|
|
self.stream_params = {"dataTypeName": self.data_field}
|
|
|
|
def path(self, **kwargs) -> str:
|
|
return "export/data.json"
|
|
|
|
@staticmethod
|
|
def _field_to_datetime(value: Union[int, str]) -> pendulum.datetime:
|
|
if isinstance(value, int):
|
|
value = pendulum.from_timestamp(value / 1000.0)
|
|
elif isinstance(value, str):
|
|
value = dateutil_parse(value)
|
|
else:
|
|
raise ValueError(f"Unsupported type of datetime field {type(value)}")
|
|
return value
|
|
|
|
def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:
|
|
for record in super().read_records(**kwargs):
|
|
self.state = self._get_updated_state(self.state, record)
|
|
yield record
|
|
|
|
def _get_updated_state(
|
|
self,
|
|
current_stream_state: MutableMapping[str, Any],
|
|
latest_record: Mapping[str, Any],
|
|
) -> Mapping[str, Any]:
|
|
"""
|
|
Return the latest state by comparing the cursor value in the latest record with the stream's most recent state object
|
|
and returning an updated state object.
|
|
"""
|
|
latest_benchmark = latest_record[self.cursor_field]
|
|
if current_stream_state.get(self.cursor_field):
|
|
return {
|
|
self.cursor_field: str(
|
|
max(
|
|
latest_benchmark,
|
|
self._field_to_datetime(current_stream_state[self.cursor_field]),
|
|
)
|
|
)
|
|
}
|
|
return {self.cursor_field: str(latest_benchmark)}
|
|
|
|
def request_params(
|
|
self,
|
|
stream_state: Mapping[str, Any],
|
|
stream_slice: StreamSlice,
|
|
next_page_token: Mapping[str, Any] = None,
|
|
) -> MutableMapping[str, Any]:
|
|
params = super().request_params(stream_state=stream_state)
|
|
params.update(
|
|
{
|
|
"startDateTime": stream_slice.start_date.strftime("%Y-%m-%d %H:%M:%S"),
|
|
"endDateTime": stream_slice.end_date.strftime("%Y-%m-%d %H:%M:%S"),
|
|
},
|
|
**self.stream_params,
|
|
)
|
|
return params
|
|
|
|
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
|
|
for obj in response.iter_lines():
|
|
record = json.loads(obj)
|
|
record[self.cursor_field] = self._field_to_datetime(record[self.cursor_field])
|
|
yield record
|
|
|
|
def request_kwargs(
|
|
self,
|
|
stream_state: Mapping[str, Any],
|
|
stream_slice: Mapping[str, Any] = None,
|
|
next_page_token: Mapping[str, Any] = None,
|
|
) -> Mapping[str, Any]:
|
|
"""
|
|
https://api.iterable.com/api/docs#export_exportDataJson
|
|
Sending those type of requests could download large piece of json
|
|
objects splitted with newline character.
|
|
Passing stream=True argument to requests.session.send method to avoid
|
|
loading whole analytics report content into memory.
|
|
"""
|
|
return {
|
|
**super().request_kwargs(stream_state, stream_slice, next_page_token),
|
|
"stream": True,
|
|
}
|
|
|
|
def get_start_date(self, stream_state: Mapping[str, Any]) -> DateTime:
|
|
stream_state = stream_state or {}
|
|
start_datetime = self._start_date
|
|
if stream_state.get(self.cursor_field):
|
|
start_datetime = pendulum.parse(stream_state[self.cursor_field])
|
|
return start_datetime
|
|
|
|
def stream_slices(
|
|
self,
|
|
sync_mode: SyncMode,
|
|
cursor_field: List[str] = None,
|
|
stream_state: Mapping[str, Any] = None,
|
|
) -> Iterable[Optional[StreamSlice]]:
|
|
start_datetime = self.get_start_date(stream_state)
|
|
return [StreamSlice(start_datetime, self._end_date or pendulum.now("UTC"))]
|
|
|
|
|
|
class IterableExportStreamRanged(IterableExportStream, ABC):
|
|
"""
|
|
This class use RangeSliceGenerator class to break single request into
|
|
ranges with same (or less for final range) number of days. By default it 90
|
|
days.
|
|
"""
|
|
|
|
def stream_slices(
|
|
self,
|
|
sync_mode: SyncMode,
|
|
cursor_field: List[str] = None,
|
|
stream_state: Mapping[str, Any] = None,
|
|
) -> Iterable[Optional[StreamSlice]]:
|
|
start_datetime = self.get_start_date(stream_state)
|
|
|
|
return RangeSliceGenerator(start_datetime, self._end_date)
|
|
|
|
|
|
class IterableExportStreamAdjustableRange(IterableExportStream, ABC):
|
|
"""
|
|
For streams that could produce large amount of data in single request so we
|
|
cant just use IterableExportStreamRanged to split it in even ranges. If
|
|
request processing takes a lot of time API server could just close
|
|
connection and connector code would fail with ChunkedEncodingError.
|
|
|
|
To solve this problem we use AdjustableSliceGenerator that able to adjust
|
|
next slice range based on two factor:
|
|
1. Previous slice range / time to process ratio.
|
|
2. Had previous request failed with ChunkedEncodingError
|
|
|
|
In case of slice processing request failed with ChunkedEncodingError (which
|
|
means that API server closed connection cause of request takes to much
|
|
time) make CHUNKED_ENCODING_ERROR_RETRIES (6) retries each time reducing
|
|
slice length.
|
|
|
|
See AdjustableSliceGenerator description for more details on next slice length adjustment alghorithm.
|
|
"""
|
|
|
|
_adjustable_generator: AdjustableSliceGenerator = None
|
|
CHUNKED_ENCODING_ERROR_RETRIES = 6
|
|
|
|
def stream_slices(
|
|
self,
|
|
sync_mode: SyncMode,
|
|
cursor_field: List[str] = None,
|
|
stream_state: Mapping[str, Any] = None,
|
|
) -> Iterable[Optional[StreamSlice]]:
|
|
start_datetime = self.get_start_date(stream_state)
|
|
self._adjustable_generator = AdjustableSliceGenerator(start_datetime, self._end_date)
|
|
return self._adjustable_generator
|
|
|
|
def read_records(
|
|
self,
|
|
sync_mode: SyncMode,
|
|
cursor_field: List[str],
|
|
stream_slice: StreamSlice,
|
|
stream_state: Mapping[str, Any] = None,
|
|
) -> Iterable[Mapping[str, Any]]:
|
|
start_time = pendulum.now()
|
|
for _ in range(self.CHUNKED_ENCODING_ERROR_RETRIES):
|
|
try:
|
|
self.logger.info(
|
|
f"Processing slice of {(stream_slice.end_date - stream_slice.start_date).total_days()} days for stream {self.name}"
|
|
)
|
|
for record in super().read_records(
|
|
sync_mode=sync_mode,
|
|
cursor_field=cursor_field,
|
|
stream_slice=stream_slice,
|
|
stream_state=stream_state,
|
|
):
|
|
now = pendulum.now()
|
|
self._adjustable_generator.adjust_range(now - start_time)
|
|
yield record
|
|
start_time = now
|
|
break
|
|
except ChunkedEncodingError:
|
|
self.logger.warn("ChunkedEncodingError occurred, decrease days range and try again")
|
|
stream_slice = self._adjustable_generator.reduce_range()
|
|
else:
|
|
raise Exception(f"ChunkedEncodingError: Reached maximum number of retires: {self.CHUNKED_ENCODING_ERROR_RETRIES}")
|
|
|
|
|
|
class IterableExportEventsStreamAdjustableRange(IterableExportStreamAdjustableRange, ABC):
|
|
def get_json_schema(self) -> Mapping[str, Any]:
|
|
"""All child stream share the same 'events' schema"""
|
|
return ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema("events")
|
|
|
|
|
|
class Campaigns(IterableStream):
|
|
data_field = "campaigns"
|
|
|
|
def path(self, **kwargs) -> str:
|
|
return "campaigns"
|
|
|
|
|
|
class CampaignsMetrics(IterableStream):
|
|
name = "campaigns_metrics"
|
|
primary_key = None
|
|
data_field = None
|
|
|
|
def __init__(self, start_date: str, end_date: Optional[str] = None, **kwargs):
|
|
"""
|
|
https://api.iterable.com/api/docs#campaigns_metrics
|
|
"""
|
|
super().__init__(**kwargs)
|
|
self.start_date = start_date
|
|
self.end_date = end_date
|
|
|
|
def path(self, **kwargs) -> str:
|
|
return "campaigns/metrics"
|
|
|
|
def request_params(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> MutableMapping[str, Any]:
|
|
params = super().request_params(**kwargs)
|
|
params["campaignId"] = stream_slice.get("campaign_ids")
|
|
params["startDateTime"] = self.start_date
|
|
if self.end_date:
|
|
params["endDateTime"] = self.end_date
|
|
return params
|
|
|
|
def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
|
|
lists = Campaigns(authenticator=self._cred)
|
|
campaign_ids = []
|
|
for list_record in lists.read_records(sync_mode=kwargs.get("sync_mode", SyncMode.full_refresh)):
|
|
campaign_ids.append(list_record["id"])
|
|
|
|
if len(campaign_ids) == CAMPAIGNS_PER_REQUEST:
|
|
yield {"campaign_ids": campaign_ids}
|
|
campaign_ids = []
|
|
|
|
if campaign_ids:
|
|
yield {"campaign_ids": campaign_ids}
|
|
|
|
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
|
|
content = response.content.decode()
|
|
records = self._parse_csv_string_to_dict(content)
|
|
|
|
for record in records:
|
|
yield {"data": record}
|
|
|
|
@staticmethod
|
|
def _parse_csv_string_to_dict(csv_string: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Parse a response with a csv type to dict object
|
|
Example:
|
|
csv_string = "a,b,c,d
|
|
1,2,,3
|
|
6,,1,2"
|
|
|
|
output = [{"a": 1, "b": 2, "d": 3},
|
|
{"a": 6, "c": 1, "d": 2}]
|
|
|
|
|
|
:param csv_string: API endpoint response with csv format
|
|
:return: parsed API response
|
|
|
|
"""
|
|
|
|
reader = csv.DictReader(StringIO(csv_string), delimiter=",")
|
|
result = []
|
|
|
|
for row in reader:
|
|
for key, value in row.items():
|
|
if value == "":
|
|
continue
|
|
try:
|
|
row[key] = int(value)
|
|
except ValueError:
|
|
row[key] = float(value)
|
|
row = {k: v for k, v in row.items() if v != ""}
|
|
|
|
result.append(row)
|
|
|
|
return result
|
|
|
|
|
|
class EmailBounce(IterableExportStreamAdjustableRange):
|
|
data_field = "emailBounce"
|
|
|
|
|
|
class EmailClick(IterableExportStreamAdjustableRange):
|
|
data_field = "emailClick"
|
|
|
|
|
|
class EmailComplaint(IterableExportStreamAdjustableRange):
|
|
data_field = "emailComplaint"
|
|
|
|
|
|
class EmailOpen(IterableExportStreamAdjustableRange):
|
|
data_field = "emailOpen"
|
|
|
|
|
|
class EmailSend(IterableExportStreamAdjustableRange):
|
|
data_field = "emailSend"
|
|
|
|
|
|
class EmailSendSkip(IterableExportStreamAdjustableRange):
|
|
data_field = "emailSendSkip"
|
|
|
|
|
|
class EmailSubscribe(IterableExportStreamAdjustableRange):
|
|
data_field = "emailSubscribe"
|
|
|
|
|
|
class EmailUnsubscribe(IterableExportStreamAdjustableRange):
|
|
data_field = "emailUnsubscribe"
|
|
|
|
|
|
class PushSend(IterableExportEventsStreamAdjustableRange):
|
|
data_field = "pushSend"
|
|
|
|
|
|
class PushSendSkip(IterableExportEventsStreamAdjustableRange):
|
|
data_field = "pushSendSkip"
|
|
|
|
|
|
class PushOpen(IterableExportEventsStreamAdjustableRange):
|
|
data_field = "pushOpen"
|
|
|
|
|
|
class PushUninstall(IterableExportEventsStreamAdjustableRange):
|
|
data_field = "pushUninstall"
|
|
|
|
|
|
class PushBounce(IterableExportEventsStreamAdjustableRange):
|
|
data_field = "pushBounce"
|
|
|
|
|
|
class WebPushSend(IterableExportEventsStreamAdjustableRange):
|
|
data_field = "webPushSend"
|
|
|
|
|
|
class WebPushClick(IterableExportEventsStreamAdjustableRange):
|
|
data_field = "webPushClick"
|
|
|
|
|
|
class WebPushSendSkip(IterableExportEventsStreamAdjustableRange):
|
|
data_field = "webPushSendSkip"
|
|
|
|
|
|
class InAppSend(IterableExportEventsStreamAdjustableRange):
|
|
data_field = "inAppSend"
|
|
|
|
|
|
class InAppOpen(IterableExportEventsStreamAdjustableRange):
|
|
data_field = "inAppOpen"
|
|
|
|
|
|
class InAppClick(IterableExportEventsStreamAdjustableRange):
|
|
data_field = "inAppClick"
|
|
|
|
|
|
class InAppClose(IterableExportEventsStreamAdjustableRange):
|
|
data_field = "inAppClose"
|
|
|
|
|
|
class InAppDelete(IterableExportEventsStreamAdjustableRange):
|
|
data_field = "inAppDelete"
|
|
|
|
|
|
class InAppDelivery(IterableExportEventsStreamAdjustableRange):
|
|
data_field = "inAppDelivery"
|
|
|
|
|
|
class InAppSendSkip(IterableExportEventsStreamAdjustableRange):
|
|
data_field = "inAppSendSkip"
|
|
|
|
|
|
class InboxSession(IterableExportEventsStreamAdjustableRange):
|
|
data_field = "inboxSession"
|
|
|
|
|
|
class InboxMessageImpression(IterableExportEventsStreamAdjustableRange):
|
|
data_field = "inboxMessageImpression"
|
|
|
|
|
|
class SmsSend(IterableExportEventsStreamAdjustableRange):
|
|
data_field = "smsSend"
|
|
|
|
|
|
class SmsBounce(IterableExportEventsStreamAdjustableRange):
|
|
data_field = "smsBounce"
|
|
|
|
|
|
class SmsClick(IterableExportEventsStreamAdjustableRange):
|
|
data_field = "smsClick"
|
|
|
|
|
|
class SmsReceived(IterableExportEventsStreamAdjustableRange):
|
|
data_field = "smsReceived"
|
|
|
|
|
|
class SmsSendSkip(IterableExportEventsStreamAdjustableRange):
|
|
data_field = "smsSendSkip"
|
|
|
|
|
|
class SmsUsageInfo(IterableExportEventsStreamAdjustableRange):
|
|
data_field = "smsUsageInfo"
|
|
|
|
|
|
class Purchase(IterableExportEventsStreamAdjustableRange):
|
|
data_field = "purchase"
|
|
|
|
|
|
class CustomEvent(IterableExportEventsStreamAdjustableRange):
|
|
data_field = "customEvent"
|
|
|
|
|
|
class HostedUnsubscribeClick(IterableExportEventsStreamAdjustableRange):
|
|
data_field = "hostedUnsubscribeClick"
|
|
|
|
|
|
class Templates(IterableExportStreamRanged):
|
|
data_field = "templates"
|
|
template_types = ["Base", "Blast", "Triggered", "Workflow"]
|
|
message_types = ["Email", "Push", "InApp", "SMS"]
|
|
|
|
def path(self, **kwargs) -> str:
|
|
return "templates"
|
|
|
|
def read_records(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> Iterable[Mapping[str, Any]]:
|
|
for template in self.template_types:
|
|
for message in self.message_types:
|
|
self.stream_params = {"templateType": template, "messageMedium": message}
|
|
yield from super().read_records(stream_slice=stream_slice, **kwargs)
|
|
|
|
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
|
|
response_json = response.json()
|
|
records = response_json.get(self.data_field, [])
|
|
|
|
for record in records:
|
|
record[self.cursor_field] = self._field_to_datetime(record[self.cursor_field])
|
|
yield record
|