82 lines
3.0 KiB
Python
82 lines
3.0 KiB
Python
#
|
|
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
import base64
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta
|
|
from typing import Any, Iterable, Mapping, MutableMapping, Optional
|
|
|
|
import backoff
|
|
import requests
|
|
|
|
from airbyte_cdk.sources.declarative.auth import DeclarativeOauth2Authenticator
|
|
from airbyte_cdk.sources.declarative.requesters.http_requester import HttpRequester
|
|
from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState
|
|
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException
|
|
|
|
|
|
logger = logging.getLogger("airbyte")
|
|
|
|
|
|
@dataclass
|
|
class PayPalOauth2Authenticator(DeclarativeOauth2Authenticator):
|
|
"""Request example for API token extraction:
|
|
For `old_config` scenario:
|
|
curl -v POST https://api-m.sandbox.paypal.com/v1/oauth2/token \
|
|
-H "Accept: application/json" \
|
|
-H "Accept-Language: en_US" \
|
|
-u "CLIENT_ID:SECRET" \
|
|
-d "grant_type=client_credentials"
|
|
"""
|
|
|
|
# config: Mapping[str, Any]
|
|
# client_id: Union[InterpolatedString, str]
|
|
# client_secret: Union[InterpolatedString, str]
|
|
# refresh_request_body: Optional[Mapping[str, Any]] = None
|
|
# token_refresh_endpoint: Union[InterpolatedString, str]
|
|
# grant_type: Union[InterpolatedString, str] = "refresh_token"
|
|
# expires_in_name: Union[InterpolatedString, str] = "expires_in"
|
|
# access_token_name: Union[InterpolatedString, str] = "access_token"
|
|
# parameters: InitVar[Mapping[str, Any]]
|
|
|
|
def get_refresh_request_headers(self):
|
|
basic_auth = base64.b64encode(bytes(f"{self.get_client_id()}:{self.get_client_secret()}", "utf-8")).decode("utf-8")
|
|
return {"Authorization": f"Basic {basic_auth}"}
|
|
|
|
@backoff.on_exception(
|
|
backoff.expo,
|
|
DefaultBackoffException,
|
|
max_tries=2,
|
|
on_backoff=lambda details: logger.info(
|
|
f"Caught retryable error after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..."
|
|
),
|
|
max_time=300,
|
|
)
|
|
def _get_refresh_access_token_response(self):
|
|
try:
|
|
request_url = self.get_token_refresh_endpoint()
|
|
request_headers = self.get_refresh_request_headers()
|
|
request_body = self.build_refresh_request_body()
|
|
|
|
logger.info(f"Sending request to URL: {request_url}")
|
|
|
|
response = requests.request(method="POST", url=request_url, data=request_body, headers=request_headers)
|
|
|
|
self._log_response(response)
|
|
response.raise_for_status()
|
|
|
|
response_json = response.json()
|
|
|
|
self.access_token = response_json.get("access_token")
|
|
|
|
return response.json()
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
if e.response and (e.response.status_code == 429 or e.response.status_code >= 500):
|
|
raise DefaultBackoffException(request=e.response.request, response=e.response)
|
|
raise
|
|
except Exception as e:
|
|
raise Exception(f"Error while refreshing access token: {e}") from e
|