# # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # import base64 import time from dataclasses import dataclass from http import HTTPStatus from typing import Any, Mapping, Optional, Union import requests from requests import HTTPError from airbyte_cdk.sources.declarative.auth.declarative_authenticator import NoAuth from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.types import Config # https://developers.zoom.us/docs/internal-apps/s2s-oauth/#successful-response # The Bearer token generated by server-to-server token will expire in one hour BEARER_TOKEN_EXPIRES_IN = 3590 class SingletonMeta(type): _instances = {} def __call__(cls, *args, **kwargs): """ Possible changes to the value of the `__init__` argument do not affect the returned instance. """ if cls not in cls._instances: instance = super().__call__(*args, **kwargs) cls._instances[cls] = instance return cls._instances[cls] @dataclass class ServerToServerOauthAuthenticator(NoAuth): config: Config account_id: Union[InterpolatedString, str] client_id: Union[InterpolatedString, str] client_secret: Union[InterpolatedString, str] authorization_endpoint: Union[InterpolatedString, str] _instance = None _generate_token_time = 0 _access_token = None _grant_type = "account_credentials" def __post_init__(self, parameters: Mapping[str, Any]): self._account_id = InterpolatedString.create(self.account_id, parameters=parameters).eval(self.config) self._client_id = InterpolatedString.create(self.client_id, parameters=parameters).eval(self.config) self._client_secret = InterpolatedString.create(self.client_secret, parameters=parameters).eval(self.config) self._authorization_endpoint = InterpolatedString.create(self.authorization_endpoint, parameters=parameters).eval(self.config) def __call__(self, request: requests.PreparedRequest) -> requests.PreparedRequest: """Attach the page access token to params to authenticate on the HTTP request""" if self._access_token is None or ((time.time() - self._generate_token_time) > BEARER_TOKEN_EXPIRES_IN): self._generate_token_time = time.time() self._access_token = self.generate_access_token() headers = {"Authorization": f"Bearer {self._access_token}", "Content-type": "application/json"} request.headers.update(headers) return request @property def auth_header(self) -> str: return "Authorization" @property def token(self) -> Optional[str]: return self._access_token if self._access_token else None def generate_access_token(self) -> str: self._generate_token_time = time.time() try: token = base64.b64encode(f"{self._client_id}:{self._client_secret}".encode("ascii")).decode("utf-8") headers = {"Authorization": f"Basic {token}", "Content-type": "application/json"} rest = requests.post( url=f"{self._authorization_endpoint}?grant_type={self._grant_type}&account_id={self._account_id}", headers=headers ) if rest.status_code != HTTPStatus.OK: raise HTTPError(rest.text) return rest.json().get("access_token") except Exception as e: raise Exception(f"Error while generating access token: {e}") from e