1
0
mirror of synced 2025-12-22 03:21:25 -05:00
Files
airbyte/airbyte-integrations/connectors/source-pinterest/components.py
2025-09-30 16:54:36 +03:00

54 lines
1.8 KiB
Python

#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#
import re
from typing import List
import requests
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.types import Record
from airbyte_cdk.sources.streams.http.error_handlers import BackoffStrategy
class AdAccountRecordExtractor(RecordExtractor):
"""
Custom extractor for handling different response formats from the Ad Accounts endpoint.
This extractor is necessary to handle cases where an `account_id` is present in the request.
- When querying all ad accounts, the response contains an "items" key with a list of accounts.
- When querying a specific ad account, the response returns a single dictionary representing that account.
"""
def extract_records(self, response: requests.Response) -> List[Record]:
data = response.json()
if not data:
return []
# Extract records from "items" if present
if isinstance(data, dict) and "items" in data:
return data["items"]
# If the response is a single object, wrap it in a list
if isinstance(data, dict):
return [data]
return []
class PinterestAnalyticsBackoffStrategy(BackoffStrategy):
_re = re.compile(r"Retry after\s+(\d+)\s+seconds", re.IGNORECASE)
def backoff_time(self, response_or_exception, attempt_count: int) -> float:
try:
if isinstance(response_or_exception, requests.Response):
data = response_or_exception.json()
msg = str(data.get("message", ""))
m = self._re.search(msg)
if m:
return float(m.group(1))
except Exception:
pass
return min(2**attempt_count, 120.0)