1
0
mirror of synced 2025-12-20 18:39:31 -05:00
Files
airbyte/airbyte-integrations/connectors/source-zendesk-support/components.py

44 lines
1.6 KiB
Python

# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
from typing import Any, List, Mapping
import requests
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
class ZendeskSupportExtractorEvents(RecordExtractor):
def extract_records(self, response: requests.Response) -> List[Mapping[str, Any]]:
try:
records = response.json().get("ticket_events") or []
except requests.exceptions.JSONDecodeError:
records = []
events = []
for record in records:
for event in record.get("child_events", []):
if event.get("event_type") == "Comment":
for prop in ["via_reference_id", "ticket_id", "timestamp"]:
event[prop] = record.get(prop)
# https://github.com/airbytehq/oncall/issues/1001
if not isinstance(event.get("via"), dict):
event["via"] = None
events.append(event)
return events
class ZendeskSupportAttributeDefinitionsExtractor(RecordExtractor):
def extract_records(self, response: requests.Response) -> List[Mapping[str, Any]]:
try:
records = []
for definition in response.json()["definitions"]["conditions_all"]:
definition["condition"] = "all"
records.append(definition)
for definition in response.json()["definitions"]["conditions_any"]:
definition["condition"] = "any"
records.append(definition)
except requests.exceptions.JSONDecodeError:
records = []
return records