# Copyright (c) 2023 Airbyte, Inc., all rights reserved. from typing import Any, List, Mapping import requests from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor class ZendeskSupportExtractorEvents(RecordExtractor): def extract_records(self, response: requests.Response) -> List[Mapping[str, Any]]: try: records = response.json().get("ticket_events") or [] except requests.exceptions.JSONDecodeError: records = [] events = [] for record in records: for event in record.get("child_events", []): if event.get("event_type") == "Comment": for prop in ["via_reference_id", "ticket_id", "timestamp"]: event[prop] = record.get(prop) # https://github.com/airbytehq/oncall/issues/1001 if not isinstance(event.get("via"), dict): event["via"] = None events.append(event) return events class ZendeskSupportAttributeDefinitionsExtractor(RecordExtractor): def extract_records(self, response: requests.Response) -> List[Mapping[str, Any]]: try: records = [] for definition in response.json()["definitions"]["conditions_all"]: definition["condition"] = "all" records.append(definition) for definition in response.json()["definitions"]["conditions_any"]: definition["condition"] = "any" records.append(definition) except requests.exceptions.JSONDecodeError: records = [] return records