feat(airbyte-cdk): Have better fallback error message on HTTP error (#43399)
This commit is contained in:
committed by
GitHub
parent
f2b2a638ff
commit
61c07e8bf6
@@ -7,7 +7,11 @@ from typing import Any, List, Mapping, Optional, Union
|
||||
|
||||
import requests
|
||||
from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler
|
||||
from airbyte_cdk.sources.streams.http.error_handlers.response_models import DEFAULT_ERROR_RESOLUTION, ErrorResolution, ResponseAction
|
||||
from airbyte_cdk.sources.streams.http.error_handlers.response_models import (
|
||||
ErrorResolution,
|
||||
ResponseAction,
|
||||
create_fallback_error_resolution,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -69,4 +73,4 @@ class CompositeErrorHandler(ErrorHandler):
|
||||
if matched_error_resolution:
|
||||
return matched_error_resolution
|
||||
|
||||
return DEFAULT_ERROR_RESOLUTION
|
||||
return create_fallback_error_resolution(response_or_exception)
|
||||
|
||||
@@ -9,7 +9,11 @@ import requests
|
||||
from airbyte_cdk.sources.declarative.requesters.error_handlers.default_http_response_filter import DefaultHttpResponseFilter
|
||||
from airbyte_cdk.sources.declarative.requesters.error_handlers.http_response_filter import HttpResponseFilter
|
||||
from airbyte_cdk.sources.streams.http.error_handlers import BackoffStrategy, ErrorHandler
|
||||
from airbyte_cdk.sources.streams.http.error_handlers.response_models import DEFAULT_ERROR_RESOLUTION, SUCCESS_RESOLUTION, ErrorResolution
|
||||
from airbyte_cdk.sources.streams.http.error_handlers.response_models import (
|
||||
SUCCESS_RESOLUTION,
|
||||
ErrorResolution,
|
||||
create_fallback_error_resolution,
|
||||
)
|
||||
from airbyte_cdk.sources.types import Config
|
||||
|
||||
|
||||
@@ -114,7 +118,11 @@ class DefaultErrorHandler(ErrorHandler):
|
||||
default_reponse_filter = DefaultHttpResponseFilter(parameters={}, config=self.config)
|
||||
default_response_filter_resolution = default_reponse_filter.matches(response_or_exception)
|
||||
|
||||
return default_response_filter_resolution if default_response_filter_resolution else DEFAULT_ERROR_RESOLUTION
|
||||
return (
|
||||
default_response_filter_resolution
|
||||
if default_response_filter_resolution
|
||||
else create_fallback_error_resolution(response_or_exception)
|
||||
)
|
||||
|
||||
def backoff_time(
|
||||
self, response_or_exception: Optional[Union[requests.Response, requests.RequestException]], attempt_count: int = 0
|
||||
|
||||
@@ -7,7 +7,7 @@ from typing import Optional, Union
|
||||
import requests
|
||||
from airbyte_cdk.sources.declarative.requesters.error_handlers.http_response_filter import HttpResponseFilter
|
||||
from airbyte_cdk.sources.streams.http.error_handlers.default_error_mapping import DEFAULT_ERROR_MAPPING
|
||||
from airbyte_cdk.sources.streams.http.error_handlers.response_models import DEFAULT_ERROR_RESOLUTION, ErrorResolution
|
||||
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ErrorResolution, create_fallback_error_resolution
|
||||
|
||||
|
||||
class DefaultHttpResponseFilter(HttpResponseFilter):
|
||||
@@ -25,4 +25,6 @@ class DefaultHttpResponseFilter(HttpResponseFilter):
|
||||
|
||||
default_mapped_error_resolution = DEFAULT_ERROR_MAPPING.get(mapped_key)
|
||||
|
||||
return default_mapped_error_resolution if default_mapped_error_resolution else DEFAULT_ERROR_RESOLUTION
|
||||
return (
|
||||
default_mapped_error_resolution if default_mapped_error_resolution else create_fallback_error_resolution(response_or_exception)
|
||||
)
|
||||
|
||||
@@ -2,9 +2,12 @@
|
||||
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
from typing import Optional, Union
|
||||
|
||||
import requests
|
||||
from airbyte_cdk.models import FailureType
|
||||
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
|
||||
from requests import HTTPError
|
||||
|
||||
|
||||
class ResponseAction(Enum):
|
||||
@@ -22,10 +25,34 @@ class ErrorResolution:
|
||||
error_message: Optional[str] = None
|
||||
|
||||
|
||||
DEFAULT_ERROR_RESOLUTION = ErrorResolution(
|
||||
response_action=ResponseAction.RETRY,
|
||||
failure_type=FailureType.system_error,
|
||||
error_message="The request failed due to an unknown error.",
|
||||
)
|
||||
def _format_exception_error_message(exception: Exception) -> str:
|
||||
return f"{type(exception).__name__}: {str(exception)}"
|
||||
|
||||
|
||||
def _format_response_error_message(response: requests.Response) -> str:
|
||||
try:
|
||||
response.raise_for_status()
|
||||
except HTTPError as exception:
|
||||
return filter_secrets(f"Response was not ok: `{str(exception)}`. Response content is: {response.text}")
|
||||
# We purposefully do not add the response.content because the response is "ok" so there might be sensitive information in the payload.
|
||||
# Feel free the
|
||||
return f"Unexpected response with HTTP status {response.status_code}"
|
||||
|
||||
|
||||
def create_fallback_error_resolution(response_or_exception: Optional[Union[requests.Response, Exception]]) -> ErrorResolution:
|
||||
if response_or_exception is None:
|
||||
# We do not expect this case to happen but if it does, it would be good to understand the cause and improve the error message
|
||||
error_message = "Error handler did not receive a valid response or exception. This is unexpected please contact Airbyte Support"
|
||||
elif isinstance(response_or_exception, Exception):
|
||||
error_message = _format_exception_error_message(response_or_exception)
|
||||
else:
|
||||
error_message = _format_response_error_message(response_or_exception)
|
||||
|
||||
return ErrorResolution(
|
||||
response_action=ResponseAction.RETRY,
|
||||
failure_type=FailureType.system_error,
|
||||
error_message=error_message,
|
||||
)
|
||||
|
||||
|
||||
SUCCESS_RESOLUTION = ErrorResolution(response_action=ResponseAction.SUCCESS, failure_type=None, error_message=None)
|
||||
|
||||
@@ -10,6 +10,7 @@ from airbyte_cdk.sources.declarative.requesters.error_handlers import HttpRespon
|
||||
from airbyte_cdk.sources.declarative.requesters.error_handlers.composite_error_handler import CompositeErrorHandler
|
||||
from airbyte_cdk.sources.declarative.requesters.error_handlers.default_error_handler import DefaultErrorHandler
|
||||
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ErrorResolution, ResponseAction
|
||||
from airbyte_protocol.models import FailureType
|
||||
|
||||
SOME_BACKOFF_TIME = 60
|
||||
|
||||
@@ -97,6 +98,24 @@ def test_composite_error_handler(test_name, first_handler_behavior, second_handl
|
||||
assert retrier.interpret_response(response_mock) == expected_behavior
|
||||
|
||||
|
||||
def test_given_unmatched_response_or_exception_then_return_default_error_resolution():
|
||||
composite_error_handler = CompositeErrorHandler(
|
||||
error_handlers=[
|
||||
DefaultErrorHandler(
|
||||
response_filters=[],
|
||||
parameters={},
|
||||
config={},
|
||||
)
|
||||
],
|
||||
parameters={},
|
||||
)
|
||||
|
||||
error_resolution = composite_error_handler.interpret_response(ValueError("Any error"))
|
||||
|
||||
assert error_resolution.response_action == ResponseAction.RETRY
|
||||
assert error_resolution.failure_type == FailureType.system_error
|
||||
|
||||
|
||||
def test_composite_error_handler_no_handlers():
|
||||
try:
|
||||
CompositeErrorHandler(error_handlers=[], parameters={})
|
||||
|
||||
@@ -12,12 +12,7 @@ from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategie
|
||||
)
|
||||
from airbyte_cdk.sources.declarative.requesters.error_handlers.default_error_handler import DefaultErrorHandler, HttpResponseFilter
|
||||
from airbyte_cdk.sources.streams.http.error_handlers.default_error_mapping import DEFAULT_ERROR_MAPPING
|
||||
from airbyte_cdk.sources.streams.http.error_handlers.response_models import (
|
||||
DEFAULT_ERROR_RESOLUTION,
|
||||
ErrorResolution,
|
||||
FailureType,
|
||||
ResponseAction,
|
||||
)
|
||||
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ErrorResolution, FailureType, ResponseAction
|
||||
|
||||
SOME_BACKOFF_TIME = 60
|
||||
|
||||
@@ -55,7 +50,7 @@ SOME_BACKOFF_TIME = 60
|
||||
ErrorResolution(
|
||||
response_action=ResponseAction.RETRY,
|
||||
failure_type=FailureType.system_error,
|
||||
error_message="The request failed due to an unknown error.",
|
||||
error_message="Unexpected response with HTTP status 418",
|
||||
),
|
||||
)
|
||||
],
|
||||
@@ -246,7 +241,9 @@ def test_default_error_handler_with_unmapped_http_code():
|
||||
response_mock.ok = False
|
||||
response_mock.headers = {}
|
||||
actual_error_resolution = error_handler.interpret_response(response_mock)
|
||||
assert actual_error_resolution == DEFAULT_ERROR_RESOLUTION
|
||||
assert actual_error_resolution
|
||||
assert actual_error_resolution.failure_type == FailureType.system_error
|
||||
assert actual_error_resolution.response_action == ResponseAction.RETRY
|
||||
|
||||
|
||||
def test_predicate_takes_precedent_over_default_mapped_error():
|
||||
|
||||
@@ -7,7 +7,8 @@ from unittest.mock import MagicMock
|
||||
import pytest
|
||||
from airbyte_cdk.sources.declarative.requesters.error_handlers.default_http_response_filter import DefaultHttpResponseFilter
|
||||
from airbyte_cdk.sources.streams.http.error_handlers.default_error_mapping import DEFAULT_ERROR_MAPPING
|
||||
from airbyte_cdk.sources.streams.http.error_handlers.response_models import DEFAULT_ERROR_RESOLUTION
|
||||
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction
|
||||
from airbyte_protocol.models import FailureType
|
||||
from requests import RequestException, Response
|
||||
|
||||
|
||||
@@ -69,4 +70,6 @@ def test_unmapped_http_status_code_returns_default_error_resolution():
|
||||
)
|
||||
|
||||
actual_error_resolution = response_filter.matches(response)
|
||||
assert actual_error_resolution == DEFAULT_ERROR_RESOLUTION
|
||||
assert actual_error_resolution
|
||||
assert actual_error_resolution.failure_type == FailureType.system_error
|
||||
assert actual_error_resolution.response_action == ResponseAction.RETRY
|
||||
|
||||
@@ -0,0 +1,63 @@
|
||||
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
|
||||
|
||||
from unittest import TestCase
|
||||
|
||||
import requests
|
||||
import requests_mock
|
||||
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction, create_fallback_error_resolution
|
||||
from airbyte_cdk.utils.airbyte_secrets_utils import update_secrets
|
||||
from airbyte_protocol.models import FailureType
|
||||
|
||||
_A_SECRET = "a-secret"
|
||||
_A_URL = "https://a-url.com"
|
||||
|
||||
|
||||
class DefaultErrorResolutionTest(TestCase):
|
||||
|
||||
def setUp(self) -> None:
|
||||
update_secrets([_A_SECRET])
|
||||
|
||||
def tearDown(self) -> None:
|
||||
# to avoid other tests being impacted by added secrets
|
||||
update_secrets([])
|
||||
|
||||
def test_given_none_when_create_fallback_error_resolution_then_return_error_resolution(self) -> None:
|
||||
error_resolution = create_fallback_error_resolution(None)
|
||||
|
||||
assert error_resolution.failure_type == FailureType.system_error
|
||||
assert error_resolution.response_action == ResponseAction.RETRY
|
||||
assert error_resolution.error_message == "Error handler did not receive a valid response or exception. This is unexpected please contact Airbyte Support"
|
||||
|
||||
def test_given_exception_when_create_fallback_error_resolution_then_return_error_resolution(self) -> None:
|
||||
exception = ValueError("This is an exception")
|
||||
|
||||
error_resolution = create_fallback_error_resolution(exception)
|
||||
|
||||
assert error_resolution.failure_type == FailureType.system_error
|
||||
assert error_resolution.response_action == ResponseAction.RETRY
|
||||
assert error_resolution.error_message
|
||||
assert "ValueError" in error_resolution.error_message
|
||||
assert str(exception) in error_resolution.error_message
|
||||
|
||||
def test_given_response_can_raise_for_status_when_create_fallback_error_resolution_then_error_resolution(self) -> None:
|
||||
response = self._create_response(512)
|
||||
|
||||
error_resolution = create_fallback_error_resolution(response)
|
||||
|
||||
assert error_resolution.failure_type == FailureType.system_error
|
||||
assert error_resolution.response_action == ResponseAction.RETRY
|
||||
assert error_resolution.error_message and "512 Server Error: None for url: https://a-url.com/" in error_resolution.error_message
|
||||
|
||||
def test_given_response_is_ok_when_create_fallback_error_resolution_then_error_resolution(self) -> None:
|
||||
response = self._create_response(205)
|
||||
|
||||
error_resolution = create_fallback_error_resolution(response)
|
||||
|
||||
assert error_resolution.failure_type == FailureType.system_error
|
||||
assert error_resolution.response_action == ResponseAction.RETRY
|
||||
assert error_resolution.error_message and str(response.status_code) in error_resolution.error_message
|
||||
|
||||
def _create_response(self, status_code: int) -> requests.Response:
|
||||
with requests_mock.Mocker() as http_mocker:
|
||||
http_mocker.get(_A_URL, status_code=status_code)
|
||||
return requests.get(_A_URL)
|
||||
Reference in New Issue
Block a user