1
0
mirror of synced 2025-12-21 02:51:29 -05:00
Files
airbyte/airbyte-integrations/connectors/source-instatus/components.py
2025-01-20 11:09:18 -08:00

117 lines
4.1 KiB
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from dataclasses import InitVar, dataclass
from typing import Any, Iterable, List, Mapping, Optional
import dpath.util
from airbyte_cdk.models import AirbyteMessage, SyncMode, Type
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ParentStreamConfig, SubstreamPartitionRouter
from airbyte_cdk.sources.declarative.transformations import AddFields
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState
@dataclass
class ListAddFields(AddFields):
"""
ListAddFields uses to transform record by adding an ids from list object field to one list.
input:
{
...,
"updates": [{
{
"id": "some id",...
},
{
"id": "some id",...
},
...
}]
}
output:
{
...,
"updates_ids": ["some id", "some id", ...]
"updates": [{
{
"id": "some id",...
},
{
"id": "some id",...
},
...
}]
}
"""
def transform(
self,
record: Record,
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> Record:
kwargs = {"record": record, "stream_state": stream_state, "stream_slice": stream_slice}
for parsed_field in self._parsed_fields:
values = parsed_field.value.eval(config, **kwargs)
id_list = [value.get("id") for value in values]
dpath.util.new(record, parsed_field.path, id_list)
return record
@dataclass
class UpdatesSubstreamPartitionRouter(SubstreamPartitionRouter):
"""
UpdatesSubstreamPartitionRouter iterates over the list of id to create a correct stream slices.
In case we need to make request from parent stream with list of object by their ids we need to use
a ListAddFields transformer class -> put oll object ids in custom list field -> UpdatesSubstreamPartitionRouter puts every
id from that list to slices.
"""
parent_stream_configs: List[ParentStreamConfig]
parameters: InitVar[Mapping[str, Any]]
def stream_slices(self) -> Iterable[StreamSlice]:
if not self.parent_stream_configs:
yield from []
else:
for parent_stream_config in self.parent_stream_configs:
parent_stream = parent_stream_config.stream
parent_field = parent_stream_config.parent_key.eval(self.config)
partition_field = parent_stream_config.partition_field.eval(self.config)
for parent_stream_slice in parent_stream.stream_slices(
sync_mode=SyncMode.full_refresh, cursor_field=None, stream_state=None
):
empty_parent_slice = True
parent_slice = parent_stream_slice
for parent_record in parent_stream.read_records(
sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=parent_stream_slice, stream_state=None
):
# Skip non-records (eg AirbyteLogMessage)
if isinstance(parent_record, AirbyteMessage):
if parent_record.type == Type.RECORD:
parent_record = parent_record.record.data
else:
continue
empty_parent_slice = False
stream_state_values = parent_record.get(parent_field)
updates_object_id = parent_record.get("id")
for stream_state_value in stream_state_values:
yield StreamSlice(
partition={partition_field: stream_state_value, "parent_slice": parent_slice},
cursor_slice={"updates_object_id": updates_object_id},
)
# If the parent slice contains no records,
if empty_parent_slice:
yield from []