1
0
mirror of synced 2025-12-21 02:51:29 -05:00
Files
airbyte/airbyte-integrations/connectors/source-monday/components.py

467 lines
22 KiB
Python

#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#
import json
import logging
from dataclasses import InitVar, dataclass, field
from datetime import datetime
from functools import partial
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Type, Union
import dpath
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
from airbyte_cdk.sources.declarative.requesters.http_requester import HttpRequester
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.page_increment import PageIncrement
from airbyte_cdk.sources.declarative.schema.inline_schema_loader import InlineSchemaLoader
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.declarative.types import Config, StreamSlice, StreamState
from airbyte_cdk.sources.types import Record
logger = logging.getLogger("airbyte")
@dataclass
class MondayActivityExtractor(RecordExtractor):
"""
Record extractor that extracts record of the form from activity logs stream:
{ "list": { "ID_1": record_1, "ID_2": record_2, ... } }
Attributes:
parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
decoder (Decoder): The decoder responsible to transfom the response in a Mapping
"""
parameters: InitVar[Mapping[str, Any]]
decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]:
response_body_generator = self.decoder.decode(response)
for response_body in response_body_generator:
if not response_body["data"]["boards"]:
continue
for board_data in response_body["data"]["boards"]:
if not isinstance(board_data, dict) or not board_data.get("activity_logs"):
continue
for record in board_data.get("activity_logs", []):
json_data = json.loads(record["data"])
new_record = record
if record.get("created_at"):
new_record.update({"created_at_int": int(record.get("created_at", 0)) // 10_000_000})
else:
continue
if record.get("entity") == "pulse" and json_data.get("pulse_id"):
new_record.update({"pulse_id": json_data.get("pulse_id")})
if record.get("entity") == "board" and json_data.get("board_id"):
new_record.update({"board_id": json_data.get("board_id")})
yield new_record
@dataclass
class MondayIncrementalItemsExtractor(RecordExtractor):
"""
Record extractor that searches a decoded response over a path defined as an array of fields.
"""
field_path: List[Union[InterpolatedString, str]]
config: Config
parameters: InitVar[Mapping[str, Any]]
field_path_pagination: List[Union[InterpolatedString, str]] = field(default_factory=list)
decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
def __post_init__(self, parameters: Mapping[str, Any]):
# Convert string paths to InterpolatedString for both field_path and field_path_pagination
self._field_path = [InterpolatedString.create(p, parameters=parameters) if isinstance(p, str) else p for p in self.field_path]
self._field_path_pagination = [
InterpolatedString.create(p, parameters=parameters) if isinstance(p, str) else p for p in self.field_path_pagination
]
def _try_extract_records(
self, response: requests.Response, field_path: List[Union[InterpolatedString, str]]
) -> Iterable[Mapping[str, Any]]:
for body in self.decoder.decode(response):
if len(field_path) == 0:
extracted = body
else:
path = [p.eval(self.config) for p in field_path]
if "*" in path:
extracted = dpath.values(body, path)
else:
extracted = dpath.get(body, path, default=[])
if extracted:
if isinstance(extracted, list) and None in extracted:
logger.warning(f"Record with null value received; errors: {body.get('errors')}")
yield from (x for x in extracted if x)
else:
yield from extracted if isinstance(extracted, list) else [extracted]
def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]:
# Try primary field path
has_records = False
for record in self._try_extract_records(response, self._field_path):
has_records = True
yield record
# Fallback to pagination path if no records and path exists
if not has_records and self._field_path_pagination:
yield from self._try_extract_records(response, self._field_path_pagination)
@dataclass(kw_only=True)
class MondayGraphqlRequester(HttpRequester):
NEXT_PAGE_TOKEN_FIELD_NAME = "next_page_token"
schema_loader: InlineSchemaLoader
limit: Union[InterpolatedString, str, int] = None
nested_limit: Union[InterpolatedString, str, int] = None
def __post_init__(self, parameters: Mapping[str, Any]):
super(MondayGraphqlRequester, self).__post_init__(parameters)
self.limit = InterpolatedString.create(self.limit, parameters=parameters)
self.nested_limit = InterpolatedString.create(self.nested_limit, parameters=parameters)
self.name = parameters.get("name", "").lower()
self.stream_sync_mode = (
SyncMode.full_refresh if parameters.get("stream_sync_mode", "full_refresh") == "full_refresh" else SyncMode.incremental
)
def _ensure_type(self, t: Type, o: Any):
"""
Ensure given object `o` is of type `t`
"""
if not isinstance(o, t):
raise TypeError(f"{type(o)} {o} is not of type {t}")
def _get_schema_root_properties(self):
schema = self.schema_loader.get_json_schema()[self.name]["properties"]
# delete fields that will be created by extractor
delete_fields = ["updated_at_int", "created_at_int", "pulse_id"]
if self.name == "activity_logs":
delete_fields.append("board_id")
for field in delete_fields:
if field in schema:
schema.pop(field)
return schema
def _get_object_arguments(self, **object_arguments) -> str:
return ",".join(
[
f"{argument}:{value}" if argument != "fromt" else f'from:"{value}"'
for argument, value in object_arguments.items()
if value is not None
]
)
def _build_query(self, object_name: str, field_schema: dict, **object_arguments) -> str:
"""
Recursive function that builds a GraphQL query string by traversing given stream schema properties.
Attributes
object_name (str): the name of root object
field_schema (dict): configured catalog schema for current stream
object_arguments (dict): arguments such as limit, page, ids, ... etc to be passed for given object
"""
fields = []
for field, nested_schema in field_schema.items():
nested_fields = nested_schema.get("properties", nested_schema.get("items", {}).get("properties"))
if nested_fields:
# preconfigured_arguments = get properties from schema or any other source ...
# fields.append(self._build_query(field, nested_fields, **preconfigured_arguments))
fields.append(self._build_query(field, nested_fields))
else:
fields.append(field)
# when querying the boards stream (object_name == "boards"), filter by board_ids if they provided in the config
if object_name == "boards" and "board_ids" in self.config:
# if we are building a query for incremental syncs, board ids are already present under 'ids' key in object_arguments (as a result of fetching the activity_logs stream first)
# These ids are already an intersection of the board_ids provided in the config and the ones that must be fetched for the incremental sync and need not be overridden
if "ids" not in object_arguments:
object_arguments["ids"] = self.config.get("board_ids")
arguments = self._get_object_arguments(**object_arguments)
arguments = f"({arguments})" if arguments else ""
if object_name == "column_values":
fields.remove("display_value")
fields.extend(
["... on MirrorValue{display_value}", "... on BoardRelationValue{display_value}", "... on DependencyValue{display_value}"]
)
fields = ",".join(fields)
if object_name in ["items_page", "next_items_page"]:
query = f"{object_name}{arguments}{{cursor,items{{{fields}}}}}"
else:
query = f"{object_name}{arguments}{{{fields}}}"
return query
def _build_items_query(self, object_name: str, field_schema: dict, sub_page: Optional[int], **object_arguments) -> str:
"""
Special optimization needed for items stream. Starting October 3rd, 2022 items can only be reached through boards.
See https://developer.monday.com/api-reference/docs/items-queries#items-queries
Comparison of different APIs queries:
2023-07:
boards(limit: 1) { items(limit: 20) { field1, field2, ... }}
boards(limit: 1, page:2) { items(limit: 20, page:2) { field1, field2, ... }} boards and items paginations
2024_01:
boards(limit: 1) { items_page(limit: 20) {cursor, items{field1, field2, ...} }}
boards(limit: 1, page:2) { items_page(limit: 20) {cursor, items{field1, field2, ...} }} - boards pagination
next_items_page(limit: 20, cursor: "blaa") {cursor, items{field1, field2, ...} } - items pagination
"""
nested_limit = self.nested_limit.eval(self.config)
if sub_page:
query = self._build_query("next_items_page", field_schema, limit=nested_limit, cursor=f'"{sub_page}"')
else:
query = self._build_query("items_page", field_schema, limit=nested_limit)
# since items are a subresource of boards, when querying items, filter by board_ids if provided in the config
if "board_ids" in self.config and "ids" not in object_arguments:
object_arguments["ids"] = self.config.get("board_ids")
arguments = self._get_object_arguments(**object_arguments)
query = f"boards({arguments}){{{query}}}"
return query
def _build_items_incremental_query(self, object_name: str, field_schema: dict, stream_slice: dict, **object_arguments) -> str:
"""
Special optimization needed for items stream. Starting October 3rd, 2022 items can only be reached through boards.
See https://developer.monday.com/api-reference/docs/items-queries#items-queries
"""
nested_limit = self.nested_limit.eval(self.config)
object_arguments["limit"] = nested_limit
object_arguments["ids"] = stream_slice["ids"]
return self._build_query("items", field_schema, **object_arguments)
def _build_teams_query(self, object_name: str, field_schema: dict, **object_arguments) -> str:
"""
Special optimization needed for tests to pass successfully because of rate limits.
It makes a query cost less points, but it is never used in production
"""
teams_limit = self.config.get("teams_limit")
if teams_limit:
self._ensure_type(int, teams_limit)
arguments = self._get_object_arguments(**object_arguments)
query = f"{{id,name,picture_url,users(limit:{teams_limit}){{id}}}}"
if not arguments:
# when providing empty arguments in () API returns error
return f"{object_name}{query}"
return f"{object_name}({arguments}){query}"
return self._build_query(object_name=object_name, field_schema=field_schema, **object_arguments)
def _build_activity_query(self, object_name: str, field_schema: dict, sub_page: Optional[int], **object_arguments) -> str:
"""
Special optimization needed for items stream. Starting October 3rd, 2022 items can only be reached through boards.
See https://developer.monday.com/api-reference/docs/items-queries#items-queries
"""
nested_limit = self.nested_limit.eval(self.config)
created_at = (object_arguments.get("stream_slice", dict()) or dict()).get("start_time")
if "stream_slice" in object_arguments:
object_arguments.pop("stream_slice")
# 1 is default start time, so we can skip it to get all the data
if created_at == "1":
created_at = None
else:
created_at = datetime.fromtimestamp(int(created_at)).strftime("%Y-%m-%dT%H:%M:%SZ")
query = self._build_query(object_name, field_schema, limit=nested_limit, page=sub_page, fromt=created_at)
if "board_ids" in self.config and "ids" not in object_arguments:
object_arguments["ids"] = self.config.get("board_ids")
arguments = self._get_object_arguments(**object_arguments)
return f"boards({arguments}){{{query}}}"
def get_request_headers(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
headers = super().get_request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
headers["API-Version"] = "2024-10"
return headers
def get_request_body_json( # type: ignore
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Mapping[str, Any]]:
"""
Combines queries to a single GraphQL query.
"""
limit = self.limit.eval(self.config)
page = next_page_token and next_page_token[self.NEXT_PAGE_TOKEN_FIELD_NAME]
if self.name == "boards" and stream_slice:
if self.stream_sync_mode == SyncMode.full_refresh:
# incremental sync parameters are not needed for full refresh
stream_slice = {}
else:
stream_slice = {"ids": stream_slice.get("ids")}
query_builder = partial(self._build_query, **stream_slice)
elif self.name == "items":
# `items` stream use a separate pagination strategy where first level pages are across `boards` and sub-pages are across `items`
page, sub_page = page if page else (None, None)
if self.stream_sync_mode == SyncMode.full_refresh:
query_builder = partial(self._build_items_query, sub_page=sub_page)
else:
query_builder = partial(self._build_items_incremental_query, stream_slice=stream_slice)
elif self.name == "teams":
query_builder = self._build_teams_query
elif self.name == "activity_logs":
page, sub_page = page if page else (None, None)
query_builder = partial(self._build_activity_query, sub_page=sub_page, stream_slice=stream_slice)
else:
query_builder = self._build_query
query = query_builder(
object_name=self.name,
field_schema=self._get_schema_root_properties(),
limit=limit or None,
page=page,
)
return {"query": f"{{{query}}}"}
# We are using an LRU cache in should_retry() method which requires all incoming arguments (including self) to be hashable.
# Dataclasses by default are not hashable, so we need to define __hash__(). Alternatively, we can set @dataclass(frozen=True),
# but this has a cascading effect where all dataclass fields must also be set to frozen.
def __hash__(self):
return hash(tuple(self.__dict__))
class ItemPaginationStrategy(PageIncrement):
"""
Page increment strategy with subpages for the `items` stream.
From the `items` documentation https://developer.monday.com/api-reference/docs/items:
Please note that you cannot return more than 100 items per query when using items at the root.
To adjust your query, try only returning items on a specific board, nesting items inside a boards query,
looping through the boards on your account, or querying less than 100 items at a time.
This pagination strategy supports nested loop through `boards` on the top level and `items` on the second.
See boards documentation for more details: https://developer.monday.com/api-reference/docs/boards#queries.
"""
def __post_init__(self, parameters: Mapping[str, Any]):
# `self._page` corresponds to board page number
# `self._sub_page` corresponds to item page number within its board
self.start_from_page = 1
self._page: Optional[int] = self.start_from_page
self._sub_page: Optional[int] = self.start_from_page
def next_page_token(
self, response: requests.Response, last_page_size: int, last_record: Optional[Record], last_page_token_value: Optional[Any]
) -> Optional[Tuple[Optional[int], Optional[int]]]:
"""
Determines page and subpage numbers for the `items` stream
Attributes:
response: Contains `boards` and corresponding lists of `items` for each `board`
last_records: Parsed `items` from the response
"""
if last_page_size >= self.page_size:
self._sub_page += 1
else:
self._sub_page = self.start_from_page
if response.json()["data"].get("boards"):
self._page += 1
else:
return None
return self._page, self._sub_page
class ItemCursorPaginationStrategy(PageIncrement):
"""
Page increment strategy with subpages for the `items` stream.
From the `items` documentation https://developer.monday.com/api-reference/docs/items:
Please note that you cannot return more than 100 items per query when using items at the root.
To adjust your query, try only returning items on a specific board, nesting items inside a boards query,
looping through the boards on your account, or querying less than 100 items at a time.
This pagination strategy supports nested loop through `boards` on the top level and `items` on the second.
See boards documentation for more details: https://developer.monday.com/api-reference/docs/boards#queries.
"""
def __post_init__(self, parameters: Mapping[str, Any]):
# `self._page` corresponds to board page number
# `self._sub_page` corresponds to item page number within its board
self.start_from_page = 1
self._page: Optional[int] = self.start_from_page
self._sub_page: Optional[int] = self.start_from_page
def next_page_token(
self, response: requests.Response, last_page_size: int, last_record: Optional[Record], last_page_token_value: Optional[Any]
) -> Optional[Tuple[Optional[int], Optional[int]]]:
"""
`items` stream use a separate 2 level pagination strategy where:
1st level `boards` - incremental pagination
2nd level `items_page` - cursor pagination
Attributes:
response: Contains `boards` and corresponding lists of `items` for each `board`
last_records: Parsed `items` from the response
"""
data = response.json()["data"]
boards = data.get("boards", [])
next_items_page = data.get("next_items_page", {})
if boards:
# there is always only one board due to limit=1, so in one request we extract all 'items_page' for one board only
board = boards[0]
cursor = board.get("items_page", {}).get("cursor", None)
elif next_items_page:
cursor = next_items_page.get("cursor", None)
else:
# Finish pagination if there is no more data
return None
if cursor:
return self._page, cursor
else:
self._page += 1
return self._page, None
class MondayStateMigration(StateMigration):
def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
del stream_state["activity_logs"]
return stream_state
def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
return "activity_logs" in stream_state
@dataclass
class MondayTransformation(RecordTransformation):
def transform(self, record: MutableMapping[str, Any], config: Optional[Config] = None, **kwargs) -> MutableMapping[str, Any]:
# Oncall issue: https://github.com/airbytehq/oncall/issues/4337
column_values = record.get("column_values", [])
for values in column_values:
display_value, text = values.get("display_value"), values.get("text")
if display_value and not text:
values["text"] = display_value
return record