1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Source gong : add incremental Feature (#48572)

Co-authored-by: Marcos Marx <marcosmarxm@users.noreply.github.com>
This commit is contained in:
FilahAnas
2024-11-26 18:40:44 +01:00
committed by GitHub
parent 0b367704d4
commit a186813ae7
8 changed files with 157 additions and 27 deletions

View File

@@ -0,0 +1,27 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
from typing import Any, Mapping, MutableMapping, Optional
from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType
from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState
class IncrementalSingleBodyFilterCursor(DatetimeBasedCursor):
def get_request_body_json(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._get_request_filter_options(RequestOptionType.body_json, stream_slice)
def _get_request_filter_options(self, option_type: RequestOptionType, stream_slice: Optional[StreamSlice]) -> Mapping[str, Any]:
options: MutableMapping[str, Any] = {}
if not stream_slice:
return options
if self.start_time_option and self.start_time_option.inject_into == option_type:
field_name, sub_field_name = self.start_time_option.field_name.eval(config=self.config).replace(" ", "").split(",")
options[field_name] = {sub_field_name: stream_slice.get(self._partition_field_start.eval(self.config))}
return options