1
0
mirror of synced 2025-12-21 11:01:41 -05:00
Files
airbyte/airbyte-integrations/connectors/source-surveymonkey/source_surveymonkey/source.py
2024-12-18 14:05:43 -08:00

69 lines
2.3 KiB
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import logging
from typing import Any, List, Mapping, Tuple
import pendulum
import requests
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
from .streams import Surveys
"""
This file provides the necessary constructs to interpret a provided declarative YAML configuration file into
source connector.
WARNING: Do not modify this file.
"""
class SourceSurveymonkey(YamlDeclarativeSource):
SCOPES = {"responses_read_detail", "surveys_read", "users_read"}
def __init__(self):
super().__init__(**{"path_to_yaml": "manifest.yaml"})
@classmethod
def _check_scopes(cls, response_json):
granted_scopes = response_json["scopes"]["granted"]
missed_scopes = cls.SCOPES - set(granted_scopes)
if missed_scopes:
return False, "missed required scopes: " + ", ".join(missed_scopes)
return True, None
@staticmethod
def get_authenticator(config: Mapping[str, Any]):
token = config.get("credentials", {}).get("access_token")
if not token:
token = config["access_token"]
return TokenAuthenticator(token=token)
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
# Check scopes
try:
authenticator = self.get_authenticator(config)
response = requests.get(url="https://api.surveymonkey.com/v3/users/me", headers=authenticator.get_auth_header())
response.raise_for_status()
return self._check_scopes(response.json())
except Exception as e:
return False, repr(e)
return super().check_connection(logger, config)
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
streams = super().streams(config=config)
authenticator = self.get_authenticator(config)
start_date = pendulum.parse(config["start_date"])
survey_ids = config.get("survey_ids", [])
args = {"authenticator": authenticator, "start_date": start_date, "survey_ids": survey_ids}
streams.append(Surveys(**args))
return streams