1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Source Freshdesk: handle rate limiting #1972 (#2060)

Co-authored-by: Sherif Nada <snadalive@gmail.com>
This commit is contained in:
Eugene K
2021-02-17 14:00:28 -03:00
committed by GitHub
parent 94dc7ae711
commit a7761e792e
13 changed files with 366 additions and 121 deletions

View File

@@ -2,6 +2,6 @@
"sourceDefinitionId": "ec4b9503-13cb-48ab-a4ab-6ade4be46567",
"name": "Freshdesk",
"dockerRepository": "airbyte/source-freshdesk",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.2",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-freshdesk"
}

View File

@@ -106,7 +106,7 @@
- sourceDefinitionId: ec4b9503-13cb-48ab-a4ab-6ade4be46567
name: Freshdesk
dockerRepository: airbyte/source-freshdesk
dockerImageTag: 0.1.1
dockerImageTag: 0.1.2
documentationUrl: https://hub.docker.com/r/airbyte/source-freshdesk
- sourceDefinitionId: 396e4ca3-8a97-4b85-aa4e-c9d8c2d5f992
name: Braintree

View File

@@ -22,6 +22,8 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import traceback
from airbyte_protocol import AirbyteLogMessage, AirbyteMessage
@@ -48,6 +50,10 @@ class AirbyteLogger:
def fatal(self, message):
self.log("FATAL", message)
def exception(self, message):
message = f"{message}\n{traceback.format_exc()}"
self.error(message)
def error(self, message):
self.log("ERROR", message)

View File

