1
0
mirror of synced 2025-12-25 02:09:19 -05:00

[low code connectors] generate complete json schema from classes (#15647)

* draft: first pass at complete schema language generation and factory validator

* actually a working validator and fixes to the schema that went uncaught

* remove extra spike file

* fix formatting file

* Add method to generate the complete JSON schema of the low code declarative language

* add testing of a few components during schema gen

* pr feedback and a little bit of refactoring

* test for schema version

* fix some types that were erroneously marked as invalid schema

* some comments

* add jsonschemamixin to interfaces

* update tests now that interfaces are jsonschemamixin

* accidentally removed a mixin

* remove unneeded test

* make comment a little more clear

* update changelog

* bump version

* generic enum not enum class

* Add method to generate the complete JSON schema of the low code declarative language

* add testing of a few components during schema gen

* test for schema version

* update tests now that interfaces are jsonschemamixin

* accidentally removed a mixin

* remove unneeded test

* make comment a little more clear

* generic enum not enum class

* add generated json file and update docs to reference it

* verbage
This commit is contained in:
Brian Lai
2022-08-18 18:53:42 -04:00
committed by GitHub
parent 410d5cd3c4
commit 7e158ef9af
4 changed files with 1512 additions and 3 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -2,10 +2,13 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
import inspect
import json
import logging
from dataclasses import dataclass
from typing import Any, List, Mapping
import typing
from dataclasses import dataclass, fields
from enum import Enum, EnumMeta
from typing import Any, List, Mapping, Union
from airbyte_cdk.sources.declarative.checks import CheckStream
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
@@ -84,3 +87,69 @@ class YamlDeclarativeSource(DeclarativeSource):
if "class_name" not in s:
s["class_name"] = "airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream"
return stream_configs
@staticmethod
def generate_schema() -> str:
expanded_source_definition = YamlDeclarativeSource.expand_schema_interfaces(ConcreteDeclarativeSource, {})
expanded_schema = expanded_source_definition.json_schema()
return json.dumps(expanded_schema, cls=SchemaEncoder)
@staticmethod
def expand_schema_interfaces(expand_class: type, visited: dict) -> type:
"""
Recursive function that takes in class type that will have its interface fields unpacked and expended and then recursively
attempt the same expansion on all the class' underlying fields that are declarative component. It also performs expansion
with respect to interfaces that are contained within generic data types.
:param expand_class: The declarative component class that will have its interface fields expanded
:param visited: cache used to store a record of already visited declarative classes that have already been seen
:return: The expanded declarative component
"""
# Recursive base case to stop recursion if we have already expanded an interface in case of cyclical components
# like CompositeErrorHandler
if expand_class.__name__ in visited:
return visited[expand_class.__name__]
visited[expand_class.__name__] = expand_class
next_classes = []
class_fields = fields(expand_class)
for field in class_fields:
unpacked_field_types = DeclarativeComponentFactory.unpack(field.type)
expand_class.__annotations__[field.name] = unpacked_field_types
next_classes.extend(YamlDeclarativeSource._get_next_expand_classes(field.type))
for next_class in next_classes:
YamlDeclarativeSource.expand_schema_interfaces(next_class, visited)
return expand_class
@staticmethod
def _get_next_expand_classes(field_type) -> list[type]:
"""
Parses through a given field type and assembles a list of all underlying declarative components. For a concrete declarative class
it will return itself. For a declarative interface it will return its subclasses. For declarative components in a generic type
it will return the unpacked classes. Any non-declarative types will be skipped.
:param field_type: A field type that
:return:
"""
generic_type = typing.get_origin(field_type)
if generic_type is None:
# We can only continue parsing declarative that inherit from the JsonSchemaMixin class because it is used
# to generate the final json schema
if inspect.isclass(field_type) and issubclass(field_type, JsonSchemaMixin) and not isinstance(field_type, EnumMeta):
subclasses = field_type.__subclasses__()
if subclasses:
return subclasses
else:
return [field_type]
elif generic_type == list or generic_type == Union:
next_classes = []
for underlying_type in typing.get_args(field_type):
next_classes.extend(YamlDeclarativeSource._get_next_expand_classes(underlying_type))
return next_classes
return []
class SchemaEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, property) or isinstance(obj, Enum):
return str(obj)
return json.JSONEncoder.default(self, obj)

