1
0
mirror of synced 2025-12-23 21:03:15 -05:00

CDK: Improve schema detection (#26741)

* improve schema detection

* improve schema detection

* review comment

* Automated Commit - Formatting Changes

---------

Co-authored-by: flash1293 <flash1293@users.noreply.github.com>
This commit is contained in:
Joe Reuter
2023-05-31 15:57:08 +02:00
committed by GitHub
parent 812aa3aca6
commit ec5aa7bab6
2 changed files with 148 additions and 3 deletions

View File

@@ -8,6 +8,7 @@ from typing import Any, Dict, List, Optional, Union
from airbyte_cdk.models import AirbyteRecordMessage
from genson import SchemaBuilder
from genson.schema.strategies.object import Object
from genson.schema.strategies.scalar import Number
class NoRequiredObj(Object):
@@ -22,8 +23,18 @@ class NoRequiredObj(Object):
return schema
class IntegerToNumber(Number):
"""
This class has the regular Number behaviour, but it will never emit an integer type.
"""
def __init__(self, node_class):
super().__init__(node_class)
self._type = "number"
class NoRequiredSchemaBuilder(SchemaBuilder):
EXTRA_STRATEGIES = (NoRequiredObj,)
EXTRA_STRATEGIES = (NoRequiredObj, IntegerToNumber)
# This type is inferred from the genson lib, but there is no alias provided for it - creating it here for type safety
@@ -56,11 +67,34 @@ class SchemaInferrer:
"""
schemas = {}
for stream_name, builder in self.stream_to_builder.items():
schemas[stream_name] = builder.to_schema()
schemas[stream_name] = self._clean(builder.to_schema())
return schemas
def _clean(self, node: InferredSchema):
"""
Recursively cleans up a produced schema:
- remove anyOf if one of them is just a null value
- remove properties of type "null"
"""
if isinstance(node, dict):
if "anyOf" in node:
if len(node["anyOf"]) == 2 and {"type": "null"} in node["anyOf"]:
real_type = node["anyOf"][1] if node["anyOf"][0]["type"] == "null" else node["anyOf"][0]
node.update(real_type)
node["type"] = [node["type"], "null"]
node.pop("anyOf")
if "properties" in node:
for key, value in list(node["properties"].items()):
if isinstance(value, dict) and value.get("type", None) == "null":
node["properties"].pop(key)
else:
self._clean(value)
if "items" in node:
self._clean(node["items"])
return node
def get_stream_schema(self, stream_name: str) -> Optional[InferredSchema]:
"""
Returns the inferred JSON schema for the specified stream. Might be `None` if there were no records for the given stream name.
"""
return self.stream_to_builder[stream_name].to_schema() if stream_name in self.stream_to_builder else None
return self._clean(self.stream_to_builder[stream_name].to_schema()) if stream_name in self.stream_to_builder else None

View File

@@ -48,6 +48,117 @@ NOW = 1234567
},
id="test_derive_schema_for_nested_structures",
),
pytest.param(
[
{"stream": "my_stream", "data": {"field_A": 1}},
{"stream": "my_stream", "data": {"field_A": 2}},
],
{"my_stream": {"field_A": {"type": "number"}}},
id="test_integer_number",
),
pytest.param(
[
{"stream": "my_stream", "data": {"field_A": None}},
],
{"my_stream": {}},
id="test_null",
),
pytest.param(
[
{"stream": "my_stream", "data": {"field_A": None}},
{"stream": "my_stream", "data": {"field_A": "abc"}},
],
{"my_stream": {"field_A": {"type": ["null", "string"]}}},
id="test_null_optional",
),
pytest.param(
[
{"stream": "my_stream", "data": {"field_A": None}},
{"stream": "my_stream", "data": {"field_A": {"nested": "abc"}}},
],
{"my_stream": {"field_A": {"type": ["object", "null"], "properties": {"nested": {"type": "string"}}}}},
id="test_any_of",
),
pytest.param(
[
{"stream": "my_stream", "data": {"field_A": None}},
{"stream": "my_stream", "data": {"field_A": {"nested": "abc", "nully": None}}},
],
{"my_stream": {"field_A": {"type": ["object", "null"], "properties": {"nested": {"type": "string"}}}}},
id="test_any_of_with_null",
),
pytest.param(
[
{"stream": "my_stream", "data": {"field_A": None}},
{"stream": "my_stream", "data": {"field_A": {"nested": "abc", "nully": None}}},
{"stream": "my_stream", "data": {"field_A": {"nested": "abc", "nully": "a string"}}},
],
{
"my_stream": {
"field_A": {
"type": ["object", "null"],
"properties": {"nested": {"type": "string"}, "nully": {"type": ["null", "string"]}},
}
}
},
id="test_any_of_with_null_union",
),
pytest.param(
[
{"stream": "my_stream", "data": {"field_A": {"nested": "abc", "nully": "a string"}}},
{"stream": "my_stream", "data": {"field_A": None}},
{"stream": "my_stream", "data": {"field_A": {"nested": "abc", "nully": None}}},
],
{
"my_stream": {
"field_A": {
"type": ["object", "null"],
"properties": {"nested": {"type": "string"}, "nully": {"type": ["null", "string"]}},
}
}
},
id="test_any_of_with_null_union_changed_order",
),
pytest.param(
[
{"stream": "my_stream", "data": {"field_A": "abc", "nested": {"field_B": None}}},
],
{"my_stream": {"field_A": {"type": "string"}, "nested": {"type": "object", "properties": {}}}},
id="test_nested_null",
),
pytest.param(
[
{"stream": "my_stream", "data": {"field_A": "abc", "nested": [{"field_B": None, "field_C": "abc"}]}},
],
{
"my_stream": {
"field_A": {"type": "string"},
"nested": {"type": "array", "items": {"type": "object", "properties": {"field_C": {"type": "string"}}}},
}
},
id="test_array_nested_null",
),
pytest.param(
[
{"stream": "my_stream", "data": {"field_A": "abc", "nested": None}},
{"stream": "my_stream", "data": {"field_A": "abc", "nested": [{"field_B": None, "field_C": "abc"}]}},
],
{
"my_stream": {
"field_A": {"type": "string"},
"nested": {"type": ["array", "null"], "items": {"type": "object", "properties": {"field_C": {"type": "string"}}}},
}
},
id="test_array_top_level_null",
),
pytest.param(
[
{"stream": "my_stream", "data": {"field_A": None}},
{"stream": "my_stream", "data": {"field_A": "abc"}},
],
{"my_stream": {"field_A": {"type": ["null", "string"]}}},
id="test_null_string",
),
],
)
def test_schema_derivation(input_records: List, expected_schemas: Mapping):