35 lines
1.1 KiB
Python
35 lines
1.1 KiB
Python
#
|
|
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
from typing import Any, Dict, List, Mapping
|
|
|
|
import requests
|
|
|
|
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
|
|
|
|
|
|
class CustomExtractor(RecordExtractor):
|
|
def extract_records(self, response: requests.Response, **kwargs) -> List[Mapping[str, Any]]:
|
|
data = response.json().get("data")
|
|
extracted_records = []
|
|
self.primary_key = "id"
|
|
if not data:
|
|
return extracted_records
|
|
|
|
for element in data:
|
|
relationships: Dict[str, List[int]] = dict()
|
|
for r_type, relations in element.get("relationships", {}).items():
|
|
if relations.get("data"):
|
|
data = relations.get("data", [])
|
|
|
|
if isinstance(data, dict):
|
|
data = [data]
|
|
|
|
relationships[f"{r_type}"] = [e.get("id") for e in data]
|
|
|
|
extracted_record = {**element.get("attributes"), **{self.primary_key: element[self.primary_key], **relationships}}
|
|
extracted_records.append(extracted_record)
|
|
|
|
return extracted_records
|