View File

@@ -2,6 +2,7 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
import json
import os
import tempfile
import unittest
@@ -242,3 +243,141 @@ class TestFileContent:
def __exit__(self, type, value, traceback):
os.unlink(self.filename)
def test_generate_schema():
schema_str = YamlDeclarativeSource.generate_schema()
schema = json.loads(schema_str)
assert "version" in schema["required"]
assert "checker" in schema["required"]
assert "streams" in schema["required"]
assert schema["properties"]["checker"]["$ref"] == "#/definitions/CheckStream"
assert schema["properties"]["streams"]["items"]["$ref"] == "#/definitions/DeclarativeStream"
check_stream = schema["definitions"]["CheckStream"]
assert {"stream_names"}.issubset(check_stream["required"])
assert check_stream["properties"]["stream_names"]["type"] == "array"
assert check_stream["properties"]["stream_names"]["items"]["type"] == "string"
declarative_stream = schema["definitions"]["DeclarativeStream"]
assert {"schema_loader", "retriever", "config"}.issubset(declarative_stream["required"])
assert declarative_stream["properties"]["schema_loader"]["$ref"] == "#/definitions/JsonSchema"
assert declarative_stream["properties"]["retriever"]["$ref"] == "#/definitions/SimpleRetriever"
assert declarative_stream["properties"]["name"]["type"] == "string"
assert {"type": "array", "items": {"type": "string"}} in declarative_stream["properties"]["primary_key"]["anyOf"]
assert {"type": "array", "items": {"type": "array", "items": {"type": "string"}}} in declarative_stream["properties"]["primary_key"][
"anyOf"
]
assert {"type": "string"} in declarative_stream["properties"]["primary_key"]["anyOf"]
assert {"type": "array", "items": {"type": "string"}} in declarative_stream["properties"]["stream_cursor_field"]["anyOf"]
assert {"type": "string"} in declarative_stream["properties"]["stream_cursor_field"]["anyOf"]
assert declarative_stream["properties"]["transformations"]["type"] == "array"
assert {"$ref": "#/definitions/AddFields"} in declarative_stream["properties"]["transformations"]["items"]["anyOf"]
assert {"$ref": "#/definitions/RemoveFields"} in declarative_stream["properties"]["transformations"]["items"]["anyOf"]
assert declarative_stream["properties"]["checkpoint_interval"]["type"] == "integer"
simple_retriever = schema["definitions"]["SimpleRetriever"]["allOf"][1]
assert {"requester", "record_selector"}.issubset(simple_retriever["required"])
assert simple_retriever["properties"]["requester"]["$ref"] == "#/definitions/HttpRequester"
assert simple_retriever["properties"]["record_selector"]["$ref"] == "#/definitions/RecordSelector"
assert simple_retriever["properties"]["name"]["type"] == "string"
assert {"type": "array", "items": {"type": "string"}} in declarative_stream["properties"]["primary_key"]["anyOf"]
assert {"type": "array", "items": {"type": "array", "items": {"type": "string"}}} in declarative_stream["properties"]["primary_key"][
"anyOf"
]
assert {"type": "string"} in declarative_stream["properties"]["primary_key"]["anyOf"]
assert {"$ref": "#/definitions/LimitPaginator"} in simple_retriever["properties"]["paginator"]["anyOf"]
assert {"$ref": "#/definitions/NoPagination"} in simple_retriever["properties"]["paginator"]["anyOf"]
assert {"$ref": "#/definitions/CartesianProductStreamSlicer"} in simple_retriever["properties"]["stream_slicer"]["anyOf"]
assert {"$ref": "#/definitions/DatetimeStreamSlicer"} in simple_retriever["properties"]["stream_slicer"]["anyOf"]
assert {"$ref": "#/definitions/ListStreamSlicer"} in simple_retriever["properties"]["stream_slicer"]["anyOf"]
assert {"$ref": "#/definitions/SingleSlice"} in simple_retriever["properties"]["stream_slicer"]["anyOf"]
assert {"$ref": "#/definitions/SubstreamSlicer"} in simple_retriever["properties"]["stream_slicer"]["anyOf"]
http_requester = schema["definitions"]["HttpRequester"]["allOf"][1]
assert {"name", "url_base", "path", "config"}.issubset(http_requester["required"])
assert http_requester["properties"]["name"]["type"] == "string"
assert {"$ref": "#/definitions/InterpolatedString"} in http_requester["properties"]["url_base"]["anyOf"]
assert {"type": "string"} in http_requester["properties"]["path"]["anyOf"]
assert {"$ref": "#/definitions/InterpolatedString"} in http_requester["properties"]["url_base"]["anyOf"]
assert {"type": "string"} in http_requester["properties"]["path"]["anyOf"]
assert {"type": "string"} in http_requester["properties"]["http_method"]["anyOf"]
assert {"type": "string", "enum": ["GET", "POST"]} in http_requester["properties"]["http_method"]["anyOf"]
assert http_requester["properties"]["request_options_provider"]["$ref"] == "#/definitions/InterpolatedRequestOptionsProvider"
assert {"$ref": "#/definitions/DeclarativeOauth2Authenticator"} in http_requester["properties"]["authenticator"]["anyOf"]
assert {"$ref": "#/definitions/ApiKeyAuthenticator"} in http_requester["properties"]["authenticator"]["anyOf"]
assert {"$ref": "#/definitions/BearerAuthenticator"} in http_requester["properties"]["authenticator"]["anyOf"]
assert {"$ref": "#/definitions/BasicHttpAuthenticator"} in http_requester["properties"]["authenticator"]["anyOf"]
assert {"$ref": "#/definitions/CompositeErrorHandler"} in http_requester["properties"]["error_handler"]["anyOf"]
assert {"$ref": "#/definitions/DefaultErrorHandler"} in http_requester["properties"]["error_handler"]["anyOf"]
api_key_authenticator = schema["definitions"]["ApiKeyAuthenticator"]["allOf"][1]
assert {"header", "api_token", "config"}.issubset(api_key_authenticator["required"])
assert {"$ref": "#/definitions/InterpolatedString"} in api_key_authenticator["properties"]["header"]["anyOf"]
assert {"type": "string"} in api_key_authenticator["properties"]["header"]["anyOf"]
assert {"$ref": "#/definitions/InterpolatedString"} in api_key_authenticator["properties"]["api_token"]["anyOf"]
assert {"type": "string"} in api_key_authenticator["properties"]["api_token"]["anyOf"]
default_error_handler = schema["definitions"]["DefaultErrorHandler"]["allOf"][1]
assert default_error_handler["properties"]["response_filters"]["type"] == "array"
assert default_error_handler["properties"]["response_filters"]["items"]["$ref"] == "#/definitions/HttpResponseFilter"
assert default_error_handler["properties"]["max_retries"]["type"] == "integer"
assert default_error_handler["properties"]["backoff_strategies"]["type"] == "array"
limit_paginator = schema["definitions"]["LimitPaginator"]["allOf"][1]
assert {"page_size", "limit_option", "page_token_option", "pagination_strategy", "config", "url_base"}.issubset(
limit_paginator["required"]
)
assert limit_paginator["properties"]["page_size"]["type"] == "integer"
assert limit_paginator["properties"]["limit_option"]["$ref"] == "#/definitions/RequestOption"
assert limit_paginator["properties"]["page_token_option"]["$ref"] == "#/definitions/RequestOption"
assert {"$ref": "#/definitions/CursorPaginationStrategy"} in limit_paginator["properties"]["pagination_strategy"]["anyOf"]
assert {"$ref": "#/definitions/OffsetIncrement"} in limit_paginator["properties"]["pagination_strategy"]["anyOf"]
assert {"$ref": "#/definitions/PageIncrement"} in limit_paginator["properties"]["pagination_strategy"]["anyOf"]
assert limit_paginator["properties"]["decoder"]["$ref"] == "#/definitions/JsonDecoder"
assert {"$ref": "#/definitions/InterpolatedString"} in http_requester["properties"]["url_base"]["anyOf"]
assert {"type": "string"} in http_requester["properties"]["path"]["anyOf"]
cursor_pagination_strategy = schema["definitions"]["CursorPaginationStrategy"]["allOf"][1]
assert {"cursor_value", "config"}.issubset(cursor_pagination_strategy["required"])
assert {"$ref": "#/definitions/InterpolatedString"} in cursor_pagination_strategy["properties"]["cursor_value"]["anyOf"]
assert {"type": "string"} in cursor_pagination_strategy["properties"]["cursor_value"]["anyOf"]
assert {"$ref": "#/definitions/InterpolatedBoolean"} in cursor_pagination_strategy["properties"]["stop_condition"]["anyOf"]
assert {"type": "string"} in cursor_pagination_strategy["properties"]["stop_condition"]["anyOf"]
assert cursor_pagination_strategy["properties"]["decoder"]["$ref"] == "#/definitions/JsonDecoder"
list_stream_slicer = schema["definitions"]["ListStreamSlicer"]["allOf"][1]
assert {"slice_values", "cursor_field", "config"}.issubset(list_stream_slicer["required"])
assert {"type": "array", "items": {"type": "string"}} in list_stream_slicer["properties"]["slice_values"]["anyOf"]
assert {"type": "string"} in list_stream_slicer["properties"]["slice_values"]["anyOf"]
assert {"$ref": "#/definitions/InterpolatedString"} in list_stream_slicer["properties"]["cursor_field"]["anyOf"]
assert {"type": "string"} in list_stream_slicer["properties"]["cursor_field"]["anyOf"]
assert list_stream_slicer["properties"]["request_option"]["$ref"] == "#/definitions/RequestOption"
added_field_definition = schema["definitions"]["AddedFieldDefinition"]
assert {"path", "value"}.issubset(added_field_definition["required"])
assert added_field_definition["properties"]["path"]["type"] == "array"
assert added_field_definition["properties"]["path"]["items"]["type"] == "string"
assert {"$ref": "#/definitions/InterpolatedString"} in added_field_definition["properties"]["value"]["anyOf"]
assert {"type": "string"} in added_field_definition["properties"]["value"]["anyOf"]
# There is something very strange about JsonSchemaMixin.json_schema(). For some reason, when this test is called independently
# it will pass. However, when it is invoked with the entire test file, certain components won't get generated in the schema. Since
# the generate_schema() method is invoked by independently so this doesn't happen under normal circumstance when we generate the
# complete schema. It only happens when the tests are all called together.
# One way to replicate this is to add DefaultErrorHandler.json_schema() to the start of this test and uncomment the assertions below
# assert {"$ref": "#/definitions/ConstantBackoffStrategy"} in default_error_handler["properties"]["backoff_strategies"]["items"]["anyOf"]
# assert {"$ref": "#/definitions/ExponentialBackoffStrategy"} in default_error_handler["properties"]["backoff_strategies"]["items"][
# "anyOf"
# ]
# assert {"$ref": "#/definitions/WaitTimeFromHeaderBackoffStrategy"} in default_error_handler["properties"]["backoff_strategies"][
# "items"
# ]["anyOf"]
# assert {"$ref": "#/definitions/WaitUntilTimeFromHeaderBackoffStrategy"} in default_error_handler["properties"]["backoff_strategies"][
# "items"
# ]["anyOf"]
#
# exponential_backoff_strategy = schema["definitions"]["ExponentialBackoffStrategy"]["allOf"][1]
# assert exponential_backoff_strategy["properties"]["factor"]["type"] == "number"