1
0
mirror of synced 2025-12-23 21:03:15 -05:00

[concurrent low-code] Add concurrency_level to manifest and allow it to be parsed into a runtime object (#45943)

This commit is contained in:
Brian Lai
2024-10-08 17:04:11 -04:00
committed by GitHub
parent 0567c798f9
commit c2923bd095
8 changed files with 197 additions and 0 deletions

View File

@@ -0,0 +1,7 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from airbyte_cdk.sources.declarative.concurrency_level.concurrency_level import ConcurrencyLevel
__all__ = ["ConcurrencyLevel"]

View File

@@ -0,0 +1,42 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from dataclasses import InitVar, dataclass
from typing import Any, Mapping, Optional, Union
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.types import Config
@dataclass
class ConcurrencyLevel:
"""
Returns the number of worker threads that should be used when syncing concurrent streams in parallel
Attributes:
default_concurrency (Union[int, str]): The hardcoded integer or interpolation of how many worker threads to use during a sync
max_concurrency (Optional[int]): The maximum number of worker threads to use when the default_concurrency is exceeded
"""
default_concurrency: Union[int, str]
max_concurrency: Optional[int]
config: Config
parameters: InitVar[Mapping[str, Any]]
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
if isinstance(self.default_concurrency, int):
self._default_concurrency: Union[int, InterpolatedString] = self.default_concurrency
elif "config" in self.default_concurrency and not self.max_concurrency:
raise ValueError("ConcurrencyLevel requires that max_concurrency be defined if the default_concurrency can be used-specified")
else:
self._default_concurrency = InterpolatedString.create(self.default_concurrency, parameters=parameters)
def get_concurrency_level(self) -> int:
if isinstance(self._default_concurrency, InterpolatedString):
evaluated_default_concurrency = self._default_concurrency.eval(config=self.config)
if not isinstance(evaluated_default_concurrency, int):
raise ValueError("default_concurrency did not evaluate to an integer")
return min(evaluated_default_concurrency, self.max_concurrency) if self.max_concurrency else evaluated_default_concurrency
else:
return self._default_concurrency

View File

@@ -28,6 +28,8 @@ properties:
type: object
spec:
"$ref": "#/definitions/Spec"
concurrency_level:
"$ref": "#/definitions/ConcurrencyLevel"
metadata:
type: object
description: For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.
@@ -315,6 +317,37 @@ definitions:
$parameters:
type: object
additionalProperties: true
ConcurrencyLevel:
title: Concurrency Level
description: Defines the amount of parallelization for the streams that are being synced. The factor of parallelization is how many partitions or streams are synced at the same time. For example, with a concurrency_level of 10, ten streams or partitions of data will processed at the same time.
type: object
required:
- default_concurrency
properties:
type:
type: string
enum: [ConcurrencyLevel]
default_concurrency:
title: Default Concurrency
description: The amount of concurrency that will applied during a sync. This value can be hardcoded or user-defined in the config if different users have varying volume thresholds in the target API.
anyOf:
- type: integer
- type: string
interpolation_context:
- config
examples:
- 10
- "{{ config['num_workers'] or 10 }}"
max_concurrency:
title: Max Concurrency
description: The maximum level of concurrency that will be used during a sync. This becomes a required field when the default_concurrency derives from the config, because it serves as a safeguard against a user-defined threshold that is too high.
type: integer
examples:
- 20
- 100
$parameters:
type: object
additionalProperties: true
ConstantBackoffStrategy:
title: Constant Backoff
description: Backoff strategy with a constant backoff interval.

View File

@@ -53,6 +53,23 @@ class CheckStream(BaseModel):
)
class ConcurrencyLevel(BaseModel):
type: Optional[Literal['ConcurrencyLevel']] = None
default_concurrency: Union[int, str] = Field(
...,
description='The amount of concurrency that will applied during a sync. This value can be hardcoded or user-defined in the config if different users have varying volume thresholds in the target API.',
examples=[10, "{{ config['num_workers'] or 10 }}"],
title='Default Concurrency',
)
max_concurrency: Optional[int] = Field(
None,
description='The maximum level of concurrency that will be used during a sync. This becomes a required field when the default_concurrency derives from the config, because it serves as a safeguard against a user-defined threshold that is too high.',
examples=[20, 100],
title='Max Concurrency',
)
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')
class ConstantBackoffStrategy(BaseModel):
type: Literal['ConstantBackoffStrategy']
backoff_time_in_seconds: Union[float, str] = Field(
@@ -1304,6 +1321,7 @@ class DeclarativeSource(BaseModel):
schemas: Optional[Schemas] = None
definitions: Optional[Dict[str, Any]] = None
spec: Optional[Spec] = None
concurrency_level: Optional[ConcurrencyLevel] = None
metadata: Optional[Dict[str, Any]] = Field(
None,
description='For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.',

View File

@@ -28,6 +28,7 @@ from airbyte_cdk.sources.declarative.auth.token import (
)
from airbyte_cdk.sources.declarative.auth.token_provider import InterpolatedStringTokenProvider, SessionTokenProvider, TokenProvider
from airbyte_cdk.sources.declarative.checks import CheckStream
from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
from airbyte_cdk.sources.declarative.datetime import MinMaxDatetime
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.decoders import Decoder, IterableDecoder, JsonDecoder, JsonlDecoder
@@ -56,6 +57,7 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import
from airbyte_cdk.sources.declarative.models.declarative_component_schema import BearerAuthenticator as BearerAuthenticatorModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import CheckStream as CheckStreamModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import CompositeErrorHandler as CompositeErrorHandlerModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ConcurrencyLevel as ConcurrencyLevelModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ConstantBackoffStrategy as ConstantBackoffStrategyModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import CursorPagination as CursorPaginationModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import CustomAuthenticator as CustomAuthenticatorModel
@@ -195,6 +197,7 @@ class ModelToComponentFactory:
BearerAuthenticatorModel: self.create_bearer_authenticator,
CheckStreamModel: self.create_check_stream,
CompositeErrorHandlerModel: self.create_composite_error_handler,
ConcurrencyLevelModel: self.create_concurrency_level,
ConstantBackoffStrategyModel: self.create_constant_backoff_strategy,
CursorPaginationModel: self.create_cursor_pagination,
CustomAuthenticatorModel: self.create_custom_component,
@@ -438,6 +441,15 @@ class ModelToComponentFactory:
]
return CompositeErrorHandler(error_handlers=error_handlers, parameters=model.parameters or {})
@staticmethod
def create_concurrency_level(model: ConcurrencyLevelModel, config: Config, **kwargs: Any) -> ConcurrencyLevel:
return ConcurrencyLevel(
default_concurrency=model.default_concurrency,
max_concurrency=model.max_concurrency,
config=config,
parameters={},
)
@staticmethod
def create_constant_backoff_strategy(model: ConstantBackoffStrategyModel, config: Config, **kwargs: Any) -> ConstantBackoffStrategy:
return ConstantBackoffStrategy(

View File

@@ -0,0 +1,71 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
from typing import Any, Mapping, Optional, Type, Union
import pytest
from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
@pytest.mark.parametrize(
"default_concurrency, max_concurrency, expected_concurrency",
[
pytest.param(20, 75, 20, id="test_default_concurrency_as_int"),
pytest.param(20, 75, 20, id="test_default_concurrency_as_int_ignores_max_concurrency"),
pytest.param("{{ config['num_workers'] or 40 }}", 75, 50, id="test_default_concurrency_using_interpolation"),
pytest.param("{{ config['missing'] or 40 }}", 75, 40, id="test_default_concurrency_using_interpolation_no_value"),
pytest.param("{{ config['num_workers'] or 40 }}", 10, 10, id="test_use_max_concurrency_if_default_is_too_high"),
],
)
def test_stream_slices(default_concurrency: Union[int, str], max_concurrency: int, expected_concurrency: int) -> None:
config = {"num_workers": 50}
concurrency_level = ConcurrencyLevel(
default_concurrency=default_concurrency,
max_concurrency=max_concurrency,
config=config,
parameters={}
)
actual_concurrency = concurrency_level.get_concurrency_level()
assert actual_concurrency == expected_concurrency
@pytest.mark.parametrize(
"config, expected_concurrency, expected_error",
[
pytest.param({"num_workers": "fifty five"}, None, ValueError, id="test_invalid_default_concurrency_as_string"),
pytest.param({"num_workers": "55"}, 55, None, id="test_default_concurrency_as_string_int"),
pytest.param({"num_workers": 60}, 60, None, id="test_default_concurrency_as_int"),
],
)
def test_default_concurrency_input_types_and_errors(
config: Mapping[str, Any],
expected_concurrency: Optional[int],
expected_error: Optional[Type[Exception]],
) -> None:
concurrency_level = ConcurrencyLevel(
default_concurrency="{{ config['num_workers'] or 30 }}",
max_concurrency=65,
config=config,
parameters={}
)
if expected_error:
with pytest.raises(expected_error):
concurrency_level.get_concurrency_level()
else:
actual_concurrency = concurrency_level.get_concurrency_level()
assert actual_concurrency == expected_concurrency
def test_max_concurrency_is_required_for_default_concurrency_using_config() -> None:
config = {"num_workers": "50"}
with pytest.raises(ValueError):
ConcurrencyLevel(
default_concurrency="{{ config['num_workers'] or 40 }}",
max_concurrency=None,
config=config,
parameters={}
)

View File

@@ -19,6 +19,7 @@ from airbyte_cdk.sources.declarative.auth.token import (
)
from airbyte_cdk.sources.declarative.auth.token_provider import SessionTokenProvider
from airbyte_cdk.sources.declarative.checks import CheckStream
from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
from airbyte_cdk.sources.declarative.datetime import MinMaxDatetime
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.decoders import JsonDecoder
@@ -28,6 +29,7 @@ from airbyte_cdk.sources.declarative.incremental import CursorFactory, DatetimeB
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.models import CheckStream as CheckStreamModel
from airbyte_cdk.sources.declarative.models import CompositeErrorHandler as CompositeErrorHandlerModel
from airbyte_cdk.sources.declarative.models import ConcurrencyLevel as ConcurrencyLevelModel
from airbyte_cdk.sources.declarative.models import CustomErrorHandler as CustomErrorHandlerModel
from airbyte_cdk.sources.declarative.models import CustomPartitionRouter as CustomPartitionRouterModel
from airbyte_cdk.sources.declarative.models import CustomSchemaLoader as CustomSchemaLoaderModel
@@ -197,6 +199,10 @@ list_stream:
check:
type: CheckStream
stream_names: ["list_stream"]
concurrency_level:
type: ConcurrencyLevel
default_concurrency: "{{ config['num_workers'] or 10 }}"
max_concurrency: 25
spec:
type: Spec
documentation_url: https://airbyte.com/#yaml-from-manifest
@@ -311,6 +317,14 @@ spec:
advanced_auth = spec.advanced_auth
assert advanced_auth.auth_flow_type.value == "oauth2.0"
concurrency_level = factory.create_component(
model_type=ConcurrencyLevelModel, component_definition=manifest["concurrency_level"], config=input_config
)
assert isinstance(concurrency_level, ConcurrencyLevel)
assert isinstance(concurrency_level._default_concurrency, InterpolatedString)
assert concurrency_level._default_concurrency.string == "{{ config['num_workers'] or 10 }}"
assert concurrency_level.max_concurrency == 25
def test_interpolate_config():
content = """