Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com> Co-authored-by: Danylo Jablonski <150933663+DanyloGL@users.noreply.github.com>
117 lines
4.1 KiB
Python
117 lines
4.1 KiB
Python
#
|
|
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
from dataclasses import InitVar, dataclass
|
|
from typing import Any, Iterable, List, Mapping, Optional
|
|
|
|
import dpath.util
|
|
|
|
from airbyte_cdk.models import AirbyteMessage, SyncMode, Type
|
|
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ParentStreamConfig, SubstreamPartitionRouter
|
|
from airbyte_cdk.sources.declarative.transformations import AddFields
|
|
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState
|
|
|
|
|
|
@dataclass
|
|
class ListAddFields(AddFields):
|
|
"""
|
|
ListAddFields uses to transform record by adding an ids from list object field to one list.
|
|
|
|
input:
|
|
{
|
|
...,
|
|
"updates": [{
|
|
{
|
|
"id": "some id",...
|
|
},
|
|
{
|
|
"id": "some id",...
|
|
},
|
|
...
|
|
}]
|
|
}
|
|
|
|
output:
|
|
{
|
|
...,
|
|
"updates_ids": ["some id", "some id", ...]
|
|
"updates": [{
|
|
{
|
|
"id": "some id",...
|
|
},
|
|
{
|
|
"id": "some id",...
|
|
},
|
|
...
|
|
}]
|
|
}
|
|
"""
|
|
|
|
def transform(
|
|
self,
|
|
record: Record,
|
|
config: Optional[Config] = None,
|
|
stream_state: Optional[StreamState] = None,
|
|
stream_slice: Optional[StreamSlice] = None,
|
|
) -> Record:
|
|
kwargs = {"record": record, "stream_state": stream_state, "stream_slice": stream_slice}
|
|
for parsed_field in self._parsed_fields:
|
|
values = parsed_field.value.eval(config, **kwargs)
|
|
id_list = [value.get("id") for value in values]
|
|
dpath.util.new(record, parsed_field.path, id_list)
|
|
|
|
return record
|
|
|
|
|
|
@dataclass
|
|
class UpdatesSubstreamPartitionRouter(SubstreamPartitionRouter):
|
|
"""
|
|
UpdatesSubstreamPartitionRouter iterates over the list of id to create a correct stream slices.
|
|
|
|
In case we need to make request from parent stream with list of object by their ids we need to use
|
|
a ListAddFields transformer class -> put oll object ids in custom list field -> UpdatesSubstreamPartitionRouter puts every
|
|
id from that list to slices.
|
|
"""
|
|
|
|
parent_stream_configs: List[ParentStreamConfig]
|
|
parameters: InitVar[Mapping[str, Any]]
|
|
|
|
def stream_slices(self) -> Iterable[StreamSlice]:
|
|
if not self.parent_stream_configs:
|
|
yield from []
|
|
else:
|
|
for parent_stream_config in self.parent_stream_configs:
|
|
parent_stream = parent_stream_config.stream
|
|
parent_field = parent_stream_config.parent_key.eval(self.config)
|
|
partition_field = parent_stream_config.partition_field.eval(self.config)
|
|
|
|
for parent_stream_slice in parent_stream.stream_slices(
|
|
sync_mode=SyncMode.full_refresh, cursor_field=None, stream_state=None
|
|
):
|
|
empty_parent_slice = True
|
|
parent_slice = parent_stream_slice
|
|
|
|
for parent_record in parent_stream.read_records(
|
|
sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=parent_stream_slice, stream_state=None
|
|
):
|
|
# Skip non-records (eg AirbyteLogMessage)
|
|
if isinstance(parent_record, AirbyteMessage):
|
|
if parent_record.type == Type.RECORD:
|
|
parent_record = parent_record.record.data
|
|
else:
|
|
continue
|
|
empty_parent_slice = False
|
|
stream_state_values = parent_record.get(parent_field)
|
|
updates_object_id = parent_record.get("id")
|
|
|
|
for stream_state_value in stream_state_values:
|
|
yield StreamSlice(
|
|
partition={partition_field: stream_state_value, "parent_slice": parent_slice},
|
|
cursor_slice={"updates_object_id": updates_object_id},
|
|
)
|
|
|
|
# If the parent slice contains no records,
|
|
if empty_parent_slice:
|
|
yield from []
|