@@ -22,9 +22,9 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import json
import copy
from datetime import datetime
from typing import Dict, Generator, Mapping, Type
from typing import Any, Iterator, Mapping, MutableMapping, Type
from airbyte_protocol import (
AirbyteCatalog,
@@ -33,7 +33,9 @@ from airbyte_protocol import (
AirbyteRecordMessage,
AirbyteStateMessage,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
Status,
SyncMode,
)
from airbyte_protocol import Type as MessageType
@@ -47,19 +49,24 @@ class BaseSource(Source):
client_class: Type[BaseClient] = None
@property
def name(self) -> str:
"""Source name"""
return self.__class__.__name__
def _get_client(self, config: Mapping):
"""Construct client"""
client = self.client_class(**config)
return client
def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog:
def discover(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteCatalog:
"""Discover streams"""
client = self._get_client(config)
return AirbyteCatalog(streams=[stream for stream in client.streams])
def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus:
def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
"""Check connection"""
client = self._get_client(config)
alive, error = client.health_check()
@@ -69,28 +76,39 @@ class BaseSource(Source):
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
def read(
self, logger: AirbyteLogger, config: json, catalog: ConfiguredAirbyteCatalog, state: Dict[str, any] = {}
) -> Generator[AirbyteMessage, None, None]:
self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None
) -> Iterator[AirbyteMessage]:
state = state or {}
client = self._get_client(config)
logger.info(f"Starting syncing {self.__class__.__name__}")
total_state = {**state}
logger.info(f"Starting syncing {self.name}")
total_state = copy.deepcopy(state)
for configured_stream in catalog.streams:
stream_name = configured_stream.stream.name
try:
yield from self._read_stream(logger=logger, client=client, configured_stream=configured_stream, state=total_state)
# TODO: test stream fail
except Exception:
logger.exception(f"Encountered an exception while reading stream {self.name}")
if client.stream_has_state(stream_name) and state.get(stream_name):
logger.info(f"Set state of {stream_name} stream to {state.get(stream_name)}")
client.set_stream_state(stream_name, state.get(stream_name))
logger.info(f"Finished syncing {self.name}")
logger.info(f"Syncing {stream_name} stream")
for record in client.read_stream(configured_stream.stream):
now = int(datetime.now().timestamp()) * 1000
message = AirbyteRecordMessage(stream=stream_name, data=record, emitted_at=now)
yield AirbyteMessage(type=MessageType.RECORD, record=message)
def _read_stream(
self, logger: AirbyteLogger, client: BaseClient, configured_stream: ConfiguredAirbyteStream, state: MutableMapping[str, Any]
):
stream_name = configured_stream.stream.name
use_incremental = configured_stream.sync_mode == SyncMode.incremental and client.stream_has_state(stream_name)
if client.stream_has_state(stream_name):
total_state[stream_name] = client.get_stream_state(stream_name)
# output state object only together with other stream states
yield AirbyteMessage(type=MessageType.STATE, state=AirbyteStateMessage(data=total_state))
if use_incremental and state.get(stream_name):
logger.info(f"Set state of {stream_name} stream to {state.get(stream_name)}")
client.set_stream_state(stream_name, state.get(stream_name))
logger.info(f"Finished syncing {self.__class__.__name__}")
logger.info(f"Syncing {stream_name} stream")
for record in client.read_stream(configured_stream.stream):
now = int(datetime.now().timestamp()) * 1000
message = AirbyteRecordMessage(stream=stream_name, data=record, emitted_at=now)
yield AirbyteMessage(type=MessageType.RECORD, record=message)
if use_incremental and client.get_stream_state(stream_name):
state[stream_name] = client.get_stream_state(stream_name)
# output state object only together with other stream states
yield AirbyteMessage(type=MessageType.STATE, state=AirbyteStateMessage(data=state))

View File

@@ -12,5 +12,5 @@ COPY $CODE_PATH ./$CODE_PATH
COPY setup.py ./
RUN pip install .
LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/source-freshdesk

View File

@@ -19,3 +19,9 @@ airbyteStandardSourceTestFile {
dependencies {
implementation files(project(':airbyte-integrations:bases:base-python').airbyteDocker.outputs)
}
task("pythonIntegrationTests", type: PythonTask, dependsOn: installTestReqs) {
module = "pytest"
command = "-s integration_tests"
}
integrationTest.dependsOn("pythonIntegrationTests")

View File

@@ -0,0 +1,83 @@
"""
MIT License
Copyright (c) 2020 Airbyte
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import json
from pathlib import Path
from typing import Mapping
import pytest
from source_freshdesk.client import Client
HERE = Path(__file__).parent.absolute()
@pytest.fixture(scope="session")
def account_creds() -> Mapping[str, str]:
config_filename = HERE.parent / "secrets" / "config.json"
if not config_filename.exists():
raise RuntimeError(f"Please provide credentials in {config_filename}")
with open(str(config_filename)) as json_file:
return json.load(json_file)
@pytest.fixture
def unknown_account() -> str:
return "unknownaccount.freshdesk.com"
@pytest.fixture
def non_freshdesk_account() -> str:
return "unknownaccount.somedomain.com"
def test_client_wrong_domain(non_freshdesk_account):
expected_error = "Freshdesk v2 API works only via Freshdesk domains and not via custom CNAMEs"
with pytest.raises(AttributeError, match=expected_error):
Client(domain=non_freshdesk_account, api_key="wrong_key")
def test_client_wrong_account(unknown_account):
client = Client(domain=unknown_account, api_key="wrong_key")
alive, error = client.health_check()
assert not alive
assert error == "Invalid credentials"
def test_client_wrong_cred(account_creds):
client = Client(domain=account_creds["domain"], api_key="wrong_key")
alive, error = client.health_check()
assert not alive
assert error == "Invalid credentials"
def test_client_ok(account_creds):
client = Client(domain=account_creds["domain"], api_key=account_creds["api_key"])
alive, error = client.health_check()
assert alive
assert not error

View File

@@ -24,20 +24,29 @@ SOFTWARE.
from setuptools import find_packages, setup
MAIN_REQUIREMENTS = [
"airbyte-protocol",
"base-python",
"backoff==1.10.0",
"requests==2.25.1",
"pendulum==1.2.0",
]
TEST_REQUIREMENTS = [
"airbyte_python_test",
"pytest==6.1.2",
"requests_mock==1.8.0",
]
setup(
name="source_freshdesk",
description="Source implementation for Freshdesk.",
author="Airbyte",
author_email="contact@airbyte.io",
packages=find_packages(),
install_requires=[
"airbyte-protocol",
"base-python",
"requests==2.25.1",
"pendulum==1.2.0",
],
install_requires=MAIN_REQUIREMENTS,
package_data={"": ["*.json", "schemas/*.json"]},
extras_require={
"tests": ["airbyte_python_test", "pytest"],
"tests": TEST_REQUIREMENTS,
},
)

View File

@@ -22,7 +22,6 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import time
from abc import ABC, abstractmethod
from functools import partial
from typing import Any, Callable, Iterator, Mapping, MutableMapping, Optional, Sequence
@@ -31,103 +30,82 @@ import pendulum
import requests
from base_python.entrypoint import logger # FIXME (Eugene K): use standard logger
from requests import HTTPError
class FreshdeskError(HTTPError):
"""
Base error class.
Subclassing HTTPError to avoid breaking existing code that expects only HTTPErrors.
"""
class FreshdeskBadRequest(FreshdeskError):
"""Most 40X and 501 status codes"""
class FreshdeskUnauthorized(FreshdeskError):
"""401 Unauthorized"""
class FreshdeskAccessDenied(FreshdeskError):
"""403 Forbidden"""
class FreshdeskNotFound(FreshdeskError):
"""404"""
class FreshdeskRateLimited(FreshdeskError):
"""429 Rate Limit Reached"""
class FreshdeskServerError(FreshdeskError):
"""50X errors"""
from source_freshdesk.errors import (
FreshdeskAccessDenied,
FreshdeskBadRequest,
FreshdeskError,
FreshdeskNotFound,
FreshdeskRateLimited,
FreshdeskServerError,
FreshdeskUnauthorized,
)
from source_freshdesk.utils import retry_after_handler, retry_connection_handler
class API:
def __init__(self, domain: str, api_key: str, verify: bool = True, proxies: MutableMapping[str, str] = None):
def __init__(self, domain: str, api_key: str, verify: bool = True, proxies: MutableMapping[str, Any] = None):
"""Basic HTTP interface to read from endpoints"""
self._api_prefix = f"https://{domain.rstrip('/')}/api/v2/"
self._session = requests.Session()
self._session.auth = (api_key, "unused_with_api_key")
self._session.verify = verify
self._session.proxies = proxies
self._session.headers = {"Content-Type": "application/json"}
self._session.headers = {
"Content-Type": "application/json",
"User-Agent": "Airbyte",
}
if domain.find("freshdesk.com") < 0:
raise AttributeError("Freshdesk v2 API works only via Freshdesk domains and not via custom CNAMEs")
@staticmethod
def _parse_and_handle_errors(req):
def _parse_and_handle_errors(response):
try:
j = req.json()
body = response.json()
except ValueError:
j = {}
body = {}
error_message = "Freshdesk Request Failed"
if "errors" in j:
error_message = "{}: {}".format(j.get("description"), j.get("errors"))
if "errors" in body:
error_message = f"{body.get('description')}: {body['errors']}"
# API docs don't mention this clearly, but in the case of bad credentials the returned JSON will have a
# "message" field at the top level
elif "message" in j:
error_message = j["message"]
elif "message" in body:
error_message = f"{body.get('code')}: {body['message']}"
if req.status_code == 400:
raise FreshdeskBadRequest(error_message)
elif req.status_code == 401:
raise FreshdeskUnauthorized(error_message)
elif req.status_code == 403:
raise FreshdeskAccessDenied(error_message)
elif req.status_code == 404:
raise FreshdeskNotFound(error_message)
elif req.status_code == 429:
if response.status_code == 400:
raise FreshdeskBadRequest(error_message or "Wrong input, check your data", response=response)
elif response.status_code == 401:
raise FreshdeskUnauthorized(error_message or "Invalid credentials", response=response)
elif response.status_code == 403:
raise FreshdeskAccessDenied(error_message or "You don't have enough permissions", response=response)
elif response.status_code == 404:
raise FreshdeskNotFound(error_message or "Resource not found", response=response)
elif response.status_code == 429:
retry_after = response.headers.get("Retry-After")
raise FreshdeskRateLimited(
"429 Rate Limit Exceeded: API rate-limit has been reached until {} seconds. See "
"http://freshdesk.com/api#ratelimit".format(req.headers.get("Retry-After"))
f"429 Rate Limit Exceeded: API rate-limit has been reached until {retry_after} seconds."
" See http://freshdesk.com/api#ratelimit",
response=response,
)
elif 500 <= req.status_code < 600:
raise FreshdeskServerError("{}: Server Error".format(req.status_code))
elif 500 <= response.status_code < 600:
raise FreshdeskServerError(f"{response.status_code}: Server Error", response=response)
# Catch any other errors
try:
req.raise_for_status()
except HTTPError as e:
raise FreshdeskError("{}: {}".format(e, j))
response.raise_for_status()
except HTTPError as err:
raise FreshdeskError(f"{err}: {body}", response=response) from err
return j
return body
@retry_connection_handler(max_tries=5, factor=5)
@retry_after_handler(max_tries=3)
def get(self, url: str, params: Mapping = None):
"""Wrapper around request.get() to use the API prefix. Returns a JSON response."""
for _ in range(10):
params = params or {}
response = self._session.get(self._api_prefix + url, params=params)
try:
return self._parse_and_handle_errors(response)
except FreshdeskRateLimited:
retry_after = int(response.headers["Retry-After"])
logger.info(f"Rate limit reached. Sleeping for {retry_after} seconds")
time.sleep(retry_after + 1) # extra second to cover any fractions of second
raise Exception("Max retry limit reached")
params = params or {}
response = self._session.get(self._api_prefix + url, params=params)
return self._parse_and_handle_errors(response)
class StreamAPI(ABC):

View File

@@ -33,6 +33,8 @@ from .api import (
ContactsAPI,
ConversationsAPI,
FreshdeskError,
FreshdeskNotFound,
FreshdeskUnauthorized,
GroupsAPI,
RolesAPI,
SatisfactionRatingsAPI,
@@ -86,8 +88,11 @@ class Client(BaseClient):
try:
self.settings()
except (FreshdeskUnauthorized, FreshdeskNotFound):
alive = False
error_msg = "Invalid credentials"
except FreshdeskError as error:
alive = False
error_msg = str(error)
error_msg = repr(error)
return alive, error_msg

View File

@@ -0,0 +1,56 @@
"""
MIT License
Copyright (c) 2020 Airbyte
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
from requests import HTTPError
class FreshdeskError(HTTPError):
"""
Base error class.
Subclassing HTTPError to avoid breaking existing code that expects only HTTPErrors.
"""
class FreshdeskBadRequest(FreshdeskError):
"""Most 40X and 501 status codes"""
class FreshdeskUnauthorized(FreshdeskError):
"""401 Unauthorized"""
class FreshdeskAccessDenied(FreshdeskError):
"""403 Forbidden"""
class FreshdeskNotFound(FreshdeskError):
"""404"""
class FreshdeskRateLimited(FreshdeskError):
"""429 Rate Limit Reached"""
class FreshdeskServerError(FreshdeskError):
"""50X errors"""

View File

@@ -0,0 +1,76 @@
"""
MIT License
Copyright (c) 2020 Airbyte
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import sys
import time
import backoff
import requests
from base_python.entrypoint import logger
from source_freshdesk.errors import FreshdeskRateLimited
def retry_connection_handler(**kwargs):
"""Retry helper, log each attempt"""
def log_retry_attempt(details):
_, exc, _ = sys.exc_info()
logger.info(str(exc))
logger.info(f"Caught retryable error after {details['tries']} tries. Waiting {details['wait']} more seconds then retrying...")
def giveup_handler(exc):
return exc.response is not None and 400 <= exc.response.status_code < 500
return backoff.on_exception(
backoff.expo,
requests.exceptions.RequestException,
jitter=None,
on_backoff=log_retry_attempt,
giveup=giveup_handler,
**kwargs,
)
def retry_after_handler(**kwargs):
"""Retry helper when we hit the call limit, sleeps for specific duration"""
def sleep_on_ratelimit(_details):
_, exc, _ = sys.exc_info()
if isinstance(exc, FreshdeskRateLimited):
retry_after = int(exc.response.headers["Retry-After"])
logger.info(f"Rate limit reached. Sleeping for {retry_after} seconds")
time.sleep(retry_after + 1) # extra second to cover any fractions of second
def log_giveup(_details):
logger.error("Max retry limit reached")
return backoff.on_exception(
backoff.constant,
FreshdeskRateLimited,
jitter=None,
on_backoff=sleep_on_ratelimit,
on_giveup=log_giveup,
interval=0, # skip waiting part, we will wait in on_backoff handler
**kwargs,
)

View File

@@ -22,28 +22,36 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import json
from pathlib import Path
# TODO uncomment once this issue https://github.com/airbytehq/airbyte/issues/1134 to allow depending on local python packages without inheriting
# Docker images. For now this is covered by standard tests.
# from source_freshdesk.client import Client
from source_freshdesk.client import Client
HERE = Path(__file__).parent.absolute()
def test_fake():
assert json.loads("{}") == {}
def test_client_backoff_on_limit_reached(requests_mock):
"""Error once, check that we retry and not fail"""
responses = [
{"json": {"error": "limit reached"}, "status_code": 429, "headers": {"Retry-After": "0"}},
{"json": {"status": "ok"}, "status_code": 200},
]
requests_mock.register_uri("GET", "/api/v2/settings/helpdesk", responses)
client = Client(domain="someaccount.freshdesk.com", api_key="somekey")
result = client.settings()
assert result == {"status": "ok"}
# def test_client_wrong_domain():
# not_freshdesk_domain = "unknownaccount"
# expected_error = "Freshdesk v2 API works only via Freshdesk" "domains and not via custom CNAMEs"
# with pytest.raises(AttributeError, match=expected_error):
# Client(domain=not_freshdesk_domain, api_key="wrong_key")
#
#
# def test_client_wrong_account():
# unknown_domain = "unknownaccount.freshdesk.com"
# client = Client(domain=unknown_domain, api_key="wrong_key")
# alive, error = client.health_check()
#
# assert not alive
# assert error == "Freshdesk Request Failed"
def test_client_backoff_on_server_error(requests_mock):
"""Error once, check that we retry and not fail"""
responses = [
{"json": {"error": "something bad"}, "status_code": 500},
{"json": {"status": "ok"}, "status_code": 200},
]
requests_mock.register_uri("GET", "/api/v2/settings/helpdesk", responses)
client = Client(domain="someaccount.freshdesk.com", api_key="somekey")
result = client.settings()
assert result == {"status": "ok"}