Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com> Co-authored-by: Danylo Jablonski <150933663+DanyloGL@users.noreply.github.com> Co-authored-by: Natik Gadzhi <natik@respawn.io> Co-authored-by: ChristoGrab <christo.grab@gmail.com>
112 lines
4.5 KiB
Python
112 lines
4.5 KiB
Python
#
|
|
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
import datetime
|
|
import operator
|
|
from dataclasses import dataclass, field
|
|
from typing import Any, Iterable, Mapping, MutableMapping, Optional
|
|
|
|
import dpath
|
|
import requests
|
|
|
|
from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor
|
|
from airbyte_cdk.sources.declarative.incremental import DatetimeBasedCursor
|
|
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
|
|
from airbyte_cdk.sources.declarative.requesters import RequestOption
|
|
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType
|
|
from airbyte_cdk.sources.declarative.transformations import AddFields
|
|
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
|
|
|
|
|
|
@dataclass
|
|
class TransformToRecordComponent(AddFields):
|
|
def transform(
|
|
self,
|
|
record: Record,
|
|
config: Optional[Config] = None,
|
|
stream_state: Optional[StreamState] = None,
|
|
stream_slice: Optional[StreamSlice] = None,
|
|
) -> Record:
|
|
"""
|
|
Transforms incoming string to a dictionary record.
|
|
"""
|
|
_record = {}
|
|
kwargs = {"record": record, "stream_state": stream_state, "stream_slice": stream_slice}
|
|
for parsed_field in self._parsed_fields:
|
|
value = parsed_field.value.eval(config, **kwargs)
|
|
dpath.new(_record, parsed_field.path, value)
|
|
return _record
|
|
|
|
|
|
@dataclass
|
|
class DatetimeIncrementalSyncComponent(DatetimeBasedCursor):
|
|
"""
|
|
Extends DatetimeBasedCursor for Braze's API requirements where instead of using explicit
|
|
start_time/end_time parameters, the API expects:
|
|
- An end_time (ending_at)
|
|
- A length parameter indicating how many days before end_time to fetch
|
|
|
|
The length parameter represents the number of days in the time window, counting both
|
|
start and end dates inclusively. For example, a window from 2023-01-01 to 2023-01-03
|
|
has a length of 3 days (counting Jan 1, 2, and 3). Length must be between 1-100 days
|
|
as per Braze's API requirements.
|
|
|
|
Example API request:
|
|
GET /campaigns/data_series?campaign_id=xxx&ending_at=2023-01-03&length=3
|
|
This would fetch data from 2023-01-01 to 2023-01-03 inclusive.
|
|
|
|
Args:
|
|
step_option: Configuration for injecting the length parameter into requests
|
|
"""
|
|
|
|
step_option: Optional[RequestOption] = field(default=None)
|
|
|
|
def __post_init__(self, parameters: Mapping[str, Any]):
|
|
super().__post_init__(parameters=parameters)
|
|
if self.step_option is None:
|
|
raise ValueError("step_option is required for DatetimeIncrementalSyncComponent")
|
|
|
|
def _get_request_options(self, option_type: RequestOptionType, stream_slice: Optional[StreamSlice] = None) -> Mapping[str, Any]:
|
|
options: dict[str, Any] = {}
|
|
if stream_slice is not None and self.step_option is not None:
|
|
base_options = super()._get_request_options(option_type, stream_slice)
|
|
options.update(base_options)
|
|
|
|
if self.step_option.inject_into == option_type:
|
|
# Get start and end times from the stream slice
|
|
start_field = self._partition_field_start.eval(self.config)
|
|
end_field = self._partition_field_end.eval(self.config)
|
|
|
|
start_str = stream_slice.get(start_field)
|
|
end_str = stream_slice.get(end_field)
|
|
|
|
if isinstance(start_str, str) and isinstance(end_str, str):
|
|
start_time = self._parser.parse(start_str, self.datetime_format)
|
|
end_time = self._parser.parse(end_str, self.datetime_format)
|
|
|
|
# Add 1 to include both start and end dates in the count
|
|
# e.g., 2023-01-01 to 2023-01-03 = 3 days (Jan 1, 2, and 3)
|
|
length_days = min(100, max(1, (end_time - start_time).days + 1))
|
|
|
|
field_name = (
|
|
self.step_option.field_name.eval(config=self.config)
|
|
if isinstance(self.step_option.field_name, InterpolatedString)
|
|
else self.step_option.field_name
|
|
)
|
|
|
|
options[field_name] = length_days
|
|
|
|
return options
|
|
|
|
|
|
@dataclass
|
|
class EventsRecordExtractor(DpathExtractor):
|
|
def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
|
|
response_body = next(self.decoder.decode(response))
|
|
events = response_body.get("events")
|
|
if events:
|
|
return [{"event_name": value} for value in events]
|
|
else:
|
|
return []
|