54 lines
1.8 KiB
Python
54 lines
1.8 KiB
Python
#
|
|
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
import re
|
|
from typing import List
|
|
|
|
import requests
|
|
|
|
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
|
|
from airbyte_cdk.sources.declarative.types import Record
|
|
from airbyte_cdk.sources.streams.http.error_handlers import BackoffStrategy
|
|
|
|
|
|
class AdAccountRecordExtractor(RecordExtractor):
|
|
"""
|
|
Custom extractor for handling different response formats from the Ad Accounts endpoint.
|
|
|
|
This extractor is necessary to handle cases where an `account_id` is present in the request.
|
|
- When querying all ad accounts, the response contains an "items" key with a list of accounts.
|
|
- When querying a specific ad account, the response returns a single dictionary representing that account.
|
|
"""
|
|
|
|
def extract_records(self, response: requests.Response) -> List[Record]:
|
|
data = response.json()
|
|
|
|
if not data:
|
|
return []
|
|
|
|
# Extract records from "items" if present
|
|
if isinstance(data, dict) and "items" in data:
|
|
return data["items"]
|
|
|
|
# If the response is a single object, wrap it in a list
|
|
if isinstance(data, dict):
|
|
return [data]
|
|
return []
|
|
|
|
|
|
class PinterestAnalyticsBackoffStrategy(BackoffStrategy):
|
|
_re = re.compile(r"Retry after\s+(\d+)\s+seconds", re.IGNORECASE)
|
|
|
|
def backoff_time(self, response_or_exception, attempt_count: int) -> float:
|
|
try:
|
|
if isinstance(response_or_exception, requests.Response):
|
|
data = response_or_exception.json()
|
|
msg = str(data.get("message", ""))
|
|
m = self._re.search(msg)
|
|
if m:
|
|
return float(m.group(1))
|
|
except Exception:
|
|
pass
|
|
return min(2**attempt_count, 120.0)
|