1
0
mirror of synced 2025-12-25 02:09:19 -05:00

[low-code] replace emptySchemaLoader with DefaultSchemaLoader (#18947)

* replace emptySchemaLoader with DefaultSchemaLoader

* fix test name

* fix test

* add logging for when we default to the empty schema

* increment patch version

* fix formatting

* update changelog
This commit is contained in:
Brian Lai
2022-11-03 23:25:01 -04:00
committed by GitHub
parent 605fb921c4
commit 186580a6ee
12 changed files with 92 additions and 48 deletions

View File

@@ -1,5 +1,9 @@
# Changelog
## 0.5.3
Low-code: Replace EmptySchemaLoader with DefaultSchemaLoader to retain backwards compatibility
Low-code: Evaluate backoff strategies at runtime
## 0.5.2
Low-code: Allow for read even when schemas are not defined for a connector yet

View File

@@ -7,7 +7,7 @@ from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.schema import EmptySchemaLoader
from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.declarative.types import Config, StreamSlice
@@ -48,7 +48,7 @@ class DeclarativeStream(Stream, JsonSchemaMixin):
def __post_init__(self, options: Mapping[str, Any]):
self.stream_cursor_field = self.stream_cursor_field or []
self.transformations = self.transformations or []
self._schema_loader = self.schema_loader if self.schema_loader else EmptySchemaLoader(config=self.config, options=options)
self._schema_loader = self.schema_loader if self.schema_loader else DefaultSchemaLoader(config=self.config, options=options)
@property
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:

View File

@@ -32,7 +32,6 @@ from airbyte_cdk.sources.declarative.requesters.paginators.strategies.cursor_pag
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.offset_increment import OffsetIncrement
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.page_increment import PageIncrement
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.schema import EmptySchemaLoader
from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.stream_slicers.cartesian_product_stream_slicer import CartesianProductStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer
@@ -59,7 +58,6 @@ CLASS_TYPES_REGISTRY: Mapping[str, Type] = {
"DefaultErrorHandler": DefaultErrorHandler,
"DefaultPaginator": DefaultPaginator,
"DpathExtractor": DpathExtractor,
"EmptySchemaLoader": EmptySchemaLoader,
"ExponentialBackoffStrategy": ExponentialBackoffStrategy,
"HttpRequester": HttpRequester,
"InterpolatedBoolean": InterpolatedBoolean,

View File

@@ -30,7 +30,7 @@ from airbyte_cdk.sources.declarative.requesters.request_options.request_options_
from airbyte_cdk.sources.declarative.requesters.requester import Requester
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.schema import EmptySchemaLoader
from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.declarative.stream_slicers.single_slice import SingleSlice
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
@@ -58,7 +58,7 @@ DEFAULT_IMPLEMENTATIONS_REGISTRY: Mapping[Type, Type] = {
RequestOptionsProvider: InterpolatedRequestOptionsProvider,
Requester: HttpRequester,
Retriever: SimpleRetriever,
SchemaLoader: EmptySchemaLoader,
SchemaLoader: DefaultSchemaLoader,
Stream: DeclarativeStream,
StreamSlicer: SingleSlice,
}

View File

@@ -2,8 +2,8 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from airbyte_cdk.sources.declarative.schema.empty_schema_loader import EmptySchemaLoader
from airbyte_cdk.sources.declarative.schema.default_schema_loader import DefaultSchemaLoader
from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
__all__ = ["JsonFileSchemaLoader", "EmptySchemaLoader", "SchemaLoader"]
__all__ = ["JsonFileSchemaLoader", "DefaultSchemaLoader", "SchemaLoader"]

View File

@@ -0,0 +1,46 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
import logging
from dataclasses import InitVar, dataclass
from typing import Any, Mapping
from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.declarative.types import Config
from dataclasses_jsonschema import JsonSchemaMixin
@dataclass
class DefaultSchemaLoader(SchemaLoader, JsonSchemaMixin):
"""
Loads a schema from the default location or returns an empty schema for streams that have not defined their schema file yet.
Attributes:
config (Config): The user-provided configuration as specified by the source's spec
options (Mapping[str, Any]): Additional arguments to pass to the string interpolation if needed
"""
config: Config
options: InitVar[Mapping[str, Any]]
def __post_init__(self, options: Mapping[str, Any]):
self._options = options
self.default_loader = JsonFileSchemaLoader(options=options, config=self.config)
def get_json_schema(self) -> Mapping[str, Any]:
"""
Attempts to retrieve a schema from the default filepath location or returns the empty schema if a schema cannot be found.
:return: The empty schema
"""
try:
return self.default_loader.get_json_schema()
except FileNotFoundError:
# A slight hack since we don't directly have the stream name. However, when building the default filepath we assume the
# runtime options stores stream name 'name' so we'll do the same here
stream_name = self._options.get("name", "")
logging.info(f"Could not find schema for stream {stream_name}, defaulting to the empty schema")
return {}

View File

@@ -1,36 +0,0 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from dataclasses import InitVar, dataclass
from typing import Any, Mapping
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.declarative.types import Config
from dataclasses_jsonschema import JsonSchemaMixin
@dataclass
class EmptySchemaLoader(SchemaLoader, JsonSchemaMixin):
"""
Loads an empty schema for streams that have not defined their schema file yet.
Attributes:
config (Config): The user-provided configuration as specified by the source's spec
options (Mapping[str, Any]): Additional arguments to pass to the string interpolation if needed
"""
config: Config
options: InitVar[Mapping[str, Any]]
def __post_init__(self, options: Mapping[str, Any]):
pass
def get_json_schema(self) -> Mapping[str, Any]:
"""
Returns by default the empty schema.
:return: The empty schema
"""
return {}

View File

@@ -15,7 +15,7 @@ README = (HERE / "README.md").read_text()
setup(
name="airbyte-cdk",
version="0.5.2",
version="0.5.3",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",

View File

@@ -0,0 +1,32 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from unittest.mock import MagicMock
import pytest
from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader
@pytest.mark.parametrize(
"found_schema, found_error, expected_schema",
[
pytest.param(
{"type": "object", "properties": {}}, None, {"type": "object", "properties": {}}, id="test_has_schema_in_default_location"
),
pytest.param(None, FileNotFoundError, {}, id="test_schema_file_does_not_exist"),
],
)
def test_get_json_schema(found_schema, found_error, expected_schema):
default_schema_loader = DefaultSchemaLoader({}, {})
json_file_schema_loader = MagicMock()
if found_schema:
json_file_schema_loader.get_json_schema.return_value = {"type": "object", "properties": {}}
if found_error:
json_file_schema_loader.get_json_schema.side_effect = found_error
default_schema_loader.default_loader = json_file_schema_loader
actual_schema = default_schema_loader.get_json_schema()
assert actual_schema == expected_schema

View File

@@ -34,7 +34,7 @@ from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_req
)
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.schema import EmptySchemaLoader
from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader
from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer
@@ -603,7 +603,7 @@ def test_config_with_defaults():
assert type(stream) == DeclarativeStream
assert stream.primary_key == "id"
assert stream.name == "lists"
assert type(stream.schema_loader) == EmptySchemaLoader
assert type(stream.schema_loader) == DefaultSchemaLoader
assert type(stream.retriever) == SimpleRetriever
assert stream.retriever.requester.http_method == HttpMethod.GET

View File

@@ -268,7 +268,7 @@ def test_generate_schema():
declarative_stream = schema["definitions"]["DeclarativeStream"]
assert {"retriever", "config"}.issubset(declarative_stream["required"])
assert {"$ref": "#/definitions/EmptySchemaLoader"} in declarative_stream["properties"]["schema_loader"]["anyOf"]
assert {"$ref": "#/definitions/DefaultSchemaLoader"} in declarative_stream["properties"]["schema_loader"]["anyOf"]
assert {"$ref": "#/definitions/JsonFileSchemaLoader"} in declarative_stream["properties"]["schema_loader"]["anyOf"]
assert declarative_stream["properties"]["retriever"]["$ref"] == "#/definitions/SimpleRetriever"
assert declarative_stream["properties"]["name"]["type"] == "string"