1
0
mirror of synced 2025-12-23 21:03:15 -05:00

[low code connectors] perform schema validation of the input config against the declarative language schema (#15543)

* draft: first pass at complete schema language generation and factory validator

* actually a working validator and fixes to the schema that went uncaught

* remove extra spike file

* fix formatting file

* pr feedback and a little bit of refactoring

* fix some types that were erroneously marked as invalid schema

* some comments

* add jsonschemamixin to interfaces

* update changelog

* bump version
This commit is contained in:
Brian Lai
2022-08-18 15:29:26 -04:00
committed by GitHub
parent 1a641f6e11
commit ca80d3782a
31 changed files with 670 additions and 93 deletions

View File

@@ -1,5 +1,8 @@
# Changelog
## 0.1.77
- Add schema validation for declarative YAML connector configs
## 0.1.76
- Bugfix: Correctly set parent slice stream for sub-resource streams

View File

@@ -0,0 +1,26 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from dataclasses import dataclass
from airbyte_cdk.sources.streams.http.requests_native_auth.abstract_token import AbstractHeaderAuthenticator
from dataclasses_jsonschema import JsonSchemaMixin
@dataclass
class DeclarativeAuthenticator(JsonSchemaMixin):
"""
Interface used to associate which authenticators can be used as part of the declarative framework
"""
@dataclass
class NoAuth(AbstractHeaderAuthenticator, DeclarativeAuthenticator, JsonSchemaMixin):
@property
def auth_header(self) -> str:
return ""
@property
def token(self) -> str:
return ""

View File

@@ -6,6 +6,7 @@ from dataclasses import InitVar, dataclass, field
from typing import Any, List, Mapping, Optional, Union
import pendulum
from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator
from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.streams.http.requests_native_auth.abstract_oauth import AbstractOauth2Authenticator
@@ -13,7 +14,7 @@ from dataclasses_jsonschema import JsonSchemaMixin
@dataclass
class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, JsonSchemaMixin):
class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, DeclarativeAuthenticator, JsonSchemaMixin):
"""
Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials based on
a declarative connector configuration file. Credentials can be defined explicitly or via interpolation
@@ -40,7 +41,7 @@ class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, JsonSchemaMixi
options: InitVar[Mapping[str, Any]]
scopes: Optional[List[str]] = None
token_expiry_date: Optional[Union[InterpolatedString, str]] = None
_token_expiry_date: pendulum.DateTime = field(init=False, repr=False)
_token_expiry_date: pendulum.DateTime = field(init=False, repr=False, default=None)
access_token_name: Union[InterpolatedString, str] = "access_token"
expires_in_name: Union[InterpolatedString, str] = "expires_in"
refresh_request_body: Optional[Mapping[str, Any]] = None

View File

@@ -6,6 +6,7 @@ import base64
from dataclasses import InitVar, dataclass
from typing import Any, Mapping, Union
from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.types import Config
from airbyte_cdk.sources.streams.http.requests_native_auth.abstract_token import AbstractHeaderAuthenticator
@@ -13,7 +14,7 @@ from dataclasses_jsonschema import JsonSchemaMixin
@dataclass
class ApiKeyAuthenticator(AbstractHeaderAuthenticator, JsonSchemaMixin):
class ApiKeyAuthenticator(AbstractHeaderAuthenticator, DeclarativeAuthenticator, JsonSchemaMixin):
"""
ApiKeyAuth sets a request header on the HTTP requests sent.
@@ -51,7 +52,7 @@ class ApiKeyAuthenticator(AbstractHeaderAuthenticator, JsonSchemaMixin):
@dataclass
class BearerAuthenticator(AbstractHeaderAuthenticator, JsonSchemaMixin):
class BearerAuthenticator(AbstractHeaderAuthenticator, DeclarativeAuthenticator, JsonSchemaMixin):
"""
Authenticator that sets the Authorization header on the HTTP requests sent.
@@ -81,7 +82,7 @@ class BearerAuthenticator(AbstractHeaderAuthenticator, JsonSchemaMixin):
@dataclass
class BasicHttpAuthenticator(AbstractHeaderAuthenticator):
class BasicHttpAuthenticator(AbstractHeaderAuthenticator, DeclarativeAuthenticator, JsonSchemaMixin):
"""
Builds auth based off the basic authentication scheme as defined by RFC 7617, which transmits credentials as USER ID/password pairs, encoded using base64
https://developer.mozilla.org/en-US/docs/Web/HTTP/Authentication#basic_authentication_scheme

View File

@@ -36,10 +36,10 @@ class DeclarativeStream(Stream, JsonSchemaMixin):
config: Config
options: InitVar[Mapping[str, Any]]
name: str
_name: str = field(init=False, repr=False)
_name: str = field(init=False, repr=False, default="")
primary_key: Optional[Union[str, List[str], List[List[str]]]]
_primary_key: str = field(init=False, repr=False)
stream_cursor_field: Optional[List[str]] = None
_primary_key: str = field(init=False, repr=False, default="")
stream_cursor_field: Optional[Union[List[str], str]] = None
transformations: List[RecordTransformation] = None
checkpoint_interval: Optional[int] = None

View File

@@ -2,15 +2,16 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from abc import ABC, abstractmethod
from abc import abstractmethod
from dataclasses import dataclass
from typing import Any, List, Mapping, Union
import requests
from dataclasses_jsonschema import JsonSchemaMixin
@dataclass
class Decoder(ABC):
class Decoder(JsonSchemaMixin):
"""
Decoder strategy to transform a requests.Response into a Mapping[str, Any]
"""

View File

@@ -7,10 +7,11 @@ from typing import Any, List, Mapping, Union
import requests
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from dataclasses_jsonschema import JsonSchemaMixin
@dataclass
class JsonDecoder(Decoder):
class JsonDecoder(Decoder, JsonSchemaMixin):
"""
Decoder strategy that returns the json-encoded content of a response, if any.
"""

View File

@@ -2,16 +2,17 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from abc import ABC, abstractmethod
from abc import abstractmethod
from dataclasses import dataclass
from typing import Any, List, Mapping, Optional
import requests
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
from dataclasses_jsonschema import JsonSchemaMixin
@dataclass
class HttpSelector(ABC):
class HttpSelector(JsonSchemaMixin):
"""
Responsible for translating an HTTP response into a list of records by extracting records from the response and optionally filtering
records based on a heuristic.

View File

@@ -2,16 +2,17 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from abc import ABC, abstractmethod
from abc import abstractmethod
from dataclasses import dataclass
from typing import List
import requests
from airbyte_cdk.sources.declarative.types import Record
from dataclasses_jsonschema import JsonSchemaMixin
@dataclass
class RecordExtractor(ABC):
class RecordExtractor(JsonSchemaMixin):
"""
Responsible for translating an HTTP response into a list of records by extracting records from the response.
"""

View File

@@ -2,7 +2,7 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from dataclasses import InitVar, dataclass, field
from dataclasses import InitVar, dataclass
from typing import Any, List, Mapping, Optional
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
@@ -20,7 +20,7 @@ class RecordFilter(JsonSchemaMixin):
"""
options: InitVar[Mapping[str, Any]]
config: Config = field(default=dict)
config: Config
condition: str = ""
def __post_init__(self, options: Mapping[str, Any]):

View File

