CDK: Add base pydantic model for connector config and schemas (#8485)
* add base spec model * fix usage of state_checkpoint_interval in case it is dynamic * add schema base models, fix spelling, signatures and polishing Co-authored-by: Eugene Kulak <kulak.eugene@gmail.com>
This commit is contained in:
@@ -1,5 +1,8 @@
|
||||
# Changelog
|
||||
|
||||
## 0.1.42
|
||||
Add base pydantic model for connector config and schemas.
|
||||
|
||||
## 0.1.41
|
||||
Fix build error
|
||||
|
||||
|
||||
@@ -92,7 +92,7 @@ class Destination(Connector, ABC):
|
||||
return
|
||||
config = self.read_config(config_path=parsed_args.config)
|
||||
if self.check_config_against_spec or cmd == "check":
|
||||
check_config_against_spec_or_exit(config, spec, self.logger)
|
||||
check_config_against_spec_or_exit(config, spec)
|
||||
|
||||
if cmd == "check":
|
||||
yield self._run_check(config=config)
|
||||
|
||||
@@ -84,7 +84,7 @@ class AirbyteEntrypoint(object):
|
||||
# jsonschema's additionalProperties flag wont fail the validation
|
||||
config, internal_config = split_config(config)
|
||||
if self.source.check_config_against_spec or cmd == "check":
|
||||
check_config_against_spec_or_exit(config, source_spec, self.logger)
|
||||
check_config_against_spec_or_exit(config, source_spec)
|
||||
# Put internal flags back to config dict
|
||||
config.update(internal_config.dict())
|
||||
|
||||
|
||||
@@ -171,7 +171,6 @@ class AbstractSource(Source, ABC):
|
||||
if stream_state:
|
||||
logger.info(f"Setting state of {stream_name} stream to {stream_state}")
|
||||
|
||||
checkpoint_interval = stream_instance.state_checkpoint_interval
|
||||
slices = stream_instance.stream_slices(
|
||||
cursor_field=configured_stream.cursor_field, sync_mode=SyncMode.incremental, stream_state=stream_state
|
||||
)
|
||||
@@ -186,6 +185,7 @@ class AbstractSource(Source, ABC):
|
||||
for record_counter, record_data in enumerate(records, start=1):
|
||||
yield self._as_airbyte_record(stream_name, record_data)
|
||||
stream_state = stream_instance.get_updated_state(stream_state, record_data)
|
||||
checkpoint_interval = stream_instance.state_checkpoint_interval
|
||||
if checkpoint_interval and record_counter % checkpoint_interval == 0:
|
||||
yield self._checkpoint_state(stream_name, stream_state, connector_state, logger)
|
||||
|
||||
|
||||
65
airbyte-cdk/python/airbyte_cdk/sources/config.py
Normal file
65
airbyte-cdk/python/airbyte_cdk/sources/config.py
Normal file
@@ -0,0 +1,65 @@
|
||||
#
|
||||
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
from typing import Any, Dict, List, MutableMapping, Optional
|
||||
|
||||
from jsonschema import RefResolver
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class BaseConfig(BaseModel):
|
||||
"""Base class for connector spec, adds the following behaviour:
|
||||
|
||||
- resolve $ref and replace it with definition
|
||||
- replace all occurrences of anyOf with oneOf
|
||||
- drop description
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def _rename_key(cls, schema: Any, old_key: str, new_key: str) -> None:
|
||||
"""Iterate over nested dictionary and replace one key with another. Used to replace anyOf with oneOf. Recursive."
|
||||
|
||||
:param schema: schema that will be patched
|
||||
:param old_key: name of the key to replace
|
||||
:param new_key: new name of the key
|
||||
"""
|
||||
if not isinstance(schema, MutableMapping):
|
||||
return
|
||||
|
||||
for key, value in schema.items():
|
||||
cls._rename_key(value, old_key, new_key)
|
||||
if old_key in schema:
|
||||
schema[new_key] = schema.pop(old_key)
|
||||
|
||||
@classmethod
|
||||
def _expand_refs(cls, schema: Any, ref_resolver: Optional[RefResolver] = None) -> None:
|
||||
"""Iterate over schema and replace all occurrences of $ref with their definitions. Recursive.
|
||||
|
||||
:param schema: schema that will be patched
|
||||
:param ref_resolver: resolver to get definition from $ref, if None pass it will be instantiated
|
||||
"""
|
||||
ref_resolver = ref_resolver or RefResolver.from_schema(schema)
|
||||
|
||||
if isinstance(schema, MutableMapping):
|
||||
if "$ref" in schema:
|
||||
ref_url = schema.pop("$ref")
|
||||
_, definition = ref_resolver.resolve(ref_url)
|
||||
cls._expand_refs(definition, ref_resolver=ref_resolver) # expand refs in definitions as well
|
||||
schema.update(definition)
|
||||
else:
|
||||
for key, value in schema.items():
|
||||
cls._expand_refs(value, ref_resolver=ref_resolver)
|
||||
elif isinstance(schema, List):
|
||||
for value in schema:
|
||||
cls._expand_refs(value, ref_resolver=ref_resolver)
|
||||
|
||||
@classmethod
|
||||
def schema(cls, **kwargs) -> Dict[str, Any]:
|
||||
"""We're overriding the schema classmethod to enable some post-processing"""
|
||||
schema = super().schema(**kwargs)
|
||||
cls._rename_key(schema, old_key="anyOf", new_key="oneOf") # UI supports only oneOf
|
||||
cls._expand_refs(schema) # UI and destination doesn't support $ref's
|
||||
schema.pop("definitions", None) # remove definitions created by $ref
|
||||
schema.pop("description", None) # description added from the docstring
|
||||
return schema
|
||||
@@ -10,13 +10,49 @@ import pkgutil
|
||||
from typing import Any, ClassVar, Dict, Mapping, Tuple
|
||||
|
||||
import jsonref
|
||||
from airbyte_cdk.logger import AirbyteLogger
|
||||
from airbyte_cdk.models import ConnectorSpecification
|
||||
from jsonschema import validate
|
||||
from jsonschema.exceptions import ValidationError
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class JsonFileLoader:
|
||||
"""
|
||||
Custom json file loader to resolve references to resources located in "shared" directory.
|
||||
We need this for compatability with existing schemas cause all of them have references
|
||||
pointing to shared_schema.json file instead of shared/shared_schema.json
|
||||
"""
|
||||
|
||||
def __init__(self, uri_base: str, shared: str):
|
||||
self.shared = shared
|
||||
self.uri_base = uri_base
|
||||
|
||||
def __call__(self, uri: str) -> Dict[str, Any]:
|
||||
uri = uri.replace(self.uri_base, f"{self.uri_base}/{self.shared}/")
|
||||
return json.load(open(uri))
|
||||
|
||||
|
||||
def resolve_ref_links(obj: Any) -> Dict[str, Any]:
|
||||
"""
|
||||
Scan resolved schema and convert jsonref.JsonRef object to JSON serializable dict.
|
||||
|
||||
:param obj - jsonschema object with ref field resolved.
|
||||
:return JSON serializable object with references without external dependencies.
|
||||
"""
|
||||
if isinstance(obj, jsonref.JsonRef):
|
||||
obj = resolve_ref_links(obj.__subject__)
|
||||
# Omit existing definitions for external resource since
|
||||
# we dont need it anymore.
|
||||
obj.pop("definitions", None)
|
||||
return obj
|
||||
elif isinstance(obj, dict):
|
||||
return {k: resolve_ref_links(v) for k, v in obj.items()}
|
||||
elif isinstance(obj, list):
|
||||
return [resolve_ref_links(item) for item in obj]
|
||||
else:
|
||||
return obj
|
||||
|
||||
|
||||
class ResourceSchemaLoader:
|
||||
"""JSONSchema loader from package resources"""
|
||||
|
||||
@@ -42,10 +78,8 @@ class ResourceSchemaLoader:
|
||||
raise IOError(f"Cannot find file {schema_filename}")
|
||||
try:
|
||||
raw_schema = json.loads(raw_file)
|
||||
except ValueError:
|
||||
# TODO use proper logging
|
||||
print(f"Invalid JSON file format for file {schema_filename}")
|
||||
raise
|
||||
except ValueError as err:
|
||||
raise RuntimeError(f"Invalid JSON file format for file {schema_filename}") from err
|
||||
|
||||
return self.__resolve_schema_references(raw_schema)
|
||||
|
||||
@@ -57,58 +91,20 @@ class ResourceSchemaLoader:
|
||||
:return JSON serializable object with references without external dependencies.
|
||||
"""
|
||||
|
||||
class JsonFileLoader:
|
||||
"""
|
||||
Custom json file loader to resolve references to resources located in "shared" directory.
|
||||
We need this for compatability with existing schemas cause all of them have references
|
||||
pointing to shared_schema.json file instead of shared/shared_schema.json
|
||||
"""
|
||||
|
||||
def __init__(self, uri_base: str, shared: str):
|
||||
self.shared = shared
|
||||
self.uri_base = uri_base
|
||||
|
||||
def __call__(self, uri: str) -> Dict[str, Any]:
|
||||
uri = uri.replace(self.uri_base, f"{self.uri_base}/{self.shared}/")
|
||||
return json.load(open(uri))
|
||||
|
||||
package = importlib.import_module(self.package_name)
|
||||
base = os.path.dirname(package.__file__) + "/"
|
||||
|
||||
def resolve_ref_links(obj: Any) -> Dict[str, Any]:
|
||||
"""
|
||||
Scan resolved schema and convert jsonref.JsonRef object to JSON
|
||||
serializable dict.
|
||||
|
||||
:param obj - jsonschema object with ref field resovled.
|
||||
:return JSON serializable object with references without external dependencies.
|
||||
"""
|
||||
if isinstance(obj, jsonref.JsonRef):
|
||||
obj = resolve_ref_links(obj.__subject__)
|
||||
# Omit existance definitions for extenal resource since
|
||||
# we dont need it anymore.
|
||||
obj.pop("definitions", None)
|
||||
return obj
|
||||
elif isinstance(obj, dict):
|
||||
return {k: resolve_ref_links(v) for k, v in obj.items()}
|
||||
elif isinstance(obj, list):
|
||||
return [resolve_ref_links(item) for item in obj]
|
||||
else:
|
||||
return obj
|
||||
|
||||
resolved = jsonref.JsonRef.replace_refs(raw_schema, loader=JsonFileLoader(base, "schemas/shared"), base_uri=base)
|
||||
resolved = resolve_ref_links(resolved)
|
||||
return resolved
|
||||
|
||||
|
||||
def check_config_against_spec_or_exit(config: Mapping[str, Any], spec: ConnectorSpecification, logger: AirbyteLogger):
|
||||
def check_config_against_spec_or_exit(config: Mapping[str, Any], spec: ConnectorSpecification):
|
||||
"""
|
||||
Check config object against spec. In case of spec is invalid, throws
|
||||
an exception with validation error description.
|
||||
|
||||
:param config - config loaded from file specified over command line
|
||||
:param spec - spec object generated by connector
|
||||
:param logger - Airbyte logger for reporting validation error
|
||||
"""
|
||||
spec_schema = spec.connectionSpecification
|
||||
try:
|
||||
@@ -122,8 +118,10 @@ class InternalConfig(BaseModel):
|
||||
limit: int = Field(None, alias="_limit")
|
||||
page_size: int = Field(None, alias="_page_size")
|
||||
|
||||
def dict(self):
|
||||
return super().dict(by_alias=True, exclude_unset=True)
|
||||
def dict(self, *args, **kwargs):
|
||||
kwargs["by_alias"] = True
|
||||
kwargs["exclude_unset"] = True
|
||||
return super().dict(*args, **kwargs)
|
||||
|
||||
|
||||
def split_config(config: Mapping[str, Any]) -> Tuple[dict, InternalConfig]:
|
||||
|
||||
@@ -0,0 +1,76 @@
|
||||
#
|
||||
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
from typing import Any, Dict, Optional, Type
|
||||
|
||||
from pydantic import BaseModel, Extra
|
||||
from pydantic.main import ModelMetaclass
|
||||
from pydantic.typing import resolve_annotations
|
||||
|
||||
|
||||
class AllOptional(ModelMetaclass):
|
||||
"""
|
||||
Metaclass for marking all Pydantic model fields as Optional
|
||||
Here is example of declaring model using this metaclass like:
|
||||
'''
|
||||
class MyModel(BaseModel, metaclass=AllOptional):
|
||||
a: str
|
||||
b: str
|
||||
'''
|
||||
it is an equivalent of:
|
||||
'''
|
||||
class MyModel(BaseModel):
|
||||
a: Optional[str]
|
||||
b: Optional[str]
|
||||
'''
|
||||
It would make code more clear and eliminate a lot of manual work.
|
||||
"""
|
||||
|
||||
def __new__(mcs, name, bases, namespaces, **kwargs):
|
||||
"""
|
||||
Iterate through fields and wrap then with typing.Optional type.
|
||||
"""
|
||||
annotations = resolve_annotations(namespaces.get("__annotations__", {}), namespaces.get("__module__", None))
|
||||
for base in bases:
|
||||
annotations = {**annotations, **getattr(base, "__annotations__", {})}
|
||||
for field in annotations:
|
||||
if not field.startswith("__"):
|
||||
annotations[field] = Optional[annotations[field]]
|
||||
namespaces["__annotations__"] = annotations
|
||||
return super().__new__(mcs, name, bases, namespaces, **kwargs)
|
||||
|
||||
|
||||
class BaseSchemaModel(BaseModel):
|
||||
"""
|
||||
Base class for all schema models. It has some extra schema postprocessing.
|
||||
Can be used in combination with AllOptional metaclass
|
||||
"""
|
||||
|
||||
class Config:
|
||||
extra = Extra.allow
|
||||
|
||||
@classmethod
|
||||
def schema_extra(cls, schema: Dict[str, Any], model: Type[BaseModel]) -> None:
|
||||
"""Modify generated jsonschema, remove "title", "description" and "required" fields.
|
||||
|
||||
Pydantic doesn't treat Union[None, Any] type correctly when generate jsonschema,
|
||||
so we can't set field as nullable (i.e. field that can have either null and non-null values),
|
||||
We generate this jsonschema value manually.
|
||||
|
||||
:param schema: generated jsonschema
|
||||
:param model:
|
||||
"""
|
||||
schema.pop("title", None)
|
||||
schema.pop("description", None)
|
||||
schema.pop("required", None)
|
||||
for name, prop in schema.get("properties", {}).items():
|
||||
prop.pop("title", None)
|
||||
prop.pop("description", None)
|
||||
allow_none = model.__fields__[name].allow_none
|
||||
if allow_none:
|
||||
if "type" in prop:
|
||||
prop["type"] = ["null", prop["type"]]
|
||||
elif "$ref" in prop:
|
||||
ref = prop.pop("$ref")
|
||||
prop["oneOf"] = [{"type": "null"}, {"$ref": ref}]
|
||||
@@ -15,7 +15,7 @@ README = (HERE / "README.md").read_text()
|
||||
|
||||
setup(
|
||||
name="airbyte-cdk",
|
||||
version="0.1.41",
|
||||
version="0.1.42",
|
||||
description="A framework for writing Airbyte Connectors.",
|
||||
long_description=README,
|
||||
long_description_content_type="text/markdown",
|
||||
|
||||
@@ -172,7 +172,7 @@ class TestRun:
|
||||
# Affirm to Mypy that this is indeed a method on this mock
|
||||
destination.check.assert_called_with(logger=ANY, config=dummy_config) # type: ignore
|
||||
# Check if config validation has been called
|
||||
validate_mock.assert_called_with(dummy_config, spec_msg, destination.logger)
|
||||
validate_mock.assert_called_with(dummy_config, spec_msg)
|
||||
|
||||
# verify output was correct
|
||||
assert _wrapped(expected_check_result) == returned_check_result
|
||||
@@ -224,7 +224,7 @@ class TestRun:
|
||||
input_messages=OrderedIterableMatcher(mocked_input),
|
||||
)
|
||||
# Check if config validation has been called
|
||||
validate_mock.assert_called_with(dummy_config, spec_msg, destination.logger)
|
||||
validate_mock.assert_called_with(dummy_config, spec_msg)
|
||||
|
||||
# verify output was correct
|
||||
assert expected_write_result == returned_write_result
|
||||
|
||||
82
airbyte-cdk/python/unit_tests/sources/test_config.py
Normal file
82
airbyte-cdk/python/unit_tests/sources/test_config.py
Normal file
@@ -0,0 +1,82 @@
|
||||
#
|
||||
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
from typing import List, Union
|
||||
|
||||
from airbyte_cdk.sources.config import BaseConfig
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class InnerClass(BaseModel):
|
||||
field1: str
|
||||
field2: int
|
||||
|
||||
|
||||
class Choice1(BaseModel):
|
||||
selected_strategy = Field("option1", const=True)
|
||||
|
||||
name: str
|
||||
count: int
|
||||
|
||||
|
||||
class Choice2(BaseModel):
|
||||
selected_strategy = Field("option2", const=True)
|
||||
|
||||
sequence: List[str]
|
||||
|
||||
|
||||
class SomeSourceConfig(BaseConfig):
|
||||
class Config:
|
||||
title = "Some Source"
|
||||
|
||||
items: List[InnerClass]
|
||||
choice: Union[Choice1, Choice2]
|
||||
|
||||
|
||||
class TestBaseConfig:
|
||||
EXPECTED_SCHEMA = {
|
||||
"properties": {
|
||||
"choice": {
|
||||
"oneOf": [
|
||||
{
|
||||
"properties": {
|
||||
"count": {"title": "Count", "type": "integer"},
|
||||
"name": {"title": "Name", "type": "string"},
|
||||
"selected_strategy": {"const": "option1", "title": "Selected " "Strategy", "type": "string"},
|
||||
},
|
||||
"required": ["name", "count"],
|
||||
"title": "Choice1",
|
||||
"type": "object",
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"selected_strategy": {"const": "option2", "title": "Selected " "Strategy", "type": "string"},
|
||||
"sequence": {"items": {"type": "string"}, "title": "Sequence", "type": "array"},
|
||||
},
|
||||
"required": ["sequence"],
|
||||
"title": "Choice2",
|
||||
"type": "object",
|
||||
},
|
||||
],
|
||||
"title": "Choice",
|
||||
},
|
||||
"items": {
|
||||
"items": {
|
||||
"properties": {"field1": {"title": "Field1", "type": "string"}, "field2": {"title": "Field2", "type": "integer"}},
|
||||
"required": ["field1", "field2"],
|
||||
"title": "InnerClass",
|
||||
"type": "object",
|
||||
},
|
||||
"title": "Items",
|
||||
"type": "array",
|
||||
},
|
||||
},
|
||||
"required": ["items", "choice"],
|
||||
"title": "Some Source",
|
||||
"type": "object",
|
||||
}
|
||||
|
||||
def test_schema_postprocessing(self):
|
||||
schema = SomeSourceConfig.schema()
|
||||
assert schema == self.EXPECTED_SCHEMA
|
||||
@@ -58,7 +58,7 @@ def spec_object():
|
||||
def test_check_config_against_spec_or_exit_does_not_print_schema(capsys, spec_object):
|
||||
config = {"super_secret_token": "really_a_secret"}
|
||||
with pytest_raises(Exception) as ex_info:
|
||||
check_config_against_spec_or_exit(config, spec_object, logger)
|
||||
check_config_against_spec_or_exit(config, spec_object)
|
||||
exc = ex_info.value
|
||||
traceback.print_exception(type(exc), exc, exc.__traceback__)
|
||||
out, err = capsys.readouterr()
|
||||
@@ -67,7 +67,7 @@ def test_check_config_against_spec_or_exit_does_not_print_schema(capsys, spec_ob
|
||||
|
||||
def test_should_not_fail_validation_for_valid_config(spec_object):
|
||||
config = {"api_token": "something"}
|
||||
check_config_against_spec_or_exit(config, spec_object, logger)
|
||||
check_config_against_spec_or_exit(config, spec_object)
|
||||
assert True, "should pass validation with valid config"
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,58 @@
|
||||
#
|
||||
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
from typing import List, Optional
|
||||
|
||||
from airbyte_cdk.sources.utils.schema_models import AllOptional, BaseSchemaModel
|
||||
|
||||
|
||||
class InnerClass(BaseSchemaModel):
|
||||
field1: Optional[str]
|
||||
field2: int
|
||||
|
||||
|
||||
class SchemaWithFewNullables(BaseSchemaModel):
|
||||
name: Optional[str]
|
||||
optional_item: Optional[InnerClass]
|
||||
items: List[InnerClass]
|
||||
|
||||
|
||||
class SchemaWithAllOptional(BaseSchemaModel, metaclass=AllOptional):
|
||||
object_id: int
|
||||
item: InnerClass
|
||||
|
||||
|
||||
class TestSchemaWithFewNullables:
|
||||
EXPECTED_SCHEMA = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {"type": ["null", "string"]},
|
||||
"optional_item": {"oneOf": [{"type": "null"}, {"$ref": "#/definitions/InnerClass"}]},
|
||||
"items": {"type": "array", "items": {"$ref": "#/definitions/InnerClass"}},
|
||||
},
|
||||
"definitions": {
|
||||
"InnerClass": {"type": "object", "properties": {"field1": {"type": ["null", "string"]}, "field2": {"type": "integer"}}}
|
||||
},
|
||||
}
|
||||
|
||||
def test_schema_postprocessing(self):
|
||||
schema = SchemaWithFewNullables.schema()
|
||||
assert schema == self.EXPECTED_SCHEMA
|
||||
|
||||
|
||||
class TestSchemaWithAllOptional:
|
||||
EXPECTED_SCHEMA = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"object_id": {"type": ["null", "integer"]},
|
||||
"item": {"oneOf": [{"type": "null"}, {"$ref": "#/definitions/InnerClass"}]},
|
||||
},
|
||||
"definitions": {
|
||||
"InnerClass": {"type": "object", "properties": {"field1": {"type": ["null", "string"]}, "field2": {"type": "integer"}}}
|
||||
},
|
||||
}
|
||||
|
||||
def test_schema_postprocessing(self):
|
||||
schema = SchemaWithAllOptional.schema()
|
||||
assert schema == self.EXPECTED_SCHEMA
|
||||
Reference in New Issue
Block a user