1
0
mirror of synced 2025-12-21 02:51:29 -05:00
Files
airbyte/airbyte-integrations/connectors/source-posthog/source_posthog/components.py
Maxime Carbonneau-Leclerc 96aa72d3b2 🐛 Source Posthog: update following state management changes in the CDK (#27764)
* Update interface based on new Cursor interface

* Update version and changelogs

* Update unit tests

* Fix cursor assignation

* Update update_state to close_slice

* Update considering fix

* Update CDK version to fix bug
2023-06-29 15:03:25 -04:00

150 lines
5.8 KiB
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from dataclasses import dataclass
from typing import Any, Iterable, Mapping, MutableMapping, Optional
from airbyte_cdk.sources.declarative.incremental import Cursor
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.stream_slicers import CartesianProductStreamSlicer
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
@dataclass
class EventsSimpleRetriever(SimpleRetriever):
def __post_init__(self, parameters: Mapping[str, Any]):
super().__post_init__(parameters)
self.cursor = self.stream_slicer if isinstance(self.stream_slicer, Cursor) else None
def request_params(
self,
stream_state: StreamSlice,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
"""Events API return records in descendent order (newest first).
Default page limit is 100 items.
Even though API mentions such pagination params as 'limit' and 'offset', they are actually ignored.
Instead, response contains 'next' url with datetime range for next OLDER records, like:
response:
{
"next": "https://app.posthog.com/api/projects/2331/events?after=2021-01-01T00%3A00%3A00.000000Z&before=2021-05-29T16%3A44%3A43.175000%2B00%3A00",
"results": [
{id ...},
{id ...},
]
}
So if next_page_token is set (contains 'after'/'before' params),
then stream_slice params ('after'/'before') should be ignored.
"""
if next_page_token:
stream_slice = {}
return self._get_request_options(
stream_slice,
next_page_token,
self.requester.get_request_params,
self.paginator.get_request_params,
self.stream_slicer.get_request_params,
self.requester.get_authenticator().get_request_body_json,
)
@dataclass
class EventsCartesianProductStreamSlicer(Cursor, CartesianProductStreamSlicer):
"""Connector requires support of nested state - each project should have own timestamp value, like:
{
"project_id1": {
"timestamp": "2021-02-01T10:21:35.003000Z"
},
"project_idX": {
"timestamp": "2022-11-17:00:00.000000Z"
}
}
we also have to support old-style (before 0.1.8) states, like:
{
"timestamp": "2021-17-01T10:21:35.003000Z"
}
Slicer also produces separate datetime slices for each project
"""
def __post_init__(self, parameters: Mapping[str, Any]):
self._cursor = {}
self._parameters = parameters
def get_stream_state(self) -> Mapping[str, Any]:
return self._cursor or {}
def set_initial_state(self, stream_state: StreamState) -> None:
self._cursor = stream_state
def close_slice(self, stream_slice: StreamSlice, most_recent_record: Optional[Record]) -> None:
project_id = str(stream_slice.get("project_id", ""))
if project_id and most_recent_record:
current_cursor_value = self._cursor.get(project_id, {}).get("timestamp", "")
new_cursor_value = most_recent_record.get("timestamp", "")
self._cursor[project_id] = {"timestamp": max(current_cursor_value, new_cursor_value)}
def stream_slices(self) -> Iterable[StreamSlice]:
"""Since each project has its own state, then we need to have a separate
datetime slices for each project
"""
slices = []
project_slicer, datetime_slicer = self.stream_slicers
# support of old style state: it contains only a single 'timestamp' field
old_style_state = self._cursor if "timestamp" in self._cursor else {}
for project_slice in project_slicer.stream_slices():
project_id = str(project_slice.get("project_id", ""))
# use old_style_state if state does not contain states for each project
project_state = self._cursor.get(project_id, {}) or old_style_state
# Each project should have own datetime slices depends on its state
datetime_slicer.set_initial_state(project_state)
project_datetime_slices = datetime_slicer.stream_slices()
# fix date ranges: start_time of next slice must be equal to end_time of previous slice
if project_datetime_slices and project_state:
project_datetime_slices[0]["start_time"] = project_state["timestamp"]
for i, datetime_slice in enumerate(project_datetime_slices[1:], start=1):
datetime_slice["start_time"] = project_datetime_slices[i - 1]["end_time"]
# Add project id to each slice
for datetime_slice in project_datetime_slices:
datetime_slice["project_id"] = project_id
slices.extend(project_datetime_slices)
return slices
def should_be_synced(self, record: Record) -> bool:
"""
As of 2023-06-28, the expectation is that this method will only be used for semi-incremental and data feed and therefore the
implementation is irrelevant for posthog
"""
return True
def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
"""
Evaluating which record is greater in terms of cursor. This is used to avoid having to capture all the records to close a slice
"""
first_cursor_value = first.get("timestamp")
second_cursor_value = second.get("timestamp")
if first_cursor_value and second_cursor_value:
return first_cursor_value >= second_cursor_value
elif first_cursor_value:
return True
else:
return False