94 lines
3.7 KiB
Python
94 lines
3.7 KiB
Python
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional
|
|
|
|
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
|
|
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
|
|
from airbyte_cdk.sources.declarative.types import StreamSlice
|
|
from airbyte_cdk.sources.streams.core import StreamData
|
|
|
|
|
|
# maximum block hierarchy recursive request depth
|
|
MAX_BLOCK_DEPTH = 30
|
|
logger = logging.getLogger("airbyte")
|
|
|
|
|
|
@dataclass
|
|
class NotionUserTransformation(RecordTransformation):
|
|
"""
|
|
Custom transformation that conditionally transforms Notion User records of type "bot",
|
|
only when the record contains additional nested "owner" info.
|
|
This transformation moves the data in the `owner.{owner_type}` field into a new `owner.info` field for clarity.
|
|
"""
|
|
|
|
def transform(self, record: MutableMapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
|
|
owner = record.get("bot", {}).get("owner")
|
|
if owner:
|
|
owner_type = owner.get("type")
|
|
owner_info = owner.get(owner_type)
|
|
if owner_type and owner_info:
|
|
record["bot"]["owner"]["info"] = owner_info
|
|
del record["bot"]["owner"][owner_type]
|
|
return record
|
|
|
|
|
|
@dataclass
|
|
class NotionPropertiesTransformation(RecordTransformation):
|
|
"""
|
|
Transforms the nested 'properties' object within a Notion Page/Database record into a more
|
|
normalized form. In Notion's API response, 'properties' is a dictionary where each key
|
|
represents the name of a property and its value contains various metadata and the property's
|
|
actual value.
|
|
|
|
The transformed 'properties' will consist of an array where each element is a dictionary
|
|
with two keys: 'name', holding the original property name, and 'value', containing the
|
|
property's content.
|
|
"""
|
|
|
|
def transform(self, record: MutableMapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
|
|
properties = record.get("properties", {})
|
|
transformed_properties = [{"name": name, "value": value} for name, value in properties.items()]
|
|
record["properties"] = transformed_properties
|
|
return record
|
|
|
|
|
|
@dataclass
|
|
class BlocksRetriever(SimpleRetriever):
|
|
"""
|
|
Docs: https://developers.notion.com/reference/get-block-children
|
|
|
|
According to that fact that block's entity may have children entities that stream also need to retrieve
|
|
BlocksRetriever calls read_records when received record.has_children is True.
|
|
|
|
"""
|
|
|
|
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
|
|
super().__post_init__(parameters)
|
|
self.current_block_depth = 0
|
|
|
|
def read_records(
|
|
self,
|
|
records_schema: Mapping[str, Any],
|
|
stream_slice: Optional[StreamSlice] = None,
|
|
) -> Iterable[StreamData]:
|
|
# if reached recursive limit, don't read anymore
|
|
if self.current_block_depth > MAX_BLOCK_DEPTH:
|
|
logger.info("Reached max block depth limit. Exiting.")
|
|
return
|
|
|
|
for sequence_number, stream_data in enumerate(super().read_records(records_schema, stream_slice)):
|
|
if stream_data.data.get("has_children"):
|
|
self.current_block_depth += 1
|
|
child_stream_slice = StreamSlice(
|
|
partition={"block_id": stream_data.data["id"], "parent_slice": {}},
|
|
cursor_slice=stream_slice.cursor_slice,
|
|
)
|
|
yield from self.read_records(records_schema, child_stream_slice)
|
|
self.current_block_depth -= 1
|
|
|
|
if "parent" in stream_data:
|
|
stream_data["parent"]["sequence_number"] = sequence_number
|
|
|
|
yield stream_data
|