@@ -7,6 +7,9 @@ from __future__ import annotations
import copy
import enum
import importlib
import inspect
import typing
from dataclasses import fields
from typing import Any, List, Literal, Mapping, Type, Union, get_args, get_origin, get_type_hints
from airbyte_cdk.sources.declarative.create_partial import OPTIONS_STR, create
@@ -14,6 +17,8 @@ from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolati
from airbyte_cdk.sources.declarative.parsers.class_types_registry import CLASS_TYPES_REGISTRY
from airbyte_cdk.sources.declarative.parsers.default_implementation_registry import DEFAULT_IMPLEMENTATIONS_REGISTRY
from airbyte_cdk.sources.declarative.types import Config
from dataclasses_jsonschema import JsonSchemaMixin
from jsonschema.validators import validate
ComponentDefinition: Union[Literal, Mapping, List]
@@ -99,13 +104,14 @@ class DeclarativeComponentFactory:
def __init__(self):
self._interpolator = JinjaInterpolation()
def create_component(self, component_definition: ComponentDefinition, config: Config):
def create_component(self, component_definition: ComponentDefinition, config: Config, instantiate: bool = True):
"""
Create a component defined by `component_definition`.
This method will also traverse and instantiate its subcomponents if needed.
:param component_definition: The definition of the object to create.
:param config: Connector's config
:param instantiate: The factory should create the component when True or instead perform schema validation when False
:return: The object to create
"""
kwargs = copy.deepcopy(component_definition)
@@ -115,9 +121,18 @@ class DeclarativeComponentFactory:
class_name = CLASS_TYPES_REGISTRY[kwargs.pop("type")]
else:
raise ValueError(f"Failed to create component because it has no class_name or type. Definition: {component_definition}")
return self.build(class_name, config, **kwargs)
def build(self, class_or_class_name: Union[str, Type], config, **kwargs):
# Because configs are sometimes stored on a component a parent definition, we should remove it and rely on the config
# that is passed down through the factory instead
kwargs.pop("config", None)
return self.build(
class_name,
config,
instantiate,
**kwargs,
)
def build(self, class_or_class_name: Union[str, Type], config, instantiate: bool = True, **kwargs):
if isinstance(class_or_class_name, str):
class_ = self._get_class_from_fully_qualified_class_name(class_or_class_name)
else:
@@ -125,10 +140,28 @@ class DeclarativeComponentFactory:
# create components in options before propagating them
if OPTIONS_STR in kwargs:
kwargs[OPTIONS_STR] = {k: self._create_subcomponent(k, v, kwargs, config, class_) for k, v in kwargs[OPTIONS_STR].items()}
kwargs[OPTIONS_STR] = {
k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs[OPTIONS_STR].items()
}
updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_) for k, v in kwargs.items()}
return create(class_, config=config, **updated_kwargs)
updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs.items()}
if instantiate:
return create(class_, config=config, **updated_kwargs)
else:
# Because the component's data fields definitions use interfaces, we need to resolve the underlying types into the
# concrete classes that implement the interface before generating the schema
class_copy = copy.deepcopy(class_)
DeclarativeComponentFactory._transform_interface_to_union(class_copy)
schema = class_copy.json_schema()
component_definition = {
**updated_kwargs,
**{k: v for k, v in updated_kwargs.get(OPTIONS_STR, {}).items() if k not in updated_kwargs},
"config": config,
}
validate(component_definition, schema)
return lambda: component_definition
@staticmethod
def _get_class_from_fully_qualified_class_name(class_name: str):
@@ -141,7 +174,7 @@ class DeclarativeComponentFactory:
def _merge_dicts(d1, d2):
return {**d1, **d2}
def _create_subcomponent(self, key, definition, kwargs, config, parent_class):
def _create_subcomponent(self, key, definition, kwargs, config, parent_class, instantiate: bool = True):
"""
There are 5 ways to define a component.
1. dict with "class_name" field -> create an object of type "class_name"
@@ -153,14 +186,14 @@ class DeclarativeComponentFactory:
if self.is_object_definition_with_class_name(definition):
# propagate kwargs to inner objects
definition[OPTIONS_STR] = self._merge_dicts(kwargs.get(OPTIONS_STR, dict()), definition.get(OPTIONS_STR, dict()))
return self.create_component(definition, config)()
return self.create_component(definition, config, instantiate)()
elif self.is_object_definition_with_type(definition):
# If type is set instead of class_name, get the class_name from the CLASS_TYPES_REGISTRY
definition[OPTIONS_STR] = self._merge_dicts(kwargs.get(OPTIONS_STR, dict()), definition.get(OPTIONS_STR, dict()))
object_type = definition.pop("type")
class_name = CLASS_TYPES_REGISTRY[object_type]
definition["class_name"] = class_name
return self.create_component(definition, config)()
return self.create_component(definition, config, instantiate)()
elif isinstance(definition, dict):
# Try to infer object type
expected_type = self.get_default_type(key, parent_class)
@@ -169,17 +202,22 @@ class DeclarativeComponentFactory:
if expected_type and not self._is_builtin_type(expected_type):
definition["class_name"] = expected_type
definition[OPTIONS_STR] = self._merge_dicts(kwargs.get(OPTIONS_STR, dict()), definition.get(OPTIONS_STR, dict()))
return self.create_component(definition, config)()
return self.create_component(definition, config, instantiate)()
else:
return definition
elif isinstance(definition, list):
return [
self._create_subcomponent(
key, sub, self._merge_dicts(kwargs.get(OPTIONS_STR, dict()), self._get_subcomponent_options(sub)), config, parent_class
key,
sub,
self._merge_dicts(kwargs.get(OPTIONS_STR, dict()), self._get_subcomponent_options(sub)),
config,
parent_class,
instantiate,
)
for sub in definition
]
else:
elif instantiate:
expected_type = self.get_default_type(key, parent_class)
if expected_type and not isinstance(definition, expected_type):
# call __init__(definition) if definition is not a dict and is not of the expected type
@@ -193,8 +231,7 @@ class DeclarativeComponentFactory:
return expected_type(definition, options=options)
except Exception as e:
raise Exception(f"failed to instantiate type {expected_type}. {e}")
else:
return definition
return definition
@staticmethod
def is_object_definition_with_class_name(definition):
@@ -238,3 +275,39 @@ class DeclarativeComponentFactory:
if not cls:
return False
return cls.__module__ == "builtins"
@staticmethod
def _transform_interface_to_union(expand_class: type):
class_fields = fields(expand_class)
for field in class_fields:
unpacked_field_types = DeclarativeComponentFactory.unpack(field.type)
expand_class.__annotations__[field.name] = unpacked_field_types
return expand_class
@staticmethod
def unpack(field_type: type):
"""
Recursive function that takes in a field type and unpacks the underlying fields (if it is a generic) or
returns the field type if it is not in a generic container
:param field_type: The current set of field types to unpack
:return: A list of unpacked types
"""
generic_type = typing.get_origin(field_type)
if generic_type is None:
# Functions as the base case since the origin is none for non-typing classes. If it is an interface then we derive
# and return the union of its subclasses or return the original type if it is a concrete class or a primitive type
if inspect.isclass(field_type) and issubclass(field_type, JsonSchemaMixin):
subclasses = field_type.__subclasses__()
if subclasses:
return Union[tuple(subclasses)]
return field_type
elif generic_type is list or generic_type is Union:
unpacked_types = [DeclarativeComponentFactory.unpack(underlying_type) for underlying_type in typing.get_args(field_type)]
if generic_type is list:
# For lists we extract the underlying list type and attempt to unpack it again since it could be another container
return List[Union[tuple(unpacked_types)]]
elif generic_type is Union:
# For Unions (and Options which evaluate into a Union of types and NoneType) we unpack the underlying type since it could
# be another container
return Union[tuple(unpacked_types)]
return field_type

View File

@@ -7,10 +7,11 @@ from dataclasses import dataclass
from typing import Optional
import requests
from dataclasses_jsonschema import JsonSchemaMixin
@dataclass
class BackoffStrategy:
class BackoffStrategy(JsonSchemaMixin):
"""
Backoff strategy defining how long to wait before retrying a request that resulted in an error.
"""

View File

@@ -2,16 +2,17 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from abc import ABC, abstractmethod
from abc import abstractmethod
from dataclasses import dataclass
from typing import Union
import requests
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus
from dataclasses_jsonschema import JsonSchemaMixin
@dataclass
class ErrorHandler(ABC):
class ErrorHandler(JsonSchemaMixin):
"""
Defines whether a request was successful and how to handle a failure.
"""

View File

@@ -7,6 +7,7 @@ from functools import lru_cache
from typing import Any, Mapping, MutableMapping, Optional, Union
import requests
from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator, NoAuth
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.error_handlers.default_error_handler import DefaultErrorHandler
from airbyte_cdk.sources.declarative.requesters.error_handlers.error_handler import ErrorHandler
@@ -14,10 +15,8 @@ from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status i
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import (
InterpolatedRequestOptionsProvider,
)
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod, Requester
from airbyte_cdk.sources.declarative.types import Config, StreamSlice, StreamState
from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator, NoAuth
from dataclasses_jsonschema import JsonSchemaMixin
@@ -28,26 +27,28 @@ class HttpRequester(Requester, JsonSchemaMixin):
Attributes:
name (str): Name of the stream. Only used for request/response caching
url_base (InterpolatedString): Base url to send requests to
path (InterpolatedString): Path to send requests to
url_base (Union[InterpolatedString, str]): Base url to send requests to
path (Union[InterpolatedString, str]): Path to send requests to
http_method (Union[str, HttpMethod]): HTTP method to use when sending requests
request_options_provider (Optional[RequestOptionsProvider]): request option provider defining the options to set on outgoing requests
authenticator (HttpAuthenticator): Authenticator defining how to authenticate to the source
request_options_provider (Optional[InterpolatedRequestOptionsProvider]): request option provider defining the options to set on outgoing requests
authenticator (DeclarativeAuthenticator): Authenticator defining how to authenticate to the source
error_handler (Optional[ErrorHandler]): Error handler defining how to detect and handle errors
config (Config): The user-provided configuration as specified by the source's spec
"""
name: str
url_base: InterpolatedString
path: InterpolatedString
url_base: Union[InterpolatedString, str]
path: Union[InterpolatedString, str]
config: Config
options: InitVar[Mapping[str, Any]]
http_method: Union[str, HttpMethod] = HttpMethod.GET
request_options_provider: Optional[RequestOptionsProvider] = None
authenticator: HttpAuthenticator = None
request_options_provider: Optional[InterpolatedRequestOptionsProvider] = None
authenticator: DeclarativeAuthenticator = None
error_handler: Optional[ErrorHandler] = None
def __post_init__(self, options: Mapping[str, Any]):
self.url_base = InterpolatedString.create(self.url_base, options=options)
self.path = InterpolatedString.create(self.path, options=options)
if self.request_options_provider is None:
self._request_options_provider = InterpolatedRequestOptionsProvider(config=self.config, options=options)
elif isinstance(self.request_options_provider, dict):

View File

