1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Source Iterable: use optional end_date for SATs (#17573)

* #17506 source iterable: use optional end_date for SATs

* source iterable: upd changelog

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
This commit is contained in:
Denys Davydov
2022-10-05 09:20:08 +03:00
committed by GitHub
parent c886dc9da4
commit 5ba79894f2
7 changed files with 58 additions and 54 deletions

View File

@@ -4,7 +4,7 @@
import math
from dataclasses import dataclass
from typing import Iterable, List, Tuple
from typing import Iterable, List, Optional, Tuple
import pendulum
from pendulum.datetime import DateTime, Period
@@ -24,9 +24,9 @@ class SliceGenerator:
_start_date: DateTime = None
_end_data: DateTime = None
def __init__(self, start_date: DateTime):
def __init__(self, start_date: DateTime, end_date: Optional[DateTime] = None):
self._start_date = start_date
self._end_date = pendulum.now("UTC")
self._end_date = end_date or pendulum.now("UTC")
def __iter__(self):
return self
@@ -41,8 +41,8 @@ class RangeSliceGenerator(SliceGenerator):
RANGE_LENGTH_DAYS: int = 90
_slices: List[StreamSlice] = []
def __init__(self, start_date: DateTime):
super().__init__(start_date)
def __init__(self, start_date: DateTime, end_date: Optional[DateTime] = None):
super().__init__(start_date, end_date)
self._slices = [
StreamSlice(start_date=start, end_date=end)
for start, end in self.make_datetime_ranges(self._start_date, self._end_date, self.RANGE_LENGTH_DAYS)
@@ -120,9 +120,6 @@ class AdjustableSliceGenerator(SliceGenerator):
# Default is True so for first slice it would length would be INITIAL_RANGE_DAYS (30 days)
_range_adjusted = True
def __init__(self, start_date: DateTime):
super().__init__(start_date)
def adjust_range(self, previous_request_time: Period):
"""
Calculate next slice length in days based on previous slice length and

View File

@@ -76,49 +76,52 @@ class SourceIterable(AbstractSource):
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
authenticator = TokenAuthenticator(token=config["api_key"], auth_header="Api-Key", auth_method="")
# end date is provided for integration tests only
start_date, end_date = config["start_date"], config.get("end_date")
date_range = {"start_date": start_date, "end_date": end_date}
return [
Campaigns(authenticator=authenticator),
CampaignsMetrics(authenticator=authenticator, start_date=config["start_date"]),
CampaignsMetrics(authenticator=authenticator, **date_range),
Channels(authenticator=authenticator),
EmailBounce(authenticator=authenticator, start_date=config["start_date"]),
EmailClick(authenticator=authenticator, start_date=config["start_date"]),
EmailComplaint(authenticator=authenticator, start_date=config["start_date"]),
EmailOpen(authenticator=authenticator, start_date=config["start_date"]),
EmailSend(authenticator=authenticator, start_date=config["start_date"]),
EmailSendSkip(authenticator=authenticator, start_date=config["start_date"]),
EmailSubscribe(authenticator=authenticator, start_date=config["start_date"]),
EmailUnsubscribe(authenticator=authenticator, start_date=config["start_date"]),
PushSend(authenticator=authenticator, start_date=config["start_date"]),
PushSendSkip(authenticator=authenticator, start_date=config["start_date"]),
PushOpen(authenticator=authenticator, start_date=config["start_date"]),
PushUninstall(authenticator=authenticator, start_date=config["start_date"]),
PushBounce(authenticator=authenticator, start_date=config["start_date"]),
WebPushSend(authenticator=authenticator, start_date=config["start_date"]),
WebPushClick(authenticator=authenticator, start_date=config["start_date"]),
WebPushSendSkip(authenticator=authenticator, start_date=config["start_date"]),
InAppSend(authenticator=authenticator, start_date=config["start_date"]),
InAppOpen(authenticator=authenticator, start_date=config["start_date"]),
InAppClick(authenticator=authenticator, start_date=config["start_date"]),
InAppClose(authenticator=authenticator, start_date=config["start_date"]),
InAppDelete(authenticator=authenticator, start_date=config["start_date"]),
InAppDelivery(authenticator=authenticator, start_date=config["start_date"]),
InAppSendSkip(authenticator=authenticator, start_date=config["start_date"]),
InboxSession(authenticator=authenticator, start_date=config["start_date"]),
InboxMessageImpression(authenticator=authenticator, start_date=config["start_date"]),
SmsSend(authenticator=authenticator, start_date=config["start_date"]),
SmsBounce(authenticator=authenticator, start_date=config["start_date"]),
SmsClick(authenticator=authenticator, start_date=config["start_date"]),
SmsReceived(authenticator=authenticator, start_date=config["start_date"]),
SmsSendSkip(authenticator=authenticator, start_date=config["start_date"]),
SmsUsageInfo(authenticator=authenticator, start_date=config["start_date"]),
Purchase(authenticator=authenticator, start_date=config["start_date"]),
CustomEvent(authenticator=authenticator, start_date=config["start_date"]),
HostedUnsubscribeClick(authenticator=authenticator, start_date=config["start_date"]),
EmailBounce(authenticator=authenticator, **date_range),
EmailClick(authenticator=authenticator, **date_range),
EmailComplaint(authenticator=authenticator, **date_range),
EmailOpen(authenticator=authenticator, **date_range),
EmailSend(authenticator=authenticator, **date_range),
EmailSendSkip(authenticator=authenticator, **date_range),
EmailSubscribe(authenticator=authenticator, **date_range),
EmailUnsubscribe(authenticator=authenticator, **date_range),
PushSend(authenticator=authenticator, **date_range),
PushSendSkip(authenticator=authenticator, **date_range),
PushOpen(authenticator=authenticator, **date_range),
PushUninstall(authenticator=authenticator, **date_range),
PushBounce(authenticator=authenticator, **date_range),
WebPushSend(authenticator=authenticator, **date_range),
WebPushClick(authenticator=authenticator, **date_range),
WebPushSendSkip(authenticator=authenticator, **date_range),
InAppSend(authenticator=authenticator, **date_range),
InAppOpen(authenticator=authenticator, **date_range),
InAppClick(authenticator=authenticator, **date_range),
InAppClose(authenticator=authenticator, **date_range),
InAppDelete(authenticator=authenticator, **date_range),
InAppDelivery(authenticator=authenticator, **date_range),
InAppSendSkip(authenticator=authenticator, **date_range),
InboxSession(authenticator=authenticator, **date_range),
InboxMessageImpression(authenticator=authenticator, **date_range),
SmsSend(authenticator=authenticator, **date_range),
SmsBounce(authenticator=authenticator, **date_range),
SmsClick(authenticator=authenticator, **date_range),
SmsReceived(authenticator=authenticator, **date_range),
SmsSendSkip(authenticator=authenticator, **date_range),
SmsUsageInfo(authenticator=authenticator, **date_range),
Purchase(authenticator=authenticator, **date_range),
CustomEvent(authenticator=authenticator, **date_range),
HostedUnsubscribeClick(authenticator=authenticator, **date_range),
Events(authenticator=authenticator),
Lists(authenticator=authenticator),
ListUsers(authenticator=authenticator),
MessageTypes(authenticator=authenticator),
Metadata(authenticator=authenticator),
Templates(authenticator=authenticator, start_date=config["start_date"]),
Users(authenticator=authenticator, start_date=config["start_date"]),
Templates(authenticator=authenticator, **date_range),
Users(authenticator=authenticator, **date_range),
]

View File

@@ -75,9 +75,10 @@ class IterableExportStream(IterableStream, ABC):
cursor_field = "createdAt"
primary_key = None
def __init__(self, start_date=None, **kwargs):
def __init__(self, start_date=None, end_date=None, **kwargs):
super().__init__(**kwargs)
self._start_date = pendulum.parse(start_date)
self._end_date = end_date and pendulum.parse(end_date)
self.stream_params = {"dataTypeName": self.data_field}
def path(self, **kwargs) -> str:
@@ -185,7 +186,7 @@ class IterableExportStream(IterableStream, ABC):
) -> Iterable[Optional[StreamSlice]]:
start_datetime = self.get_start_date(stream_state)
return [StreamSlice(start_datetime, pendulum.now("UTC"))]
return [StreamSlice(start_datetime, self._end_date or pendulum.now("UTC"))]
class IterableExportStreamRanged(IterableExportStream, ABC):
@@ -204,7 +205,7 @@ class IterableExportStreamRanged(IterableExportStream, ABC):
start_datetime = self.get_start_date(stream_state)
return RangeSliceGenerator(start_datetime)
return RangeSliceGenerator(start_datetime, self._end_date)
class IterableExportStreamAdjustableRange(IterableExportStream, ABC):
@@ -238,7 +239,7 @@ class IterableExportStreamAdjustableRange(IterableExportStream, ABC):
) -> Iterable[Optional[StreamSlice]]:
start_datetime = self.get_start_date(stream_state)
self._adjustable_generator = AdjustableSliceGenerator(start_datetime)
self._adjustable_generator = AdjustableSliceGenerator(start_datetime, self._end_date)
return self._adjustable_generator
def read_records(
@@ -325,12 +326,13 @@ class CampaignsMetrics(IterableStream):
primary_key = None
data_field = None
def __init__(self, start_date: str, **kwargs):
def __init__(self, start_date: str, end_date: Optional[str] = None, **kwargs):
"""
https://api.iterable.com/api/docs#campaigns_metrics
"""
super().__init__(**kwargs)
self.start_date = start_date
self.end_date = end_date
def path(self, **kwargs) -> str:
return "campaigns/metrics"
@@ -339,7 +341,8 @@ class CampaignsMetrics(IterableStream):
params = super().request_params(**kwargs)
params["campaignId"] = stream_slice.get("campaign_ids")
params["startDateTime"] = self.start_date
if self.end_date:
params["endDateTime"] = self.end_date
return params
def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: