1
0
mirror of synced 2025-12-25 02:09:19 -05:00

[file-based cdk] spec schema improvements and fixes (#28263)

* fix spec schema incompatibility with ui and improve spec documentation and titles

* fix schema to account for latest changes pulled from main

* tests

* remove duplicate test
This commit is contained in:
Brian Lai
2023-07-13 15:14:05 -04:00
committed by GitHub
parent 633c939d46
commit 8e835963c1
4 changed files with 117 additions and 49 deletions

View File

@@ -19,9 +19,7 @@ class AbstractFileBasedSpec(BaseModel):
streams: List[FileBasedStreamConfig] = Field(
title="The list of streams to sync",
description="Streams defines the behavior for grouping files together that will be synced to the downstream destination. Each "
"stream has it own independent configuration to handle which files to sync, how files should be parsed, and the "
"validation of records against the schema.",
description='Each instance of this configuration defines a <a href="https://docs.airbyte.com/cloud/core-concepts#stream">stream</a>. Use this to define which files belong in the stream, their format, and how they should be parsed and validated. When sending data to warehouse destination such as Snowflake or BigQuery, each stream is a separate table.',
order=10,
)
@@ -40,16 +38,18 @@ class AbstractFileBasedSpec(BaseModel):
schema = super().schema(*args, **kwargs)
transformed_schema = copy.deepcopy(schema)
schema_helpers.expand_refs(transformed_schema)
cls.remove_enum_allOf(transformed_schema)
cls.replace_enum_allOf_and_anyOf(transformed_schema)
cls.add_legacy_format(transformed_schema)
return transformed_schema
@staticmethod
def remove_enum_allOf(schema: dict) -> dict:
def replace_enum_allOf_and_anyOf(schema: dict) -> dict:
"""
allOfs are not supported by the UI, but pydantic is automatically writing them for enums.
Unpacks the enums under allOf and moves them up a level under the enum key
anyOfs are also not supported by the UI, so we replace them with the similar oneOf, with the
additional validation that an incoming config only matches exactly one of a field's types.
"""
# this will need to add ["anyOf"] once we have more than one format type and loop over the list of elements
objects_to_check = schema["properties"]["streams"]["items"]["properties"]["format"]
@@ -59,6 +59,12 @@ class AbstractFileBasedSpec(BaseModel):
if "allOf" in object_property and "enum" in object_property["allOf"][0]:
object_property["enum"] = object_property["allOf"][0]["enum"]
object_property.pop("allOf")
properties_to_change = ["primary_key", "input_schema"]
for property_to_change in properties_to_change:
schema["properties"]["streams"]["items"]["properties"][property_to_change]["oneOf"] = schema["properties"]["streams"]["items"][
"properties"
][property_to_change].pop("anyOf")
return schema
@staticmethod
@@ -69,7 +75,14 @@ class AbstractFileBasedSpec(BaseModel):
config must be adjusted to support the generic mapping object. Once configs no longer adhere to the old
format we can remove this change.
"""
legacy_format_options = {
"title": "Legacy Format",
# Explicitly require this field to make it mutually exclusive (oneOf) with the new format mapping file_type -> format
"required": ["filetype"],
"type": "object",
"properties": {"filetype": {"title": "Filetype", "type": "string"}},
}
csv_format_options = schema["properties"]["streams"]["items"]["properties"]["format"]
union_format = {"anyOf": [csv_format_options, {"type": "object"}]}
union_format = {"oneOf": [csv_format_options, legacy_format_options]}
schema["properties"]["streams"]["items"]["properties"]["format"] = union_format
return schema

View File

@@ -7,9 +7,9 @@ from enum import Enum
from typing import Any, List, Mapping, Optional, Union
from airbyte_cdk.sources.file_based.schema_helpers import type_mapping_to_jsonschema
from pydantic import BaseModel, validator
from pydantic import BaseModel, Field, validator
PrimaryKeyType = Optional[Union[str, List[str], List[List[str]]]]
PrimaryKeyType = Optional[Union[str, List[str]]]
VALID_FILE_TYPES = {"avro", "csv", "jsonl", "parquet"}
@@ -22,12 +22,33 @@ class QuotingBehavior(Enum):
class CsvFormat(BaseModel):
delimiter: str = ","
quote_char: str = '"'
escape_char: Optional[str]
encoding: Optional[str] = "utf8"
double_quote: bool
quoting_behavior: Optional[QuotingBehavior] = QuotingBehavior.QUOTE_SPECIAL_CHARACTERS
delimiter: str = Field(
title="Delimiter",
description="The character delimiting individual cells in the CSV data. This may only be a 1-character string. For tab-delimited data enter '\\t'.",
default=",",
)
quote_char: str = Field(
title="Quote Character",
default='"',
description="The character used for quoting CSV values. To disallow quoting, make this field blank.",
)
escape_char: Optional[str] = Field(
title="Escape Character",
default=None,
description="The character used for escaping special characters. To disallow escaping, leave this field blank.",
)
encoding: Optional[str] = Field(
default="utf8",
description='The character encoding of the CSV data. Leave blank to default to <strong>UTF8</strong>. See <a href="https://docs.python.org/3/library/codecs.html#standard-encodings" target="_blank">list of python encodings</a> for allowable options.',
)
double_quote: bool = Field(
title="Double Quote", default=True, description="Whether two quotes in a quoted CSV value denote a single quote in the data."
)
quoting_behavior: Optional[QuotingBehavior] = Field(
title="Quoting Behavior",
default=QuotingBehavior.QUOTE_SPECIAL_CHARACTERS,
description="The quoting behavior determines when a value in a row should have quote marks added around it. For example, if Quote Non-numeric is specified, while reading, quotes are expected for row values that do not contain numbers. Or for Quote All, every row value will be expecting quotes.",
)
# Noting that the existing S3 connector had a config option newlines_in_values. This was only supported by pyarrow and not
# the Python csv package. It has a little adoption, but long term we should ideally phase this out because of the drawbacks
# of using pyarrow
@@ -62,15 +83,37 @@ class CsvFormat(BaseModel):
class FileBasedStreamConfig(BaseModel):
name: str
file_type: str
globs: Optional[List[str]]
validation_policy: str
input_schema: Optional[Union[str, Mapping[str, Any]]]
primary_key: PrimaryKeyType
days_to_sync_if_history_is_full: int = 3
format: Optional[Mapping[str, CsvFormat]] # this will eventually be a Union once we have more than one format type
schemaless: bool = False
name: str = Field(title="Name", description="The name of the stream.")
file_type: str = Field(title="File Type", description="The data file type that is being extracted for a stream.")
globs: Optional[List[str]] = Field(
title="Globs",
description='The pattern used to specify which files should be selected from the file system. For more information on glob pattern matching look <a href="https://en.wikipedia.org/wiki/Glob_(programming)">here</a>.',
)
validation_policy: str = Field(
title="Validation Policy",
description="The name of the validation policy that dictates sync behavior when a record does not adhere to the stream schema.",
)
input_schema: Optional[Union[str, Mapping[str, Any]]] = Field(
title="Input Schema",
description="The schema that will be used to validate records extracted from the file. This will override the stream schema that is auto-detected from incoming files.",
)
primary_key: PrimaryKeyType = Field(
title="Primary Key", description="The column or columns (for a composite key) that serves as the unique identifier of a record."
)
days_to_sync_if_history_is_full: int = Field(
title="Days To Sync If History Is Full",
description="When the state history of the file store is full, syncs will only read files that were last modified in the provided day range.",
default=3,
)
format: Optional[Mapping[str, CsvFormat]] = Field(
title="Format",
description="The configuration options that are used to alter how to read incoming files that deviate from the standard formatting.",
) # this will eventually be a Union once we have more than one format type
schemaless: bool = Field(
title="Schemaless",
description="When enabled, syncs will not validate or structure records against the stream's schema.",
default=False,
)
@validator("file_type", pre=True)
def validate_file_type(cls, v):

View File

@@ -46,9 +46,7 @@ single_csv_scenario = (
"properties": {
"streams": {
"title": "The list of streams to sync",
"description": "Streams defines the behavior for grouping files together that will be synced to the downstream "
"destination. Each stream has it own independent configuration to handle which files to sync, "
"how files should be parsed, and the validation of records against the schema.",
"description": "Each instance of this configuration defines a <a href=\"https://docs.airbyte.com/cloud/core-concepts#stream\">stream</a>. Use this to define which files belong in the stream, their format, and how they should be parsed and validated. When sending data to warehouse destination such as Snowflake or BigQuery, each stream is a separate table.",
"order": 10,
"type": "array",
"items": {
@@ -57,14 +55,17 @@ single_csv_scenario = (
"properties": {
"name": {
"title": "Name",
"description": "The name of the stream.",
"type": "string"
},
"file_type": {
"title": "File Type",
"description": "The data file type that is being extracted for a stream.",
"type": "string"
},
"globs": {
"title": "Globs",
"description": "The pattern used to specify which files should be selected from the file system. For more information on glob pattern matching look <a href=\"https://en.wikipedia.org/wiki/Glob_(programming)\">here</a>.",
"type": "array",
"items": {
"type": "string"
@@ -72,23 +73,27 @@ single_csv_scenario = (
},
"schemaless": {
"title": "Schemaless",
"description": "When enabled, syncs will not validate or structure records against the stream's schema.",
"default": False,
"type": "boolean"
},
"validation_policy": {
"title": "Validation Policy",
"description": "The name of the validation policy that dictates sync behavior when a record does not adhere to the stream schema.",
"type": "string"
},
'input_schema': {
'title': 'Input Schema',
'anyOf': [
{'type': 'object'},
{'type': 'string'},
"input_schema": {
"title": "Input Schema",
"description": "The schema that will be used to validate records extracted from the file. This will override the stream schema that is auto-detected from incoming files.",
"oneOf": [
{"type": "object"},
{"type": "string"},
],
},
"primary_key": {
"title": "Primary Key",
"anyOf": [
"description": "The column or columns (for a composite key) that serves as the unique identifier of a record.",
"oneOf": [
{
"type": "string"
},
@@ -97,27 +102,20 @@ single_csv_scenario = (
"items": {
"type": "string"
}
},
{
"type": "array",
"items": {
"type": "array",
"items": {
"type": "string"
}
}
}
]
},
"days_to_sync_if_history_is_full": {
"title": "Days To Sync If History Is Full",
"description": "When the state history of the file store is full, syncs will only read files that were last modified in the provided day range.",
"default": 3,
"type": "integer"
},
"format": {
"anyOf": [
"oneOf": [
{
"title": "Format",
"description": "The configuration options that are used to alter how to read incoming files that deviate from the standard formatting.",
"type": "object",
"additionalProperties": {
"title": "CsvFormat",
@@ -125,37 +123,52 @@ single_csv_scenario = (
"properties": {
"delimiter": {
"title": "Delimiter",
"description": "The character delimiting individual cells in the CSV data. This may only be a 1-character string. For tab-delimited data enter '\\t'.",
"default": ",",
"type": "string"
},
"quote_char": {
"title": "Quote Char",
"title": "Quote Character",
"description": "The character used for quoting CSV values. To disallow quoting, make this field blank.",
"default": "\"",
"type": "string"
},
"escape_char": {
"title": "Escape Char",
"title": "Escape Character",
"description": "The character used for escaping special characters. To disallow escaping, leave this field blank.",
"type": "string"
},
"encoding": {
"title": "Encoding",
"description": "The character encoding of the CSV data. Leave blank to default to <strong>UTF8</strong>. See <a href=\"https://docs.python.org/3/library/codecs.html#standard-encodings\" target=\"_blank\">list of python encodings</a> for allowable options.",
"default": "utf8",
"type": "string"
},
"double_quote": {
"title": "Double Quote",
"description": "Whether two quotes in a quoted CSV value denote a single quote in the data.",
"default": True,
"type": "boolean"
},
"quoting_behavior": {
"title": "Quoting Behavior",
"description": "The quoting behavior determines when a value in a row should have quote marks added around it. For example, if Quote Non-numeric is specified, while reading, quotes are expected for row values that do not contain numbers. Or for Quote All, every row value will be expecting quotes.",
"default": "Quote Special Characters",
"enum": ["Quote All", "Quote Special Characters", "Quote Non-numeric", "Quote None"]
}
},
"required": ["double_quote"]
}
},
{
"type": "object"
"title": "Legacy Format",
"required": ["filetype"],
"type": "object",
"properties": {
"filetype": {
"title": "Filetype",
"type": "string"
}
},
}
]
}

View File

@@ -222,14 +222,13 @@ def _verify_expected_logs(logs: List[Dict[str, Any]], expected_logs: List[Dict[s
spec_scenarios = [
csv_multi_stream_scenario,
csv_single_stream_scenario,
single_csv_scenario,
]
@pytest.mark.parametrize("scenario", spec_scenarios, ids=[c.name for c in spec_scenarios])
def test_spec(capsys, scenario):
assert spec(capsys, scenario) == single_csv_scenario.expected_spec
assert spec(capsys, scenario) == scenario.expected_spec
check_scenarios = [