@@ -8,10 +8,11 @@ from typing import Any, List, Mapping, Optional, Union
import requests
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState
from dataclasses_jsonschema import JsonSchemaMixin
@dataclass
class NoPagination(Paginator):
class NoPagination(Paginator, JsonSchemaMixin):
"""
Pagination implementation that never returns a next page.
"""

View File

@@ -8,10 +8,11 @@ from typing import Any, List, Mapping, Optional
import requests
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider
from dataclasses_jsonschema import JsonSchemaMixin
@dataclass
class Paginator(RequestOptionsProvider):
class Paginator(RequestOptionsProvider, JsonSchemaMixin):
"""
Defines the token to use to fetch the next page of records from the API.

View File

@@ -30,12 +30,14 @@ class CursorPaginationStrategy(PaginationStrategy, JsonSchemaMixin):
cursor_value: Union[InterpolatedString, str]
config: Config
options: InitVar[Mapping[str, Any]]
stop_condition: Optional[InterpolatedBoolean] = None
stop_condition: Optional[Union[InterpolatedBoolean, str]] = None
decoder: Decoder = JsonDecoder(options={})
def __post_init__(self, options: Mapping[str, Any]):
if isinstance(self.cursor_value, str):
self.cursor_value = InterpolatedString.create(self.cursor_value, options=options)
if isinstance(self.stop_condition, str):
self.stop_condition = InterpolatedBoolean(condition=self.stop_condition, options=options)
def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Any]:
decoded_response = self.decoder.decode(response)

View File

@@ -2,15 +2,16 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from abc import ABC, abstractmethod
from abc import abstractmethod
from dataclasses import dataclass
from typing import Any, Mapping, MutableMapping, Optional, Union
from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState
from dataclasses_jsonschema import JsonSchemaMixin
@dataclass
class RequestOptionsProvider(ABC):
class RequestOptionsProvider(JsonSchemaMixin):
"""
Defines the request options to set on an outgoing HTTP request

View File

@@ -10,6 +10,7 @@ import requests
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider
from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState
from dataclasses_jsonschema import JsonSchemaMixin
from requests.auth import AuthBase
@@ -22,7 +23,7 @@ class HttpMethod(Enum):
POST = "POST"
class Requester(RequestOptionsProvider):
class Requester(RequestOptionsProvider, JsonSchemaMixin):
@abstractmethod
def get_authenticator(self) -> AuthBase:
"""

View File

@@ -2,16 +2,17 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from abc import ABC, abstractmethod
from abc import abstractmethod
from dataclasses import dataclass
from typing import Iterable, List, Optional
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
from dataclasses_jsonschema import JsonSchemaMixin
@dataclass
class Retriever(ABC):
class Retriever(JsonSchemaMixin):
"""
Responsible for fetching a stream's records from an HTTP API source.
"""

View File

@@ -48,9 +48,9 @@ class SimpleRetriever(Retriever, HttpStream, JsonSchemaMixin):
record_selector: HttpSelector
options: InitVar[Mapping[str, Any]]
name: str
_name: str = field(init=False, repr=False)
_name: str = field(init=False, repr=False, default="")
primary_key: Optional[Union[str, List[str], List[List[str]]]]
_primary_key: str = field(init=False, repr=False)
_primary_key: str = field(init=False, repr=False, default="")
paginator: Optional[Paginator] = None
stream_slicer: Optional[StreamSlicer] = SingleSlice(options={})

View File

@@ -2,13 +2,15 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from abc import ABC, abstractmethod
from abc import abstractmethod
from dataclasses import dataclass
from typing import Any, Mapping
from dataclasses_jsonschema import JsonSchemaMixin
@dataclass
class SchemaLoader(ABC):
class SchemaLoader(JsonSchemaMixin):
"""Describes a stream's schema"""
@abstractmethod

View File

@@ -5,7 +5,7 @@
import datetime
import re
from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, Mapping, Optional
from typing import Any, Iterable, Mapping, Optional, Union
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser
@@ -40,10 +40,10 @@ class DatetimeStreamSlicer(StreamSlicer, JsonSchemaMixin):
Full list of accepted format codes: https://man7.org/linux/man-pages/man3/strftime.3.html
Attributes:
start_datetime (MinMaxDatetime): the datetime that determines the earliest record that should be synced
end_datetime (MinMaxDatetime): the datetime that determines the last record that should be synced
start_datetime (Union[MinMaxDatetime, str]): the datetime that determines the earliest record that should be synced
end_datetime (Union[MinMaxDatetime, str]): the datetime that determines the last record that should be synced
step (str): size of the timewindow
cursor_field (InterpolatedString): record's cursor field
cursor_field (Union[InterpolatedString, str]): record's cursor field
datetime_format (str): format of the datetime
config (Config): connection config
start_time_option (Optional[RequestOption]): request option for start time
@@ -53,10 +53,10 @@ class DatetimeStreamSlicer(StreamSlicer, JsonSchemaMixin):
lookback_window (Optional[InterpolatedString]): how many days before start_datetime to read data for
"""
start_datetime: MinMaxDatetime
end_datetime: MinMaxDatetime
start_datetime: Union[MinMaxDatetime, str]
end_datetime: Union[MinMaxDatetime, str]
step: str
cursor_field: InterpolatedString
cursor_field: Union[InterpolatedString, str]
datetime_format: str
config: Config
options: InitVar[Mapping[str, Any]]
@@ -66,11 +66,16 @@ class DatetimeStreamSlicer(StreamSlicer, JsonSchemaMixin):
end_time_option: Optional[RequestOption] = None
stream_state_field_start: Optional[str] = None
stream_state_field_end: Optional[str] = None
lookback_window: Optional[InterpolatedString] = None
lookback_window: Optional[Union[InterpolatedString, str]] = None
timedelta_regex = re.compile(r"((?P<weeks>[\.\d]+?)w)?" r"((?P<days>[\.\d]+?)d)?$")
def __post_init__(self, options: Mapping[str, Any]):
if not isinstance(self.start_datetime, MinMaxDatetime):
self.start_datetime = MinMaxDatetime(self.start_datetime, options)
if not isinstance(self.end_datetime, MinMaxDatetime):
self.end_datetime = MinMaxDatetime(self.end_datetime, options)
self._timezone = datetime.timezone.utc
self._interpolation = JinjaInterpolation()

View File

@@ -9,10 +9,11 @@ from typing import Iterable, Optional
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
from dataclasses_jsonschema import JsonSchemaMixin
@dataclass
class StreamSlicer(RequestOptionsProvider):
class StreamSlicer(RequestOptionsProvider, JsonSchemaMixin):
"""
Slices the stream into a subset of records.
Slices enable state checkpointing and data retrieval parallelization.

View File

@@ -14,7 +14,7 @@ from dataclasses_jsonschema import JsonSchemaMixin
@dataclass
class ParentStreamConfig:
class ParentStreamConfig(JsonSchemaMixin):
"""
Describes how to create a stream slice from a parent stream

View File

@@ -13,7 +13,7 @@ from dataclasses_jsonschema import JsonSchemaMixin
@dataclass(frozen=True)
class AddedFieldDefinition:
class AddedFieldDefinition(JsonSchemaMixin):
"""Defines the field to add on a record"""
path: FieldPointer
@@ -22,7 +22,7 @@ class AddedFieldDefinition:
@dataclass(frozen=True)
class ParsedAddFieldDefinition:
class ParsedAddFieldDefinition(JsonSchemaMixin):
"""Defines the field to add on a record"""
path: FieldPointer

View File

@@ -2,15 +2,16 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from abc import ABC, abstractmethod
from abc import abstractmethod
from dataclasses import dataclass
from typing import Optional
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState
from dataclasses_jsonschema import JsonSchemaMixin
@dataclass
class RecordTransformation(ABC):
class RecordTransformation(JsonSchemaMixin):
"""
Implementations of this class define transformations that can be applied to records of a stream.
"""

View File

@@ -4,14 +4,26 @@
import json
import logging
from dataclasses import dataclass
from typing import Any, List, Mapping
from airbyte_cdk.sources.declarative.checks import CheckStream
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.exceptions import InvalidConnectorDefinitionException
from airbyte_cdk.sources.declarative.parsers.factory import DeclarativeComponentFactory
from airbyte_cdk.sources.declarative.parsers.yaml_parser import YamlParser
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.core import Stream
from dataclasses_jsonschema import JsonSchemaMixin
from jsonschema.validators import validate
@dataclass
class ConcreteDeclarativeSource(JsonSchemaMixin):
version: str
checker: CheckStream
streams: List[DeclarativeStream]
class YamlDeclarativeSource(DeclarativeSource):
@@ -28,6 +40,8 @@ class YamlDeclarativeSource(DeclarativeSource):
self._path_to_yaml = path_to_yaml
self._source_config = self._read_and_parse_yaml_file(path_to_yaml)
self._validate_source()
# Stopgap to protect the top-level namespace until it's validated through the schema
unknown_fields = [key for key in self._source_config.keys() if key not in self.VALID_TOP_LEVEL_FIELDS]
if unknown_fields:
@@ -45,14 +59,28 @@ class YamlDeclarativeSource(DeclarativeSource):
"parsed YAML into declarative source",
extra={"path_to_yaml_file": self._path_to_yaml, "source_name": self.name, "parsed_config": json.dumps(self._source_config)},
)
stream_configs = self._source_config["streams"]
for s in stream_configs:
if "class_name" not in s:
s["class_name"] = "airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream"
return [self._factory.create_component(stream_config, config)() for stream_config in self._source_config["streams"]]
return [self._factory.create_component(stream_config, config, True)() for stream_config in self._stream_configs()]
def _read_and_parse_yaml_file(self, path_to_yaml_file):
with open(path_to_yaml_file, "r") as f:
config_content = f.read()
return YamlParser().parse(config_content)
def _validate_source(self):
full_config = {}
if "version" in self._source_config:
full_config["version"] = self._source_config["version"]
if "check" in self._source_config:
full_config["checker"] = self._source_config["check"]
streams = [self._factory.create_component(stream_config, {}, False)() for stream_config in self._stream_configs()]
if len(streams) > 0:
full_config["streams"] = streams
declarative_source_schema = ConcreteDeclarativeSource.json_schema()
validate(full_config, declarative_source_schema)
def _stream_configs(self):
stream_configs = self._source_config.get("streams", [])
for s in stream_configs:
if "class_name" not in s:
s["class_name"] = "airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream"
return stream_configs

View File

@@ -15,7 +15,7 @@ README = (HERE / "README.md").read_text()
setup(
name="airbyte-cdk",
version="0.1.76",
version="0.1.77",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",

View File

@@ -3,7 +3,9 @@
#
import datetime
from typing import List, Optional, Union
import pytest
from airbyte_cdk.sources.declarative.auth.token import BasicHttpAuthenticator
from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
@@ -14,6 +16,13 @@ from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSel
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.parsers.factory import DeclarativeComponentFactory
from airbyte_cdk.sources.declarative.parsers.yaml_parser import YamlParser
from airbyte_cdk.sources.declarative.requesters.error_handlers import BackoffStrategy
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies import (
ConstantBackoffStrategy,
ExponentialBackoffStrategy,
WaitTimeFromHeaderBackoffStrategy,
WaitUntilTimeFromHeaderBackoffStrategy,
)
from airbyte_cdk.sources.declarative.requesters.error_handlers.composite_error_handler import CompositeErrorHandler
from airbyte_cdk.sources.declarative.requesters.error_handlers.default_error_handler import DefaultErrorHandler
from airbyte_cdk.sources.declarative.requesters.error_handlers.http_response_filter import HttpResponseFilter
@@ -30,6 +39,7 @@ from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer impor
from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer
from airbyte_cdk.sources.declarative.transformations import AddFields, RemoveFields
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
from jsonschema import ValidationError
factory = DeclarativeComponentFactory()
@@ -40,7 +50,7 @@ input_config = {"apikey": "verysecrettoken", "repos": ["airbyte", "airbyte-cloud
def test_factory():
content = """
limit: 50
limit: "50"
offset_request_parameters:
offset: "{{ next_page_token['offset'] }}"
limit: "*ref(limit)"
@@ -53,6 +63,9 @@ def test_factory():
body_offset: "{{ next_page_token['offset'] }}"
"""
config = parser.parse(content)
factory.create_component(config["request_options"], input_config, False)
request_options_provider = factory.create_component(config["request_options"], input_config)()
assert type(request_options_provider) == InterpolatedRequestOptionsProvider
@@ -75,6 +88,9 @@ def test_interpolate_config():
interpolated_body_field: "{{ config['apikey'] }}"
"""
config = parser.parse(content)
factory.create_component(config["authenticator"], input_config, False)
authenticator = factory.create_component(config["authenticator"], input_config)()
assert authenticator.client_id.eval(input_config) == "some_client_id"
assert authenticator.client_secret.string == "some_client_secret"
@@ -94,6 +110,9 @@ def test_list_based_stream_slicer_with_values_refd():
cursor_field: repository
"""
config = parser.parse(content)
factory.create_component(config["stream_slicer"], input_config, False)
stream_slicer = factory.create_component(config["stream_slicer"], input_config)()
assert ["airbyte", "airbyte-cloud"] == stream_slicer.slice_values
@@ -109,6 +128,9 @@ def test_list_based_stream_slicer_with_values_defined_in_config():
field_name: repository
"""
config = parser.parse(content)
factory.create_component(config["stream_slicer"], input_config, False)
stream_slicer = factory.create_component(config["stream_slicer"], input_config)()
assert ["airbyte", "airbyte-cloud"] == stream_slicer.slice_values
assert stream_slicer.request_option.inject_into == RequestOptionType.header
@@ -118,28 +140,29 @@ def test_list_based_stream_slicer_with_values_defined_in_config():
def test_create_substream_slicer():
content = """
schema_loader:
file_path: "./source_sendgrid/schemas/{{ options['stream_name'] }}.yaml"
file_path: "./source_sendgrid/schemas/{{ options['name'] }}.yaml"
name: "{{ options['stream_name'] }}"
retriever:
requester:
name: "{{ options['stream_name'] }}"
path: "/v3"
name: "{{ options['name'] }}"
type: "HttpRequester"
path: "kek"
record_selector:
extractor:
field_pointer: []
stream_A:
type: DeclarativeStream
$options:
stream_name: "A"
stream_primary_key: "id"
name: "A"
primary_key: "id"
retriever: "*ref(retriever)"
url_base: "https://airbyte.io"
schema_loader: "*ref(schema_loader)"
stream_B:
type: DeclarativeStream
$options:
stream_name: "B"
stream_primary_key: "id"
name: "B"
primary_key: "id"
retriever: "*ref(retriever)"
url_base: "https://airbyte.io"
schema_loader: "*ref(schema_loader)"
@@ -157,6 +180,7 @@ def test_create_substream_slicer():
stream_slice_field: word_id
"""
config = parser.parse(content)
stream_slicer = factory.create_component(config["stream_slicer"], input_config)()
parent_stream_configs = stream_slicer.parent_stream_configs
assert len(parent_stream_configs) == 2
@@ -191,6 +215,9 @@ def test_create_cartesian_stream_slicer():
- "*ref(stream_slicer_B)"
"""
config = parser.parse(content)
factory.create_component(config["stream_slicer"], input_config, False)
stream_slicer = factory.create_component(config["stream_slicer"], input_config)()
underlying_slicers = stream_slicer.stream_slicers
assert len(underlying_slicers) == 2
@@ -220,6 +247,9 @@ def test_datetime_stream_slicer():
"""
config = parser.parse(content)
factory.create_component(config["stream_slicer"], input_config, False)
stream_slicer = factory.create_component(config["stream_slicer"], input_config)()
assert type(stream_slicer) == DatetimeStreamSlicer
assert stream_slicer._timezone == datetime.timezone.utc
@@ -276,7 +306,7 @@ requester:
api_token: "{{ config['apikey'] }}"
request_parameters_provider: "*ref(request_options_provider)"
error_handler:
type: DefaultErrorHandler
type: NoPagination
retriever:
class_name: "airbyte_cdk.sources.declarative.retrievers.simple_retriever.SimpleRetriever"
name: "{{ options['name'] }}"
@@ -316,6 +346,8 @@ check:
"""
config = parser.parse(content)
factory.create_component(config["list_stream"], input_config, False)
stream_config = config["list_stream"]
assert stream_config["class_name"] == "airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream"
assert stream_config["cursor_field"] == []
@@ -360,6 +392,9 @@ def test_create_record_selector():
field_pointer: ["result"]
"""
config = parser.parse(content)
factory.create_component(config["selector"], input_config, False)
selector = factory.create_component(config["selector"], input_config)()
assert isinstance(selector, RecordSelector)
assert isinstance(selector.extractor, DpathExtractor)
@@ -381,11 +416,14 @@ def test_create_requester():
password: "{{ config.apikey }}"
request_options_provider:
request_parameters:
page_size: 10
a_parameter: "something_here"
request_headers:
header: header_value
"""
config = parser.parse(content)
factory.create_component(config["requester"], input_config, False)
component = factory.create_component(config["requester"], input_config)()
assert isinstance(component, HttpRequester)
assert isinstance(component.error_handler, DefaultErrorHandler)
@@ -395,7 +433,7 @@ def test_create_requester():
assert component.authenticator._username.eval(input_config) == "lists"
assert component.authenticator._password.eval(input_config) == "verysecrettoken"
assert component._method == HttpMethod.GET
assert component._request_options_provider._parameter_interpolator._interpolator.mapping["page_size"] == 10
assert component._request_options_provider._parameter_interpolator._interpolator.mapping["a_parameter"] == "something_here"
assert component._request_options_provider._headers_interpolator._interpolator.mapping["header"] == "header_value"
assert component.name == "lists"
@@ -413,6 +451,9 @@ def test_create_composite_error_handler():
action: RETRY
"""
config = parser.parse(content)
factory.create_component(config["error_handler"], input_config, False)
component = factory.create_component(config["error_handler"], input_config)()
assert len(component.error_handlers) == 2
assert isinstance(component.error_handlers[0], DefaultErrorHandler)
@@ -460,6 +501,8 @@ def test_config_with_defaults():
"""
config = parser.parse(content)
factory.create_component(config["lists_stream"], input_config, False)
stream_config = config["lists_stream"]
stream = factory.create_component(stream_config, input_config)()
assert type(stream) == DeclarativeStream
@@ -495,6 +538,8 @@ def test_create_limit_paginator():
"""
config = parser.parse(content)
factory.create_component(config["paginator"], input_config, False)
paginator_config = config["paginator"]
paginator = factory.create_component(paginator_config, input_config)()
assert isinstance(paginator, LimitPaginator)
@@ -531,6 +576,9 @@ class TestCreateTransformations:
{self.base_options}
"""
config = parser.parse(content)
factory.create_component(config["the_stream"], input_config, False)
component = factory.create_component(config["the_stream"], input_config)()
assert isinstance(component, DeclarativeStream)
assert [] == component.transformations
@@ -548,6 +596,9 @@ class TestCreateTransformations:
- ["path2"]
"""
config = parser.parse(content)
factory.create_component(config["the_stream"], input_config, False)
component = factory.create_component(config["the_stream"], input_config)()
assert isinstance(component, DeclarativeStream)
expected = [RemoveFields(field_pointers=[["path", "to", "field1"], ["path2"]], options={})]
@@ -566,6 +617,9 @@ class TestCreateTransformations:
value: "static_value"
"""
config = parser.parse(content)
factory.create_component(config["the_stream"], input_config, False)
component = factory.create_component(config["the_stream"], input_config)()
assert isinstance(component, DeclarativeStream)
expected = [
@@ -579,3 +633,177 @@ class TestCreateTransformations:
)
]
assert expected == component.transformations
def test_validation_wrong_input_type():
content = """
extractor:
type: DpathExtractor
selector:
class_name: airbyte_cdk.sources.declarative.extractors.record_selector.RecordSelector
record_filter:
class_name: airbyte_cdk.sources.declarative.extractors.record_filter.RecordFilter
condition: "{{ record['id'] > stream_state['id'] }}"
extractor:
$ref: "*ref(extractor)"
field_pointer: 408
"""
config = parser.parse(content)
with pytest.raises(ValidationError):
factory.create_component(config["selector"], input_config, False)
def test_validation_type_missing_required_fields():
content = """
stream_slicer:
type: DatetimeStreamSlicer
$options:
datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z"
start_datetime:
type: MinMaxDatetime
datetime: "{{ config['start_time'] }}"
min_datetime: "{{ config['start_time'] + day_delta(2) }}"
end_datetime: "{{ config['end_time'] }}"
cursor_field: "created"
lookback_window: "5d"
start_time_option:
inject_into: request_parameter
field_name: created[gte]
"""
config = parser.parse(content)
with pytest.raises(ValidationError):
factory.create_component(config["stream_slicer"], input_config, False)
def test_validation_wrong_interface_type():
content = """
paginator:
type: "LimitPaginator"
page_size: 10
url_base: "https://airbyte.io"
limit_option:
inject_into: request_parameter
field_name: page_size
page_token_option:
inject_into: path
pagination_strategy:
type: "MinMaxDatetime"
datetime: "{{ response._metadata.next }}"
"""
config = parser.parse(content)
with pytest.raises(ValidationError):
factory.create_component(config["paginator"], input_config, False)
def test_validation_create_composite_error_handler():
content = """
error_handler:
type: "CompositeErrorHandler"
error_handlers:
- response_filters:
- predicate: "{{ 'code' in response }}"
action: RETRY
- response_filters:
- http_codes: [ 403 ]
"""
config = parser.parse(content)
with pytest.raises(ValidationError):
factory.create_component(config["error_handler"], input_config, False)
# Leaving this test here to document a limitation of the validator. Decoder has no meaningful fields to validate on so it accepts
# the MinMaxDatetime despite being the wrong type
def test_validation_wrong_object_type():
content = """
paginator:
type: "LimitPaginator"
page_size: 10
url_base: "https://airbyte.io"
limit_option:
inject_into: request_parameter
field_name: page_size
page_token_option:
inject_into: path
pagination_strategy:
type: "CursorPagination"
cursor_value: "{{ response._metadata.next }}"
decoder:
type: "MinMaxDatetime"
datetime: "{{ response._metadata.next }}"
"""
config = parser.parse(content)
factory.create_component(config["paginator"], input_config, False)
# This test should fail because the extractor doesn't match the Array of resolved classes. However, despite the schema being correct
# validation passes. Leaving this here to document it and revisit at another time. This is another validator limitation.
def test_validate_types_nested_in_list():
content = """
error_handler:
type: DefaultErrorHandler
backoff_strategies:
- type: DpathExtractor
field_pointer: ["result"]
"""
config = parser.parse(content)
factory.create_component(config["error_handler"], input_config, False)
@pytest.mark.parametrize(
"test_name, input_type, expected_unpacked_types",
[
(
"test_unpacking_component_in_list",
List[BackoffStrategy],
List[
Union[
ConstantBackoffStrategy,
ExponentialBackoffStrategy,
WaitTimeFromHeaderBackoffStrategy,
WaitUntilTimeFromHeaderBackoffStrategy,
]
],
),
(
"test_unpacking_component_in_union",
Union[BackoffStrategy, RequestOption],
Union[
ConstantBackoffStrategy,
ExponentialBackoffStrategy,
WaitTimeFromHeaderBackoffStrategy,
WaitUntilTimeFromHeaderBackoffStrategy,
RequestOption,
],
),
(
"test_unpacking_component_in_optional",
Optional[BackoffStrategy],
Union[
ConstantBackoffStrategy,
ExponentialBackoffStrategy,
WaitTimeFromHeaderBackoffStrategy,
WaitUntilTimeFromHeaderBackoffStrategy,
type(None),
],
),
(
"test_unpacking_component_nested_in_multiple_types",
Optional[List[BackoffStrategy]],
Union[
List[
Union[
ConstantBackoffStrategy,
ExponentialBackoffStrategy,
WaitTimeFromHeaderBackoffStrategy,
WaitUntilTimeFromHeaderBackoffStrategy,
]
],
type(None),
],
),
],
)
def test_unpack(test_name, input_type, expected_unpacked_types):
actual_unpacked_types = DeclarativeComponentFactory.unpack(input_type)
assert actual_unpacked_types == expected_unpacked_types

View File

@@ -6,16 +6,53 @@ import os
import tempfile
import unittest
import pytest
from airbyte_cdk.sources.declarative.exceptions import InvalidConnectorDefinitionException
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from jsonschema import ValidationError
class TestYamlDeclarativeSource(unittest.TestCase):
def test_source_is_created_if_toplevel_fields_are_known(self):
content = """
version: "version"
streams: "streams"
check: "check"
definitions:
schema_loader:
name: "{{ options.stream_name }}"
file_path: "./source_sendgrid/schemas/{{ options.name }}.yaml"
retriever:
paginator:
type: "LimitPaginator"
page_size: 10
limit_option:
inject_into: request_parameter
field_name: page_size
page_token_option:
inject_into: path
pagination_strategy:
type: "CursorPagination"
cursor_value: "{{ response._metadata.next }}"
requester:
path: "/v3/marketing/lists"
authenticator:
type: "BearerAuthenticator"
api_token: "{{ config.apikey }}"
request_parameters:
page_size: 10
record_selector:
extractor:
field_pointer: ["result"]
streams:
- type: DeclarativeStream
$options:
name: "lists"
primary_key: id
url_base: "https://api.sendgrid.com"
schema_loader: "*ref(definitions.schema_loader)"
retriever: "*ref(definitions.retriever)"
check:
type: CheckStream
stream_names: ["lists"]
"""
temporary_file = TestFileContent(content)
YamlDeclarativeSource(temporary_file.filename)
@@ -23,14 +60,171 @@ class TestYamlDeclarativeSource(unittest.TestCase):
def test_source_is_not_created_if_toplevel_fields_are_unknown(self):
content = """
version: "version"
streams: "streams"
check: "check"
definitions:
schema_loader:
name: "{{ options.stream_name }}"
file_path: "./source_sendgrid/schemas/{{ options.name }}.yaml"
retriever:
paginator:
type: "LimitPaginator"
page_size: 10
limit_option:
inject_into: request_parameter
field_name: page_size
page_token_option:
inject_into: path
pagination_strategy:
type: "CursorPagination"
cursor_value: "{{ response._metadata.next }}"
requester:
path: "/v3/marketing/lists"
authenticator:
type: "BearerAuthenticator"
api_token: "{{ config.apikey }}"
request_parameters:
page_size: 10
record_selector:
extractor:
field_pointer: ["result"]
streams:
- type: DeclarativeStream
$options:
name: "lists"
primary_key: id
url_base: "https://api.sendgrid.com"
schema_loader: "*ref(definitions.schema_loader)"
retriever: "*ref(definitions.retriever)"
check:
type: CheckStream
stream_names: ["lists"]
not_a_valid_field: "error"
"""
temporary_file = TestFileContent(content)
with self.assertRaises(InvalidConnectorDefinitionException):
YamlDeclarativeSource(temporary_file.filename)
def test_source_missing_checker_fails_validation(self):
content = """
version: "version"
definitions:
schema_loader:
name: "{{ options.stream_name }}"
file_path: "./source_sendgrid/schemas/{{ options.name }}.yaml"
retriever:
paginator:
type: "LimitPaginator"
page_size: 10
limit_option:
inject_into: request_parameter
field_name: page_size
page_token_option:
inject_into: path
pagination_strategy:
type: "CursorPagination"
cursor_value: "{{ response._metadata.next }}"
requester:
path: "/v3/marketing/lists"
authenticator:
type: "BearerAuthenticator"
api_token: "{{ config.apikey }}"
request_parameters:
page_size: 10
record_selector:
extractor:
field_pointer: ["result"]
streams:
- type: DeclarativeStream
$options:
name: "lists"
primary_key: id
url_base: "https://api.sendgrid.com"
schema_loader: "*ref(definitions.schema_loader)"
retriever: "*ref(definitions.retriever)"
check:
type: CheckStream
"""
temporary_file = TestFileContent(content)
with pytest.raises(ValidationError):
YamlDeclarativeSource(temporary_file.filename)
def test_source_with_missing_streams_fails(self):
content = """
version: "version"
definitions:
check:
type: CheckStream
stream_names: ["lists"]
"""
temporary_file = TestFileContent(content)
with pytest.raises(ValidationError):
YamlDeclarativeSource(temporary_file.filename)
def test_source_with_missing_version_fails(self):
content = """
definitions:
schema_loader:
name: "{{ options.stream_name }}"
file_path: "./source_sendgrid/schemas/{{ options.name }}.yaml"
retriever:
paginator:
type: "LimitPaginator"
page_size: 10
limit_option:
inject_into: request_parameter
field_name: page_size
page_token_option:
inject_into: path
pagination_strategy:
type: "CursorPagination"
cursor_value: "{{ response._metadata.next }}"
requester:
path: "/v3/marketing/lists"
authenticator:
type: "BearerAuthenticator"
api_token: "{{ config.apikey }}"
request_parameters:
page_size: 10
record_selector:
extractor:
field_pointer: ["result"]
streams:
- type: DeclarativeStream
$options:
name: "lists"
primary_key: id
url_base: "https://api.sendgrid.com"
schema_loader: "*ref(definitions.schema_loader)"
retriever: "*ref(definitions.retriever)"
check:
type: CheckStream
stream_names: ["lists"]
"""
temporary_file = TestFileContent(content)
with pytest.raises(ValidationError):
YamlDeclarativeSource(temporary_file.filename)
def test_source_with_invalid_stream_config_fails_validation(self):
content = """
version: "version"
definitions:
schema_loader:
name: "{{ options.stream_name }}"
file_path: "./source_sendgrid/schemas/{{ options.name }}.yaml"
streams:
- type: DeclarativeStream
$options:
name: "lists"
primary_key: id
url_base: "https://api.sendgrid.com"
schema_loader: "*ref(definitions.schema_loader)"
check:
type: CheckStream
stream_names: ["lists"]
"""
temporary_file = TestFileContent(content)
with pytest.raises(ValidationError):
YamlDeclarativeSource(temporary_file.filename)
class TestFileContent:
def __init__(self, content):