1
0
mirror of synced 2025-12-21 11:01:41 -05:00
Files
airbyte/airbyte-integrations/connectors/source-qualaroo/components.py
2024-12-18 14:05:43 -08:00

34 lines
1.1 KiB
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from base64 import b64encode
from dataclasses import dataclass
from typing import Any, List, Mapping
import requests
from airbyte_cdk.sources.declarative.auth.token import BasicHttpAuthenticator
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
@dataclass
class CustomAuthenticator(BasicHttpAuthenticator):
@property
def token(self):
key = str(self._username.eval(self.config)).encode("latin1")
token = self._password.eval(self.config).encode("latin1")
encoded_credentials = b64encode(b":".join((key, token))).strip()
token = "Basic " + encoded_credentials.decode("ascii")
return token
class CustomExtractor(RecordExtractor):
def extract_records(self, response: requests.Response, **kwargs) -> List[Mapping[str, Any]]:
extracted = []
for record in response.json():
if "answered_questions" in record:
record["answered_questions"] = list(record["answered_questions"].values())
extracted.append(record)
return extracted