1
0
mirror of synced 2025-12-22 11:31:02 -05:00
Files
airbyte/airbyte-integrations/connectors/source-outreach/components.py
2025-09-04 14:53:42 -04:00

35 lines
1.1 KiB
Python

#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from typing import Any, Dict, List, Mapping
import requests
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
class CustomExtractor(RecordExtractor):
def extract_records(self, response: requests.Response, **kwargs) -> List[Mapping[str, Any]]:
data = response.json().get("data")
extracted_records = []
self.primary_key = "id"
if not data:
return extracted_records
for element in data:
relationships: Dict[str, List[int]] = dict()
for r_type, relations in element.get("relationships", {}).items():
if relations.get("data"):
data = relations.get("data", [])
if isinstance(data, dict):
data = [data]
relationships[f"{r_type}"] = [e.get("id") for e in data]
extracted_record = {**element.get("attributes"), **{self.primary_key: element[self.primary_key], **relationships}}
extracted_records.append(extracted_record)
return extracted_records