|
|
|
|
@@ -16,6 +16,7 @@ from airbyte_cdk.sources.streams.core import package_name_from_class
|
|
|
|
|
from airbyte_cdk.sources.streams.http import HttpStream
|
|
|
|
|
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
|
|
|
|
|
from pendulum.datetime import DateTime
|
|
|
|
|
from requests import codes
|
|
|
|
|
from requests.exceptions import ChunkedEncodingError
|
|
|
|
|
from source_iterable.slice_generators import AdjustableSliceGenerator, RangeSliceGenerator, StreamSlice
|
|
|
|
|
|
|
|
|
|
@@ -24,6 +25,7 @@ CAMPAIGNS_PER_REQUEST = 20
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class IterableStream(HttpStream, ABC):
|
|
|
|
|
raise_on_http_errors = True
|
|
|
|
|
|
|
|
|
|
# Hardcode the value because it is not returned from the API
|
|
|
|
|
BACKOFF_TIME_CONSTANT = 10.0
|
|
|
|
|
@@ -43,6 +45,13 @@ class IterableStream(HttpStream, ABC):
|
|
|
|
|
:return: Default field name to get data from response
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def check_unauthorized_key(self, response: requests.Response) -> bool:
|
|
|
|
|
if response.status_code == codes.UNAUTHORIZED:
|
|
|
|
|
self.logger.warn(f"Provided API Key has not sufficient permissions to read from stream: {self.data_field}")
|
|
|
|
|
setattr(self, "raise_on_http_errors", False)
|
|
|
|
|
return False
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
def backoff_time(self, response: requests.Response) -> Optional[float]:
|
|
|
|
|
return self.BACKOFF_TIME_CONSTANT
|
|
|
|
|
|
|
|
|
|
@@ -53,12 +62,20 @@ class IterableStream(HttpStream, ABC):
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
|
|
|
|
|
if not self.check_unauthorized_key(response):
|
|
|
|
|
yield from []
|
|
|
|
|
response_json = response.json()
|
|
|
|
|
records = response_json.get(self.data_field, [])
|
|
|
|
|
|
|
|
|
|
for record in records:
|
|
|
|
|
yield record
|
|
|
|
|
|
|
|
|
|
def should_retry(self, response: requests.Response) -> bool:
|
|
|
|
|
if not self.check_unauthorized_key(response):
|
|
|
|
|
return False
|
|
|
|
|
else:
|
|
|
|
|
return super().should_retry(response)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class IterableExportStream(IterableStream, ABC):
|
|
|
|
|
"""
|
|
|
|
|
@@ -151,6 +168,8 @@ class IterableExportStream(IterableStream, ABC):
|
|
|
|
|
return params
|
|
|
|
|
|
|
|
|
|
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
|
|
|
|
|
if not self.check_unauthorized_key(response):
|
|
|
|
|
return None
|
|
|
|
|
for obj in response.iter_lines():
|
|
|
|
|
record = json.loads(obj)
|
|
|
|
|
record[self.cursor_field] = self._field_to_datetime(record[self.cursor_field])
|
|
|
|
|
@@ -301,6 +320,8 @@ class ListUsers(IterableStream):
|
|
|
|
|
yield {"list_id": list_record["id"]}
|
|
|
|
|
|
|
|
|
|
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
|
|
|
|
|
if not self.check_unauthorized_key(response):
|
|
|
|
|
yield from []
|
|
|
|
|
list_id = self._get_list_id(response.url)
|
|
|
|
|
for user in response.iter_lines():
|
|
|
|
|
yield {"email": user.decode(), "listId": list_id}
|
|
|
|
|
@@ -359,6 +380,8 @@ class CampaignsMetrics(IterableStream):
|
|
|
|
|
yield {"campaign_ids": campaign_ids}
|
|
|
|
|
|
|
|
|
|
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
|
|
|
|
|
if not self.check_unauthorized_key(response):
|
|
|
|
|
yield from []
|
|
|
|
|
content = response.content.decode()
|
|
|
|
|
records = self._parse_csv_string_to_dict(content)
|
|
|
|
|
|
|
|
|
|
@@ -456,7 +479,8 @@ class Events(IterableStream):
|
|
|
|
|
Put common event fields at the top level.
|
|
|
|
|
Put the rest of the fields in the `data` subobject.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
if not self.check_unauthorized_key(response):
|
|
|
|
|
yield from []
|
|
|
|
|
jsonl_records = StringIO(response.text)
|
|
|
|
|
for record in jsonl_records:
|
|
|
|
|
record_dict = json.loads(record)
|
|
|
|
|
@@ -618,6 +642,8 @@ class Templates(IterableExportStreamRanged):
|
|
|
|
|
yield from super().read_records(stream_slice=stream_slice, **kwargs)
|
|
|
|
|
|
|
|
|
|
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
|
|
|
|
|
if not self.check_unauthorized_key(response):
|
|
|
|
|
yield from []
|
|
|
|
|
response_json = response.json()
|
|
|
|
|
records = response_json.get(self.data_field, [])
|
|
|
|
|
|
|
|
|
|
|