1
0
mirror of synced 2026-01-20 21:06:36 -05:00
Files
airbyte/airbyte-integrations/connectors/source-auth0/source_auth0/utils.py
Yiyang Li 4fdd09b79c 🎉 New Source: Auth0 [python cdk] (#18338)
* 🎉 New Source: Auth0 [python cdk]

- Full import and incremental are supported
- Only users stream is added
- Added unit tests and ensure all acceptance tests are passed locally
- It's free to create a new account in Auth0, so it's free to setup an integration account.

* fix: remove unused import in source.py

* auto-bump connector version

Co-authored-by: sajarin <sajarindider@gmail.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
2022-10-26 13:55:16 -04:00

45 lines
1.3 KiB
Python

#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
import datetime
import logging
from typing import Dict
from urllib import parse
from airbyte_cdk.sources.streams.http.requests_native_auth.token import TokenAuthenticator
from requests.auth import AuthBase
from .authenticator import Auth0Oauth2Authenticator
logger = logging.getLogger("airbyte")
def get_api_endpoint(url_base: str, version: str) -> str:
return parse.urljoin(url_base, f"/api/{version}/")
def initialize_authenticator(config: Dict) -> AuthBase:
credentials = config.get("credentials")
if not credentials:
raise Exception("Config validation error. `credentials` not specified.")
auth_type = credentials.get("auth_type")
if not auth_type:
raise Exception("Config validation error. `auth_type` not specified.")
if auth_type == "oauth2_access_token":
return TokenAuthenticator(credentials.get("access_token"))
if auth_type == "oauth2_confidential_application":
return Auth0Oauth2Authenticator(
base_url=config.get("base_url"),
audience=credentials.get("audience"),
client_secret=credentials.get("client_secret"),
client_id=credentials.get("client_id"),
)
def datetime_to_string(date: datetime.datetime) -> str:
return date.strftime("%Y-%m-%dT%H:%M:%S.000Z")