88 lines
3.4 KiB
Python
88 lines
3.4 KiB
Python
#
|
|
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
import base64
|
|
import time
|
|
from dataclasses import dataclass
|
|
from http import HTTPStatus
|
|
from typing import Any, Mapping, Optional, Union
|
|
|
|
import requests
|
|
from requests import HTTPError
|
|
|
|
from airbyte_cdk.sources.declarative.auth.declarative_authenticator import NoAuth
|
|
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
|
|
from airbyte_cdk.sources.declarative.types import Config
|
|
|
|
|
|
# https://developers.zoom.us/docs/internal-apps/s2s-oauth/#successful-response
|
|
# The Bearer token generated by server-to-server token will expire in one hour
|
|
BEARER_TOKEN_EXPIRES_IN = 3590
|
|
|
|
|
|
class SingletonMeta(type):
|
|
_instances = {}
|
|
|
|
def __call__(cls, *args, **kwargs):
|
|
"""
|
|
Possible changes to the value of the `__init__` argument do not affect
|
|
the returned instance.
|
|
"""
|
|
if cls not in cls._instances:
|
|
instance = super().__call__(*args, **kwargs)
|
|
cls._instances[cls] = instance
|
|
return cls._instances[cls]
|
|
|
|
|
|
@dataclass
|
|
class ServerToServerOauthAuthenticator(NoAuth):
|
|
config: Config
|
|
account_id: Union[InterpolatedString, str]
|
|
client_id: Union[InterpolatedString, str]
|
|
client_secret: Union[InterpolatedString, str]
|
|
authorization_endpoint: Union[InterpolatedString, str]
|
|
|
|
_instance = None
|
|
_generate_token_time = 0
|
|
_access_token = None
|
|
_grant_type = "account_credentials"
|
|
|
|
def __post_init__(self, parameters: Mapping[str, Any]):
|
|
self._account_id = InterpolatedString.create(self.account_id, parameters=parameters).eval(self.config)
|
|
self._client_id = InterpolatedString.create(self.client_id, parameters=parameters).eval(self.config)
|
|
self._client_secret = InterpolatedString.create(self.client_secret, parameters=parameters).eval(self.config)
|
|
self._authorization_endpoint = InterpolatedString.create(self.authorization_endpoint, parameters=parameters).eval(self.config)
|
|
|
|
def __call__(self, request: requests.PreparedRequest) -> requests.PreparedRequest:
|
|
"""Attach the page access token to params to authenticate on the HTTP request"""
|
|
if self._access_token is None or ((time.time() - self._generate_token_time) > BEARER_TOKEN_EXPIRES_IN):
|
|
self._generate_token_time = time.time()
|
|
self._access_token = self.generate_access_token()
|
|
headers = {"Authorization": f"Bearer {self._access_token}", "Content-type": "application/json"}
|
|
request.headers.update(headers)
|
|
|
|
return request
|
|
|
|
@property
|
|
def auth_header(self) -> str:
|
|
return "Authorization"
|
|
|
|
@property
|
|
def token(self) -> Optional[str]:
|
|
return self._access_token if self._access_token else None
|
|
|
|
def generate_access_token(self) -> str:
|
|
self._generate_token_time = time.time()
|
|
try:
|
|
token = base64.b64encode(f"{self._client_id}:{self._client_secret}".encode("ascii")).decode("utf-8")
|
|
headers = {"Authorization": f"Basic {token}", "Content-type": "application/json"}
|
|
rest = requests.post(
|
|
url=f"{self._authorization_endpoint}?grant_type={self._grant_type}&account_id={self._account_id}", headers=headers
|
|
)
|
|
if rest.status_code != HTTPStatus.OK:
|
|
raise HTTPError(rest.text)
|
|
return rest.json().get("access_token")
|
|
except Exception as e:
|
|
raise Exception(f"Error while generating access token: {e}") from e
|