1
0
mirror of synced 2025-12-25 02:09:19 -05:00

[low-code]: Evaluate backoff strategies at runtime (#18053)

* pass options to wait time from header

* fix constant backoff

* parameterize test

* fix tests

* missing unit tests

* eval header at runtime

* eval regex at runtime

* evaluate min_wait at runtime

* eval factor at runtime

* missing unit tests

* remove debug print

* rename

* Add tests

* Add tests

* Update docs
This commit is contained in:
Alexandre Girard
2022-11-03 18:50:47 -07:00
committed by GitHub
parent 838aebe322
commit 605fb921c4
13 changed files with 211 additions and 106 deletions

View File

@@ -229,7 +229,7 @@ class DeclarativeComponentFactory:
options = kwargs.get(OPTIONS_STR, {})
try:
# enums can't accept options
if issubclass(expected_type, enum.Enum):
if issubclass(expected_type, enum.Enum) or self.is_primitive(definition):
return expected_type(definition)
else:
return expected_type(definition, options=options)
@@ -237,6 +237,9 @@ class DeclarativeComponentFactory:
raise Exception(f"failed to instantiate type {expected_type}. {e}")
return definition
def is_primitive(self, obj):
return isinstance(obj, (int, float, bool))
@staticmethod
def is_object_definition_with_class_name(definition):
return isinstance(definition, dict) and "class_name" in definition

View File

@@ -2,11 +2,13 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from dataclasses import dataclass
from typing import Optional
from dataclasses import InitVar, dataclass
from typing import Any, Mapping, Optional, Union
import requests
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategy import BackoffStrategy
from airbyte_cdk.sources.declarative.types import Config
from dataclasses_jsonschema import JsonSchemaMixin
@@ -19,7 +21,14 @@ class ConstantBackoffStrategy(BackoffStrategy, JsonSchemaMixin):
backoff_time_in_seconds (float): time to backoff before retrying a retryable request.
"""
backoff_time_in_seconds: float
backoff_time_in_seconds: Union[float, InterpolatedString, str]
options: InitVar[Mapping[str, Any]]
config: Config
def __post_init__(self, options: Mapping[str, Any]):
if not isinstance(self.backoff_time_in_seconds, InterpolatedString):
self.backoff_time_in_seconds = str(self.backoff_time_in_seconds)
self.backoff_time_in_seconds = InterpolatedString.create(self.backoff_time_in_seconds, options=options)
def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]:
return self.backoff_time_in_seconds
return self.backoff_time_in_seconds.eval(self.config)

View File

@@ -2,11 +2,13 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from dataclasses import dataclass
from typing import Optional
from dataclasses import InitVar, dataclass
from typing import Any, Mapping, Optional, Union
import requests
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategy import BackoffStrategy
from airbyte_cdk.sources.declarative.types import Config
from dataclasses_jsonschema import JsonSchemaMixin
@@ -19,7 +21,14 @@ class ExponentialBackoffStrategy(BackoffStrategy, JsonSchemaMixin):
factor (float): multiplicative factor
"""
factor: float = 5
options: InitVar[Mapping[str, Any]]
config: Config
factor: Union[float, InterpolatedString, str] = 5
def __post_init__(self, options: Mapping[str, Any]):
if not isinstance(self.factor, InterpolatedString):
self.factor = str(self.factor)
self.factor = InterpolatedString.create(self.factor, options=options)
def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]:
return self.factor * 2**attempt_count
return self.factor.eval(self.config) * 2**attempt_count

View File

