1
0
mirror of synced 2025-12-21 02:51:29 -05:00
Files
airbyte/airbyte-integrations/connectors/source-zenloop/source_zenloop/components.py
2024-12-18 14:05:43 -08:00

35 lines
1.1 KiB
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from dataclasses import dataclass
from typing import Iterable
from airbyte_cdk.sources.declarative.partition_routers import SubstreamPartitionRouter
from airbyte_cdk.sources.declarative.types import Config, StreamSlice
@dataclass
class ZenloopPartitionRouter(SubstreamPartitionRouter):
config: Config
def stream_slices(self) -> Iterable[StreamSlice]:
"""
config_parent_field : parent field name in config
Use parent id's as stream state value if it specified in config or
create stream_slices according SubstreamSlicer logic.
"""
parent_field = self._parameters.get("config_parent_field")
custom_stream_state_value = self.config.get(parent_field)
if not custom_stream_state_value:
yield from super().stream_slices()
else:
for parent_stream_config in self.parent_stream_configs:
stream_state_field = parent_stream_config.partition_field.eval(self.config)
yield StreamSlice(partition={stream_state_field: custom_stream_state_value, "parent_slice": {}}, cursor_slice={})