1
0
mirror of synced 2025-12-19 18:14:56 -05:00
Files
Eytan Avisror e7ce8a0a60 Fix: source-notion depth leak (#69780)
Co-authored-by: Alfredo Garcia <alfredo.garcia@hallmark.edu>
2025-11-24 12:27:33 -06:00

94 lines
3.7 KiB
Python

# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
import logging
from dataclasses import dataclass
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.declarative.types import StreamSlice
from airbyte_cdk.sources.streams.core import StreamData
# maximum block hierarchy recursive request depth
MAX_BLOCK_DEPTH = 30
logger = logging.getLogger("airbyte")
@dataclass
class NotionUserTransformation(RecordTransformation):
"""
Custom transformation that conditionally transforms Notion User records of type "bot",
only when the record contains additional nested "owner" info.
This transformation moves the data in the `owner.{owner_type}` field into a new `owner.info` field for clarity.
"""
def transform(self, record: MutableMapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
owner = record.get("bot", {}).get("owner")
if owner:
owner_type = owner.get("type")
owner_info = owner.get(owner_type)
if owner_type and owner_info:
record["bot"]["owner"]["info"] = owner_info
del record["bot"]["owner"][owner_type]
return record
@dataclass
class NotionPropertiesTransformation(RecordTransformation):
"""
Transforms the nested 'properties' object within a Notion Page/Database record into a more
normalized form. In Notion's API response, 'properties' is a dictionary where each key
represents the name of a property and its value contains various metadata and the property's
actual value.
The transformed 'properties' will consist of an array where each element is a dictionary
with two keys: 'name', holding the original property name, and 'value', containing the
property's content.
"""
def transform(self, record: MutableMapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
properties = record.get("properties", {})
transformed_properties = [{"name": name, "value": value} for name, value in properties.items()]
record["properties"] = transformed_properties
return record
@dataclass
class BlocksRetriever(SimpleRetriever):
"""
Docs: https://developers.notion.com/reference/get-block-children
According to that fact that block's entity may have children entities that stream also need to retrieve
BlocksRetriever calls read_records when received record.has_children is True.
"""
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
super().__post_init__(parameters)
self.current_block_depth = 0
def read_records(
self,
records_schema: Mapping[str, Any],
stream_slice: Optional[StreamSlice] = None,
) -> Iterable[StreamData]:
# if reached recursive limit, don't read anymore
if self.current_block_depth > MAX_BLOCK_DEPTH:
logger.info("Reached max block depth limit. Exiting.")
return
for sequence_number, stream_data in enumerate(super().read_records(records_schema, stream_slice)):
if stream_data.data.get("has_children"):
self.current_block_depth += 1
child_stream_slice = StreamSlice(
partition={"block_id": stream_data.data["id"], "parent_slice": {}},
cursor_slice=stream_slice.cursor_slice,
)
yield from self.read_records(records_schema, child_stream_slice)
self.current_block_depth -= 1
if "parent" in stream_data:
stream_data["parent"]["sequence_number"] = sequence_number
yield stream_data