@@ -3,12 +3,14 @@
#
import re
from dataclasses import dataclass
from typing import Optional
from dataclasses import InitVar, dataclass
from typing import Any, Mapping, Optional, Union
import requests
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.header_helper import get_numeric_value_from_header
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategy import BackoffStrategy
from airbyte_cdk.sources.declarative.types import Config
from dataclasses_jsonschema import JsonSchemaMixin
@@ -22,12 +24,16 @@ class WaitTimeFromHeaderBackoffStrategy(BackoffStrategy, JsonSchemaMixin):
regex (Optional[str]): optional regex to apply on the header to extract its value
"""
header: str
header: Union[InterpolatedString, str]
options: InitVar[Mapping[str, Any]]
config: Config
regex: Optional[str] = None
def __post_init__(self):
def __post_init__(self, options: Mapping[str, Any]):
self.regex = re.compile(self.regex) if self.regex else None
self.header = InterpolatedString.create(self.header, options=options)
def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]:
header_value = get_numeric_value_from_header(response, self.header, self.regex)
header = self.header.eval(config=self.config)
header_value = get_numeric_value_from_header(response, header, self.regex)
return header_value

View File

@@ -5,12 +5,14 @@
import numbers
import re
import time
from dataclasses import dataclass
from typing import Optional
from dataclasses import InitVar, dataclass
from typing import Any, Mapping, Optional, Union
import requests
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.header_helper import get_numeric_value_from_header
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategy import BackoffStrategy
from airbyte_cdk.sources.declarative.types import Config
from dataclasses_jsonschema import JsonSchemaMixin
@@ -26,24 +28,36 @@ class WaitUntilTimeFromHeaderBackoffStrategy(BackoffStrategy, JsonSchemaMixin):
regex (Optional[str]): optional regex to apply on the header to extract its value
"""
header: str
min_wait: Optional[float] = None
regex: Optional[str] = None
header: Union[InterpolatedString, str]
options: InitVar[Mapping[str, Any]]
config: Config
min_wait: Optional[Union[float, InterpolatedString, str]] = None
regex: Optional[Union[InterpolatedString, str]] = None
def __post_init__(self):
self.regex = re.compile(self.regex) if self.regex else None
def __post_init__(self, options: Mapping[str, Any]):
self.header = InterpolatedString.create(self.header, options=options)
self.regex = InterpolatedString.create(self.regex, options=options) if self.regex else None
if not isinstance(self.min_wait, InterpolatedString):
self.min_wait = InterpolatedString.create(str(self.min_wait), options=options)
def backoff(self, response: requests.Response, attempt_count: int) -> Optional[float]:
now = time.time()
wait_until = get_numeric_value_from_header(response, self.header, self.regex)
header = self.header.eval(self.config)
if self.regex:
evaled_regex = self.regex.eval(self.config)
regex = re.compile(evaled_regex)
else:
regex = None
wait_until = get_numeric_value_from_header(response, header, regex)
min_wait = self.min_wait.eval(self.config)
if wait_until is None or not wait_until:
return self.min_wait
return min_wait
if (isinstance(wait_until, str) and wait_until.isnumeric()) or isinstance(wait_until, numbers.Number):
wait_time = float(wait_until) - now
else:
return self.min_wait
if self.min_wait:
return max(wait_time, self.min_wait)
if min_wait:
return max(wait_time, min_wait)
elif wait_time < 0:
return None
return wait_time

View File

@@ -94,6 +94,7 @@ class DefaultErrorHandler(ErrorHandler, JsonSchemaMixin):
config: Config
options: InitVar[Mapping[str, Any]]
config: Config
response_filters: Optional[List[HttpResponseFilter]] = None
max_retries: Optional[int] = 5
_max_retries: int = field(init=False, repr=False, default=5)
@@ -111,7 +112,7 @@ class DefaultErrorHandler(ErrorHandler, JsonSchemaMixin):
self.response_filters.append(HttpResponseFilter(ResponseAction.IGNORE, config={}, options={}))
if not self.backoff_strategies:
self.backoff_strategies = [DefaultErrorHandler.DEFAULT_BACKOFF_STRATEGY()]
self.backoff_strategies = [DefaultErrorHandler.DEFAULT_BACKOFF_STRATEGY(options=options, config=self.config)]
self._last_request_to_attempt_count: MutableMapping[requests.PreparedRequest, int] = {}

View File

