52 lines
2.1 KiB
Python
52 lines
2.1 KiB
Python
#
|
|
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Iterable
|
|
|
|
from airbyte_cdk.models import SyncMode
|
|
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import SubstreamPartitionRouter
|
|
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
|
|
|
|
|
|
class SurveyIdPartitionRouter(SubstreamPartitionRouter):
|
|
"""
|
|
A SurveyIdPartitionRouter is specifically tailored for survey data, addressing the limitations of the current solution,
|
|
SubstreamPartitionRouter, which only offers one option for partitioning via access to the parent stream with input.
|
|
The SurveyIdPartitionRouter generates stream slices for partitioning based on either provided survey IDs or parent stream keys.
|
|
|
|
|
|
Inherits from:
|
|
SubstreamPartitionRouter
|
|
|
|
Custom Methods:
|
|
stream_slices: Generates stream slices for partitioning.
|
|
"""
|
|
|
|
def stream_slices(self) -> Iterable[StreamSlice]:
|
|
"""
|
|
Generates stream slices for partitioning based on survey IDs or parent stream keys.
|
|
"""
|
|
|
|
# Get the survey IDs from the configuration
|
|
survey_ids = self.config.get("survey_ids", [])
|
|
|
|
# Extract necessary configuration parameters
|
|
parent_stream_config = self.parent_stream_configs[0]
|
|
parent_key = parent_stream_config.parent_key.string
|
|
partition_field = parent_stream_config.partition_field.string
|
|
|
|
if survey_ids:
|
|
# If specific survey IDs are provided, yield slices based on them
|
|
for item in survey_ids:
|
|
yield StreamSlice(partition={partition_field: item}, cursor_slice={})
|
|
else:
|
|
# If not, iterate over parent stream records and yield slices based on parent keys
|
|
for parent_stream_config in self.parent_stream_configs:
|
|
for item in parent_stream_config.stream.read_records(sync_mode=SyncMode.full_refresh):
|
|
yield StreamSlice(partition={partition_field: item[parent_key]}, cursor_slice={})
|
|
|
|
# Ensures the function always returns an iterable
|
|
yield from []
|