1
0
mirror of synced 2025-12-20 18:39:31 -05:00
Files
airbyte/airbyte-integrations/connectors/source-iterable/source_iterable/streams.py
2024-12-18 14:05:43 -08:00

590 lines
20 KiB
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import csv
import json
from abc import ABC, abstractmethod
from io import StringIO
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Union
import pendulum
import requests
from pendulum.datetime import DateTime
from requests import HTTPError
from requests.exceptions import ChunkedEncodingError
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.core import CheckpointMixin, package_name_from_class
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException, UserDefinedBackoffException
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
from source_iterable.slice_generators import AdjustableSliceGenerator, RangeSliceGenerator, StreamSlice
from source_iterable.utils import dateutil_parse
EVENT_ROWS_LIMIT = 200
CAMPAIGNS_PER_REQUEST = 20
class IterableStream(HttpStream, ABC):
# in case we get a 401 error (api token disabled or deleted) on a stream slice, do not make further requests within the current stream
# to prevent 429 error on other streams
ignore_further_slices = False
url_base = "https://api.iterable.com/api/"
primary_key = "id"
def __init__(self, authenticator):
self._cred = authenticator
self._slice_retry = 0
super().__init__(authenticator)
@property
def retry_factor(self) -> int:
return 20
# With factor 20 it would be from 20 to 400 seconds delay
@property
def max_retries(self) -> Union[int, None]:
return 10
@property
@abstractmethod
def data_field(self) -> str:
"""
:return: Default field name to get data from response
"""
@property
def availability_strategy(self) -> Optional["AvailabilityStrategy"]:
return None
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""
Iterable API does not support pagination
"""
return None
def check_generic_error(self, response: requests.Response) -> bool:
"""
https://github.com/airbytehq/oncall/issues/1592#issuecomment-1499109251
https://github.com/airbytehq/oncall/issues/1985
"""
codes = ["Generic Error", "GenericError"]
msg_pattern = "Please try again later"
if response.status_code == 500:
# I am not sure that all 500 errors return valid json
try:
response_json = json.loads(response.text)
except ValueError:
return
if response_json.get("code") in codes and msg_pattern in response_json.get("msg", ""):
return True
def request_kwargs(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Mapping[str, Any]:
"""
https://requests.readthedocs.io/en/latest/user/advanced/#timeouts
https://github.com/airbytehq/oncall/issues/1985#issuecomment-1559276465
"""
return {"timeout": (60, 300)}
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_json = response.json() or {}
records = response_json.get(self.data_field, [])
for record in records:
yield record
def should_retry(self, response: requests.Response) -> bool:
if self.check_generic_error(response):
self._slice_retry += 1
if self._slice_retry < 3:
return True
return False
return response.status_code == 429 or 500 <= response.status_code < 600
def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
self._slice_retry = 0
if self.ignore_further_slices:
return
try:
yield from super().read_records(sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state)
except (HTTPError, UserDefinedBackoffException, DefaultBackoffException) as e:
response = e.response
if self.check_generic_error(response):
return
raise e
class IterableExportStream(IterableStream, CheckpointMixin, ABC):
"""
This stream utilize "export" Iterable api for getting large amount of data.
It can return data in form of new line separater strings each of each
representing json object.
Data could be windowed by date ranges by applying startDateTime and
endDateTime parameters. Single request could return large volumes of data
and request rate is limited by 4 requests per minute.
Details: https://api.iterable.com/api/docs#export_exportDataJson
"""
cursor_field = "createdAt"
primary_key = None
@property
def state(self) -> MutableMapping[str, Any]:
return self._state
@state.setter
def state(self, value: MutableMapping[str, Any]):
self._state = value
def __init__(self, start_date=None, end_date=None, **kwargs):
super().__init__(**kwargs)
self._start_date = pendulum.parse(start_date)
self._end_date = end_date and pendulum.parse(end_date)
self.stream_params = {"dataTypeName": self.data_field}
def path(self, **kwargs) -> str:
return "export/data.json"
@staticmethod
def _field_to_datetime(value: Union[int, str]) -> pendulum.datetime:
if isinstance(value, int):
value = pendulum.from_timestamp(value / 1000.0)
elif isinstance(value, str):
value = dateutil_parse(value)
else:
raise ValueError(f"Unsupported type of datetime field {type(value)}")
return value
def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]:
for record in super().read_records(**kwargs):
self.state = self._get_updated_state(self.state, record)
yield record
def _get_updated_state(
self,
current_stream_state: MutableMapping[str, Any],
latest_record: Mapping[str, Any],
) -> Mapping[str, Any]:
"""
Return the latest state by comparing the cursor value in the latest record with the stream's most recent state object
and returning an updated state object.
"""
latest_benchmark = latest_record[self.cursor_field]
if current_stream_state.get(self.cursor_field):
return {
self.cursor_field: str(
max(
latest_benchmark,
self._field_to_datetime(current_stream_state[self.cursor_field]),
)
)
}
return {self.cursor_field: str(latest_benchmark)}
def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: StreamSlice,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state=stream_state)
params.update(
{
"startDateTime": stream_slice.start_date.strftime("%Y-%m-%d %H:%M:%S"),
"endDateTime": stream_slice.end_date.strftime("%Y-%m-%d %H:%M:%S"),
},
**self.stream_params,
)
return params
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
for obj in response.iter_lines():
record = json.loads(obj)
record[self.cursor_field] = self._field_to_datetime(record[self.cursor_field])
yield record
def request_kwargs(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Mapping[str, Any]:
"""
https://api.iterable.com/api/docs#export_exportDataJson
Sending those type of requests could download large piece of json
objects splitted with newline character.
Passing stream=True argument to requests.session.send method to avoid
loading whole analytics report content into memory.
"""
return {
**super().request_kwargs(stream_state, stream_slice, next_page_token),
"stream": True,
}
def get_start_date(self, stream_state: Mapping[str, Any]) -> DateTime:
stream_state = stream_state or {}
start_datetime = self._start_date
if stream_state.get(self.cursor_field):
start_datetime = pendulum.parse(stream_state[self.cursor_field])
return start_datetime
def stream_slices(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Optional[StreamSlice]]:
start_datetime = self.get_start_date(stream_state)
return [StreamSlice(start_datetime, self._end_date or pendulum.now("UTC"))]
class IterableExportStreamRanged(IterableExportStream, ABC):
"""
This class use RangeSliceGenerator class to break single request into
ranges with same (or less for final range) number of days. By default it 90
days.
"""
def stream_slices(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Optional[StreamSlice]]:
start_datetime = self.get_start_date(stream_state)
return RangeSliceGenerator(start_datetime, self._end_date)
class IterableExportStreamAdjustableRange(IterableExportStream, ABC):
"""
For streams that could produce large amount of data in single request so we
cant just use IterableExportStreamRanged to split it in even ranges. If
request processing takes a lot of time API server could just close
connection and connector code would fail with ChunkedEncodingError.
To solve this problem we use AdjustableSliceGenerator that able to adjust
next slice range based on two factor:
1. Previous slice range / time to process ratio.
2. Had previous request failed with ChunkedEncodingError
In case of slice processing request failed with ChunkedEncodingError (which
means that API server closed connection cause of request takes to much
time) make CHUNKED_ENCODING_ERROR_RETRIES (6) retries each time reducing
slice length.
See AdjustableSliceGenerator description for more details on next slice length adjustment alghorithm.
"""
_adjustable_generator: AdjustableSliceGenerator = None
CHUNKED_ENCODING_ERROR_RETRIES = 6
def stream_slices(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Optional[StreamSlice]]:
start_datetime = self.get_start_date(stream_state)
self._adjustable_generator = AdjustableSliceGenerator(start_datetime, self._end_date)
return self._adjustable_generator
def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str],
stream_slice: StreamSlice,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
start_time = pendulum.now()
for _ in range(self.CHUNKED_ENCODING_ERROR_RETRIES):
try:
self.logger.info(
f"Processing slice of {(stream_slice.end_date - stream_slice.start_date).total_days()} days for stream {self.name}"
)
for record in super().read_records(
sync_mode=sync_mode,
cursor_field=cursor_field,
stream_slice=stream_slice,
stream_state=stream_state,
):
now = pendulum.now()
self._adjustable_generator.adjust_range(now - start_time)
yield record
start_time = now
break
except ChunkedEncodingError:
self.logger.warn("ChunkedEncodingError occurred, decrease days range and try again")
stream_slice = self._adjustable_generator.reduce_range()
else:
raise Exception(f"ChunkedEncodingError: Reached maximum number of retires: {self.CHUNKED_ENCODING_ERROR_RETRIES}")
class IterableExportEventsStreamAdjustableRange(IterableExportStreamAdjustableRange, ABC):
def get_json_schema(self) -> Mapping[str, Any]:
"""All child stream share the same 'events' schema"""
return ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema("events")
class Campaigns(IterableStream):
data_field = "campaigns"
def path(self, **kwargs) -> str:
return "campaigns"
class CampaignsMetrics(IterableStream):
name = "campaigns_metrics"
primary_key = None
data_field = None
def __init__(self, start_date: str, end_date: Optional[str] = None, **kwargs):
"""
https://api.iterable.com/api/docs#campaigns_metrics
"""
super().__init__(**kwargs)
self.start_date = start_date
self.end_date = end_date
def path(self, **kwargs) -> str:
return "campaigns/metrics"
def request_params(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> MutableMapping[str, Any]:
params = super().request_params(**kwargs)
params["campaignId"] = stream_slice.get("campaign_ids")
params["startDateTime"] = self.start_date
if self.end_date:
params["endDateTime"] = self.end_date
return params
def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
lists = Campaigns(authenticator=self._cred)
campaign_ids = []
for list_record in lists.read_records(sync_mode=kwargs.get("sync_mode", SyncMode.full_refresh)):
campaign_ids.append(list_record["id"])
if len(campaign_ids) == CAMPAIGNS_PER_REQUEST:
yield {"campaign_ids": campaign_ids}
campaign_ids = []
if campaign_ids:
yield {"campaign_ids": campaign_ids}
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
content = response.content.decode()
records = self._parse_csv_string_to_dict(content)
for record in records:
yield {"data": record}
@staticmethod
def _parse_csv_string_to_dict(csv_string: str) -> List[Dict[str, Any]]:
"""
Parse a response with a csv type to dict object
Example:
csv_string = "a,b,c,d
1,2,,3
6,,1,2"
output = [{"a": 1, "b": 2, "d": 3},
{"a": 6, "c": 1, "d": 2}]
:param csv_string: API endpoint response with csv format
:return: parsed API response
"""
reader = csv.DictReader(StringIO(csv_string), delimiter=",")
result = []
for row in reader:
for key, value in row.items():
if value == "":
continue
try:
row[key] = int(value)
except ValueError:
row[key] = float(value)
row = {k: v for k, v in row.items() if v != ""}
result.append(row)
return result
class EmailBounce(IterableExportStreamAdjustableRange):
data_field = "emailBounce"
class EmailClick(IterableExportStreamAdjustableRange):
data_field = "emailClick"
class EmailComplaint(IterableExportStreamAdjustableRange):
data_field = "emailComplaint"
class EmailOpen(IterableExportStreamAdjustableRange):
data_field = "emailOpen"
class EmailSend(IterableExportStreamAdjustableRange):
data_field = "emailSend"
class EmailSendSkip(IterableExportStreamAdjustableRange):
data_field = "emailSendSkip"
class EmailSubscribe(IterableExportStreamAdjustableRange):
data_field = "emailSubscribe"
class EmailUnsubscribe(IterableExportStreamAdjustableRange):
data_field = "emailUnsubscribe"
class PushSend(IterableExportEventsStreamAdjustableRange):
data_field = "pushSend"
class PushSendSkip(IterableExportEventsStreamAdjustableRange):
data_field = "pushSendSkip"
class PushOpen(IterableExportEventsStreamAdjustableRange):
data_field = "pushOpen"
class PushUninstall(IterableExportEventsStreamAdjustableRange):
data_field = "pushUninstall"
class PushBounce(IterableExportEventsStreamAdjustableRange):
data_field = "pushBounce"
class WebPushSend(IterableExportEventsStreamAdjustableRange):
data_field = "webPushSend"
class WebPushClick(IterableExportEventsStreamAdjustableRange):
data_field = "webPushClick"
class WebPushSendSkip(IterableExportEventsStreamAdjustableRange):
data_field = "webPushSendSkip"
class InAppSend(IterableExportEventsStreamAdjustableRange):
data_field = "inAppSend"
class InAppOpen(IterableExportEventsStreamAdjustableRange):
data_field = "inAppOpen"
class InAppClick(IterableExportEventsStreamAdjustableRange):
data_field = "inAppClick"
class InAppClose(IterableExportEventsStreamAdjustableRange):
data_field = "inAppClose"
class InAppDelete(IterableExportEventsStreamAdjustableRange):
data_field = "inAppDelete"
class InAppDelivery(IterableExportEventsStreamAdjustableRange):
data_field = "inAppDelivery"
class InAppSendSkip(IterableExportEventsStreamAdjustableRange):
data_field = "inAppSendSkip"
class InboxSession(IterableExportEventsStreamAdjustableRange):
data_field = "inboxSession"
class InboxMessageImpression(IterableExportEventsStreamAdjustableRange):
data_field = "inboxMessageImpression"
class SmsSend(IterableExportEventsStreamAdjustableRange):
data_field = "smsSend"
class SmsBounce(IterableExportEventsStreamAdjustableRange):
data_field = "smsBounce"
class SmsClick(IterableExportEventsStreamAdjustableRange):
data_field = "smsClick"
class SmsReceived(IterableExportEventsStreamAdjustableRange):
data_field = "smsReceived"
class SmsSendSkip(IterableExportEventsStreamAdjustableRange):
data_field = "smsSendSkip"
class SmsUsageInfo(IterableExportEventsStreamAdjustableRange):
data_field = "smsUsageInfo"
class Purchase(IterableExportEventsStreamAdjustableRange):
data_field = "purchase"
class CustomEvent(IterableExportEventsStreamAdjustableRange):
data_field = "customEvent"
class HostedUnsubscribeClick(IterableExportEventsStreamAdjustableRange):
data_field = "hostedUnsubscribeClick"
class Templates(IterableExportStreamRanged):
data_field = "templates"
template_types = ["Base", "Blast", "Triggered", "Workflow"]
message_types = ["Email", "Push", "InApp", "SMS"]
def path(self, **kwargs) -> str:
return "templates"
def read_records(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> Iterable[Mapping[str, Any]]:
for template in self.template_types:
for message in self.message_types:
self.stream_params = {"templateType": template, "messageMedium": message}
yield from super().read_records(stream_slice=stream_slice, **kwargs)
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_json = response.json()
records = response_json.get(self.data_field, [])
for record in records:
record[self.cursor_field] = self._field_to_datetime(record[self.cursor_field])
yield record