@@ -8,17 +8,27 @@ import pytest
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.constant_backoff_strategy import ConstantBackoffStrategy
BACKOFF_TIME = 10
OPTIONS_BACKOFF_TIME = 20
CONFIG_BACKOFF_TIME = 30
@pytest.mark.parametrize(
"test_name, attempt_count, expected_backoff_time",
"test_name, attempt_count, backofftime, expected_backoff_time",
[
("test_exponential_backoff", 1, BACKOFF_TIME),
("test_exponential_backoff", 2, BACKOFF_TIME),
("test_constant_backoff_first_attempt", 1, BACKOFF_TIME, BACKOFF_TIME),
("test_constant_backoff_first_attempt_float", 1, 6.7, 6.7),
("test_constant_backoff_attempt_round_float", 1.0, 6.7, 6.7),
("test_constant_backoff_attempt_round_float", 1.5, 6.7, 6.7),
("test_constant_backoff_first_attempt_round_float", 1, 10.0, BACKOFF_TIME),
("test_constant_backoff_second_attempt_round_float", 2, 10.0, BACKOFF_TIME),
("test_constant_backoff_from_options", 1, "{{ options['backoff'] }}", OPTIONS_BACKOFF_TIME),
("test_constant_backoff_from_config", 1, "{{ config['backoff'] }}", CONFIG_BACKOFF_TIME),
],
)
def test_exponential_backoff(test_name, attempt_count, expected_backoff_time):
def test_constant_backoff(test_name, attempt_count, backofftime, expected_backoff_time):
response_mock = MagicMock()
backoff_strategy = ConstantBackoffStrategy(backoff_time_in_seconds=BACKOFF_TIME)
backoff_strategy = ConstantBackoffStrategy(
options={"backoff": OPTIONS_BACKOFF_TIME}, backoff_time_in_seconds=backofftime, config={"backoff": CONFIG_BACKOFF_TIME}
)
backoff = backoff_strategy.backoff(response_mock, attempt_count)
assert backoff == expected_backoff_time

View File

@@ -9,23 +9,28 @@ from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategie
ExponentialBackoffStrategy,
)
options = {"backoff": 5}
config = {"backoff": 5}
@pytest.mark.parametrize(
"test_name, attempt_count, expected_backoff_time",
"test_name, attempt_count, factor, expected_backoff_time",
[
("test_exponential_backoff", 1, 10),
("test_exponential_backoff", 2, 20),
("test_exponential_backoff_first_attempt", 1, 5, 10),
("test_exponential_backoff_second_attempt", 2, 5, 20),
("test_exponential_backoff_from_options", 2, "{{options['backoff']}}", 20),
("test_exponential_backoff_from_config", 2, "{{config['backoff']}}", 20),
],
)
def test_exponential_backoff(test_name, attempt_count, expected_backoff_time):
def test_exponential_backoff(test_name, attempt_count, factor, expected_backoff_time):
response_mock = MagicMock()
backoff_strategy = ExponentialBackoffStrategy(factor=5)
backoff_strategy = ExponentialBackoffStrategy(factor=factor, options=options, config=config)
backoff = backoff_strategy.backoff(response_mock, attempt_count)
assert backoff == expected_backoff_time
def test_exponential_backoff_default():
response_mock = MagicMock()
backoff_strategy = ExponentialBackoffStrategy()
backoff_strategy = ExponentialBackoffStrategy(options=options, config=config)
backoff = backoff_strategy.backoff(response_mock, 3)
assert backoff == 40

View File

