[Low-Code CDK] Handle forward references in manifest (#20893)
[Low-Code CDK] Handle forward references in manifest
This commit is contained in:
@@ -53,8 +53,7 @@ class ManifestDeclarativeSource(DeclarativeSource):
|
||||
"""
|
||||
self.logger = logging.getLogger(f"airbyte.{self.name}")
|
||||
|
||||
evaluated_manifest = {}
|
||||
resolved_source_config = ManifestReferenceResolver().preprocess_manifest(source_config, evaluated_manifest, "")
|
||||
resolved_source_config = ManifestReferenceResolver().preprocess_manifest(source_config)
|
||||
self._source_config = resolved_source_config
|
||||
self._debug = debug
|
||||
self._factory = DeclarativeComponentFactory()
|
||||
|
||||
@@ -3,6 +3,15 @@
|
||||
#
|
||||
|
||||
|
||||
class CircularReferenceException(Exception):
|
||||
"""
|
||||
Raised when a circular reference is detected in a manifest.
|
||||
"""
|
||||
|
||||
def __init__(self, reference):
|
||||
super().__init__(f"Circular reference found: {reference}")
|
||||
|
||||
|
||||
class UndefinedReferenceException(Exception):
|
||||
"""
|
||||
Raised when refering to an undefined reference.
|
||||
@@ -2,10 +2,10 @@
|
||||
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
from copy import deepcopy
|
||||
from typing import Any, Mapping, Tuple, Union
|
||||
import re
|
||||
from typing import Any, Mapping, Set, Tuple, Union
|
||||
|
||||
from airbyte_cdk.sources.declarative.parsers.undefined_reference_exception import UndefinedReferenceException
|
||||
from airbyte_cdk.sources.declarative.parsers.custom_exceptions import CircularReferenceException, UndefinedReferenceException
|
||||
|
||||
|
||||
class ManifestReferenceResolver:
|
||||
@@ -96,91 +96,102 @@ class ManifestReferenceResolver:
|
||||
|
||||
ref_tag = "$ref"
|
||||
|
||||
def preprocess_manifest(self, manifest: Mapping[str, Any], evaluated_mapping: Mapping[str, Any], path: Union[str, Tuple[str]]):
|
||||
|
||||
def preprocess_manifest(self, manifest):
|
||||
"""
|
||||
:param manifest: incoming manifest that could have references to previously defined components
|
||||
:param evaluated_mapping: mapping produced by dereferencing the content of input_mapping
|
||||
:param path: curent path in configuration traversal
|
||||
:return:
|
||||
"""
|
||||
d = {}
|
||||
if self.ref_tag in manifest:
|
||||
partial_ref_string = manifest[self.ref_tag]
|
||||
d = deepcopy(self._preprocess(partial_ref_string, evaluated_mapping, path))
|
||||
return self._evaluate_node(manifest, manifest)
|
||||
|
||||
for key, value in manifest.items():
|
||||
if key == self.ref_tag:
|
||||
continue
|
||||
full_path = self._resolve_value(key, path)
|
||||
if full_path in evaluated_mapping:
|
||||
raise Exception(f"Databag already contains key={key} with path {full_path}")
|
||||
processed_value = self._preprocess(value, evaluated_mapping, full_path)
|
||||
evaluated_mapping[full_path] = processed_value
|
||||
d[key] = processed_value
|
||||
|
||||
return d
|
||||
|
||||
def _get_ref_key(self, s: str) -> str:
|
||||
ref_start = s.find("*ref(")
|
||||
if ref_start == -1:
|
||||
return None
|
||||
return s[ref_start + 5 : s.find(")")]
|
||||
|
||||
def _resolve_value(self, value: str, path):
|
||||
if path:
|
||||
return *path, value
|
||||
else:
|
||||
return (value,)
|
||||
|
||||
def _preprocess(self, value, evaluated_config: Mapping[str, Any], path):
|
||||
if isinstance(value, str):
|
||||
ref_key = self._get_ref_key(value)
|
||||
if ref_key is None:
|
||||
return value
|
||||
def _evaluate_node(self, node: Any, manifest: Mapping[str, Any], visited: Set = None):
|
||||
if isinstance(node, dict):
|
||||
evaluated_dict = {k: self._evaluate_node(v, manifest) for k, v in node.items() if not self._is_ref_key(k)}
|
||||
if self.ref_tag in node:
|
||||
# The node includes a $ref key, so we splat the referenced value(s) into the evaluated dict
|
||||
evaluated_ref = self._evaluate_node(node[self.ref_tag], manifest)
|
||||
if not isinstance(evaluated_ref, dict):
|
||||
return evaluated_ref
|
||||
else:
|
||||
# The values defined on the component take precedence over the reference values
|
||||
return evaluated_ref | evaluated_dict
|
||||
else:
|
||||
"""
|
||||
references are ambiguous because one could define a key containing with `.`
|
||||
in this example, we want to refer to the limit key in the dict object:
|
||||
dict:
|
||||
limit: 50
|
||||
limit_ref: "*ref(dict.limit)"
|
||||
|
||||
whereas here we want to access the `nested.path` value.
|
||||
nested:
|
||||
path: "first one"
|
||||
nested.path: "uh oh"
|
||||
value: "ref(nested.path)
|
||||
|
||||
to resolve the ambiguity, we try looking for the reference key at the top level, and then traverse the structs downward
|
||||
until we find a key with the given path, or until there is nothing to traverse.
|
||||
"""
|
||||
key = (ref_key,)
|
||||
while key[-1]:
|
||||
if key in evaluated_config:
|
||||
return evaluated_config[key]
|
||||
else:
|
||||
split = key[-1].split(".")
|
||||
key = *key[:-1], split[0], ".".join(split[1:])
|
||||
raise UndefinedReferenceException(path, ref_key)
|
||||
elif isinstance(value, dict):
|
||||
return self.preprocess_manifest(value, evaluated_config, path)
|
||||
elif type(value) == list:
|
||||
evaluated_list = [
|
||||
# pass in elem's path instead of the list's path
|
||||
self._preprocess(v, evaluated_config, self._get_path_for_list_item(path, index))
|
||||
for index, v in enumerate(value)
|
||||
]
|
||||
# Add the list's element to the evaluated config so they can be referenced
|
||||
for index, elem in enumerate(evaluated_list):
|
||||
evaluated_config[self._get_path_for_list_item(path, index)] = elem
|
||||
return evaluated_list
|
||||
return evaluated_dict
|
||||
elif isinstance(node, list):
|
||||
return [self._evaluate_node(v, manifest) for v in node]
|
||||
elif isinstance(node, str) and node.startswith("*ref("):
|
||||
if visited is None:
|
||||
visited = set()
|
||||
if node in visited:
|
||||
raise CircularReferenceException(node)
|
||||
visited.add(node)
|
||||
ret = self._evaluate_node(self._lookup_reference_value(node, manifest), manifest, visited)
|
||||
visited.remove(node)
|
||||
return ret
|
||||
else:
|
||||
return value
|
||||
return node
|
||||
|
||||
def _get_path_for_list_item(self, path, index):
|
||||
# An elem's path is {path_to_list}[{index}]
|
||||
if len(path) > 1:
|
||||
return path[:-1], f"{path[-1]}[{index}]"
|
||||
else:
|
||||
return (f"{path[-1]}[{index}]",)
|
||||
def _is_ref_key(self, key):
|
||||
return key == self.ref_tag
|
||||
|
||||
def _lookup_reference_value(self, reference: str, manifest: Mapping[str, Any]) -> Any:
|
||||
path = re.match("\\*ref\\(([^)]+)\\)", reference).groups()[0]
|
||||
if not path:
|
||||
raise UndefinedReferenceException(path, reference)
|
||||
try:
|
||||
return self._read_reference_value(path, manifest)
|
||||
except (KeyError, IndexError):
|
||||
raise UndefinedReferenceException(path, reference)
|
||||
|
||||
@staticmethod
|
||||
def _read_reference_value(ref: str, manifest_node: Mapping[str, Any]) -> Any:
|
||||
"""
|
||||
Read the value at the referenced location of the manifest.
|
||||
|
||||
References are ambiguous because one could define a key containing `.`
|
||||
In this example, we want to refer to the `limit` key in the `dict` object:
|
||||
dict:
|
||||
limit: 50
|
||||
limit_ref: "*ref(dict.limit)"
|
||||
|
||||
Whereas here we want to access the `nested.path` value.
|
||||
nested:
|
||||
path: "first one"
|
||||
nested.path: "uh oh"
|
||||
value: "ref(nested.path)
|
||||
|
||||
To resolve the ambiguity, we try looking for the reference key at the top level, and then traverse the structs downward
|
||||
until we find a key with the given path, or until there is nothing to traverse.
|
||||
|
||||
Consider the path foo.bar.baz. To resolve the ambiguity, we first try 'foo.bar.baz' in its entirety as a top-level key. If this
|
||||
fails, we try 'foo' as the top-level key, and if this succeeds, pass 'bar.baz' on as the key to be tried at the next level.
|
||||
"""
|
||||
while ref:
|
||||
try:
|
||||
return manifest_node[ref]
|
||||
except (KeyError, TypeError):
|
||||
head, ref = _parse_path(ref)
|
||||
manifest_node = manifest_node[head]
|
||||
return manifest_node
|
||||
|
||||
|
||||
def _parse_path(ref: str) -> Tuple[Union[str, int], str]:
|
||||
"""
|
||||
Return the next path component, together with the rest of the path.
|
||||
|
||||
A path component may be a string key, or an int index.
|
||||
|
||||
>>> _parse_path("foo.bar")
|
||||
"foo", "bar"
|
||||
>>> _parse_path("foo[7][8].bar")
|
||||
"foo", "[7][8].bar"
|
||||
>>> _parse_path("[7][8].bar")
|
||||
7, "[8].bar"
|
||||
>>> _parse_path("[8].bar")
|
||||
8, "bar"
|
||||
"""
|
||||
if match := re.match(r"^\[([0-9]+)\]\.?(.*)", ref):
|
||||
idx, rest = match.groups()
|
||||
result = int(idx), rest
|
||||
else:
|
||||
result = re.match(r"([^[.]*)\.?(.*)", ref).groups()
|
||||
return result
|
||||
|
||||
@@ -3,65 +3,33 @@
|
||||
#
|
||||
|
||||
import pytest
|
||||
from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ManifestReferenceResolver
|
||||
from airbyte_cdk.sources.declarative.parsers.undefined_reference_exception import UndefinedReferenceException
|
||||
from airbyte_cdk.sources.declarative.parsers.custom_exceptions import CircularReferenceException, UndefinedReferenceException
|
||||
from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ManifestReferenceResolver, _parse_path
|
||||
|
||||
resolver = ManifestReferenceResolver()
|
||||
|
||||
|
||||
def test_get_ref():
|
||||
s = "*ref(limit)"
|
||||
ref_key = resolver._get_ref_key(s)
|
||||
assert ref_key == "limit"
|
||||
|
||||
|
||||
def test_get_ref_no_ref():
|
||||
s = "limit: 50"
|
||||
|
||||
ref_key = resolver._get_ref_key(s)
|
||||
assert ref_key is None
|
||||
|
||||
|
||||
def test_refer():
|
||||
content = {
|
||||
"limit": 50,
|
||||
"limit_ref": "*ref(limit)"
|
||||
}
|
||||
config = resolver.preprocess_manifest(content, {}, "")
|
||||
content = {"limit": 50, "limit_ref": "*ref(limit)"}
|
||||
config = resolver.preprocess_manifest(content)
|
||||
assert config["limit_ref"] == 50
|
||||
|
||||
|
||||
def test_refer_to_inner():
|
||||
content = {
|
||||
"dict": {
|
||||
"limit": 50
|
||||
},
|
||||
"limit_ref": "*ref(dict.limit)"
|
||||
}
|
||||
config = resolver.preprocess_manifest(content, {}, "")
|
||||
content = {"dict": {"limit": 50}, "limit_ref": "*ref(dict.limit)"}
|
||||
config = resolver.preprocess_manifest(content)
|
||||
assert config["limit_ref"] == 50
|
||||
|
||||
|
||||
def test_refer_to_non_existant_struct():
|
||||
content = {
|
||||
"dict": {
|
||||
"limit": 50
|
||||
},
|
||||
"limit_ref": "*ref(not_dict)"
|
||||
}
|
||||
content = {"dict": {"limit": 50}, "limit_ref": "*ref(not_dict)"}
|
||||
with pytest.raises(UndefinedReferenceException):
|
||||
resolver.preprocess_manifest(content, {}, "")
|
||||
resolver.preprocess_manifest(content)
|
||||
|
||||
|
||||
def test_refer_in_dict():
|
||||
content = {
|
||||
"limit": 50,
|
||||
"offset_request_parameters": {
|
||||
"offset": "{{ next_page_token['offset'] }}",
|
||||
"limit": "*ref(limit)"
|
||||
}
|
||||
}
|
||||
config = resolver.preprocess_manifest(content, {}, "")
|
||||
content = {"limit": 50, "offset_request_parameters": {"offset": "{{ next_page_token['offset'] }}", "limit": "*ref(limit)"}}
|
||||
config = resolver.preprocess_manifest(content)
|
||||
assert config["offset_request_parameters"]["offset"] == "{{ next_page_token['offset'] }}"
|
||||
assert config["offset_request_parameters"]["limit"] == 50
|
||||
|
||||
@@ -69,16 +37,13 @@ def test_refer_in_dict():
|
||||
def test_refer_to_dict():
|
||||
content = {
|
||||
"limit": 50,
|
||||
"offset_request_parameters": {
|
||||
"offset": "{{ next_page_token['offset'] }}",
|
||||
"limit": "*ref(limit)"
|
||||
},
|
||||
"offset_request_parameters": {"offset": "{{ next_page_token['offset'] }}", "limit": "*ref(limit)"},
|
||||
"offset_pagination_request_parameters": {
|
||||
"class": "InterpolatedRequestParameterProvider",
|
||||
"request_parameters": "*ref(offset_request_parameters)"
|
||||
}
|
||||
"request_parameters": "*ref(offset_request_parameters)",
|
||||
},
|
||||
}
|
||||
config = resolver.preprocess_manifest(content, {}, "")
|
||||
config = resolver.preprocess_manifest(content)
|
||||
assert config["limit"] == 50
|
||||
assert config["offset_request_parameters"]["limit"] == 50
|
||||
assert len(config["offset_pagination_request_parameters"]) == 2
|
||||
@@ -90,16 +55,10 @@ def test_refer_and_overwrite():
|
||||
content = {
|
||||
"limit": 50,
|
||||
"custom_limit": 25,
|
||||
"offset_request_parameters": {
|
||||
"offset": "{{ next_page_token['offset'] }}",
|
||||
"limit": "*ref(limit)"
|
||||
},
|
||||
"custom_request_parameters": {
|
||||
"$ref": "*ref(offset_request_parameters)",
|
||||
"limit": "*ref(custom_limit)"
|
||||
}
|
||||
"offset_request_parameters": {"offset": "{{ next_page_token['offset'] }}", "limit": "*ref(limit)"},
|
||||
"custom_request_parameters": {"$ref": "*ref(offset_request_parameters)", "limit": "*ref(custom_limit)"},
|
||||
}
|
||||
config = resolver.preprocess_manifest(content, {}, "")
|
||||
config = resolver.preprocess_manifest(content)
|
||||
assert config["offset_request_parameters"]["limit"] == 50
|
||||
assert config["custom_request_parameters"]["limit"] == 25
|
||||
|
||||
@@ -110,22 +69,13 @@ def test_refer_and_overwrite():
|
||||
def test_collision():
|
||||
content = {
|
||||
"example": {
|
||||
"nested":{
|
||||
"path": "first one",
|
||||
"more_nested": {
|
||||
"value": "found it!"
|
||||
}
|
||||
},
|
||||
"nested": {"path": "first one", "more_nested": {"value": "found it!"}},
|
||||
"nested.path": "uh oh",
|
||||
},
|
||||
"reference_to_nested_path": {
|
||||
"$ref": "*ref(example.nested.path)"
|
||||
},
|
||||
"reference_to_nested_nested_value": {
|
||||
"$ref": "*ref(example.nested.more_nested.value)"
|
||||
}
|
||||
"reference_to_nested_path": {"$ref": "*ref(example.nested.path)"},
|
||||
"reference_to_nested_nested_value": {"$ref": "*ref(example.nested.more_nested.value)"},
|
||||
}
|
||||
config = resolver.preprocess_manifest(content, {}, "")
|
||||
config = resolver.preprocess_manifest(content)
|
||||
assert config["example"]["nested"]["path"] == "first one"
|
||||
assert config["example"]["nested.path"] == "uh oh"
|
||||
assert config["reference_to_nested_path"] == "uh oh"
|
||||
@@ -133,11 +83,55 @@ def test_collision():
|
||||
assert config["reference_to_nested_nested_value"] == "found it!"
|
||||
|
||||
|
||||
def test_list():
|
||||
def test_internal_collision():
|
||||
content = {
|
||||
"list": ["A", "B"],
|
||||
"elem_ref": "*ref(list[0])"
|
||||
"example": {
|
||||
"nested": {"path": {"internal": "uh oh"}, "path.internal": "found it!"},
|
||||
},
|
||||
"reference": {"$ref": "*ref(example.nested.path.internal)"},
|
||||
}
|
||||
config = resolver.preprocess_manifest(content, {}, "")
|
||||
config = resolver.preprocess_manifest(content)
|
||||
assert config["example"]["nested"]["path"]["internal"] == "uh oh"
|
||||
assert config["example"]["nested"]["path.internal"] == "found it!"
|
||||
assert config["reference"] == "found it!"
|
||||
|
||||
|
||||
def test_parse_path():
|
||||
assert _parse_path("foo.bar") == ("foo", "bar")
|
||||
assert _parse_path("foo[7][8].bar") == ("foo", "[7][8].bar")
|
||||
assert _parse_path("[7][8].bar") == (7, "[8].bar")
|
||||
assert _parse_path("[8].bar") == (8, "bar")
|
||||
|
||||
|
||||
def test_list():
|
||||
content = {"list": ["A", "B"], "elem_ref": "*ref(list[0])"}
|
||||
config = resolver.preprocess_manifest(content)
|
||||
elem_ref = config["elem_ref"]
|
||||
assert elem_ref == "A"
|
||||
|
||||
|
||||
def test_nested_list():
|
||||
content = {"list": [["A"], ["B"]], "elem_ref": "*ref(list[1][0])"}
|
||||
config = resolver.preprocess_manifest(content)
|
||||
elem_ref = config["elem_ref"]
|
||||
assert elem_ref == "B"
|
||||
|
||||
|
||||
def test_list_of_dicts():
|
||||
content = {"list": [{"A": "a"}, {"B": "b"}], "elem_ref": "*ref(list[1].B)"}
|
||||
config = resolver.preprocess_manifest(content)
|
||||
elem_ref = config["elem_ref"]
|
||||
assert elem_ref == "b"
|
||||
|
||||
|
||||
def test_multiple_levels_of_indexing():
|
||||
content = {"list": [{"A": ["a1", "a2"]}, {"B": ["b1", "b2"]}], "elem_ref": "*ref(list[1].B[0])"}
|
||||
config = resolver.preprocess_manifest(content)
|
||||
elem_ref = config["elem_ref"]
|
||||
assert elem_ref == "b1"
|
||||
|
||||
|
||||
def test_circular_reference():
|
||||
content = {"elem_ref1": "*ref(elem_ref2)", "elem_ref2": "*ref(elem_ref1)"}
|
||||
with pytest.raises(CircularReferenceException):
|
||||
resolver.preprocess_manifest(content)
|
||||
|
||||
@@ -34,7 +34,7 @@ from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_req
|
||||
)
|
||||
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
|
||||
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
|
||||
from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader
|
||||
from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader, InlineSchemaLoader
|
||||
from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
|
||||
from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer
|
||||
from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer
|
||||
@@ -45,10 +45,10 @@ from jsonschema import ValidationError
|
||||
|
||||
factory = DeclarativeComponentFactory()
|
||||
|
||||
resolver = ManifestReferenceResolver()
|
||||
|
||||
input_config = {"apikey": "verysecrettoken", "repos": ["airbyte", "airbyte-cloud"]}
|
||||
|
||||
resolver = ManifestReferenceResolver()
|
||||
|
||||
|
||||
def test_factory():
|
||||
content = """
|
||||
@@ -64,7 +64,7 @@ def test_factory():
|
||||
request_body_json:
|
||||
body_offset: "{{ next_page_token['offset'] }}"
|
||||
"""
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "")
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content))
|
||||
|
||||
factory.create_component(config["request_options"], input_config, False)
|
||||
|
||||
@@ -89,7 +89,7 @@ def test_interpolate_config():
|
||||
body_field: "yoyoyo"
|
||||
interpolated_body_field: "{{ config['apikey'] }}"
|
||||
"""
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "")
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content))
|
||||
|
||||
factory.create_component(config["authenticator"], input_config, False)
|
||||
|
||||
@@ -111,7 +111,7 @@ def test_list_based_stream_slicer_with_values_refd():
|
||||
slice_values: "*ref(repositories)"
|
||||
cursor_field: repository
|
||||
"""
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "")
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content))
|
||||
|
||||
factory.create_component(config["stream_slicer"], input_config, False)
|
||||
|
||||
@@ -129,7 +129,7 @@ def test_list_based_stream_slicer_with_values_defined_in_config():
|
||||
inject_into: header
|
||||
field_name: repository
|
||||
"""
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "")
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content))
|
||||
|
||||
factory.create_component(config["stream_slicer"], input_config, False)
|
||||
|
||||
@@ -181,7 +181,7 @@ def test_create_substream_slicer():
|
||||
parent_key: someid
|
||||
stream_slice_field: word_id
|
||||
"""
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "")
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content))
|
||||
|
||||
stream_slicer = factory.create_component(config["stream_slicer"], input_config)()
|
||||
parent_stream_configs = stream_slicer.parent_stream_configs
|
||||
@@ -216,7 +216,7 @@ def test_create_cartesian_stream_slicer():
|
||||
- "*ref(stream_slicer_A)"
|
||||
- "*ref(stream_slicer_B)"
|
||||
"""
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "")
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content))
|
||||
|
||||
factory.create_component(config["stream_slicer"], input_config, False)
|
||||
|
||||
@@ -249,7 +249,7 @@ def test_datetime_stream_slicer():
|
||||
field_name: created[gte]
|
||||
"""
|
||||
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "")
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content))
|
||||
|
||||
factory.create_component(config["stream_slicer"], input_config, False)
|
||||
|
||||
@@ -363,7 +363,7 @@ spec:
|
||||
description: Test API Key
|
||||
order: 0
|
||||
"""
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "")
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content))
|
||||
|
||||
factory.create_component(config["list_stream"], input_config, False)
|
||||
|
||||
@@ -430,7 +430,7 @@ def test_create_record_selector(test_name, record_selector, expected_runtime_sel
|
||||
$ref: "*ref(extractor)"
|
||||
field_pointer: ["{record_selector}"]
|
||||
"""
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "")
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content))
|
||||
|
||||
factory.create_component(config["selector"], input_config, False)
|
||||
|
||||
@@ -477,7 +477,7 @@ def test_create_record_selector(test_name, record_selector, expected_runtime_sel
|
||||
],
|
||||
)
|
||||
def test_options_propagation(test_name, content, expected_field_pointer_value):
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "")
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content))
|
||||
|
||||
selector = factory.create_component(config["selector"], input_config, True)()
|
||||
assert selector.extractor.field_pointer[0].eval(input_config) == expected_field_pointer_value
|
||||
@@ -547,7 +547,7 @@ def test_create_requester(test_name, error_handler):
|
||||
header: header_value
|
||||
{error_handler}
|
||||
"""
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "")
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content))
|
||||
|
||||
factory.create_component(config["requester"], input_config, False)
|
||||
|
||||
@@ -577,7 +577,7 @@ def test_create_composite_error_handler():
|
||||
- http_codes: [ 403 ]
|
||||
action: RETRY
|
||||
"""
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "")
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content))
|
||||
|
||||
factory.create_component(config["error_handler"], input_config, False)
|
||||
|
||||
@@ -626,7 +626,7 @@ def test_config_with_defaults():
|
||||
streams:
|
||||
- "*ref(lists_stream)"
|
||||
"""
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "")
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content))
|
||||
|
||||
factory.create_component(config["lists_stream"], input_config, False)
|
||||
|
||||
@@ -663,7 +663,7 @@ def test_create_default_paginator():
|
||||
page_size: 50
|
||||
cursor_value: "{{ response._metadata.next }}"
|
||||
"""
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "")
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content))
|
||||
|
||||
factory.create_component(config["paginator"], input_config, False)
|
||||
|
||||
@@ -702,7 +702,7 @@ class TestCreateTransformations:
|
||||
$options:
|
||||
{self.base_options}
|
||||
"""
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "")
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content))
|
||||
|
||||
factory.create_component(config["the_stream"], input_config, False)
|
||||
|
||||
@@ -722,7 +722,7 @@ class TestCreateTransformations:
|
||||
- ["path", "to", "field1"]
|
||||
- ["path2"]
|
||||
"""
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "")
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content))
|
||||
|
||||
factory.create_component(config["the_stream"], input_config, False)
|
||||
|
||||
@@ -743,7 +743,7 @@ class TestCreateTransformations:
|
||||
- path: ["field1"]
|
||||
value: "static_value"
|
||||
"""
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "")
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content))
|
||||
|
||||
factory.create_component(config["the_stream"], input_config, False)
|
||||
|
||||
@@ -761,6 +761,31 @@ class TestCreateTransformations:
|
||||
]
|
||||
assert expected == component.transformations
|
||||
|
||||
def test_forward_references(self):
|
||||
content = f"""
|
||||
the_stream:
|
||||
type: DeclarativeStream
|
||||
$options:
|
||||
{self.base_options}
|
||||
schema_loader:
|
||||
type: InlineSchemaLoader
|
||||
schema: "*ref(schemas.the_stream_schema)"
|
||||
schemas:
|
||||
the_stream_schema:
|
||||
type: object
|
||||
properties:
|
||||
title:
|
||||
type: string
|
||||
"""
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content))
|
||||
|
||||
factory.create_component(config["the_stream"], input_config, False)
|
||||
|
||||
component = factory.create_component(config["the_stream"], input_config)()
|
||||
assert isinstance(component.schema_loader, InlineSchemaLoader)
|
||||
expected = {"type": "object", "properties": {"title": {"type": "string"}}}
|
||||
assert expected == component.schema_loader.schema
|
||||
|
||||
|
||||
def test_validation_wrong_input_type():
|
||||
content = """
|
||||
@@ -775,7 +800,7 @@ def test_validation_wrong_input_type():
|
||||
$ref: "*ref(extractor)"
|
||||
field_pointer: 408
|
||||
"""
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "")
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content))
|
||||
with pytest.raises(ValidationError):
|
||||
factory.create_component(config["selector"], input_config, False)
|
||||
|
||||
@@ -798,7 +823,7 @@ def test_validation_type_missing_required_fields():
|
||||
field_name: created[gte]
|
||||
"""
|
||||
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "")
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content))
|
||||
with pytest.raises(ValidationError):
|
||||
factory.create_component(config["stream_slicer"], input_config, False)
|
||||
|
||||
@@ -818,7 +843,7 @@ def test_validation_wrong_interface_type():
|
||||
type: "MinMaxDatetime"
|
||||
datetime: "{{ response._metadata.next }}"
|
||||
"""
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "")
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content))
|
||||
with pytest.raises(ValidationError):
|
||||
factory.create_component(config["paginator"], input_config, False)
|
||||
|
||||
@@ -834,7 +859,7 @@ def test_validation_create_composite_error_handler():
|
||||
- response_filters:
|
||||
- http_codes: [ 403 ]
|
||||
"""
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "")
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content))
|
||||
with pytest.raises(ValidationError):
|
||||
factory.create_component(config["error_handler"], input_config, False)
|
||||
|
||||
@@ -859,7 +884,7 @@ def test_validation_wrong_object_type():
|
||||
type: "MinMaxDatetime"
|
||||
datetime: "{{ response._metadata.next }}"
|
||||
"""
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "")
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content))
|
||||
factory.create_component(config["paginator"], input_config, False)
|
||||
|
||||
|
||||
@@ -873,7 +898,7 @@ def test_validate_types_nested_in_list():
|
||||
- type: DpathExtractor
|
||||
field_pointer: ["result"]
|
||||
"""
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "")
|
||||
config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content))
|
||||
factory.create_component(config["error_handler"], input_config, False)
|
||||
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ import os
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
from airbyte_cdk.sources.declarative.parsers.undefined_reference_exception import UndefinedReferenceException
|
||||
from airbyte_cdk.sources.declarative.parsers.custom_exceptions import UndefinedReferenceException
|
||||
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
|
||||
from yaml.parser import ParserError
|
||||
|
||||
|
||||
Reference in New Issue
Block a user