153 lines
7.1 KiB
Python
153 lines
7.1 KiB
Python
#
|
|
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
from typing import Optional, Union
|
|
|
|
import requests
|
|
|
|
from airbyte_cdk.models import FailureType
|
|
from airbyte_cdk.sources.streams.http import HttpStream
|
|
from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler, ErrorResolution, HttpStatusErrorHandler, ResponseAction
|
|
from airbyte_cdk.sources.streams.http.error_handlers.default_error_mapping import DEFAULT_ERROR_MAPPING
|
|
|
|
from . import constants
|
|
|
|
|
|
GITHUB_DEFAULT_ERROR_MAPPING = DEFAULT_ERROR_MAPPING | {
|
|
401: ErrorResolution(
|
|
response_action=ResponseAction.RETRY,
|
|
failure_type=FailureType.config_error,
|
|
error_message="Conflict.",
|
|
),
|
|
403: ErrorResolution(
|
|
response_action=ResponseAction.RETRY,
|
|
failure_type=FailureType.config_error,
|
|
error_message="Conflict.",
|
|
),
|
|
404: ErrorResolution(
|
|
response_action=ResponseAction.RETRY,
|
|
failure_type=FailureType.config_error,
|
|
error_message="Conflict.",
|
|
),
|
|
409: ErrorResolution(
|
|
response_action=ResponseAction.RETRY,
|
|
failure_type=FailureType.config_error,
|
|
error_message="Conflict.",
|
|
),
|
|
410: ErrorResolution(
|
|
response_action=ResponseAction.RETRY,
|
|
failure_type=FailureType.config_error,
|
|
error_message="Gone. Please ensure the url is valid.",
|
|
),
|
|
}
|
|
|
|
|
|
def is_conflict_with_empty_repository(response_or_exception: Optional[Union[requests.Response, Exception]] = None) -> bool:
|
|
if isinstance(response_or_exception, requests.Response) and response_or_exception.status_code == requests.codes.CONFLICT:
|
|
response_data = response_or_exception.json()
|
|
return response_data.get("message") == "Git Repository is empty."
|
|
return False
|
|
|
|
|
|
class GithubStreamABCErrorHandler(HttpStatusErrorHandler):
|
|
def __init__(self, stream: HttpStream, **kwargs): # type: ignore # noqa
|
|
self.stream = stream
|
|
super().__init__(**kwargs)
|
|
|
|
def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]] = None) -> ErrorResolution:
|
|
if isinstance(response_or_exception, requests.Response):
|
|
retry_flag = (
|
|
# The GitHub GraphQL API has limitations
|
|
# https://docs.github.com/en/graphql/overview/resource-limitations
|
|
(
|
|
response_or_exception.headers.get("X-RateLimit-Resource") == "graphql"
|
|
and self.stream.check_graphql_rate_limited(response_or_exception.json())
|
|
)
|
|
# Rate limit HTTP headers
|
|
# https://docs.github.com/en/rest/overview/resources-in-the-rest-api#rate-limit-http-headers
|
|
or (response_or_exception.status_code != 200 and response_or_exception.headers.get("X-RateLimit-Remaining") == "0")
|
|
# Secondary rate limits
|
|
# https://docs.github.com/en/rest/overview/resources-in-the-rest-api#secondary-rate-limits
|
|
or "Retry-After" in response_or_exception.headers
|
|
)
|
|
if retry_flag:
|
|
headers = [
|
|
"X-RateLimit-Resource",
|
|
"X-RateLimit-Remaining",
|
|
"X-RateLimit-Reset",
|
|
"X-RateLimit-Limit",
|
|
"X-RateLimit-Used",
|
|
"Retry-After",
|
|
]
|
|
string_headers = ", ".join(
|
|
[f"{h}: {response_or_exception.headers[h]}" for h in headers if h in response_or_exception.headers]
|
|
)
|
|
if string_headers:
|
|
string_headers = f"HTTP headers: {string_headers},"
|
|
|
|
self._logger.info(
|
|
f"Rate limit handling for stream `{self.stream.name}` for the response with {response_or_exception.status_code} status code, {string_headers} with message: {response_or_exception.text}"
|
|
)
|
|
return ErrorResolution(
|
|
response_action=ResponseAction.RATE_LIMITED,
|
|
failure_type=FailureType.transient_error,
|
|
error_message=f"Response status code: {response_or_exception.status_code}. Retrying...",
|
|
)
|
|
|
|
if is_conflict_with_empty_repository(response_or_exception=response_or_exception):
|
|
log_message = f"Ignoring response for '{response_or_exception.request.method}' request to '{response_or_exception.url}' with response code '{response_or_exception.status_code}' as the repository is empty."
|
|
return ErrorResolution(
|
|
response_action=ResponseAction.IGNORE,
|
|
failure_type=FailureType.config_error,
|
|
error_message=log_message,
|
|
)
|
|
|
|
return super().interpret_response(response_or_exception)
|
|
|
|
|
|
class ContributorActivityErrorHandler(HttpStatusErrorHandler):
|
|
"""
|
|
This custom error handler is needed for streams based on repository statistics endpoints like ContributorActivity because
|
|
when requesting data that hasn't been cached yet when the request is made, you'll receive a 202 response. And these requests
|
|
need to retried to get the actual results.
|
|
|
|
See the docs for more info:
|
|
https://docs.github.com/en/rest/metrics/statistics?apiVersion=2022-11-28#a-word-about-caching
|
|
"""
|
|
|
|
def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]] = None) -> ErrorResolution:
|
|
if isinstance(response_or_exception, requests.Response) and response_or_exception.status_code == requests.codes.ACCEPTED:
|
|
return ErrorResolution(
|
|
response_action=ResponseAction.RETRY,
|
|
failure_type=FailureType.transient_error,
|
|
error_message=f"Response status code: {response_or_exception.status_code}. Retrying...",
|
|
)
|
|
|
|
return super().interpret_response(response_or_exception)
|
|
|
|
|
|
class GitHubGraphQLErrorHandler(GithubStreamABCErrorHandler):
|
|
def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]] = None) -> ErrorResolution:
|
|
if isinstance(response_or_exception, requests.Response):
|
|
if response_or_exception.status_code in (requests.codes.BAD_GATEWAY, requests.codes.GATEWAY_TIMEOUT):
|
|
self.stream.page_size = int(self.stream.page_size / 2)
|
|
return ErrorResolution(
|
|
response_action=ResponseAction.RETRY,
|
|
failure_type=FailureType.transient_error,
|
|
error_message=f"Response status code: {response_or_exception.status_code}. Retrying...",
|
|
)
|
|
|
|
self.stream.page_size = (
|
|
constants.DEFAULT_PAGE_SIZE_FOR_LARGE_STREAM if self.stream.large_stream else constants.DEFAULT_PAGE_SIZE
|
|
)
|
|
|
|
if response_or_exception.json().get("errors"):
|
|
return ErrorResolution(
|
|
response_action=ResponseAction.RETRY,
|
|
failure_type=FailureType.transient_error,
|
|
error_message=f"Response status code: {response_or_exception.status_code}. Retrying...",
|
|
)
|
|
|
|
return super().interpret_response(response_or_exception)
|