1
0
mirror of synced 2026-01-16 09:06:29 -05:00
Files
airbyte/airbyte-integrations/bases/base-python/base_python/cdk/streams/rate_limiting.py
2021-05-13 17:46:34 -07:00

87 lines
3.3 KiB
Python

#
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
import sys
import time
import backoff
from base_python.cdk.streams.exceptions import DefaultBackoffException, UserDefinedBackoffException
from base_python.logger import AirbyteLogger
from requests import codes, exceptions
TRANSIENT_EXCEPTIONS = (DefaultBackoffException, exceptions.ConnectTimeout, exceptions.ReadTimeout, exceptions.ConnectionError)
# TODO inject singleton logger?
logger = AirbyteLogger()
def default_backoff_handler(max_tries: int, factor: int, **kwargs):
def log_retry_attempt(details):
_, exc, _ = sys.exc_info()
logger.info(str(exc))
logger.info(f"Caught retryable error after {details['tries']} tries. Waiting {details['wait']} seconds then retrying...")
def should_give_up(exc):
# If a non-rate-limiting related 4XX error makes it this far, it means it was unexpected and probably consistent, so we shouldn't back off
give_up = exc.response is not None and exc.response.status_code != codes.too_many_requests and 400 <= exc.response.status_code < 500
if give_up:
logger.info(f"Giving up for returned HTTP status: {exc.response.status_code}")
return give_up
return backoff.on_exception(
backoff.expo,
TRANSIENT_EXCEPTIONS,
jitter=None,
on_backoff=log_retry_attempt,
giveup=should_give_up,
max_tries=max_tries,
factor=factor,
**kwargs,
)
def user_defined_backoff_handler(max_tries: int, **kwargs):
def sleep_on_ratelimit(details):
_, exc, _ = sys.exc_info()
if isinstance(exc, UserDefinedBackoffException):
retry_after = exc.backoff
logger.info(f"Retrying. Sleeping for {retry_after} seconds")
time.sleep(retry_after + 1) # extra second to cover any fractions of second
def log_give_up(details):
_, exc, _ = sys.exc_info()
logger.error(f"Max retry limit reached. Request: {exc.request}, Response: {exc.response}")
return backoff.on_exception(
backoff.constant,
UserDefinedBackoffException,
interval=0, # skip waiting, we'll wait in on_backoff handler
on_backoff=sleep_on_ratelimit,
on_giveup=log_give_up,
jitter=None,
max_tries=max_tries,
**kwargs,
)