1
0
mirror of synced 2025-12-23 21:03:15 -05:00

feat(airbyte-cdk): add failure_type to HttpResponseFilter (raise config error in low-code) (#40676)

This commit is contained in:
Daryna Ishchenko
2024-07-16 16:50:29 +03:00
committed by GitHub
parent 7e5644d53a
commit a49e779c59
5 changed files with 110 additions and 5 deletions

View File

@@ -1570,11 +1570,25 @@ definitions:
- FAIL
- RETRY
- IGNORE
- RATE_LIMITED
examples:
- SUCCESS
- FAIL
- RETRY
- IGNORE
- RATE_LIMITED
failure_type:
title: Failure Type
description: Failure type of traced exception if a response matches the filter.
type: string
enum:
- system_error
- config_error
- transient_error
examples:
- system_error
- config_error
- transient_error
error_message:
title: Error Message
description: Error Message to display if the response matches the filter.

View File

@@ -532,6 +532,13 @@ class Action(Enum):
FAIL = 'FAIL'
RETRY = 'RETRY'
IGNORE = 'IGNORE'
RATE_LIMITED = 'RATE_LIMITED'
class FailureType(Enum):
system_error = 'system_error'
config_error = 'config_error'
transient_error = 'transient_error'
class HttpResponseFilter(BaseModel):
@@ -539,9 +546,15 @@ class HttpResponseFilter(BaseModel):
action: Optional[Action] = Field(
None,
description='Action to execute if a response matches the filter.',
examples=['SUCCESS', 'FAIL', 'RETRY', 'IGNORE'],
examples=['SUCCESS', 'FAIL', 'RETRY', 'IGNORE', 'RATE_LIMITED'],
title='Action',
)
failure_type: Optional[FailureType] = Field(
None,
description='Failure type of traced exception if a response matches the filter.',
examples=['system_error', 'config_error', 'transient_error'],
title='Failure Type',
)
error_message: Optional[str] = Field(
None,
description='Error Message to display if the response matches the filter.',

View File

@@ -9,7 +9,7 @@ import inspect
import re
from typing import Any, Callable, Dict, List, Mapping, Optional, Type, Union, get_args, get_origin, get_type_hints
from airbyte_cdk.models import Level
from airbyte_cdk.models import FailureType, Level
from airbyte_cdk.sources.declarative.auth import DeclarativeOauth2Authenticator, JwtAuthenticator
from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator, NoAuth
from airbyte_cdk.sources.declarative.auth.jwt import JwtAlgorithm
@@ -818,12 +818,16 @@ class ModelToComponentFactory:
action = ResponseAction(model.action.value)
else:
action = None
failure_type = FailureType(model.failure_type.value) if model.failure_type else None
http_codes = (
set(model.http_codes) if model.http_codes else set()
) # JSON schema notation has no set data type. The schema enforces an array of unique elements
return HttpResponseFilter(
action=action,
failure_type=failure_type,
error_message=model.error_message or "",
error_message_contains=model.error_message_contains or "",
http_codes=http_codes,

View File

@@ -21,9 +21,11 @@ class HttpResponseFilter:
Filter to select a response based on its HTTP status code, error message or a predicate.
If a response matches the filter, the response action, failure_type, and error message are returned as an ErrorResolution object.
For http_codes declared in the filter, the failure_type will default to `system_error`.
To override default failure_type use configured failure_type with ResponseAction.FAIL.
Attributes:
action (Union[ResponseAction, str]): action to execute if a request matches
failure_type (Union[ResponseAction, str]): failure type of traced exception if a response matches the filter
http_codes (Set[int]): http code of matching requests
error_message_contains (str): error substring of matching requests
predicate (str): predicate to apply to determine if a request is matching
@@ -33,6 +35,7 @@ class HttpResponseFilter:
config: Config
parameters: InitVar[Mapping[str, Any]]
action: Optional[Union[ResponseAction, str]] = None
failure_type: Optional[Union[FailureType, str]] = None
http_codes: Optional[Set[int]] = None
error_message_contains: Optional[str] = None
predicate: Union[InterpolatedBoolean, str] = ""
@@ -50,6 +53,8 @@ class HttpResponseFilter:
self.predicate = InterpolatedBoolean(condition=self.predicate, parameters=parameters)
self.error_message = InterpolatedString.create(string_or_interpolated=self.error_message, parameters=parameters)
self._error_message_parser = JsonErrorMessageParser()
if self.failure_type and isinstance(self.failure_type, str):
self.failure_type = FailureType[self.failure_type]
def matches(self, response_or_exception: Optional[Union[requests.Response, Exception]]) -> Optional[ErrorResolution]:
filter_action = self._matches_filter(response_or_exception)
@@ -68,7 +73,13 @@ class HttpResponseFilter:
if isinstance(response_or_exception, requests.Response):
error_message = self._create_error_message(response_or_exception)
error_message = error_message or default_error_message
failure_type = default_mapped_error_resolution.failure_type if default_mapped_error_resolution else FailureType.system_error
if self.failure_type and filter_action == ResponseAction.FAIL:
failure_type = self.failure_type
elif default_mapped_error_resolution:
failure_type = default_mapped_error_resolution.failure_type
else:
failure_type = FailureType.system_error
return ErrorResolution(
response_action=filter_action,

View File

@@ -12,10 +12,11 @@ from airbyte_cdk.sources.streams.http.error_handlers.response_models import Erro
@pytest.mark.parametrize(
"action, http_codes, predicate, error_contains, error_message, response, expected_error_resolution",
"action, failure_type, http_codes, predicate, error_contains, error_message, response, expected_error_resolution",
[
pytest.param(
ResponseAction.FAIL,
None,
{501, 503},
"",
"",
@@ -26,6 +27,7 @@ from airbyte_cdk.sources.streams.http.error_handlers.response_models import Erro
),
pytest.param(
ResponseAction.IGNORE,
None,
{403},
"",
"",
@@ -36,6 +38,7 @@ from airbyte_cdk.sources.streams.http.error_handlers.response_models import Erro
),
pytest.param(
ResponseAction.RETRY,
None,
{429},
"",
"",
@@ -46,6 +49,7 @@ from airbyte_cdk.sources.streams.http.error_handlers.response_models import Erro
),
pytest.param(
ResponseAction.FAIL,
None,
{},
'{{ response.the_body == "do_i_match" }}',
"",
@@ -56,6 +60,7 @@ from airbyte_cdk.sources.streams.http.error_handlers.response_models import Erro
),
pytest.param(
ResponseAction.FAIL,
None,
{},
'{{ headers.the_key == "header_match" }}',
"",
@@ -66,6 +71,7 @@ from airbyte_cdk.sources.streams.http.error_handlers.response_models import Erro
),
pytest.param(
ResponseAction.FAIL,
None,
{},
None,
"DENIED",
@@ -80,6 +86,7 @@ from airbyte_cdk.sources.streams.http.error_handlers.response_models import Erro
),
pytest.param(
ResponseAction.FAIL,
None,
{400, 404},
'{{ headers.error == "invalid_input" or response.reason == "bad request"}}',
"",
@@ -88,9 +95,64 @@ from airbyte_cdk.sources.streams.http.error_handlers.response_models import Erro
None,
id="test_response_does_not_match_filter",
),
pytest.param(
ResponseAction.FAIL,
FailureType.config_error,
{403, 404},
"",
"",
"check permissions",
{"status_code": 403},
ErrorResolution(response_action=ResponseAction.FAIL, failure_type=FailureType.config_error, error_message="check permissions"),
id="test_http_code_matches_failure_type_config_error",
),
pytest.param(
ResponseAction.FAIL,
FailureType.system_error,
{403, 404},
"",
"",
"check permissions",
{"status_code": 403},
ErrorResolution(response_action=ResponseAction.FAIL, failure_type=FailureType.system_error, error_message="check permissions"),
id="test_http_code_matches_failure_type_system_error",
),
pytest.param(
ResponseAction.FAIL,
FailureType.transient_error,
{500},
"",
"",
"rate limits",
{"status_code": 500},
ErrorResolution(response_action=ResponseAction.FAIL, failure_type=FailureType.transient_error, error_message="rate limits"),
id="test_http_code_matches_failure_type_transient_error",
),
pytest.param(
ResponseAction.RETRY,
FailureType.config_error,
{500},
"",
"",
"rate limits",
{"status_code": 500},
ErrorResolution(response_action=ResponseAction.RETRY, failure_type=FailureType.transient_error, error_message="rate limits"),
id="test_http_code_matches_failure_type_config_error_action_retry_uses_default_failure_type",
),
pytest.param(
ResponseAction.RATE_LIMITED,
None,
{500},
"",
"",
"rate limits",
{"status_code": 500},
ErrorResolution(response_action=ResponseAction.RATE_LIMITED, failure_type=FailureType.transient_error, error_message="rate limits"),
id="test_http_code_matches_response_action_rate_limited",
),
],
)
def test_matches(requests_mock, action, http_codes, predicate, error_contains, error_message, response, expected_error_resolution):
def test_matches(requests_mock, action, failure_type, http_codes, predicate, error_contains, error_message, response, expected_error_resolution):
requests_mock.register_uri(
"GET",
"https://airbyte.io/",
@@ -101,6 +163,7 @@ def test_matches(requests_mock, action, http_codes, predicate, error_contains, e
response = requests.get("https://airbyte.io/")
response_filter = HttpResponseFilter(
action=action,
failure_type=failure_type,
config={},
parameters={},
http_codes=http_codes,