1
0
mirror of synced 2025-12-22 19:38:29 -05:00
Files
airbyte/airbyte-integrations/connectors/source-avni/source_avni/components.py
2024-12-18 14:05:43 -08:00

37 lines
1.1 KiB
Python

# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
from dataclasses import dataclass
import boto3
import requests
from airbyte_cdk.sources.declarative.auth.token import BasicHttpAuthenticator
@dataclass
class CustomAuthenticator(BasicHttpAuthenticator):
@property
def token(self) -> str:
username = self._username.eval(self.config)
password = self._password.eval(self.config)
app_client_id = self.get_client_id()
client = boto3.client("cognito-idp", region_name="ap-south-1")
response = client.initiate_auth(
ClientId=app_client_id, AuthFlow="USER_PASSWORD_AUTH", AuthParameters={"USERNAME": username, "PASSWORD": password}
)
token = response["AuthenticationResult"]["IdToken"]
return token
@property
def auth_header(self) -> str:
return "auth-token"
def get_client_id(self):
url_client = "https://app.avniproject.org/idp-details"
response = requests.get(url_client)
response.raise_for_status()
client = response.json()
return client["cognito"]["clientId"]