1
0
mirror of synced 2025-12-19 18:14:56 -05:00
Files
2025-10-02 13:29:00 +03:00

153 lines
7.1 KiB
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from typing import Optional, Union
import requests
from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler, ErrorResolution, HttpStatusErrorHandler, ResponseAction
from airbyte_cdk.sources.streams.http.error_handlers.default_error_mapping import DEFAULT_ERROR_MAPPING
from . import constants
GITHUB_DEFAULT_ERROR_MAPPING = DEFAULT_ERROR_MAPPING | {
401: ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.config_error,
error_message="Conflict.",
),
403: ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.config_error,
error_message="Conflict.",
),
404: ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.config_error,
error_message="Conflict.",
),
409: ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.config_error,
error_message="Conflict.",
),
410: ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.config_error,
error_message="Gone. Please ensure the url is valid.",
),
}
def is_conflict_with_empty_repository(response_or_exception: Optional[Union[requests.Response, Exception]] = None) -> bool:
if isinstance(response_or_exception, requests.Response) and response_or_exception.status_code == requests.codes.CONFLICT:
response_data = response_or_exception.json()
return response_data.get("message") == "Git Repository is empty."
return False
class GithubStreamABCErrorHandler(HttpStatusErrorHandler):
def __init__(self, stream: HttpStream, **kwargs): # type: ignore # noqa
self.stream = stream
super().__init__(**kwargs)
def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]] = None) -> ErrorResolution:
if isinstance(response_or_exception, requests.Response):
retry_flag = (
# The GitHub GraphQL API has limitations
# https://docs.github.com/en/graphql/overview/resource-limitations
(
response_or_exception.headers.get("X-RateLimit-Resource") == "graphql"
and self.stream.check_graphql_rate_limited(response_or_exception.json())
)
# Rate limit HTTP headers
# https://docs.github.com/en/rest/overview/resources-in-the-rest-api#rate-limit-http-headers
or (response_or_exception.status_code != 200 and response_or_exception.headers.get("X-RateLimit-Remaining") == "0")
# Secondary rate limits
# https://docs.github.com/en/rest/overview/resources-in-the-rest-api#secondary-rate-limits
or "Retry-After" in response_or_exception.headers
)
if retry_flag:
headers = [
"X-RateLimit-Resource",
"X-RateLimit-Remaining",
"X-RateLimit-Reset",
"X-RateLimit-Limit",
"X-RateLimit-Used",
"Retry-After",
]
string_headers = ", ".join(
[f"{h}: {response_or_exception.headers[h]}" for h in headers if h in response_or_exception.headers]
)
if string_headers:
string_headers = f"HTTP headers: {string_headers},"
self._logger.info(
f"Rate limit handling for stream `{self.stream.name}` for the response with {response_or_exception.status_code} status code, {string_headers} with message: {response_or_exception.text}"
)
return ErrorResolution(
response_action=ResponseAction.RATE_LIMITED,
failure_type=FailureType.transient_error,
error_message=f"Response status code: {response_or_exception.status_code}. Retrying...",
)
if is_conflict_with_empty_repository(response_or_exception=response_or_exception):
log_message = f"Ignoring response for '{response_or_exception.request.method}' request to '{response_or_exception.url}' with response code '{response_or_exception.status_code}' as the repository is empty."
return ErrorResolution(
response_action=ResponseAction.IGNORE,
failure_type=FailureType.config_error,
error_message=log_message,
)
return super().interpret_response(response_or_exception)
class ContributorActivityErrorHandler(HttpStatusErrorHandler):
"""
This custom error handler is needed for streams based on repository statistics endpoints like ContributorActivity because
when requesting data that hasn't been cached yet when the request is made, you'll receive a 202 response. And these requests
need to retried to get the actual results.
See the docs for more info:
https://docs.github.com/en/rest/metrics/statistics?apiVersion=2022-11-28#a-word-about-caching
"""
def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]] = None) -> ErrorResolution:
if isinstance(response_or_exception, requests.Response) and response_or_exception.status_code == requests.codes.ACCEPTED:
return ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message=f"Response status code: {response_or_exception.status_code}. Retrying...",
)
return super().interpret_response(response_or_exception)
class GitHubGraphQLErrorHandler(GithubStreamABCErrorHandler):
def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]] = None) -> ErrorResolution:
if isinstance(response_or_exception, requests.Response):
if response_or_exception.status_code in (requests.codes.BAD_GATEWAY, requests.codes.GATEWAY_TIMEOUT):
self.stream.page_size = int(self.stream.page_size / 2)
return ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message=f"Response status code: {response_or_exception.status_code}. Retrying...",
)
self.stream.page_size = (
constants.DEFAULT_PAGE_SIZE_FOR_LARGE_STREAM if self.stream.large_stream else constants.DEFAULT_PAGE_SIZE
)
if response_or_exception.json().get("errors"):
return ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message=f"Response status code: {response_or_exception.status_code}. Retrying...",
)
return super().interpret_response(response_or_exception)