1
0
mirror of synced 2025-12-22 03:21:25 -05:00
Files
airbyte/airbyte-integrations/connectors/source-gong/components.py
2024-11-26 14:40:44 -03:00

28 lines
1.3 KiB
Python

# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
from typing import Any, Mapping, MutableMapping, Optional
from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType
from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState
class IncrementalSingleBodyFilterCursor(DatetimeBasedCursor):
def get_request_body_json(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._get_request_filter_options(RequestOptionType.body_json, stream_slice)
def _get_request_filter_options(self, option_type: RequestOptionType, stream_slice: Optional[StreamSlice]) -> Mapping[str, Any]:
options: MutableMapping[str, Any] = {}
if not stream_slice:
return options
if self.start_time_option and self.start_time_option.inject_into == option_type:
field_name, sub_field_name = self.start_time_option.field_name.eval(config=self.config).replace(" ", "").split(",")
options[field_name] = {sub_field_name: stream_slice.get(self._partition_field_start.eval(self.config))}
return options