1
0
mirror of synced 2025-12-25 02:09:19 -05:00

CDK: add SelectiveAuthenticator (#33526)

Co-authored-by: Eugene Kulak <kulak.eugene@gmail.com>
Co-authored-by: Yevhenii Kurochkin <ykurochkin@flyaps.com>
This commit is contained in:
Eugene Kulak
2023-12-20 01:59:50 +02:00
committed by GitHub
parent ca33d3099c
commit 25bdd30fd5
6 changed files with 222 additions and 1 deletions

View File

@@ -0,0 +1,37 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from dataclasses import dataclass
from typing import Any, List, Mapping
import dpath
from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator
@dataclass
class SelectiveAuthenticator(DeclarativeAuthenticator):
"""Authenticator that selects concrete implementation based on specific config value."""
config: Mapping[str, Any]
authenticators: Mapping[str, DeclarativeAuthenticator]
authenticator_selection_path: List[str]
# returns "DeclarativeAuthenticator", but must return a subtype of "SelectiveAuthenticator"
def __new__( # type: ignore[misc]
cls,
config: Mapping[str, Any],
authenticators: Mapping[str, DeclarativeAuthenticator],
authenticator_selection_path: List[str],
*arg: Any,
**kwargs: Any,
) -> DeclarativeAuthenticator:
try:
selected_key = str(dpath.util.get(config, authenticator_selection_path))
except KeyError as err:
raise ValueError("The path from `authenticator_selection_path` is not found in the config.") from err
try:
return authenticators[selected_key]
except KeyError as err:
raise ValueError(f"The authenticator `{selected_key}` is not found.") from err

View File

@@ -224,6 +224,49 @@ definitions:
$parameters:
type: object
additionalProperties: true
SelectiveAuthenticator:
title: Selective Authenticator
description: Authenticator that selects concrete authenticator based on config property.
type: object
additionalProperties: true
required:
- type
- authenticators
- authenticator_selection_path
properties:
type:
type: string
enum: [SelectiveAuthenticator]
authenticator_selection_path:
title: Authenticator Selection Path
description: Path of the field in config with selected authenticator name
type: array
items:
type: string
examples:
- ["auth"]
- ["auth", "type"]
authenticators:
title: Authenticators
description: Authenticators to select from.
type: object
additionalProperties:
anyOf:
- "$ref": "#/definitions/ApiKeyAuthenticator"
- "$ref": "#/definitions/BasicHttpAuthenticator"
- "$ref": "#/definitions/BearerAuthenticator"
- "$ref": "#/definitions/CustomAuthenticator"
- "$ref": "#/definitions/OAuthAuthenticator"
- "$ref": "#/definitions/NoAuth"
- "$ref": "#/definitions/SessionTokenAuthenticator"
- "$ref": "#/definitions/LegacySessionTokenAuthenticator"
examples:
- authenticators:
token: "#/definitions/ApiKeyAuthenticator"
oauth: "#/definitions/OAuthAuthenticator"
$parameters:
type: object
additionalProperties: true
CheckStream:
title: Streams to Check
description: Defines the streams to try reading when running a check operation.
@@ -1149,6 +1192,7 @@ definitions:
- "$ref": "#/definitions/NoAuth"
- "$ref": "#/definitions/SessionTokenAuthenticator"
- "$ref": "#/definitions/LegacySessionTokenAuthenticator"
- "$ref": "#/definitions/SelectiveAuthenticator"
error_handler:
title: Error Handler
description: Error handler component that defines how to handle errors.

View File

@@ -1069,6 +1069,45 @@ class DeclarativeSource(BaseModel):
)
class SelectiveAuthenticator(BaseModel):
class Config:
extra = Extra.allow
type: Literal['SelectiveAuthenticator']
authenticator_selection_path: List[str] = Field(
...,
description='Path of the field in config with selected authenticator name',
examples=[['auth'], ['auth', 'type']],
title='Authenticator Selection Path',
)
authenticators: Dict[
str,
Union[
ApiKeyAuthenticator,
BasicHttpAuthenticator,
BearerAuthenticator,
CustomAuthenticator,
OAuthAuthenticator,
NoAuth,
SessionTokenAuthenticator,
LegacySessionTokenAuthenticator,
],
] = Field(
...,
description='Authenticators to select from.',
examples=[
{
'authenticators': {
'token': '#/definitions/ApiKeyAuthenticator',
'oauth': '#/definitions/OAuthAuthenticator',
}
}
],
title='Authenticators',
)
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')
class DeclarativeStream(BaseModel):
class Config:
extra = Extra.allow
@@ -1179,6 +1218,7 @@ class HttpRequester(BaseModel):
NoAuth,
SessionTokenAuthenticator,
LegacySessionTokenAuthenticator,
SelectiveAuthenticator,
]
] = Field(
None,
@@ -1313,6 +1353,7 @@ class SubstreamPartitionRouter(BaseModel):
CompositeErrorHandler.update_forward_refs()
DeclarativeSource.update_forward_refs()
SelectiveAuthenticator.update_forward_refs()
DeclarativeStream.update_forward_refs()
SessionTokenAuthenticator.update_forward_refs()
SimpleRetriever.update_forward_refs()

View File

@@ -11,8 +11,9 @@ from typing import Any, Callable, List, Mapping, Optional, Type, Union, get_args
from airbyte_cdk.models import Level
from airbyte_cdk.sources.declarative.auth import DeclarativeOauth2Authenticator
from airbyte_cdk.sources.declarative.auth.declarative_authenticator import NoAuth
from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator, NoAuth
from airbyte_cdk.sources.declarative.auth.oauth import DeclarativeSingleUseRefreshTokenOauth2Authenticator
from airbyte_cdk.sources.declarative.auth.selective_authenticator import SelectiveAuthenticator
from airbyte_cdk.sources.declarative.auth.token import (
ApiKeyAuthenticator,
BasicHttpAuthenticator,
@@ -76,6 +77,7 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import
from airbyte_cdk.sources.declarative.models.declarative_component_schema import RemoveFields as RemoveFieldsModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import RequestOption as RequestOptionModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import RequestPath as RequestPathModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import SelectiveAuthenticator as SelectiveAuthenticatorModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import SessionTokenAuthenticator as SessionTokenAuthenticatorModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import SimpleRetriever as SimpleRetrieverModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel
@@ -187,6 +189,7 @@ class ModelToComponentFactory:
RequestPathModel: self.create_request_path,
RequestOptionModel: self.create_request_option,
LegacySessionTokenAuthenticatorModel: self.create_legacy_session_token_authenticator,
SelectiveAuthenticatorModel: self.create_selective_authenticator,
SimpleRetrieverModel: self.create_simple_retriever,
SpecModel: self.create_spec,
SubstreamPartitionRouterModel: self.create_substream_partition_router,
@@ -711,6 +714,8 @@ class ModelToComponentFactory:
model.http_method if isinstance(model.http_method, str) else model.http_method.value if model.http_method is not None else "GET"
)
assert model.use_cache is not None # for mypy
return HttpRequester(
name=name,
url_base=model.url_base,
@@ -896,6 +901,16 @@ class ModelToComponentFactory:
def create_remove_fields(model: RemoveFieldsModel, config: Config, **kwargs: Any) -> RemoveFields:
return RemoveFields(field_pointers=model.field_pointers, parameters={})
def create_selective_authenticator(self, model: SelectiveAuthenticatorModel, config: Config, **kwargs: Any) -> DeclarativeAuthenticator:
authenticators = {name: self._create_component_from_model(model=auth, config=config) for name, auth in model.authenticators.items()}
# SelectiveAuthenticator will return instance of DeclarativeAuthenticator or raise ValueError error
return SelectiveAuthenticator( # type: ignore[abstract]
config=config,
authenticators=authenticators,
authenticator_selection_path=model.authenticator_selection_path,
**kwargs,
)
@staticmethod
def create_legacy_session_token_authenticator(
model: LegacySessionTokenAuthenticatorModel, config: Config, *, url_base: str, **kwargs: Any

View File

@@ -0,0 +1,39 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import pytest
from airbyte_cdk.sources.declarative.auth.selective_authenticator import SelectiveAuthenticator
def test_authenticator_selected(mocker):
authenticators = {"one": mocker.Mock(), "two": mocker.Mock()}
auth = SelectiveAuthenticator(
config={"auth": {"type": "one"}},
authenticators=authenticators,
authenticator_selection_path=["auth", "type"],
)
assert auth is authenticators["one"]
def test_selection_path_not_found(mocker):
authenticators = {"one": mocker.Mock(), "two": mocker.Mock()}
with pytest.raises(ValueError, match="The path from `authenticator_selection_path` is not found in the config"):
_ = SelectiveAuthenticator(
config={"auth": {"type": "one"}},
authenticators=authenticators,
authenticator_selection_path=["auth_type"],
)
def test_selected_auth_not_found(mocker):
authenticators = {"one": mocker.Mock(), "two": mocker.Mock()}
with pytest.raises(ValueError, match="The authenticator `unknown` is not found"):
_ = SelectiveAuthenticator(
config={"auth": {"type": "unknown"}},
authenticators=authenticators,
authenticator_selection_path=["auth", "type"],
)

View File

@@ -39,6 +39,7 @@ from airbyte_cdk.sources.declarative.models import Spec as SpecModel
from airbyte_cdk.sources.declarative.models import SubstreamPartitionRouter as SubstreamPartitionRouterModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import OffsetIncrement as OffsetIncrementModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import PageIncrement as PageIncrementModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import SelectiveAuthenticator
from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import ManifestComponentTransformer
from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ManifestReferenceResolver
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ModelToComponentFactory
@@ -936,6 +937,50 @@ requester:
}
@pytest.mark.parametrize("input_config, expected_authenticator_class", [
pytest.param(
{"auth": {"type": "token"}, "credentials": {"api_key": "some_key"}},
ApiKeyAuthenticator,
id="test_create_requester_with_selective_authenticator_and_token_selected",
),
pytest.param(
{"auth": {"type": "oauth"}, "credentials": {"client_id": "ABC"}},
DeclarativeOauth2Authenticator,
id="test_create_requester_with_selective_authenticator_and_oauth_selected",
),
]
)
def test_create_requester_with_selective_authenticator(input_config, expected_authenticator_class):
content = """
authenticator:
type: SelectiveAuthenticator
authenticator_selection_path:
- auth
- type
authenticators:
token:
type: ApiKeyAuthenticator
header: "Authorization"
api_token: "api_key={{ config['credentials']['api_key'] }}"
oauth:
type: OAuthAuthenticator
token_refresh_endpoint: https://api.url.com
client_id: "{{ config['credentials']['client_id'] }}"
client_secret: some_secret
refresh_token: some_token
"""
name = "name"
parsed_manifest = YamlDeclarativeSource._parse(content)
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)
authenticator_manifest = transformer.propagate_types_and_parameters("", resolved_manifest["authenticator"], {})
authenticator = factory.create_component(
model_type=SelectiveAuthenticator, component_definition=authenticator_manifest, config=input_config, name=name
)
assert isinstance(authenticator, expected_authenticator_class)
def test_create_composite_error_handler():
content = """
error_handler: