1
0
mirror of synced 2025-12-25 11:06:55 -05:00

[low-code cdk] decouple parsing the yaml manifest from the declarative source implementation (#19095)

* decouple parsing the yaml manifest from the declarative source implementation

* bump version and changelog
This commit is contained in:
Brian Lai
2022-11-08 15:45:01 -05:00
committed by GitHub
parent b3643269ba
commit 5c9e5d9934
6 changed files with 795 additions and 623 deletions

View File

@@ -1,5 +1,8 @@
# Changelog
## 0.7.1
Low-code: Decouple yaml manifest parsing from the declarative source implementation
## 0.7.0
Low-code: Allow connector specifications to be defined in the manifest

View File

@@ -0,0 +1,173 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
import inspect
import json
import logging
import typing
from dataclasses import dataclass, fields
from enum import Enum, EnumMeta
from typing import Any, List, Mapping, Union
from airbyte_cdk.models import ConnectorSpecification
from airbyte_cdk.sources.declarative.checks import CheckStream
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.exceptions import InvalidConnectorDefinitionException
from airbyte_cdk.sources.declarative.parsers.factory import DeclarativeComponentFactory
from airbyte_cdk.sources.declarative.types import ConnectionDefinition
from airbyte_cdk.sources.streams.core import Stream
from dataclasses_jsonschema import JsonSchemaMixin
from jsonschema.validators import validate
@dataclass
class ConcreteDeclarativeSource(JsonSchemaMixin):
version: str
checker: CheckStream
streams: List[DeclarativeStream]
class ManifestDeclarativeSource(DeclarativeSource):
"""Declarative source defined by a manifest of low-code components that define source connector behavior"""
VALID_TOP_LEVEL_FIELDS = {"check", "definitions", "spec", "streams", "version"}
def __init__(self, source_config: ConnectionDefinition):
"""
:param source_config(Mapping[str, Any]): The manifest of low-code components that describe the source connector
"""
self.logger = logging.getLogger(f"airbyte.{self.name}")
self._source_config = source_config
self._factory = DeclarativeComponentFactory()
self._validate_source()
# Stopgap to protect the top-level namespace until it's validated through the schema
unknown_fields = [key for key in self._source_config.keys() if key not in self.VALID_TOP_LEVEL_FIELDS]
if unknown_fields:
raise InvalidConnectorDefinitionException(f"Found unknown top-level fields: {unknown_fields}")
@property
def connection_checker(self) -> ConnectionChecker:
check = self._source_config["check"]
if "class_name" not in check:
check["class_name"] = "airbyte_cdk.sources.declarative.checks.check_stream.CheckStream"
return self._factory.create_component(check, dict())(source=self)
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
self._emit_manifest_debug_message(extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)})
source_streams = [self._factory.create_component(stream_config, config, True)() for stream_config in self._stream_configs()]
for stream in source_streams:
# make sure the log level is always applied to the stream's logger
self._apply_log_level_to_stream_logger(self.logger, stream)
return source_streams
def spec(self, logger: logging.Logger) -> ConnectorSpecification:
"""
Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
in the project root.
"""
self._emit_manifest_debug_message(extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)})
spec = self._source_config.get("spec")
if spec:
if "class_name" not in spec:
spec["class_name"] = "airbyte_cdk.sources.declarative.spec.Spec"
spec_component = self._factory.create_component(spec, dict())()
return spec_component.generate_spec()
else:
return super().spec(logger)
def _validate_source(self):
full_config = {}
if "version" in self._source_config:
full_config["version"] = self._source_config["version"]
if "check" in self._source_config:
full_config["checker"] = self._source_config["check"]
streams = [self._factory.create_component(stream_config, {}, False)() for stream_config in self._stream_configs()]
if len(streams) > 0:
full_config["streams"] = streams
declarative_source_schema = ConcreteDeclarativeSource.json_schema()
validate(full_config, declarative_source_schema)
def _stream_configs(self):
stream_configs = self._source_config.get("streams", [])
for s in stream_configs:
if "class_name" not in s:
s["class_name"] = "airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream"
return stream_configs
@staticmethod
def generate_schema() -> str:
expanded_source_definition = ManifestDeclarativeSource.expand_schema_interfaces(ConcreteDeclarativeSource, {})
expanded_schema = expanded_source_definition.json_schema()
return json.dumps(expanded_schema, cls=SchemaEncoder)
@staticmethod
def expand_schema_interfaces(expand_class: type, visited: dict) -> type:
"""
Recursive function that takes in class type that will have its interface fields unpacked and expended and then recursively
attempt the same expansion on all the class' underlying fields that are declarative component. It also performs expansion
with respect to interfaces that are contained within generic data types.
:param expand_class: The declarative component class that will have its interface fields expanded
:param visited: cache used to store a record of already visited declarative classes that have already been seen
:return: The expanded declarative component
"""
# Recursive base case to stop recursion if we have already expanded an interface in case of cyclical components
# like CompositeErrorHandler
if expand_class.__name__ in visited:
return visited[expand_class.__name__]
visited[expand_class.__name__] = expand_class
next_classes = []
class_fields = fields(expand_class)
for field in class_fields:
unpacked_field_types = DeclarativeComponentFactory.unpack(field.type)
expand_class.__annotations__[field.name] = unpacked_field_types
next_classes.extend(ManifestDeclarativeSource._get_next_expand_classes(field.type))
for next_class in next_classes:
ManifestDeclarativeSource.expand_schema_interfaces(next_class, visited)
return expand_class
@staticmethod
def _get_next_expand_classes(field_type) -> list[type]:
"""
Parses through a given field type and assembles a list of all underlying declarative components. For a concrete declarative class
it will return itself. For a declarative interface it will return its subclasses. For declarative components in a generic type
it will return the unpacked classes. Any non-declarative types will be skipped.
:param field_type: A field type that
:return:
"""
generic_type = typing.get_origin(field_type)
if generic_type is None:
# We can only continue parsing declarative that inherit from the JsonSchemaMixin class because it is used
# to generate the final json schema
if inspect.isclass(field_type) and issubclass(field_type, JsonSchemaMixin) and not isinstance(field_type, EnumMeta):
subclasses = field_type.__subclasses__()
if subclasses:
return subclasses
else:
return [field_type]
elif generic_type == list or generic_type == Union:
next_classes = []
for underlying_type in typing.get_args(field_type):
next_classes.extend(ManifestDeclarativeSource._get_next_expand_classes(underlying_type))
return next_classes
return []
def _emit_manifest_debug_message(self, extra_args: dict):
self.logger.debug("declarative source created from manifest", extra=extra_args)
class SchemaEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, property) or isinstance(obj, Enum):
return str(obj)
return json.JSONEncoder.default(self, obj)

View File

@@ -2,184 +2,31 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
import inspect
import json
import logging
import pkgutil
import typing
from dataclasses import dataclass, fields
from enum import Enum, EnumMeta
from typing import Any, List, Mapping, Union
from airbyte_cdk.models import ConnectorSpecification
from airbyte_cdk.sources.declarative.checks import CheckStream
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.exceptions import InvalidConnectorDefinitionException
from airbyte_cdk.sources.declarative.parsers.factory import DeclarativeComponentFactory
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.declarative.parsers.yaml_parser import YamlParser
from airbyte_cdk.sources.streams.core import Stream
from dataclasses_jsonschema import JsonSchemaMixin
from jsonschema.validators import validate
from airbyte_cdk.sources.declarative.types import ConnectionDefinition
@dataclass
class ConcreteDeclarativeSource(JsonSchemaMixin):
version: str
checker: CheckStream
streams: List[DeclarativeStream]
class YamlDeclarativeSource(DeclarativeSource):
class YamlDeclarativeSource(ManifestDeclarativeSource):
"""Declarative source defined by a yaml file"""
VALID_TOP_LEVEL_FIELDS = {"check", "definitions", "spec", "streams", "version"}
def __init__(self, path_to_yaml):
"""
:param path_to_yaml: Path to the yaml file describing the source
"""
self.logger = logging.getLogger(f"airbyte.{self.name}")
self._factory = DeclarativeComponentFactory()
self._path_to_yaml = path_to_yaml
self._source_config = self._read_and_parse_yaml_file(path_to_yaml)
source_config = self._read_and_parse_yaml_file(path_to_yaml)
super().__init__(source_config)
self._validate_source()
# Stopgap to protect the top-level namespace until it's validated through the schema
unknown_fields = [key for key in self._source_config.keys() if key not in self.VALID_TOP_LEVEL_FIELDS]
if unknown_fields:
raise InvalidConnectorDefinitionException(f"Found unknown top-level fields: {unknown_fields}")
@property
def connection_checker(self) -> ConnectionChecker:
check = self._source_config["check"]
if "class_name" not in check:
check["class_name"] = "airbyte_cdk.sources.declarative.checks.check_stream.CheckStream"
return self._factory.create_component(check, dict())(source=self)
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
self.logger.debug(
"parsed YAML into declarative source",
extra={"path_to_yaml_file": self._path_to_yaml, "source_name": self.name, "parsed_config": json.dumps(self._source_config)},
)
source_streams = [self._factory.create_component(stream_config, config, True)() for stream_config in self._stream_configs()]
for stream in source_streams:
# make sure the log level is always appied to the stream's logger
self._apply_log_level_to_stream_logger(self.logger, stream)
return source_streams
def spec(self, logger: logging.Logger) -> ConnectorSpecification:
"""
Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
in the project root.
"""
self.logger.debug(
"parsed YAML into declarative source",
extra={"path_to_yaml_file": self._path_to_yaml, "source_name": self.name, "parsed_config": json.dumps(self._source_config)},
)
spec = self._source_config.get("spec")
if spec:
if "class_name" not in spec:
spec["class_name"] = "airbyte_cdk.sources.declarative.spec.Spec"
spec_component = self._factory.create_component(spec, dict())()
return spec_component.generate_spec()
else:
return super().spec(logger)
def _read_and_parse_yaml_file(self, path_to_yaml_file):
def _read_and_parse_yaml_file(self, path_to_yaml_file) -> ConnectionDefinition:
package = self.__class__.__module__.split(".")[0]
yaml_config = pkgutil.get_data(package, path_to_yaml_file)
decoded_yaml = yaml_config.decode()
return YamlParser().parse(decoded_yaml)
def _validate_source(self):
full_config = {}
if "version" in self._source_config:
full_config["version"] = self._source_config["version"]
if "check" in self._source_config:
full_config["checker"] = self._source_config["check"]
streams = [self._factory.create_component(stream_config, {}, False)() for stream_config in self._stream_configs()]
if len(streams) > 0:
full_config["streams"] = streams
declarative_source_schema = ConcreteDeclarativeSource.json_schema()
validate(full_config, declarative_source_schema)
def _stream_configs(self):
stream_configs = self._source_config.get("streams", [])
for s in stream_configs:
if "class_name" not in s:
s["class_name"] = "airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream"
return stream_configs
@staticmethod
def generate_schema() -> str:
expanded_source_definition = YamlDeclarativeSource.expand_schema_interfaces(ConcreteDeclarativeSource, {})
expanded_schema = expanded_source_definition.json_schema()
return json.dumps(expanded_schema, cls=SchemaEncoder)
@staticmethod
def expand_schema_interfaces(expand_class: type, visited: dict) -> type:
"""
Recursive function that takes in class type that will have its interface fields unpacked and expended and then recursively
attempt the same expansion on all the class' underlying fields that are declarative component. It also performs expansion
with respect to interfaces that are contained within generic data types.
:param expand_class: The declarative component class that will have its interface fields expanded
:param visited: cache used to store a record of already visited declarative classes that have already been seen
:return: The expanded declarative component
"""
# Recursive base case to stop recursion if we have already expanded an interface in case of cyclical components
# like CompositeErrorHandler
if expand_class.__name__ in visited:
return visited[expand_class.__name__]
visited[expand_class.__name__] = expand_class
next_classes = []
class_fields = fields(expand_class)
for field in class_fields:
unpacked_field_types = DeclarativeComponentFactory.unpack(field.type)
expand_class.__annotations__[field.name] = unpacked_field_types
next_classes.extend(YamlDeclarativeSource._get_next_expand_classes(field.type))
for next_class in next_classes:
YamlDeclarativeSource.expand_schema_interfaces(next_class, visited)
return expand_class
@staticmethod
def _get_next_expand_classes(field_type) -> list[type]:
"""
Parses through a given field type and assembles a list of all underlying declarative components. For a concrete declarative class
it will return itself. For a declarative interface it will return its subclasses. For declarative components in a generic type
it will return the unpacked classes. Any non-declarative types will be skipped.
:param field_type: A field type that
:return:
"""
generic_type = typing.get_origin(field_type)
if generic_type is None:
# We can only continue parsing declarative that inherit from the JsonSchemaMixin class because it is used
# to generate the final json schema
if inspect.isclass(field_type) and issubclass(field_type, JsonSchemaMixin) and not isinstance(field_type, EnumMeta):
subclasses = field_type.__subclasses__()
if subclasses:
return subclasses
else:
return [field_type]
elif generic_type == list or generic_type == Union:
next_classes = []
for underlying_type in typing.get_args(field_type):
next_classes.extend(YamlDeclarativeSource._get_next_expand_classes(underlying_type))
return next_classes
return []
class SchemaEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, property) or isinstance(obj, Enum):
return str(obj)
return json.JSONEncoder.default(self, obj)
def _emit_manifest_debug_message(self, extra_args: dict):
extra_args["path_to_yaml"] = self._path_to_yaml
self.logger.debug("declarative source created from parsed YAML manifest", extra=extra_args)

View File

@@ -15,7 +15,7 @@ README = (HERE / "README.md").read_text()
setup(
name="airbyte-cdk",
version="0.7.0",
version="0.7.1",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",

View File

@@ -0,0 +1,601 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
import json
import logging
import os
import sys
import pytest
import yaml
from airbyte_cdk.sources.declarative.exceptions import InvalidConnectorDefinitionException
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from jsonschema.exceptions import ValidationError
logger = logging.getLogger("airbyte")
EXTERNAL_CONNECTION_SPECIFICATION = {
"type": "object",
"required": ["api_token"],
"additionalProperties": False,
"properties": {"api_token": {"type": "string"}},
}
class MockManifestDeclarativeSource(ManifestDeclarativeSource):
"""
Mock test class that is needed to monkey patch how we read from various files that make up a declarative source because of how our
tests write configuration files during testing. It is also used to properly namespace where files get written in specific
cases like when we temporarily write files like spec.yaml to the package unit_tests, which is the directory where it will
be read in during the tests.
"""
class TestYamlDeclarativeSource:
@pytest.fixture
def use_external_yaml_spec(self):
# Our way of resolving the absolute path to root of the airbyte-cdk unit test directory where spec.yaml files should
# be written to (i.e. ~/airbyte/airbyte-cdk/python/unit-tests) because that is where they are read from during testing.
module = sys.modules[__name__]
module_path = os.path.abspath(module.__file__)
test_path = os.path.dirname(module_path)
spec_root = test_path.split("/sources/declarative")[0]
spec = {"documentationUrl": "https://airbyte.com/#yaml-from-external", "connectionSpecification": EXTERNAL_CONNECTION_SPECIFICATION}
yaml_path = os.path.join(spec_root, "spec.yaml")
with open(yaml_path, "w") as f:
f.write(yaml.dump(spec))
yield
os.remove(yaml_path)
def test_valid_manifest(self):
manifest = {
"version": "version",
"definitions": {
"schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"},
"retriever": {
"paginator": {
"type": "DefaultPaginator",
"page_size": 10,
"page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"},
"page_token_option": {"inject_into": "path"},
"pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"},
},
"requester": {
"path": "/v3/marketing/lists",
"authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"},
"request_parameters": {"page_size": 10},
},
"record_selector": {"extractor": {"field_pointer": ["result"]}},
},
},
"streams": [
{
"type": "DeclarativeStream",
"$options": {"name": "lists", "primary_key": "id", "url_base": "https://api.sendgrid.com"},
"schema_loader": {
"name": "{{ options.stream_name }}",
"file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml",
},
"retriever": {
"paginator": {
"type": "DefaultPaginator",
"page_size": 10,
"page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"},
"page_token_option": {"inject_into": "path"},
"pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"},
},
"requester": {
"path": "/v3/marketing/lists",
"authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"},
"request_parameters": {"page_size": 10},
},
"record_selector": {"extractor": {"field_pointer": ["result"]}},
},
}
],
"check": {"type": "CheckStream", "stream_names": ["lists"]},
}
ManifestDeclarativeSource(source_config=manifest)
def test_manifest_with_spec(self):
manifest = {
"version": "version",
"definitions": {
"schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"},
"retriever": {
"paginator": {
"type": "DefaultPaginator",
"page_size": 10,
"page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"},
"page_token_option": {"inject_into": "path"},
"pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"},
},
"requester": {
"path": "/v3/marketing/lists",
"authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"},
"request_parameters": {"page_size": 10},
},
"record_selector": {"extractor": {"field_pointer": ["result"]}},
},
},
"streams": [
{
"type": "DeclarativeStream",
"$options": {"name": "lists", "primary_key": "id", "url_base": "https://api.sendgrid.com"},
"schema_loader": {
"name": "{{ options.stream_name }}",
"file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml",
},
"retriever": {
"paginator": {
"type": "DefaultPaginator",
"page_size": 10,
"page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"},
"page_token_option": {"inject_into": "path"},
"pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"},
},
"requester": {
"path": "/v3/marketing/lists",
"authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"},
"request_parameters": {"page_size": 10},
},
"record_selector": {"extractor": {"field_pointer": ["result"]}},
},
}
],
"check": {"type": "CheckStream", "stream_names": ["lists"]},
"spec": {
"type": "Spec",
"documentation_url": "https://airbyte.com/#yaml-from-manifest",
"connection_specification": {
"title": "Test Spec",
"type": "object",
"required": ["api_key"],
"additionalProperties": False,
"properties": {
"api_key": {"type": "string", "airbyte_secret": True, "title": "API Key", "description": "Test API Key", "order": 0}
},
},
},
}
source = ManifestDeclarativeSource(source_config=manifest)
connector_specification = source.spec(logger)
assert connector_specification is not None
assert connector_specification.documentationUrl == "https://airbyte.com/#yaml-from-manifest"
assert connector_specification.connectionSpecification["title"] == "Test Spec"
assert connector_specification.connectionSpecification["required"][0] == "api_key"
assert connector_specification.connectionSpecification["additionalProperties"] is False
assert connector_specification.connectionSpecification["properties"]["api_key"] == {
"type": "string",
"airbyte_secret": True,
"title": "API Key",
"description": "Test API Key",
"order": 0,
}
def test_manifest_with_external_spec(self, use_external_yaml_spec):
manifest = {
"version": "version",
"definitions": {
"schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"},
"retriever": {
"paginator": {
"type": "DefaultPaginator",
"page_size": 10,
"page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"},
"page_token_option": {"inject_into": "path"},
"pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"},
},
"requester": {
"path": "/v3/marketing/lists",
"authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"},
"request_parameters": {"page_size": 10},
},
"record_selector": {"extractor": {"field_pointer": ["result"]}},
},
},
"streams": [
{
"type": "DeclarativeStream",
"$options": {"name": "lists", "primary_key": "id", "url_base": "https://api.sendgrid.com"},
"schema_loader": {
"name": "{{ options.stream_name }}",
"file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml",
},
"retriever": {
"paginator": {
"type": "DefaultPaginator",
"page_size": 10,
"page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"},
"page_token_option": {"inject_into": "path"},
"pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"},
},
"requester": {
"path": "/v3/marketing/lists",
"authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"},
"request_parameters": {"page_size": 10},
},
"record_selector": {"extractor": {"field_pointer": ["result"]}},
},
}
],
"check": {"type": "CheckStream", "stream_names": ["lists"]},
}
source = MockManifestDeclarativeSource(source_config=manifest)
connector_specification = source.spec(logger)
assert connector_specification.documentationUrl == "https://airbyte.com/#yaml-from-external"
assert connector_specification.connectionSpecification == EXTERNAL_CONNECTION_SPECIFICATION
def test_source_is_not_created_if_toplevel_fields_are_unknown(self):
manifest = {
"version": "version",
"definitions": {
"schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"},
"retriever": {
"paginator": {
"type": "DefaultPaginator",
"page_size": 10,
"page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"},
"page_token_option": {"inject_into": "path"},
"pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"},
},
"requester": {
"path": "/v3/marketing/lists",
"authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"},
"request_parameters": {"page_size": 10},
},
"record_selector": {"extractor": {"field_pointer": ["result"]}},
},
},
"streams": [
{
"type": "DeclarativeStream",
"$options": {"name": "lists", "primary_key": "id", "url_base": "https://api.sendgrid.com"},
"schema_loader": {
"name": "{{ options.stream_name }}",
"file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml",
},
"retriever": {
"paginator": {
"type": "DefaultPaginator",
"page_size": 10,
"page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"},
"page_token_option": {"inject_into": "path"},
"pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"},
},
"requester": {
"path": "/v3/marketing/lists",
"authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"},
"request_parameters": {"page_size": 10},
},
"record_selector": {"extractor": {"field_pointer": ["result"]}},
},
}
],
"check": {"type": "CheckStream", "stream_names": ["lists"]},
"not_a_valid_field": "error",
}
with pytest.raises(InvalidConnectorDefinitionException):
ManifestDeclarativeSource(manifest)
def test_source_missing_checker_fails_validation(self):
manifest = {
"version": "version",
"definitions": {
"schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"},
"retriever": {
"paginator": {
"type": "DefaultPaginator",
"page_size": 10,
"page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"},
"page_token_option": {"inject_into": "path"},
"pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"},
},
"requester": {
"path": "/v3/marketing/lists",
"authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"},
"request_parameters": {"page_size": 10},
},
"record_selector": {"extractor": {"field_pointer": ["result"]}},
},
},
"streams": [
{
"type": "DeclarativeStream",
"$options": {"name": "lists", "primary_key": "id", "url_base": "https://api.sendgrid.com"},
"schema_loader": {
"name": "{{ options.stream_name }}",
"file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml",
},
"retriever": {
"paginator": {
"type": "DefaultPaginator",
"page_size": 10,
"page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"},
"page_token_option": {"inject_into": "path"},
"pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"},
},
"requester": {
"path": "/v3/marketing/lists",
"authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"},
"request_parameters": {"page_size": 10},
},
"record_selector": {"extractor": {"field_pointer": ["result"]}},
},
}
],
"check": {"type": "CheckStream"},
}
with pytest.raises(ValidationError):
ManifestDeclarativeSource(source_config=manifest)
def test_source_with_missing_streams_fails(self):
manifest = {"version": "version", "definitions": None, "check": {"type": "CheckStream", "stream_names": ["lists"]}}
with pytest.raises(ValidationError):
ManifestDeclarativeSource(manifest)
def test_source_with_missing_version_fails(self):
manifest = {
"definitions": {
"schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"},
"retriever": {
"paginator": {
"type": "DefaultPaginator",
"page_size": 10,
"page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"},
"page_token_option": {"inject_into": "path"},
"pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"},
},
"requester": {
"path": "/v3/marketing/lists",
"authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"},
"request_parameters": {"page_size": 10},
},
"record_selector": {"extractor": {"field_pointer": ["result"]}},
},
},
"streams": [
{
"type": "DeclarativeStream",
"$options": {"name": "lists", "primary_key": "id", "url_base": "https://api.sendgrid.com"},
"schema_loader": {
"name": "{{ options.stream_name }}",
"file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml",
},
"retriever": {
"paginator": {
"type": "DefaultPaginator",
"page_size": 10,
"page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"},
"page_token_option": {"inject_into": "path"},
"pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"},
},
"requester": {
"path": "/v3/marketing/lists",
"authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"},
"request_parameters": {"page_size": 10},
},
"record_selector": {"extractor": {"field_pointer": ["result"]}},
},
}
],
"check": {"type": "CheckStream", "stream_names": ["lists"]},
}
with pytest.raises(ValidationError):
ManifestDeclarativeSource(manifest)
def test_source_with_invalid_stream_config_fails_validation(self):
manifest = {
"version": "version",
"definitions": {
"schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"}
},
"streams": [
{
"type": "DeclarativeStream",
"$options": {"name": "lists", "primary_key": "id", "url_base": "https://api.sendgrid.com"},
"schema_loader": {
"name": "{{ options.stream_name }}",
"file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml",
},
}
],
"check": {"type": "CheckStream", "stream_names": ["lists"]},
}
with pytest.raises(ValidationError):
ManifestDeclarativeSource(manifest)
def test_source_with_no_external_spec_and_no_in_yaml_spec_fails(self):
manifest = {
"version": "version",
"definitions": {
"schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"},
"retriever": {
"paginator": {
"type": "DefaultPaginator",
"page_size": 10,
"page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"},
"page_token_option": {"inject_into": "path"},
"pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"},
},
"requester": {
"path": "/v3/marketing/lists",
"authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"},
"request_parameters": {"page_size": 10},
},
"record_selector": {"extractor": {"field_pointer": ["result"]}},
},
},
"streams": [
{
"type": "DeclarativeStream",
"$options": {"name": "lists", "primary_key": "id", "url_base": "https://api.sendgrid.com"},
"schema_loader": {
"name": "{{ options.stream_name }}",
"file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml",
},
"retriever": {
"paginator": {
"type": "DefaultPaginator",
"page_size": 10,
"page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"},
"page_token_option": {"inject_into": "path"},
"pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"},
},
"requester": {
"path": "/v3/marketing/lists",
"authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"},
"request_parameters": {"page_size": 10},
},
"record_selector": {"extractor": {"field_pointer": ["result"]}},
},
}
],
"check": {"type": "CheckStream", "stream_names": ["lists"]},
}
source = ManifestDeclarativeSource(source_config=manifest)
# We expect to fail here because we have not created a temporary spec.yaml file
with pytest.raises(FileNotFoundError):
source.spec(logger)
def test_generate_schema():
schema_str = ManifestDeclarativeSource.generate_schema()
schema = json.loads(schema_str)
assert "version" in schema["required"]
assert "checker" in schema["required"]
assert "streams" in schema["required"]
assert schema["properties"]["checker"]["$ref"] == "#/definitions/CheckStream"
assert schema["properties"]["streams"]["items"]["$ref"] == "#/definitions/DeclarativeStream"
check_stream = schema["definitions"]["CheckStream"]
assert {"stream_names"}.issubset(check_stream["required"])
assert check_stream["properties"]["stream_names"]["type"] == "array"
assert check_stream["properties"]["stream_names"]["items"]["type"] == "string"
declarative_stream = schema["definitions"]["DeclarativeStream"]
assert {"retriever", "config"}.issubset(declarative_stream["required"])
assert {"$ref": "#/definitions/DefaultSchemaLoader"} in declarative_stream["properties"]["schema_loader"]["anyOf"]
assert {"$ref": "#/definitions/JsonFileSchemaLoader"} in declarative_stream["properties"]["schema_loader"]["anyOf"]
assert declarative_stream["properties"]["retriever"]["$ref"] == "#/definitions/SimpleRetriever"
assert declarative_stream["properties"]["name"]["type"] == "string"
assert {"type": "array", "items": {"type": "string"}} in declarative_stream["properties"]["primary_key"]["anyOf"]
assert {"type": "array", "items": {"type": "array", "items": {"type": "string"}}} in declarative_stream["properties"]["primary_key"][
"anyOf"
]
assert {"type": "string"} in declarative_stream["properties"]["primary_key"]["anyOf"]
assert {"type": "array", "items": {"type": "string"}} in declarative_stream["properties"]["stream_cursor_field"]["anyOf"]
assert {"type": "string"} in declarative_stream["properties"]["stream_cursor_field"]["anyOf"]
assert declarative_stream["properties"]["transformations"]["type"] == "array"
assert {"$ref": "#/definitions/AddFields"} in declarative_stream["properties"]["transformations"]["items"]["anyOf"]
assert {"$ref": "#/definitions/RemoveFields"} in declarative_stream["properties"]["transformations"]["items"]["anyOf"]
assert declarative_stream["properties"]["checkpoint_interval"]["type"] == "integer"
simple_retriever = schema["definitions"]["SimpleRetriever"]["allOf"][1]
assert {"requester", "record_selector"}.issubset(simple_retriever["required"])
assert simple_retriever["properties"]["requester"]["$ref"] == "#/definitions/HttpRequester"
assert simple_retriever["properties"]["record_selector"]["$ref"] == "#/definitions/RecordSelector"
assert simple_retriever["properties"]["name"]["type"] == "string"
assert {"type": "array", "items": {"type": "string"}} in declarative_stream["properties"]["primary_key"]["anyOf"]
assert {"type": "array", "items": {"type": "array", "items": {"type": "string"}}} in declarative_stream["properties"]["primary_key"][
"anyOf"
]
assert {"type": "string"} in declarative_stream["properties"]["primary_key"]["anyOf"]
assert {"$ref": "#/definitions/DefaultPaginator"} in simple_retriever["properties"]["paginator"]["anyOf"]
assert {"$ref": "#/definitions/NoPagination"} in simple_retriever["properties"]["paginator"]["anyOf"]
assert {"$ref": "#/definitions/CartesianProductStreamSlicer"} in simple_retriever["properties"]["stream_slicer"]["anyOf"]
assert {"$ref": "#/definitions/DatetimeStreamSlicer"} in simple_retriever["properties"]["stream_slicer"]["anyOf"]
assert {"$ref": "#/definitions/ListStreamSlicer"} in simple_retriever["properties"]["stream_slicer"]["anyOf"]
assert {"$ref": "#/definitions/SingleSlice"} in simple_retriever["properties"]["stream_slicer"]["anyOf"]
assert {"$ref": "#/definitions/SubstreamSlicer"} in simple_retriever["properties"]["stream_slicer"]["anyOf"]
http_requester = schema["definitions"]["HttpRequester"]["allOf"][1]
assert {"name", "url_base", "path", "config"}.issubset(http_requester["required"])
assert http_requester["properties"]["name"]["type"] == "string"
assert {"$ref": "#/definitions/InterpolatedString"} in http_requester["properties"]["url_base"]["anyOf"]
assert {"type": "string"} in http_requester["properties"]["path"]["anyOf"]
assert {"$ref": "#/definitions/InterpolatedString"} in http_requester["properties"]["url_base"]["anyOf"]
assert {"type": "string"} in http_requester["properties"]["path"]["anyOf"]
assert {"type": "string"} in http_requester["properties"]["http_method"]["anyOf"]
assert {"type": "string", "enum": ["GET", "POST"]} in http_requester["properties"]["http_method"]["anyOf"]
assert http_requester["properties"]["request_options_provider"]["$ref"] == "#/definitions/InterpolatedRequestOptionsProvider"
assert {"$ref": "#/definitions/DeclarativeOauth2Authenticator"} in http_requester["properties"]["authenticator"]["anyOf"]
assert {"$ref": "#/definitions/ApiKeyAuthenticator"} in http_requester["properties"]["authenticator"]["anyOf"]
assert {"$ref": "#/definitions/BearerAuthenticator"} in http_requester["properties"]["authenticator"]["anyOf"]
assert {"$ref": "#/definitions/BasicHttpAuthenticator"} in http_requester["properties"]["authenticator"]["anyOf"]
assert {"$ref": "#/definitions/CompositeErrorHandler"} in http_requester["properties"]["error_handler"]["anyOf"]
assert {"$ref": "#/definitions/DefaultErrorHandler"} in http_requester["properties"]["error_handler"]["anyOf"]
api_key_authenticator = schema["definitions"]["ApiKeyAuthenticator"]["allOf"][1]
assert {"header", "api_token", "config"}.issubset(api_key_authenticator["required"])
assert {"$ref": "#/definitions/InterpolatedString"} in api_key_authenticator["properties"]["header"]["anyOf"]
assert {"type": "string"} in api_key_authenticator["properties"]["header"]["anyOf"]
assert {"$ref": "#/definitions/InterpolatedString"} in api_key_authenticator["properties"]["api_token"]["anyOf"]
assert {"type": "string"} in api_key_authenticator["properties"]["api_token"]["anyOf"]
default_error_handler = schema["definitions"]["DefaultErrorHandler"]["allOf"][1]
assert default_error_handler["properties"]["response_filters"]["type"] == "array"
assert default_error_handler["properties"]["response_filters"]["items"]["$ref"] == "#/definitions/HttpResponseFilter"
assert default_error_handler["properties"]["max_retries"]["type"] == "integer"
assert default_error_handler["properties"]["backoff_strategies"]["type"] == "array"
default_paginator = schema["definitions"]["DefaultPaginator"]["allOf"][1]
assert {"page_token_option", "pagination_strategy", "config", "url_base"}.issubset(default_paginator["required"])
assert default_paginator["properties"]["page_size_option"]["$ref"] == "#/definitions/RequestOption"
assert default_paginator["properties"]["page_token_option"]["$ref"] == "#/definitions/RequestOption"
assert {"$ref": "#/definitions/CursorPaginationStrategy"} in default_paginator["properties"]["pagination_strategy"]["anyOf"]
assert {"$ref": "#/definitions/OffsetIncrement"} in default_paginator["properties"]["pagination_strategy"]["anyOf"]
assert {"$ref": "#/definitions/PageIncrement"} in default_paginator["properties"]["pagination_strategy"]["anyOf"]
assert default_paginator["properties"]["decoder"]["$ref"] == "#/definitions/JsonDecoder"
assert {"$ref": "#/definitions/InterpolatedString"} in http_requester["properties"]["url_base"]["anyOf"]
assert {"type": "string"} in http_requester["properties"]["path"]["anyOf"]
cursor_pagination_strategy = schema["definitions"]["CursorPaginationStrategy"]["allOf"][1]
assert {"cursor_value", "config"}.issubset(cursor_pagination_strategy["required"])
assert {"$ref": "#/definitions/InterpolatedString"} in cursor_pagination_strategy["properties"]["cursor_value"]["anyOf"]
assert {"type": "string"} in cursor_pagination_strategy["properties"]["cursor_value"]["anyOf"]
assert {"$ref": "#/definitions/InterpolatedBoolean"} in cursor_pagination_strategy["properties"]["stop_condition"]["anyOf"]
assert {"type": "string"} in cursor_pagination_strategy["properties"]["stop_condition"]["anyOf"]
assert cursor_pagination_strategy["properties"]["decoder"]["$ref"] == "#/definitions/JsonDecoder"
list_stream_slicer = schema["definitions"]["ListStreamSlicer"]["allOf"][1]
assert {"slice_values", "cursor_field", "config"}.issubset(list_stream_slicer["required"])
assert {"type": "array", "items": {"type": "string"}} in list_stream_slicer["properties"]["slice_values"]["anyOf"]
assert {"type": "string"} in list_stream_slicer["properties"]["slice_values"]["anyOf"]
assert {"$ref": "#/definitions/InterpolatedString"} in list_stream_slicer["properties"]["cursor_field"]["anyOf"]
assert {"type": "string"} in list_stream_slicer["properties"]["cursor_field"]["anyOf"]
assert list_stream_slicer["properties"]["request_option"]["$ref"] == "#/definitions/RequestOption"
added_field_definition = schema["definitions"]["AddedFieldDefinition"]
assert {"path", "value"}.issubset(added_field_definition["required"])
assert added_field_definition["properties"]["path"]["type"] == "array"
assert added_field_definition["properties"]["path"]["items"]["type"] == "string"
assert {"$ref": "#/definitions/InterpolatedString"} in added_field_definition["properties"]["value"]["anyOf"]
assert {"type": "string"} in added_field_definition["properties"]["value"]["anyOf"]
# There is something very strange about JsonSchemaMixin.json_schema(). For some reason, when this test is called independently
# it will pass. However, when it is invoked with the entire test file, certain components won't get generated in the schema. Since
# the generate_schema() method is invoked by independently so this doesn't happen under normal circumstance when we generate the
# complete schema. It only happens when the tests are all called together.
# One way to replicate this is to add DefaultErrorHandler.json_schema() to the start of this test and uncomment the assertions below
# assert {"$ref": "#/definitions/ConstantBackoffStrategy"} in default_error_handler["properties"]["backoff_strategies"]["items"]["anyOf"]
# assert {"$ref": "#/definitions/ExponentialBackoffStrategy"} in default_error_handler["properties"]["backoff_strategies"]["items"][
# "anyOf"
# ]
# assert {"$ref": "#/definitions/WaitTimeFromHeaderBackoffStrategy"} in default_error_handler["properties"]["backoff_strategies"][
# "items"
# ]["anyOf"]
# assert {"$ref": "#/definitions/WaitUntilTimeFromHeaderBackoffStrategy"} in default_error_handler["properties"]["backoff_strategies"][
# "items"
# ]["anyOf"]
#
# exponential_backoff_strategy = schema["definitions"]["ExponentialBackoffStrategy"]["allOf"][1]
# assert exponential_backoff_strategy["properties"]["factor"]["type"] == "number"

View File

@@ -2,21 +2,19 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
import json
import logging
import os
import sys
import tempfile
import pytest
import yaml
from airbyte_cdk.sources.declarative.exceptions import InvalidConnectorDefinitionException
from airbyte_cdk.sources.declarative.parsers.undefined_reference_exception import UndefinedReferenceException
from airbyte_cdk.sources.declarative.parsers.yaml_parser import YamlParser
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from jsonschema import ValidationError
from yaml.parser import ParserError
logger = logging.getLogger("airbyte")
EXTERNAL_CONNECTION_SPECIFICATION = {
"type": "object",
"required": ["api_token"],
@@ -46,23 +44,6 @@ class MockYamlDeclarativeSource(YamlDeclarativeSource):
class TestYamlDeclarativeSource:
@pytest.fixture
def use_external_yaml_spec(self):
# Our way of resolving the absolute path to root of the airbyte-cdk unit test directory where spec.yaml files should
# be written to (i.e. ~/airbyte/airbyte-cdk/python/unit-tests) because that is where they are read from during testing.
module = sys.modules[__name__]
module_path = os.path.abspath(module.__file__)
test_path = os.path.dirname(module_path)
spec_root = test_path.split("/sources/declarative")[0]
spec = {"documentationUrl": "https://airbyte.com/#yaml-from-external", "connectionSpecification": EXTERNAL_CONNECTION_SPECIFICATION}
yaml_path = os.path.join(spec_root, "spec.yaml")
with open(yaml_path, "w") as f:
f.write(yaml.dump(spec))
yield
os.remove(yaml_path)
def test_source_is_created_if_toplevel_fields_are_known(self):
content = """
version: "version"
@@ -107,259 +88,32 @@ class TestYamlDeclarativeSource:
temporary_file = TestFileContent(content)
MockYamlDeclarativeSource(temporary_file.filename)
def test_source_with_spec_in_yaml(self):
def test_source_fails_for_invalid_yaml(self):
content = """
version: "version"
definitions:
schema_loader:
name: "{{ options.stream_name }}"
file_path: "./source_sendgrid/schemas/{{ options.name }}.yaml"
retriever:
paginator:
type: "DefaultPaginator"
page_size: 10
page_size_option:
inject_into: request_parameter
field_name: page_size
page_token_option:
inject_into: path
pagination_strategy:
type: "CursorPagination"
cursor_value: "{{ response._metadata.next }}"
requester:
path: "/v3/marketing/lists"
authenticator:
type: "BearerAuthenticator"
api_token: "{{ config.apikey }}"
request_parameters:
page_size: 10
record_selector:
extractor:
field_pointer: ["result"]
this is not parsable yaml: " at all
streams:
- type: DeclarativeStream
$options:
name: "lists"
primary_key: id
url_base: "https://api.sendgrid.com"
schema_loader: "*ref(definitions.schema_loader)"
retriever: "*ref(definitions.retriever)"
check:
type: CheckStream
stream_names: ["lists"]
spec:
type: Spec
documentation_url: https://airbyte.com/#yaml-from-manifest
connection_specification:
title: Test Spec
type: object
required:
- api_key
additionalProperties: false
properties:
api_key:
type: string
airbyte_secret: true
title: API Key
description: Test API Key
order: 0
"""
temporary_file = TestFileContent(content)
source = MockYamlDeclarativeSource(temporary_file.filename)
connector_specification = source.spec(logger)
assert connector_specification is not None
assert connector_specification.documentationUrl == "https://airbyte.com/#yaml-from-manifest"
assert connector_specification.connectionSpecification["title"] == "Test Spec"
assert connector_specification.connectionSpecification["required"][0] == "api_key"
assert connector_specification.connectionSpecification["additionalProperties"] is False
assert connector_specification.connectionSpecification["properties"]["api_key"] == {
"type": "string",
"airbyte_secret": True,
"title": "API Key",
"description": "Test API Key",
"order": 0,
}
def test_source_with_external_spec(self, use_external_yaml_spec):
content = """
version: "version"
definitions:
schema_loader:
name: "{{ options.stream_name }}"
file_path: "./source_sendgrid/schemas/{{ options.name }}.yaml"
retriever:
paginator:
type: "DefaultPaginator"
page_size: 10
page_size_option:
inject_into: request_parameter
field_name: page_size
page_token_option:
inject_into: path
pagination_strategy:
type: "CursorPagination"
cursor_value: "{{ response._metadata.next }}"
requester:
path: "/v3/marketing/lists"
authenticator:
type: "BearerAuthenticator"
api_token: "{{ config.apikey }}"
request_parameters:
page_size: 10
record_selector:
extractor:
field_pointer: ["result"]
streams:
- type: DeclarativeStream
$options:
name: "lists"
primary_key: id
url_base: "https://api.sendgrid.com"
schema_loader: "*ref(definitions.schema_loader)"
retriever: "*ref(definitions.retriever)"
check:
type: CheckStream
stream_names: ["lists"]
"""
temporary_file = TestFileContent(content)
source = MockYamlDeclarativeSource(temporary_file.filename)
connector_specification = source.spec(logger)
assert connector_specification.documentationUrl == "https://airbyte.com/#yaml-from-external"
assert connector_specification.connectionSpecification == EXTERNAL_CONNECTION_SPECIFICATION
def test_source_is_not_created_if_toplevel_fields_are_unknown(self):
content = """
version: "version"
definitions:
schema_loader:
name: "{{ options.stream_name }}"
file_path: "./source_sendgrid/schemas/{{ options.name }}.yaml"
retriever:
paginator:
type: "DefaultPaginator"
page_size: 10
page_size_option:
inject_into: request_parameter
field_name: page_size
page_token_option:
inject_into: path
pagination_strategy:
type: "CursorPagination"
cursor_value: "{{ response._metadata.next }}"
requester:
path: "/v3/marketing/lists"
authenticator:
type: "BearerAuthenticator"
api_token: "{{ config.apikey }}"
request_parameters:
page_size: 10
record_selector:
extractor:
field_pointer: ["result"]
streams:
- type: DeclarativeStream
$options:
name: "lists"
primary_key: id
url_base: "https://api.sendgrid.com"
schema_loader: "*ref(definitions.schema_loader)"
retriever: "*ref(definitions.retriever)"
check:
type: CheckStream
stream_names: ["lists"]
not_a_valid_field: "error"
"""
temporary_file = TestFileContent(content)
with pytest.raises(InvalidConnectorDefinitionException):
with pytest.raises(ParserError):
MockYamlDeclarativeSource(temporary_file.filename)
def test_source_missing_checker_fails_validation(self):
def test_source_with_missing_reference_fails(self):
content = """
version: "version"
definitions:
schema_loader:
name: "{{ options.stream_name }}"
file_path: "./source_sendgrid/schemas/{{ options.name }}.yaml"
retriever:
paginator:
type: "DefaultPaginator"
page_size: 10
page_size_option:
inject_into: request_parameter
field_name: page_size
page_token_option:
inject_into: path
pagination_strategy:
type: "CursorPagination"
cursor_value: "{{ response._metadata.next }}"
requester:
path: "/v3/marketing/lists"
authenticator:
type: "BearerAuthenticator"
api_token: "{{ config.apikey }}"
request_parameters:
page_size: 10
record_selector:
extractor:
field_pointer: ["result"]
streams:
- type: DeclarativeStream
$options:
name: "lists"
primary_key: id
url_base: "https://api.sendgrid.com"
schema_loader: "*ref(definitions.schema_loader)"
retriever: "*ref(definitions.retriever)"
check:
type: CheckStream
"""
temporary_file = TestFileContent(content)
with pytest.raises(ValidationError):
MockYamlDeclarativeSource(temporary_file.filename)
def test_source_with_missing_streams_fails(self):
content = """
version: "version"
definitions:
check:
type: CheckStream
stream_names: ["lists"]
"""
temporary_file = TestFileContent(content)
with pytest.raises(ValidationError):
MockYamlDeclarativeSource(temporary_file.filename)
def test_source_with_missing_version_fails(self):
content = """
definitions:
schema_loader:
name: "{{ options.stream_name }}"
file_path: "./source_sendgrid/schemas/{{ options.name }}.yaml"
retriever:
paginator:
type: "DefaultPaginator"
page_size: 10
page_size_option:
inject_into: request_parameter
field_name: page_size
page_token_option:
inject_into: path
pagination_strategy:
type: "CursorPagination"
cursor_value: "{{ response._metadata.next }}"
requester:
path: "/v3/marketing/lists"
authenticator:
type: "BearerAuthenticator"
api_token: "{{ config.apikey }}"
request_parameters:
page_size: 10
record_selector:
extractor:
field_pointer: ["result"]
streams:
- type: DeclarativeStream
$options:
@@ -373,79 +127,9 @@ class TestYamlDeclarativeSource:
stream_names: ["lists"]
"""
temporary_file = TestFileContent(content)
with pytest.raises(ValidationError):
with pytest.raises(UndefinedReferenceException):
MockYamlDeclarativeSource(temporary_file.filename)
def test_source_with_invalid_stream_config_fails_validation(self):
content = """
version: "version"
definitions:
schema_loader:
name: "{{ options.stream_name }}"
file_path: "./source_sendgrid/schemas/{{ options.name }}.yaml"
streams:
- type: DeclarativeStream
$options:
name: "lists"
primary_key: id
url_base: "https://api.sendgrid.com"
schema_loader: "*ref(definitions.schema_loader)"
check:
type: CheckStream
stream_names: ["lists"]
"""
temporary_file = TestFileContent(content)
with pytest.raises(ValidationError):
MockYamlDeclarativeSource(temporary_file.filename)
def test_source_with_no_external_spec_and_no_in_yaml_spec_fails(self):
content = """
version: "version"
definitions:
schema_loader:
name: "{{ options.stream_name }}"
file_path: "./source_sendgrid/schemas/{{ options.name }}.yaml"
retriever:
paginator:
type: "DefaultPaginator"
page_size: 10
page_size_option:
inject_into: request_parameter
field_name: page_size
page_token_option:
inject_into: path
pagination_strategy:
type: "CursorPagination"
cursor_value: "{{ response._metadata.next }}"
requester:
path: "/v3/marketing/lists"
authenticator:
type: "BearerAuthenticator"
api_token: "{{ config.apikey }}"
request_parameters:
page_size: 10
record_selector:
extractor:
field_pointer: ["result"]
streams:
- type: DeclarativeStream
$options:
name: "lists"
primary_key: id
url_base: "https://api.sendgrid.com"
schema_loader: "*ref(definitions.schema_loader)"
retriever: "*ref(definitions.retriever)"
check:
type: CheckStream
stream_names: ["lists"]
"""
temporary_file = TestFileContent(content)
source = MockYamlDeclarativeSource(temporary_file.filename)
# We expect to fail here because we have not created a temporary spec.yaml file
with pytest.raises(FileNotFoundError):
source.spec(logger)
class TestFileContent:
def __init__(self, content):
@@ -463,139 +147,3 @@ class TestFileContent:
def __exit__(self, type, value, traceback):
os.unlink(self.filename)
def test_generate_schema():
schema_str = YamlDeclarativeSource.generate_schema()
schema = json.loads(schema_str)
assert "version" in schema["required"]
assert "checker" in schema["required"]
assert "streams" in schema["required"]
assert schema["properties"]["checker"]["$ref"] == "#/definitions/CheckStream"
assert schema["properties"]["streams"]["items"]["$ref"] == "#/definitions/DeclarativeStream"
check_stream = schema["definitions"]["CheckStream"]
assert {"stream_names"}.issubset(check_stream["required"])
assert check_stream["properties"]["stream_names"]["type"] == "array"
assert check_stream["properties"]["stream_names"]["items"]["type"] == "string"
declarative_stream = schema["definitions"]["DeclarativeStream"]
assert {"retriever", "config"}.issubset(declarative_stream["required"])
assert {"$ref": "#/definitions/DefaultSchemaLoader"} in declarative_stream["properties"]["schema_loader"]["anyOf"]
assert {"$ref": "#/definitions/JsonFileSchemaLoader"} in declarative_stream["properties"]["schema_loader"]["anyOf"]
assert declarative_stream["properties"]["retriever"]["$ref"] == "#/definitions/SimpleRetriever"
assert declarative_stream["properties"]["name"]["type"] == "string"
assert {"type": "array", "items": {"type": "string"}} in declarative_stream["properties"]["primary_key"]["anyOf"]
assert {"type": "array", "items": {"type": "array", "items": {"type": "string"}}} in declarative_stream["properties"]["primary_key"][
"anyOf"
]
assert {"type": "string"} in declarative_stream["properties"]["primary_key"]["anyOf"]
assert {"type": "array", "items": {"type": "string"}} in declarative_stream["properties"]["stream_cursor_field"]["anyOf"]
assert {"type": "string"} in declarative_stream["properties"]["stream_cursor_field"]["anyOf"]
assert declarative_stream["properties"]["transformations"]["type"] == "array"
assert {"$ref": "#/definitions/AddFields"} in declarative_stream["properties"]["transformations"]["items"]["anyOf"]
assert {"$ref": "#/definitions/RemoveFields"} in declarative_stream["properties"]["transformations"]["items"]["anyOf"]
assert declarative_stream["properties"]["checkpoint_interval"]["type"] == "integer"
simple_retriever = schema["definitions"]["SimpleRetriever"]["allOf"][1]
assert {"requester", "record_selector"}.issubset(simple_retriever["required"])
assert simple_retriever["properties"]["requester"]["$ref"] == "#/definitions/HttpRequester"
assert simple_retriever["properties"]["record_selector"]["$ref"] == "#/definitions/RecordSelector"
assert simple_retriever["properties"]["name"]["type"] == "string"
assert {"type": "array", "items": {"type": "string"}} in declarative_stream["properties"]["primary_key"]["anyOf"]
assert {"type": "array", "items": {"type": "array", "items": {"type": "string"}}} in declarative_stream["properties"]["primary_key"][
"anyOf"
]
assert {"type": "string"} in declarative_stream["properties"]["primary_key"]["anyOf"]
assert {"$ref": "#/definitions/DefaultPaginator"} in simple_retriever["properties"]["paginator"]["anyOf"]
assert {"$ref": "#/definitions/NoPagination"} in simple_retriever["properties"]["paginator"]["anyOf"]
assert {"$ref": "#/definitions/CartesianProductStreamSlicer"} in simple_retriever["properties"]["stream_slicer"]["anyOf"]
assert {"$ref": "#/definitions/DatetimeStreamSlicer"} in simple_retriever["properties"]["stream_slicer"]["anyOf"]
assert {"$ref": "#/definitions/ListStreamSlicer"} in simple_retriever["properties"]["stream_slicer"]["anyOf"]
assert {"$ref": "#/definitions/SingleSlice"} in simple_retriever["properties"]["stream_slicer"]["anyOf"]
assert {"$ref": "#/definitions/SubstreamSlicer"} in simple_retriever["properties"]["stream_slicer"]["anyOf"]
http_requester = schema["definitions"]["HttpRequester"]["allOf"][1]
assert {"name", "url_base", "path", "config"}.issubset(http_requester["required"])
assert http_requester["properties"]["name"]["type"] == "string"
assert {"$ref": "#/definitions/InterpolatedString"} in http_requester["properties"]["url_base"]["anyOf"]
assert {"type": "string"} in http_requester["properties"]["path"]["anyOf"]
assert {"$ref": "#/definitions/InterpolatedString"} in http_requester["properties"]["url_base"]["anyOf"]
assert {"type": "string"} in http_requester["properties"]["path"]["anyOf"]
assert {"type": "string"} in http_requester["properties"]["http_method"]["anyOf"]
assert {"type": "string", "enum": ["GET", "POST"]} in http_requester["properties"]["http_method"]["anyOf"]
assert http_requester["properties"]["request_options_provider"]["$ref"] == "#/definitions/InterpolatedRequestOptionsProvider"
assert {"$ref": "#/definitions/DeclarativeOauth2Authenticator"} in http_requester["properties"]["authenticator"]["anyOf"]
assert {"$ref": "#/definitions/ApiKeyAuthenticator"} in http_requester["properties"]["authenticator"]["anyOf"]
assert {"$ref": "#/definitions/BearerAuthenticator"} in http_requester["properties"]["authenticator"]["anyOf"]
assert {"$ref": "#/definitions/BasicHttpAuthenticator"} in http_requester["properties"]["authenticator"]["anyOf"]
assert {"$ref": "#/definitions/CompositeErrorHandler"} in http_requester["properties"]["error_handler"]["anyOf"]
assert {"$ref": "#/definitions/DefaultErrorHandler"} in http_requester["properties"]["error_handler"]["anyOf"]
api_key_authenticator = schema["definitions"]["ApiKeyAuthenticator"]["allOf"][1]
assert {"header", "api_token", "config"}.issubset(api_key_authenticator["required"])
assert {"$ref": "#/definitions/InterpolatedString"} in api_key_authenticator["properties"]["header"]["anyOf"]
assert {"type": "string"} in api_key_authenticator["properties"]["header"]["anyOf"]
assert {"$ref": "#/definitions/InterpolatedString"} in api_key_authenticator["properties"]["api_token"]["anyOf"]
assert {"type": "string"} in api_key_authenticator["properties"]["api_token"]["anyOf"]
default_error_handler = schema["definitions"]["DefaultErrorHandler"]["allOf"][1]
assert default_error_handler["properties"]["response_filters"]["type"] == "array"
assert default_error_handler["properties"]["response_filters"]["items"]["$ref"] == "#/definitions/HttpResponseFilter"
assert default_error_handler["properties"]["max_retries"]["type"] == "integer"
assert default_error_handler["properties"]["backoff_strategies"]["type"] == "array"
default_paginator = schema["definitions"]["DefaultPaginator"]["allOf"][1]
assert {"page_token_option", "pagination_strategy", "config", "url_base"}.issubset(default_paginator["required"])
assert default_paginator["properties"]["page_size_option"]["$ref"] == "#/definitions/RequestOption"
assert default_paginator["properties"]["page_token_option"]["$ref"] == "#/definitions/RequestOption"
assert {"$ref": "#/definitions/CursorPaginationStrategy"} in default_paginator["properties"]["pagination_strategy"]["anyOf"]
assert {"$ref": "#/definitions/OffsetIncrement"} in default_paginator["properties"]["pagination_strategy"]["anyOf"]
assert {"$ref": "#/definitions/PageIncrement"} in default_paginator["properties"]["pagination_strategy"]["anyOf"]
assert default_paginator["properties"]["decoder"]["$ref"] == "#/definitions/JsonDecoder"
assert {"$ref": "#/definitions/InterpolatedString"} in http_requester["properties"]["url_base"]["anyOf"]
assert {"type": "string"} in http_requester["properties"]["path"]["anyOf"]
cursor_pagination_strategy = schema["definitions"]["CursorPaginationStrategy"]["allOf"][1]
assert {"cursor_value", "config"}.issubset(cursor_pagination_strategy["required"])
assert {"$ref": "#/definitions/InterpolatedString"} in cursor_pagination_strategy["properties"]["cursor_value"]["anyOf"]
assert {"type": "string"} in cursor_pagination_strategy["properties"]["cursor_value"]["anyOf"]
assert {"$ref": "#/definitions/InterpolatedBoolean"} in cursor_pagination_strategy["properties"]["stop_condition"]["anyOf"]
assert {"type": "string"} in cursor_pagination_strategy["properties"]["stop_condition"]["anyOf"]
assert cursor_pagination_strategy["properties"]["decoder"]["$ref"] == "#/definitions/JsonDecoder"
list_stream_slicer = schema["definitions"]["ListStreamSlicer"]["allOf"][1]
assert {"slice_values", "cursor_field", "config"}.issubset(list_stream_slicer["required"])
assert {"type": "array", "items": {"type": "string"}} in list_stream_slicer["properties"]["slice_values"]["anyOf"]
assert {"type": "string"} in list_stream_slicer["properties"]["slice_values"]["anyOf"]
assert {"$ref": "#/definitions/InterpolatedString"} in list_stream_slicer["properties"]["cursor_field"]["anyOf"]
assert {"type": "string"} in list_stream_slicer["properties"]["cursor_field"]["anyOf"]
assert list_stream_slicer["properties"]["request_option"]["$ref"] == "#/definitions/RequestOption"
added_field_definition = schema["definitions"]["AddedFieldDefinition"]
assert {"path", "value"}.issubset(added_field_definition["required"])
assert added_field_definition["properties"]["path"]["type"] == "array"
assert added_field_definition["properties"]["path"]["items"]["type"] == "string"
assert {"$ref": "#/definitions/InterpolatedString"} in added_field_definition["properties"]["value"]["anyOf"]
assert {"type": "string"} in added_field_definition["properties"]["value"]["anyOf"]
# There is something very strange about JsonSchemaMixin.json_schema(). For some reason, when this test is called independently
# it will pass. However, when it is invoked with the entire test file, certain components won't get generated in the schema. Since
# the generate_schema() method is invoked by independently so this doesn't happen under normal circumstance when we generate the
# complete schema. It only happens when the tests are all called together.
# One way to replicate this is to add DefaultErrorHandler.json_schema() to the start of this test and uncomment the assertions below
# assert {"$ref": "#/definitions/ConstantBackoffStrategy"} in default_error_handler["properties"]["backoff_strategies"]["items"]["anyOf"]
# assert {"$ref": "#/definitions/ExponentialBackoffStrategy"} in default_error_handler["properties"]["backoff_strategies"]["items"][
# "anyOf"
# ]
# assert {"$ref": "#/definitions/WaitTimeFromHeaderBackoffStrategy"} in default_error_handler["properties"]["backoff_strategies"][
# "items"
# ]["anyOf"]
# assert {"$ref": "#/definitions/WaitUntilTimeFromHeaderBackoffStrategy"} in default_error_handler["properties"]["backoff_strategies"][
# "items"
# ]["anyOf"]
#
# exponential_backoff_strategy = schema["definitions"]["ExponentialBackoffStrategy"]["allOf"][1]
# assert exponential_backoff_strategy["properties"]["factor"]["type"] == "number"