@@ -17,6 +17,8 @@ SOME_BACKOFF_TIME = 60
[
("test_wait_time_from_header", "wait_time", SOME_BACKOFF_TIME, None, SOME_BACKOFF_TIME),
("test_wait_time_from_header_string", "wait_time", "60", None, SOME_BACKOFF_TIME),
("test_wait_time_from_header_options", "{{ options['wait_time'] }}", "60", None, SOME_BACKOFF_TIME),
("test_wait_time_from_header_config", "{{ config['wait_time'] }}", "60", None, SOME_BACKOFF_TIME),
("test_wait_time_from_header_not_a_number", "wait_time", "61,60", None, None),
("test_wait_time_from_header_with_regex", "wait_time", "61,60", "([-+]?\d+)", 61), # noqa
("test_wait_time_fœrom_header_with_regex_no_match", "wait_time", "...", "[-+]?\d+", None), # noqa
@@ -26,6 +28,8 @@ SOME_BACKOFF_TIME = 60
def test_wait_time_from_header(test_name, header, header_value, regex, expected_backoff_time):
response_mock = MagicMock()
response_mock.headers = {"wait_time": header_value}
backoff_stratery = WaitTimeFromHeaderBackoffStrategy(header, regex)
backoff_stratery = WaitTimeFromHeaderBackoffStrategy(
header=header, regex=regex, options={"wait_time": "wait_time"}, config={"wait_time": "wait_time"}
)
backoff = backoff_stratery.backoff(response_mock, 1)
assert backoff == expected_backoff_time

View File

@@ -11,26 +11,54 @@ from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategie
)
SOME_BACKOFF_TIME = 60
REGEX = "[-+]?\\d+"
@pytest.mark.parametrize(
"test_name, header, wait_until, min_wait, regex, expected_backoff_time",
[
("test_wait_until_time_from_header", "wait_until", 1600000060.0, None, None, 60),
("test_wait_until_time_from_header_options", "{{options['wait_until']}}", 1600000060.0, None, None, 60),
("test_wait_until_time_from_header_config", "{{config['wait_until']}}", 1600000060.0, None, None, 60),
("test_wait_until_negative_time", "wait_until", 1500000000.0, None, None, None),
("test_wait_until_time_less_than_min", "wait_until", 1600000060.0, 120, None, 120),
("test_wait_until_no_header", "absent_header", 1600000000.0, None, None, None),
("test_wait_until_time_from_header_not_numeric", "wait_until", "1600000000,1600000000", None, None, None),
("test_wait_until_time_from_header_is_numeric", "wait_until", "1600000060", None, None, 60),
("test_wait_until_time_from_header_with_regex", "wait_until", "1600000060,60", None, "[-+]?\d+", 60), # noqa
("test_wait_until_time_from_header_with_regex_from_options", "wait_until", "1600000060,60", None, "{{options['regex']}}", 60),
# noqa
("test_wait_until_time_from_header_with_regex_from_config", "wait_until", "1600000060,60", None, "{{config['regex']}}", 60), # noqa
("test_wait_until_time_from_header_with_regex_no_match", "wait_time", "...", None, "[-+]?\d+", None), # noqa
("test_wait_until_no_header_with_min", "absent_header", "1600000000.0", SOME_BACKOFF_TIME, None, SOME_BACKOFF_TIME),
(
"test_wait_until_no_header_with_min_from_options",
"absent_header",
"1600000000.0",
"{{options['min_wait']}}",
None,
SOME_BACKOFF_TIME,
),
(
"test_wait_until_no_header_with_min_from_config",
"absent_header",
"1600000000.0",
"{{config['min_wait']}}",
None,
SOME_BACKOFF_TIME,
),
],
)
@patch("time.time", return_value=1600000000.0)
def test_wait_untiltime_from_header(time_mock, test_name, header, wait_until, min_wait, regex, expected_backoff_time):
response_mock = MagicMock()
response_mock.headers = {"wait_until": wait_until}
backoff_stratery = WaitUntilTimeFromHeaderBackoffStrategy(header, min_wait, regex)
backoff_stratery = WaitUntilTimeFromHeaderBackoffStrategy(
header=header,
min_wait=min_wait,
regex=regex,
options={"wait_until": "wait_until", "regex": REGEX, "min_wait": SOME_BACKOFF_TIME},
config={"wait_until": "wait_until", "regex": REGEX, "min_wait": SOME_BACKOFF_TIME},
)
backoff = backoff_stratery.backoff(response_mock, 1)
assert backoff == expected_backoff_time

