* Update interface based on new Cursor interface * Update version and changelogs * Update unit tests * Fix cursor assignation * Update update_state to close_slice * Update considering fix * Update CDK version to fix bug
150 lines
5.8 KiB
Python
150 lines
5.8 KiB
Python
#
|
|
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Any, Iterable, Mapping, MutableMapping, Optional
|
|
|
|
from airbyte_cdk.sources.declarative.incremental import Cursor
|
|
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
|
|
from airbyte_cdk.sources.declarative.stream_slicers import CartesianProductStreamSlicer
|
|
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
|
|
|
|
|
|
@dataclass
|
|
class EventsSimpleRetriever(SimpleRetriever):
|
|
def __post_init__(self, parameters: Mapping[str, Any]):
|
|
super().__post_init__(parameters)
|
|
self.cursor = self.stream_slicer if isinstance(self.stream_slicer, Cursor) else None
|
|
|
|
def request_params(
|
|
self,
|
|
stream_state: StreamSlice,
|
|
stream_slice: Optional[StreamSlice] = None,
|
|
next_page_token: Optional[Mapping[str, Any]] = None,
|
|
) -> MutableMapping[str, Any]:
|
|
"""Events API return records in descendent order (newest first).
|
|
Default page limit is 100 items.
|
|
|
|
Even though API mentions such pagination params as 'limit' and 'offset', they are actually ignored.
|
|
Instead, response contains 'next' url with datetime range for next OLDER records, like:
|
|
|
|
response:
|
|
{
|
|
"next": "https://app.posthog.com/api/projects/2331/events?after=2021-01-01T00%3A00%3A00.000000Z&before=2021-05-29T16%3A44%3A43.175000%2B00%3A00",
|
|
"results": [
|
|
{id ...},
|
|
{id ...},
|
|
]
|
|
}
|
|
|
|
So if next_page_token is set (contains 'after'/'before' params),
|
|
then stream_slice params ('after'/'before') should be ignored.
|
|
"""
|
|
|
|
if next_page_token:
|
|
stream_slice = {}
|
|
|
|
return self._get_request_options(
|
|
stream_slice,
|
|
next_page_token,
|
|
self.requester.get_request_params,
|
|
self.paginator.get_request_params,
|
|
self.stream_slicer.get_request_params,
|
|
self.requester.get_authenticator().get_request_body_json,
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class EventsCartesianProductStreamSlicer(Cursor, CartesianProductStreamSlicer):
|
|
"""Connector requires support of nested state - each project should have own timestamp value, like:
|
|
{
|
|
"project_id1": {
|
|
"timestamp": "2021-02-01T10:21:35.003000Z"
|
|
},
|
|
"project_idX": {
|
|
"timestamp": "2022-11-17:00:00.000000Z"
|
|
}
|
|
}
|
|
we also have to support old-style (before 0.1.8) states, like:
|
|
{
|
|
"timestamp": "2021-17-01T10:21:35.003000Z"
|
|
}
|
|
|
|
Slicer also produces separate datetime slices for each project
|
|
"""
|
|
|
|
def __post_init__(self, parameters: Mapping[str, Any]):
|
|
self._cursor = {}
|
|
self._parameters = parameters
|
|
|
|
def get_stream_state(self) -> Mapping[str, Any]:
|
|
return self._cursor or {}
|
|
|
|
def set_initial_state(self, stream_state: StreamState) -> None:
|
|
self._cursor = stream_state
|
|
|
|
def close_slice(self, stream_slice: StreamSlice, most_recent_record: Optional[Record]) -> None:
|
|
project_id = str(stream_slice.get("project_id", ""))
|
|
if project_id and most_recent_record:
|
|
current_cursor_value = self._cursor.get(project_id, {}).get("timestamp", "")
|
|
new_cursor_value = most_recent_record.get("timestamp", "")
|
|
|
|
self._cursor[project_id] = {"timestamp": max(current_cursor_value, new_cursor_value)}
|
|
|
|
def stream_slices(self) -> Iterable[StreamSlice]:
|
|
"""Since each project has its own state, then we need to have a separate
|
|
datetime slices for each project
|
|
"""
|
|
|
|
slices = []
|
|
|
|
project_slicer, datetime_slicer = self.stream_slicers
|
|
|
|
# support of old style state: it contains only a single 'timestamp' field
|
|
old_style_state = self._cursor if "timestamp" in self._cursor else {}
|
|
|
|
for project_slice in project_slicer.stream_slices():
|
|
project_id = str(project_slice.get("project_id", ""))
|
|
|
|
# use old_style_state if state does not contain states for each project
|
|
project_state = self._cursor.get(project_id, {}) or old_style_state
|
|
|
|
# Each project should have own datetime slices depends on its state
|
|
datetime_slicer.set_initial_state(project_state)
|
|
project_datetime_slices = datetime_slicer.stream_slices()
|
|
|
|
# fix date ranges: start_time of next slice must be equal to end_time of previous slice
|
|
if project_datetime_slices and project_state:
|
|
project_datetime_slices[0]["start_time"] = project_state["timestamp"]
|
|
for i, datetime_slice in enumerate(project_datetime_slices[1:], start=1):
|
|
datetime_slice["start_time"] = project_datetime_slices[i - 1]["end_time"]
|
|
|
|
# Add project id to each slice
|
|
for datetime_slice in project_datetime_slices:
|
|
datetime_slice["project_id"] = project_id
|
|
|
|
slices.extend(project_datetime_slices)
|
|
|
|
return slices
|
|
|
|
def should_be_synced(self, record: Record) -> bool:
|
|
"""
|
|
As of 2023-06-28, the expectation is that this method will only be used for semi-incremental and data feed and therefore the
|
|
implementation is irrelevant for posthog
|
|
"""
|
|
return True
|
|
|
|
def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
|
|
"""
|
|
Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice
|
|
"""
|
|
first_cursor_value = first.get("timestamp")
|
|
second_cursor_value = second.get("timestamp")
|
|
if first_cursor_value and second_cursor_value:
|
|
return first_cursor_value >= second_cursor_value
|
|
elif first_cursor_value:
|
|
return True
|
|
else:
|
|
return False
|