# # Copyright (c) 2025 Airbyte, Inc., all rights reserved. # import json import logging from dataclasses import InitVar, dataclass, field from datetime import datetime from functools import partial from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Type, Union import dpath import requests from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.declarative.decoders.decoder import Decoder from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration from airbyte_cdk.sources.declarative.requesters.http_requester import HttpRequester from airbyte_cdk.sources.declarative.requesters.paginators.strategies.page_increment import PageIncrement from airbyte_cdk.sources.declarative.schema.inline_schema_loader import InlineSchemaLoader from airbyte_cdk.sources.declarative.transformations import RecordTransformation from airbyte_cdk.sources.declarative.types import Config, StreamSlice, StreamState from airbyte_cdk.sources.types import Record logger = logging.getLogger("airbyte") @dataclass class MondayActivityExtractor(RecordExtractor): """ Record extractor that extracts record of the form from activity logs stream: { "list": { "ID_1": record_1, "ID_2": record_2, ... } } Attributes: parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation decoder (Decoder): The decoder responsible to transfom the response in a Mapping """ parameters: InitVar[Mapping[str, Any]] decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={})) def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]: response_body_generator = self.decoder.decode(response) for response_body in response_body_generator: if not response_body["data"]["boards"]: continue for board_data in response_body["data"]["boards"]: if not isinstance(board_data, dict) or not board_data.get("activity_logs"): continue for record in board_data.get("activity_logs", []): json_data = json.loads(record["data"]) new_record = record if record.get("created_at"): new_record.update({"created_at_int": int(record.get("created_at", 0)) // 10_000_000}) else: continue if record.get("entity") == "pulse" and json_data.get("pulse_id"): new_record.update({"pulse_id": json_data.get("pulse_id")}) if record.get("entity") == "board" and json_data.get("board_id"): new_record.update({"board_id": json_data.get("board_id")}) yield new_record @dataclass class MondayIncrementalItemsExtractor(RecordExtractor): """ Record extractor that searches a decoded response over a path defined as an array of fields. """ field_path: List[Union[InterpolatedString, str]] config: Config parameters: InitVar[Mapping[str, Any]] field_path_pagination: List[Union[InterpolatedString, str]] = field(default_factory=list) decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={})) def __post_init__(self, parameters: Mapping[str, Any]): # Convert string paths to InterpolatedString for both field_path and field_path_pagination self._field_path = [InterpolatedString.create(p, parameters=parameters) if isinstance(p, str) else p for p in self.field_path] self._field_path_pagination = [ InterpolatedString.create(p, parameters=parameters) if isinstance(p, str) else p for p in self.field_path_pagination ] def _try_extract_records( self, response: requests.Response, field_path: List[Union[InterpolatedString, str]] ) -> Iterable[Mapping[str, Any]]: for body in self.decoder.decode(response): if len(field_path) == 0: extracted = body else: path = [p.eval(self.config) for p in field_path] if "*" in path: extracted = dpath.values(body, path) else: extracted = dpath.get(body, path, default=[]) if extracted: if isinstance(extracted, list) and None in extracted: logger.warning(f"Record with null value received; errors: {body.get('errors')}") yield from (x for x in extracted if x) else: yield from extracted if isinstance(extracted, list) else [extracted] def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]: # Try primary field path has_records = False for record in self._try_extract_records(response, self._field_path): has_records = True yield record # Fallback to pagination path if no records and path exists if not has_records and self._field_path_pagination: yield from self._try_extract_records(response, self._field_path_pagination) @dataclass(kw_only=True) class MondayGraphqlRequester(HttpRequester): NEXT_PAGE_TOKEN_FIELD_NAME = "next_page_token" schema_loader: InlineSchemaLoader limit: Union[InterpolatedString, str, int] = None nested_limit: Union[InterpolatedString, str, int] = None def __post_init__(self, parameters: Mapping[str, Any]): super(MondayGraphqlRequester, self).__post_init__(parameters) self.limit = InterpolatedString.create(self.limit, parameters=parameters) self.nested_limit = InterpolatedString.create(self.nested_limit, parameters=parameters) self.name = parameters.get("name", "").lower() self.stream_sync_mode = ( SyncMode.full_refresh if parameters.get("stream_sync_mode", "full_refresh") == "full_refresh" else SyncMode.incremental ) def _ensure_type(self, t: Type, o: Any): """ Ensure given object `o` is of type `t` """ if not isinstance(o, t): raise TypeError(f"{type(o)} {o} is not of type {t}") def _get_schema_root_properties(self): schema = self.schema_loader.get_json_schema()[self.name]["properties"] # delete fields that will be created by extractor delete_fields = ["updated_at_int", "created_at_int", "pulse_id"] if self.name == "activity_logs": delete_fields.append("board_id") for field in delete_fields: if field in schema: schema.pop(field) return schema def _get_object_arguments(self, **object_arguments) -> str: return ",".join( [ f"{argument}:{value}" if argument != "fromt" else f'from:"{value}"' for argument, value in object_arguments.items() if value is not None ] ) def _build_query(self, object_name: str, field_schema: dict, **object_arguments) -> str: """ Recursive function that builds a GraphQL query string by traversing given stream schema properties. Attributes object_name (str): the name of root object field_schema (dict): configured catalog schema for current stream object_arguments (dict): arguments such as limit, page, ids, ... etc to be passed for given object """ fields = [] for field, nested_schema in field_schema.items(): nested_fields = nested_schema.get("properties", nested_schema.get("items", {}).get("properties")) if nested_fields: # preconfigured_arguments = get properties from schema or any other source ... # fields.append(self._build_query(field, nested_fields, **preconfigured_arguments)) fields.append(self._build_query(field, nested_fields)) else: fields.append(field) # when querying the boards stream (object_name == "boards"), filter by board_ids if they provided in the config if object_name == "boards" and "board_ids" in self.config: # if we are building a query for incremental syncs, board ids are already present under 'ids' key in object_arguments (as a result of fetching the activity_logs stream first) # These ids are already an intersection of the board_ids provided in the config and the ones that must be fetched for the incremental sync and need not be overridden if "ids" not in object_arguments: object_arguments["ids"] = self.config.get("board_ids") arguments = self._get_object_arguments(**object_arguments) arguments = f"({arguments})" if arguments else "" if object_name == "column_values": fields.remove("display_value") fields.extend( ["... on MirrorValue{display_value}", "... on BoardRelationValue{display_value}", "... on DependencyValue{display_value}"] ) fields = ",".join(fields) if object_name in ["items_page", "next_items_page"]: query = f"{object_name}{arguments}{{cursor,items{{{fields}}}}}" else: query = f"{object_name}{arguments}{{{fields}}}" return query def _build_items_query(self, object_name: str, field_schema: dict, sub_page: Optional[int], **object_arguments) -> str: """ Special optimization needed for items stream. Starting October 3rd, 2022 items can only be reached through boards. See https://developer.monday.com/api-reference/docs/items-queries#items-queries Comparison of different APIs queries: 2023-07: boards(limit: 1) { items(limit: 20) { field1, field2, ... }} boards(limit: 1, page:2) { items(limit: 20, page:2) { field1, field2, ... }} boards and items paginations 2024_01: boards(limit: 1) { items_page(limit: 20) {cursor, items{field1, field2, ...} }} boards(limit: 1, page:2) { items_page(limit: 20) {cursor, items{field1, field2, ...} }} - boards pagination next_items_page(limit: 20, cursor: "blaa") {cursor, items{field1, field2, ...} } - items pagination """ nested_limit = self.nested_limit.eval(self.config) if sub_page: query = self._build_query("next_items_page", field_schema, limit=nested_limit, cursor=f'"{sub_page}"') else: query = self._build_query("items_page", field_schema, limit=nested_limit) # since items are a subresource of boards, when querying items, filter by board_ids if provided in the config if "board_ids" in self.config and "ids" not in object_arguments: object_arguments["ids"] = self.config.get("board_ids") arguments = self._get_object_arguments(**object_arguments) query = f"boards({arguments}){{{query}}}" return query def _build_items_incremental_query(self, object_name: str, field_schema: dict, stream_slice: dict, **object_arguments) -> str: """ Special optimization needed for items stream. Starting October 3rd, 2022 items can only be reached through boards. See https://developer.monday.com/api-reference/docs/items-queries#items-queries """ nested_limit = self.nested_limit.eval(self.config) object_arguments["limit"] = nested_limit object_arguments["ids"] = stream_slice["ids"] return self._build_query("items", field_schema, **object_arguments) def _build_teams_query(self, object_name: str, field_schema: dict, **object_arguments) -> str: """ Special optimization needed for tests to pass successfully because of rate limits. It makes a query cost less points, but it is never used in production """ teams_limit = self.config.get("teams_limit") if teams_limit: self._ensure_type(int, teams_limit) arguments = self._get_object_arguments(**object_arguments) query = f"{{id,name,picture_url,users(limit:{teams_limit}){{id}}}}" if not arguments: # when providing empty arguments in () API returns error return f"{object_name}{query}" return f"{object_name}({arguments}){query}" return self._build_query(object_name=object_name, field_schema=field_schema, **object_arguments) def _build_activity_query(self, object_name: str, field_schema: dict, sub_page: Optional[int], **object_arguments) -> str: """ Special optimization needed for items stream. Starting October 3rd, 2022 items can only be reached through boards. See https://developer.monday.com/api-reference/docs/items-queries#items-queries """ nested_limit = self.nested_limit.eval(self.config) created_at = (object_arguments.get("stream_slice", dict()) or dict()).get("start_time") if "stream_slice" in object_arguments: object_arguments.pop("stream_slice") # 1 is default start time, so we can skip it to get all the data if created_at == "1": created_at = None else: created_at = datetime.fromtimestamp(int(created_at)).strftime("%Y-%m-%dT%H:%M:%SZ") query = self._build_query(object_name, field_schema, limit=nested_limit, page=sub_page, fromt=created_at) if "board_ids" in self.config and "ids" not in object_arguments: object_arguments["ids"] = self.config.get("board_ids") arguments = self._get_object_arguments(**object_arguments) return f"boards({arguments}){{{query}}}" def get_request_headers( self, *, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: headers = super().get_request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) headers["API-Version"] = "2024-10" return headers def get_request_body_json( # type: ignore self, *, stream_state: Optional[StreamState] = None, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[Mapping[str, Any]]: """ Combines queries to a single GraphQL query. """ limit = self.limit.eval(self.config) page = next_page_token and next_page_token[self.NEXT_PAGE_TOKEN_FIELD_NAME] if self.name == "boards" and stream_slice: if self.stream_sync_mode == SyncMode.full_refresh: # incremental sync parameters are not needed for full refresh stream_slice = {} else: stream_slice = {"ids": stream_slice.get("ids")} query_builder = partial(self._build_query, **stream_slice) elif self.name == "items": # `items` stream use a separate pagination strategy where first level pages are across `boards` and sub-pages are across `items` page, sub_page = page if page else (None, None) if self.stream_sync_mode == SyncMode.full_refresh: query_builder = partial(self._build_items_query, sub_page=sub_page) else: query_builder = partial(self._build_items_incremental_query, stream_slice=stream_slice) elif self.name == "teams": query_builder = self._build_teams_query elif self.name == "activity_logs": page, sub_page = page if page else (None, None) query_builder = partial(self._build_activity_query, sub_page=sub_page, stream_slice=stream_slice) else: query_builder = self._build_query query = query_builder( object_name=self.name, field_schema=self._get_schema_root_properties(), limit=limit or None, page=page, ) return {"query": f"{{{query}}}"} # We are using an LRU cache in should_retry() method which requires all incoming arguments (including self) to be hashable. # Dataclasses by default are not hashable, so we need to define __hash__(). Alternatively, we can set @dataclass(frozen=True), # but this has a cascading effect where all dataclass fields must also be set to frozen. def __hash__(self): return hash(tuple(self.__dict__)) class ItemPaginationStrategy(PageIncrement): """ Page increment strategy with subpages for the `items` stream. From the `items` documentation https://developer.monday.com/api-reference/docs/items: Please note that you cannot return more than 100 items per query when using items at the root. To adjust your query, try only returning items on a specific board, nesting items inside a boards query, looping through the boards on your account, or querying less than 100 items at a time. This pagination strategy supports nested loop through `boards` on the top level and `items` on the second. See boards documentation for more details: https://developer.monday.com/api-reference/docs/boards#queries. """ def __post_init__(self, parameters: Mapping[str, Any]): # `self._page` corresponds to board page number # `self._sub_page` corresponds to item page number within its board self.start_from_page = 1 self._page: Optional[int] = self.start_from_page self._sub_page: Optional[int] = self.start_from_page def next_page_token( self, response: requests.Response, last_page_size: int, last_record: Optional[Record], last_page_token_value: Optional[Any] ) -> Optional[Tuple[Optional[int], Optional[int]]]: """ Determines page and subpage numbers for the `items` stream Attributes: response: Contains `boards` and corresponding lists of `items` for each `board` last_records: Parsed `items` from the response """ if last_page_size >= self.page_size: self._sub_page += 1 else: self._sub_page = self.start_from_page if response.json()["data"].get("boards"): self._page += 1 else: return None return self._page, self._sub_page class ItemCursorPaginationStrategy(PageIncrement): """ Page increment strategy with subpages for the `items` stream. From the `items` documentation https://developer.monday.com/api-reference/docs/items: Please note that you cannot return more than 100 items per query when using items at the root. To adjust your query, try only returning items on a specific board, nesting items inside a boards query, looping through the boards on your account, or querying less than 100 items at a time. This pagination strategy supports nested loop through `boards` on the top level and `items` on the second. See boards documentation for more details: https://developer.monday.com/api-reference/docs/boards#queries. """ def __post_init__(self, parameters: Mapping[str, Any]): # `self._page` corresponds to board page number # `self._sub_page` corresponds to item page number within its board self.start_from_page = 1 self._page: Optional[int] = self.start_from_page self._sub_page: Optional[int] = self.start_from_page def next_page_token( self, response: requests.Response, last_page_size: int, last_record: Optional[Record], last_page_token_value: Optional[Any] ) -> Optional[Tuple[Optional[int], Optional[int]]]: """ `items` stream use a separate 2 level pagination strategy where: 1st level `boards` - incremental pagination 2nd level `items_page` - cursor pagination Attributes: response: Contains `boards` and corresponding lists of `items` for each `board` last_records: Parsed `items` from the response """ data = response.json()["data"] boards = data.get("boards", []) next_items_page = data.get("next_items_page", {}) if boards: # there is always only one board due to limit=1, so in one request we extract all 'items_page' for one board only board = boards[0] cursor = board.get("items_page", {}).get("cursor", None) elif next_items_page: cursor = next_items_page.get("cursor", None) else: # Finish pagination if there is no more data return None if cursor: return self._page, cursor else: self._page += 1 return self._page, None class MondayStateMigration(StateMigration): def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: del stream_state["activity_logs"] return stream_state def should_migrate(self, stream_state: Mapping[str, Any]) -> bool: return "activity_logs" in stream_state @dataclass class MondayTransformation(RecordTransformation): def transform(self, record: MutableMapping[str, Any], config: Optional[Config] = None, **kwargs) -> MutableMapping[str, Any]: # Oncall issue: https://github.com/airbytehq/oncall/issues/4337 column_values = record.get("column_values", []) for values in column_values: display_value, text = values.get("display_value"), values.get("text") if display_value and not text: values["text"] = display_value return record