View File

@@ -30,7 +30,7 @@ SOME_BACKOFF_TIME = 60
None,
{},
ResponseStatus.retry(SOME_BACKOFF_TIME),
[ConstantBackoffStrategy(SOME_BACKOFF_TIME)],
[ConstantBackoffStrategy(options={}, backoff_time_in_seconds=SOME_BACKOFF_TIME, config={})],
),
("test_exponential_backoff", HTTPStatus.BAD_GATEWAY, None, None, {}, ResponseStatus.retry(10), None),
(
@@ -40,7 +40,7 @@ SOME_BACKOFF_TIME = 60
None,
{},
ResponseStatus.retry(10),
[DefaultErrorHandler.DEFAULT_BACKOFF_STRATEGY()],
[DefaultErrorHandler.DEFAULT_BACKOFF_STRATEGY(options={}, config={})],
),
("test_chain_backoff_strategy", HTTPStatus.BAD_GATEWAY, None, None, {}, ResponseStatus.retry(10), None),
(
@@ -50,7 +50,10 @@ SOME_BACKOFF_TIME = 60
None,
{},
ResponseStatus.retry(10),
[DefaultErrorHandler.DEFAULT_BACKOFF_STRATEGY(), ConstantBackoffStrategy(SOME_BACKOFF_TIME)],
[
DefaultErrorHandler.DEFAULT_BACKOFF_STRATEGY(options={}, config={}),
ConstantBackoffStrategy(options={}, backoff_time_in_seconds=SOME_BACKOFF_TIME, config={}),
],
),
("test_200", HTTPStatus.OK, None, None, {}, response_status.SUCCESS, None),
("test_3XX", HTTPStatus.PERMANENT_REDIRECT, None, None, {}, response_status.SUCCESS, None),

View File

