69 lines
2.3 KiB
Python
69 lines
2.3 KiB
Python
#
|
|
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
import logging
|
|
from typing import Any, List, Mapping, Tuple
|
|
|
|
import pendulum
|
|
import requests
|
|
|
|
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
|
|
from airbyte_cdk.sources.streams import Stream
|
|
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
|
|
|
|
from .streams import Surveys
|
|
|
|
|
|
"""
|
|
This file provides the necessary constructs to interpret a provided declarative YAML configuration file into
|
|
source connector.
|
|
|
|
WARNING: Do not modify this file.
|
|
"""
|
|
|
|
|
|
class SourceSurveymonkey(YamlDeclarativeSource):
|
|
SCOPES = {"responses_read_detail", "surveys_read", "users_read"}
|
|
|
|
def __init__(self):
|
|
super().__init__(**{"path_to_yaml": "manifest.yaml"})
|
|
|
|
@classmethod
|
|
def _check_scopes(cls, response_json):
|
|
granted_scopes = response_json["scopes"]["granted"]
|
|
missed_scopes = cls.SCOPES - set(granted_scopes)
|
|
if missed_scopes:
|
|
return False, "missed required scopes: " + ", ".join(missed_scopes)
|
|
return True, None
|
|
|
|
@staticmethod
|
|
def get_authenticator(config: Mapping[str, Any]):
|
|
token = config.get("credentials", {}).get("access_token")
|
|
if not token:
|
|
token = config["access_token"]
|
|
return TokenAuthenticator(token=token)
|
|
|
|
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
|
|
# Check scopes
|
|
try:
|
|
authenticator = self.get_authenticator(config)
|
|
response = requests.get(url="https://api.surveymonkey.com/v3/users/me", headers=authenticator.get_auth_header())
|
|
response.raise_for_status()
|
|
return self._check_scopes(response.json())
|
|
except Exception as e:
|
|
return False, repr(e)
|
|
|
|
return super().check_connection(logger, config)
|
|
|
|
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
|
|
streams = super().streams(config=config)
|
|
|
|
authenticator = self.get_authenticator(config)
|
|
start_date = pendulum.parse(config["start_date"])
|
|
survey_ids = config.get("survey_ids", [])
|
|
args = {"authenticator": authenticator, "start_date": start_date, "survey_ids": survey_ids}
|
|
|
|
streams.append(Surveys(**args))
|
|
return streams
|