1
0
mirror of synced 2025-12-21 11:01:41 -05:00
Files
airbyte/airbyte-integrations/connectors/source-surveymonkey/source_surveymonkey/streams.py
2024-12-18 14:05:43 -08:00

211 lines
9.1 KiB
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import tempfile
import urllib.parse
from abc import ABC, abstractmethod
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional
import pendulum
import requests
import vcr
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.core import CheckpointMixin
from airbyte_cdk.sources.streams.http import HttpStream
cache_file = tempfile.NamedTemporaryFile()
class SurveymonkeyStream(HttpStream, ABC):
url_base = "https://api.surveymonkey.com/v3/"
primary_key = "id"
data_field = "data"
default_backoff_time: int = 60 # secs
def __init__(self, start_date: pendulum.datetime, survey_ids: List[str], **kwargs):
super().__init__(**kwargs)
self._start_date = start_date
self._survey_ids = survey_ids
@property
def availability_strategy(self) -> Optional["AvailabilityStrategy"]:
return None
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
resp_json = response.json()
links = resp_json.get("links", {})
if links.get("next"):
next_query_string = urllib.parse.urlsplit(links["next"]).query
params = dict(urllib.parse.parse_qsl(next_query_string))
return params
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
return next_page_token or {}
def request_headers(self, **kwargs) -> Mapping[str, Any]:
return {"Content-Type": "application/json"}
def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
"""
We need to cache all requests to all endpoints during iteration.
This API is very very rate limited, we need to reuse everything possible.
We use the "new_episodes" record mode to save and reuse all requests in slices, details, etc..
"""
with vcr.use_cassette(cache_file.name, record_mode="new_episodes", serializer="json", decode_compressed_response=True):
try:
yield from super().read_records(
sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state
)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
yield from []
else:
raise e
def backoff_time(self, response: requests.Response) -> Optional[float]:
"""
https://developer.surveymonkey.com/api/v3/#headers
X-Ratelimit-App-Global-Minute-Remaining - Number of remaining requests app has before hitting per minute limit
X-Ratelimit-App-Global-Minute-Reset - Number of seconds until the rate limit remaining resets
Limits: https://developer.surveymonkey.com/api/v3/#request-and-response-limits
Max Requests Per Day - 500
Max Requests Per Minute - 120
Real limits from API response headers:
"X-Ratelimit-App-Global-Minute-Limit": "720"
"X-Ratelimit-App-Global-Day-Limit": "500000"
"""
# Stop for 60 secs if less than 1 request remains before we hit minute limit
minute_limit_remaining = response.headers.get("X-Ratelimit-App-Global-Minute-Remaining", "100")
if int(minute_limit_remaining) <= 1:
return self.default_backoff_time
def raise_error_from_response(self, response_json):
"""
this method use in all parse responses
including those who does not inherit / super() due to
necessity use raw response instead of accessing `data_field`
"""
if response_json.get("error"):
raise Exception(repr(response_json.get("error")))
def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
response_json = response.json()
self.raise_error_from_response(response_json=response_json)
if self.data_field:
yield from response_json.get(self.data_field, [])
else:
yield response_json
class IncrementalSurveymonkeyStream(SurveymonkeyStream, CheckpointMixin, ABC):
state_checkpoint_interval = 1000
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._state = None
@property
@abstractmethod
def cursor_field(self) -> str:
pass
@property
def state(self) -> MutableMapping[str, Any]:
return self._state
@state.setter
def state(self, value: MutableMapping[str, Any]):
self._state = value
def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
for record in super().read_records(sync_mode, cursor_field, stream_slice, stream_state):
self.state = self._get_updated_state(self.state, record)
yield record
def _get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Return the latest state by comparing the cursor value in the latest record with the stream's most recent state object
and returning an updated state object.
"""
current_stream_state = current_stream_state or {}
state_value = max(current_stream_state.get(self.cursor_field, ""), latest_record.get(self.cursor_field, ""))
return {self.cursor_field: state_value}
class SurveyIds(IncrementalSurveymonkeyStream):
cursor_field = "date_modified"
def path(self, **kwargs) -> str:
return "surveys"
def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
params = super().request_params(stream_state=stream_state, **kwargs)
params["sort_order"] = "ASC"
params["sort_by"] = self.cursor_field
params["per_page"] = 1000 # maybe as user input or bigger value
since_value = pendulum.parse(stream_state.get(self.cursor_field)) if stream_state.get(self.cursor_field) else self._start_date
since_value = max(since_value, self._start_date)
params["start_modified_at"] = since_value.strftime("%Y-%m-%dT%H:%M:%S")
return params
class SurveyIDSliceMixin:
def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return f"surveys/{stream_slice['survey_id']}/details"
def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs):
if self._survey_ids:
yield from [{"survey_id": id} for id in self._survey_ids]
else:
survey_stream = SurveyIds(start_date=self._start_date, survey_ids=self._survey_ids, authenticator=self._session.auth)
for survey in survey_stream.read_records(sync_mode=SyncMode.full_refresh, stream_state=stream_state):
yield {"survey_id": survey["id"]}
class Surveys(SurveyIDSliceMixin, IncrementalSurveymonkeyStream):
"""
Docs: https://developer.surveymonkey.com/api/v3/#surveys
A source for stream slices. It does not contain useful info itself.
The `surveys/id/details` endpoint contains full data about pages and questions. This data is already collected and
gathered into array [pages] and array of arrays questions, where each inner array contains data about certain page.
Example [[q1, q2,q3], [q4,q5]] means we have 2 pages, first page contains 3 questions q1, q2, q3, second page contains other.
If we use the "normal" query, we need to query surveys/id/pages for page enumeration,
then we need to query each page_id in every new request for details (because `pages` doesn't contain full info
and valid only for enumeration), then for each page need to enumerate questions and get each question_id for details
(since `/surveys/id/pages/id/questions` without ending /id also doesnt contain full info,
In other words, we need to have triple stream slices, (note that api is very very rate limited
and we need details for each survey etc), and finally we get a response similar to those we can have from `/id/details`
endpoint. Also we will need to gather info to array in case of overrequesting, but details is already gathered it for us.
We just need to apply filtering or small data transformation against array.
So this way is very much better in terms of API limits.
"""
data_field = None
cursor_field = "date_modified"
def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
data = super().parse_response(response=response, stream_state=stream_state, **kwargs)
for record in data:
record.pop("pages", None) # remove pages data
yield record