@@ -415,31 +415,31 @@ def test_create_record_selector(test_name, record_selector, expected_runtime_sel
(
"test_option_in_selector",
"""
extractor:
type: DpathExtractor
field_pointer: ["{{ options['name'] }}"]
selector:
class_name: airbyte_cdk.sources.declarative.extractors.record_selector.RecordSelector
$options:
name: "selector"
extractor: "*ref(extractor)"
""",
extractor:
type: DpathExtractor
field_pointer: ["{{ options['name'] }}"]
selector:
class_name: airbyte_cdk.sources.declarative.extractors.record_selector.RecordSelector
$options:
name: "selector"
extractor: "*ref(extractor)"
""",
"selector",
),
(
"test_option_in_extractor",
"""
extractor:
type: DpathExtractor
$options:
name: "extractor"
field_pointer: ["{{ options['name'] }}"]
selector:
class_name: airbyte_cdk.sources.declarative.extractors.record_selector.RecordSelector
$options:
name: "selector"
extractor: "*ref(extractor)"
""",
extractor:
type: DpathExtractor
$options:
name: "extractor"
field_pointer: ["{{ options['name'] }}"]
selector:
class_name: airbyte_cdk.sources.declarative.extractors.record_selector.RecordSelector
$options:
name: "selector"
extractor: "*ref(extractor)"
""",
"extractor",
),
],
@@ -451,8 +451,53 @@ def test_options_propagation(test_name, content, expected_field_pointer_value):
assert selector.extractor.field_pointer[0].eval(input_config) == expected_field_pointer_value
def test_create_requester():
content = """
@pytest.mark.parametrize(
"test_name, error_handler",
[
(
"test_create_requester_constant_error_handler",
"""
error_handler:
backoff_strategies:
- type: "ConstantBackoffStrategy"
backoff_time_in_seconds: 5
""",
),
(
"test_create_requester_exponential_error_handler",
"""
error_handler:
backoff_strategies:
- type: "ExponentialBackoffStrategy"
factor: 5
""",
),
(
"test_create_requester_wait_time_from_header_error_handler",
"""
error_handler:
backoff_strategies:
- type: "WaitTimeFromHeader"
header: "a_header"
""",
),
(
"test_create_requester_wait_time_until_from_header_error_handler",
"""
error_handler:
backoff_strategies:
- type: "WaitUntilTimeFromHeader"
header: "a_header"
""",
),
(
"test_create_requester_no_error_handler",
"""""",
),
],
)
def test_create_requester(test_name, error_handler):
content = f"""
requester:
type: HttpRequester
path: "/v3/marketing/lists"
@@ -461,13 +506,14 @@ def test_create_requester():
url_base: "https://api.sendgrid.com"
authenticator:
type: "BasicHttpAuthenticator"
username: "{{ options.name }}"
password: "{{ config.apikey }}"
username: "{{{{ options.name}}}}"
password: "{{{{ config.apikey }}}}"
request_options_provider:
request_parameters:
a_parameter: "something_here"
request_headers:
header: header_value
{error_handler}
"""
config = parser.parse(content)
@@ -498,7 +544,6 @@ def test_create_composite_error_handler():
- response_filters:
- http_codes: [ 403 ]
action: RETRY
error_message: "Retryable error received: {{ response.message }}"
"""
config = parser.parse(content)
@@ -510,7 +555,6 @@ def test_create_composite_error_handler():
assert isinstance(component.error_handlers[0].response_filters[0], HttpResponseFilter)
assert component.error_handlers[0].response_filters[0].predicate.condition == "{{ 'code' in response }}"
assert component.error_handlers[1].response_filters[0].http_codes == [403]
assert component.error_handlers[1].response_filters[0].error_message.string == "Retryable error received: {{ response.message }}"
assert isinstance(component, CompositeErrorHandler)
@@ -685,37 +729,6 @@ class TestCreateTransformations:
]
assert expected == component.transformations
def test_add_fields_path_in_options(self):
content = f"""
the_stream:
class_name: airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream
$options:
{self.base_options}
path: "/wrong_path"
transformations:
- type: AddFields
fields:
- path: ["field1"]
value: "static_value"
"""
config = parser.parse(content)
factory.create_component(config["the_stream"], input_config, False)
component = factory.create_component(config["the_stream"], input_config)()
assert isinstance(component, DeclarativeStream)
expected = [
AddFields(
fields=[
AddedFieldDefinition(
path=["field1"], value=InterpolatedString(string="static_value", default="static_value", options={}), options={}
)
],
options={},
)
]
assert expected == component.transformations
def test_validation_wrong_input_type():
content = """

View File

@@ -157,8 +157,8 @@ Schema:
BackoffStrategy:
type: object
anyOf:
- "$ref": "#/definitions/ExponentialBackoff"
- "$ref": "#/definitions/ConstantBackoff"
- "$ref": "#/definitions/ExponentialBackoffStrategy"
- "$ref": "#/definitions/ConstantBackoffStrategy"
- "$ref": "#/definitions/WaitTimeFromHeader"
- "$ref": "#/definitions/WaitUntilTimeFromHeader"
```
@@ -170,7 +170,7 @@ This is the default backoff strategy. The requester will backoff with an exponen
Schema:
```yaml
ExponentialBackoff:
ExponentialBackoffStrategy:
type: object
additionalProperties: true
properties:
@@ -183,12 +183,12 @@ Schema:
### Constant Backoff
When using the `ConstantBackoff` strategy, the requester will backoff with a constant interval.
When using the `ConstantBackoffStrategy` strategy, the requester will backoff with a constant interval.
Schema:
```yaml
ConstantBackoff:
ConstantBackoffStrategy:
type: object
additionalProperties: true
required:
@@ -340,13 +340,13 @@ requester:
- predicate: "{{ 'code' in response }}"
action: RETRY
backoff_strategies:
- type: "ConstantBackoff"
- type: "ConstantBackoffStrategy"
backoff_time_in_seconds: 5
- response_filters:
- http_codes: [ 403 ]
action: RETRY
backoff_strategies:
- type: "ExponentialBackoff"
- type: "ExponentialBackoffStrategy"
```
## More readings