1
0
mirror of synced 2025-12-25 02:09:19 -05:00

File-based CDK: implement schema validation policy options (#27816)

This commit is contained in:
Catherine Noll
2023-07-06 03:35:48 -04:00
committed by GitHub
parent aa57cc21ba
commit cfec41b1e5
25 changed files with 1374 additions and 216 deletions

View File

@@ -110,6 +110,7 @@ class AbstractSource(Source, ABC):
f"The requested stream {configured_stream.stream.name} was not found in the source."
f" Available streams: {stream_instances.keys()}"
)
try:
timer.start_event(f"Syncing stream {configured_stream.stream.name}")
stream_is_available, reason = stream_instance.check_availability(logger, self)

View File

@@ -4,10 +4,10 @@
import codecs
from enum import Enum
from typing import Any, List, Mapping, Optional, Union
from typing import Any, Dict, List, Mapping, Optional, Union
from airbyte_cdk.models import ConfiguredAirbyteCatalog
from pydantic import BaseModel, validator
from pydantic import BaseModel, root_validator, validator
PrimaryKeyType = Optional[Union[str, List[str], List[List[str]]]]
@@ -65,9 +65,10 @@ class FileBasedStreamConfig(BaseModel):
name: str
file_type: str
globs: Optional[List[str]]
validation_policy: str
validation_policy: Union[str, Any]
validation_policies: Dict[str, Any]
catalog_schema: Optional[ConfiguredAirbyteCatalog]
input_schema: Optional[Mapping[str, Any]]
input_schema: Optional[Dict[str, Any]]
primary_key: PrimaryKeyType
max_history_size: Optional[int]
days_to_sync_if_history_is_full: Optional[int]
@@ -81,3 +82,15 @@ class FileBasedStreamConfig(BaseModel):
raise ValueError(f"Format filetype {file_type} is not a supported file type")
return {file_type: {key: val for key, val in v.items()}}
return v
@root_validator
def set_validation_policy(cls, values):
validation_policy_key = values.get("validation_policy")
validation_policies = values.get("validation_policies")
if validation_policy_key not in validation_policies:
raise ValueError(f"validation_policy must be one of {list(validation_policies.keys())}")
values["validation_policy"] = validation_policies[validation_policy_key]
return values

View File

@@ -40,9 +40,8 @@ class DefaultFileBasedAvailabilityStrategy(AvailabilityStrategy):
try:
files = self._check_list_files(stream)
self._check_parse_record(stream, files[0])
except CheckAvailabilityError as exc:
tb = traceback.format_exception(etype=type(exc), value=exc, tb=exc.__traceback__)
return False, "".join(tb)
except CheckAvailabilityError:
return False, "".join(traceback.format_exc())
return True, None

View File

@@ -17,12 +17,16 @@ class FileBasedSourceError(Enum):
ERROR_READING_FILE = (
"Error opening file. Please check the credentials provided in the config and verify that they provide permission to read files."
)
ERROR_PARSING_FILE = "Error parsing file. This could be due to a mismatch between the config's file type and the actual file type, or because the file is not parseable."
ERROR_PARSING_RECORD = "Error parsing record. This could be due to a mismatch between the config's file type and the actual file type, or because the file or record is not parseable."
ERROR_PARSING_USER_PROVIDED_SCHEMA = "The provided schema could not be transformed into valid JSON Schema." # TODO
ERROR_VALIDATING_RECORD = "One or more records do not pass the schema validation policy. Please modify your input schema, or select a more lenient validation policy."
STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY = (
"Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema."
)
NULL_VALUE_IN_SCHEMA = "Error during schema inference: no type was detected for key."
UNRECOGNIZED_TYPE = "Error during schema inference: unrecognized type."
SCHEMA_INFERENCE_ERROR = "Error inferring schema for file. Is the file valid?"
INVALID_SCHEMA_ERROR = "No fields were identified for this schema. This may happen if the stream is empty. Please check your configuration to verify that there are files that match the stream's glob patterns."
CONFIG_VALIDATION_ERROR = "Error creating stream config object."
MISSING_SCHEMA = "Expected `json_schema` in the configured catalog but it is missing."
UNDEFINED_PARSER = "No parser is defined for this file type."
@@ -39,6 +43,10 @@ class ConfigValidationError(BaseFileBasedSourceError):
pass
class InvalidSchemaError(BaseFileBasedSourceError):
pass
class MissingSchemaError(BaseFileBasedSourceError):
pass
@@ -57,3 +65,7 @@ class CheckAvailabilityError(BaseFileBasedSourceError):
class UndefinedParserError(BaseFileBasedSourceError):
pass
class StopSyncPerValidationPolicy(BaseFileBasedSourceError):
pass

View File

@@ -5,8 +5,9 @@
import logging
import traceback
from abc import ABC
from typing import Any, Dict, List, Mapping, Optional, Tuple, Type
from typing import Any, Dict, List, Mapping, Optional, Tuple
from airbyte_cdk.models import ConfiguredAirbyteCatalog
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.default_file_based_availability_strategy import DefaultFileBasedAvailabilityStrategy
@@ -15,7 +16,7 @@ from airbyte_cdk.sources.file_based.exceptions import ConfigValidationError, Fil
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
from airbyte_cdk.sources.file_based.file_types import default_parsers
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
from airbyte_cdk.sources.file_based.schema_validation_policies import AbstractSchemaValidationPolicy, DefaultSchemaValidationPolicy
from airbyte_cdk.sources.file_based.schema_validation_policies import DEFAULT_SCHEMA_VALIDATION_POLICIES, AbstractSchemaValidationPolicy
from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream, DefaultFileBasedStream
from airbyte_cdk.sources.file_based.stream.cursor.default_file_based_cursor import DefaultFileBasedCursor
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
@@ -23,23 +24,21 @@ from pydantic.error_wrappers import ValidationError
class FileBasedSource(AbstractSource, ABC):
"""
All file-based sources must provide a `stream_reader`.
"""
def __init__(
self,
stream_reader: AbstractFileBasedStreamReader,
availability_strategy: AvailabilityStrategy,
catalog: Optional[ConfiguredAirbyteCatalog],
availability_strategy: Optional[AvailabilityStrategy],
discovery_policy: AbstractDiscoveryPolicy = DefaultDiscoveryPolicy(),
parsers: Dict[str, FileTypeParser] = None,
validation_policies: Type[AbstractSchemaValidationPolicy] = Type[DefaultSchemaValidationPolicy],
validation_policies: Dict[str, AbstractSchemaValidationPolicy] = DEFAULT_SCHEMA_VALIDATION_POLICIES,
):
self.stream_reader = stream_reader
self.availability_strategy = availability_strategy or DefaultFileBasedAvailabilityStrategy(stream_reader)
self.discovery_policy = discovery_policy
self.parsers = parsers or default_parsers
self.validation_policies = validation_policies
self.stream_schemas = {s.stream.name: s.stream.json_schema for s in catalog.streams} if catalog else {}
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
"""
@@ -82,16 +81,16 @@ class FileBasedSource(AbstractSource, ABC):
try:
streams = []
for stream in config["streams"]:
stream_config = FileBasedStreamConfig(**stream)
stream_config = FileBasedStreamConfig(validation_policies=self.validation_policies, **stream)
streams.append(
DefaultFileBasedStream(
config=stream_config,
catalog_schema=self.stream_schemas.get(stream_config.name),
stream_reader=self.stream_reader,
availability_strategy=self.availability_strategy,
discovery_policy=self.discovery_policy,
parsers=self.parsers,
cursor=DefaultFileBasedCursor(stream_config.max_history_size, stream_config.days_to_sync_if_history_is_full),
validation_policies=self.validation_policies,
)
)
return streams

View File

@@ -39,13 +39,13 @@ class CsvParser(FileTypeParser):
# todo: the existing InMemoryFilesSource.open_file() test source doesn't currently require an encoding, but actual
# sources will likely require one. Rather than modify the interface now we can wait until the real use case
reader = csv.DictReader(fp, dialect=dialect_name)
schema = {field.strip(): {"type": ["null", "string"]} for field in next(reader)}
schema = {field.strip(): {"type": "string"} for field in next(reader)}
csv.unregister_dialect(dialect_name)
return schema
else:
with stream_reader.open_file(file) as fp:
reader = csv.DictReader(fp)
return {field.strip(): {"type": ["null", "string"]} for field in next(reader)}
return {field.strip(): {"type": "string"} for field in next(reader)}
def parse_records(
self, config: FileBasedStreamConfig, file: RemoteFile, stream_reader: AbstractFileBasedStreamReader

View File

@@ -3,6 +3,8 @@
#
from copy import deepcopy
from enum import Enum
from functools import total_ordering
from typing import Any, Dict, List, Literal, Mapping, Union
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, SchemaInferenceError
@@ -11,7 +13,52 @@ type_widths = {str: 0}
JsonSchemaSupportedType = Union[List, Literal["string"], str]
SchemaType = Dict[str, Dict[str, JsonSchemaSupportedType]]
supported_types = {"null", "string"}
@total_ordering
class ComparableType(Enum):
NULL = 0
BOOLEAN = 1
INTEGER = 2
NUMBER = 3
STRING = 4
OBJECT = 5
def __lt__(self, other):
if self.__class__ is other.__class__:
return self.value < other.value
else:
return NotImplemented
def get_comparable_type(value: Any) -> ComparableType:
if value == "null":
return ComparableType.NULL
if value == "boolean":
return ComparableType.BOOLEAN
if value == "integer":
return ComparableType.INTEGER
if value == "number":
return ComparableType.NUMBER
if value == "string":
return ComparableType.STRING
if value == "object":
return ComparableType.OBJECT
def get_inferred_type(value: Any) -> ComparableType:
if value is None:
return ComparableType.NULL
if isinstance(value, bool):
return ComparableType.BOOLEAN
if isinstance(value, int):
return ComparableType.INTEGER
if isinstance(value, float):
return ComparableType.NUMBER
if isinstance(value, str):
return ComparableType.STRING
if isinstance(value, dict):
return ComparableType.OBJECT
def merge_schemas(schema1: SchemaType, schema2: SchemaType) -> SchemaType:
@@ -31,38 +78,62 @@ def merge_schemas(schema1: SchemaType, schema2: SchemaType) -> SchemaType:
and nothing else.
"""
for k, t in list(schema1.items()) + list(schema2.items()):
if not _is_valid_type(t["type"]):
if not isinstance(t, dict) or not _is_valid_type(t.get("type")):
raise SchemaInferenceError(FileBasedSourceError.UNRECOGNIZED_TYPE, key=k, type=t)
merged_schema = deepcopy(schema1)
for k2, t2 in schema2.items():
t1 = merged_schema.get(k2)
t1_type = t1["type"] if t1 else None
t2_type = t2["type"]
if t1_type is None:
if t1 is None:
merged_schema[k2] = t2
elif t1_type == t2_type:
elif t1 == t2:
continue
else:
merged_schema[k2]["type"] = _choose_wider_type(k2, t1_type, t2_type)
merged_schema[k2] = _choose_wider_type(k2, t1, t2)
return merged_schema
def _is_valid_type(t: JsonSchemaSupportedType) -> bool:
if isinstance(t, list):
return all(_t in supported_types for _t in t)
return t in supported_types
return t == "array" or get_comparable_type(t) is not None
def _choose_wider_type(key: str, t1: JsonSchemaSupportedType, t2: JsonSchemaSupportedType) -> JsonSchemaSupportedType:
# TODO: update with additional types.
if t1 is None and t2 is None:
raise SchemaInferenceError(FileBasedSourceError.NULL_VALUE_IN_SCHEMA, key=key)
elif t1 is None or t2 is None:
return t1 or t2
def _choose_wider_type(key: str, t1: Dict[str, Any], t2: Dict[str, Any]) -> Dict[str, Any]:
if (t1["type"] == "array" or t2["type"] == "array") and t1 != t2:
raise SchemaInferenceError(
FileBasedSourceError.SCHEMA_INFERENCE_ERROR,
details="Cannot merge schema for unequal array types.",
key=key,
detected_types=f"{t1},{t2}",
)
elif (t1["type"] == "object" or t2["type"] == "object") and t1 != t2:
raise SchemaInferenceError(
FileBasedSourceError.SCHEMA_INFERENCE_ERROR,
details="Cannot merge schema for unequal object types.",
key=key,
detected_types=f"{t1},{t2}",
)
else:
raise SchemaInferenceError(FileBasedSourceError.UNRECOGNIZED_TYPE, key=key, detected_types=f"{t1},{t2}")
comparable_t1 = get_comparable_type(t1["type"])
comparable_t2 = get_comparable_type(t2["type"])
if not comparable_t1 and comparable_t2:
raise SchemaInferenceError(FileBasedSourceError.UNRECOGNIZED_TYPE, key=key, detected_types=f"{t1},{t2}")
return max([t1, t2], key=lambda x: ComparableType(get_comparable_type(x["type"])))
def is_equal_or_narrower_type(value: Any, expected_type: str):
if isinstance(value, list):
# We do not compare lists directly; the individual items are compared.
# If we hit this condition, it means that the expected type is not
# compatible with the inferred type.
return False
inferred_type = ComparableType(get_inferred_type(value))
if inferred_type is None:
return False
return ComparableType(inferred_type) <= ComparableType(get_comparable_type(expected_type))
def conforms_to_schema(record: Mapping[str, Any], schema: Mapping[str, str]) -> bool:
@@ -74,7 +145,29 @@ def conforms_to_schema(record: Mapping[str, Any], schema: Mapping[str, str]) ->
- For every column in the record, that column's type is equal to or narrower than the same column's
type in the schema.
"""
...
schema_columns = set(schema.get("properties", {}).keys())
record_columns = set(record.keys())
if not record_columns.issubset(schema_columns):
return False
for column, definition in schema.get("properties", {}).items():
expected_type = definition.get("type")
value = record.get(column)
if value is not None:
if expected_type == "object":
return isinstance(value, dict)
elif expected_type == "array":
if not isinstance(value, list):
return False
array_type = definition.get("items", {}).get("type")
if not all(is_equal_or_narrower_type(v, array_type) for v in value):
return False
elif not is_equal_or_narrower_type(value, expected_type):
return False
return True
def type_mapping_to_jsonschema(type_mapping: Mapping[str, Any]) -> Mapping[str, str]:

View File

@@ -1,4 +1,15 @@
from airbyte_cdk.sources.file_based.schema_validation_policies.abstract_schema_validation_policy import AbstractSchemaValidationPolicy
from airbyte_cdk.sources.file_based.schema_validation_policies.default_schema_validation_policy import DefaultSchemaValidationPolicy
from airbyte_cdk.sources.file_based.schema_validation_policies.default_schema_validation_policies import (
DEFAULT_SCHEMA_VALIDATION_POLICIES,
EmitRecordPolicy,
SkipRecordPolicy,
WaitForDiscoverPolicy,
)
__all__ = ["AbstractSchemaValidationPolicy", "DefaultSchemaValidationPolicy"]
__all__ = [
"DEFAULT_SCHEMA_VALIDATION_POLICIES",
"AbstractSchemaValidationPolicy",
"EmitRecordPolicy",
"SkipRecordPolicy",
"WaitForDiscoverPolicy",
]

View File

@@ -2,12 +2,13 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from abc import abstractmethod
from enum import Enum
from abc import ABC, abstractmethod
from typing import Any, Mapping
class AbstractSchemaValidationPolicy(Enum):
class AbstractSchemaValidationPolicy(ABC):
name: str
@abstractmethod
def record_passes_validation_policy(self, record: Mapping[str, Any], schema: Mapping[str, Any]) -> bool:
"""

View File

@@ -0,0 +1,39 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from typing import Any, Mapping
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, StopSyncPerValidationPolicy
from airbyte_cdk.sources.file_based.schema_helpers import conforms_to_schema
from airbyte_cdk.sources.file_based.schema_validation_policies import AbstractSchemaValidationPolicy
class EmitRecordPolicy(AbstractSchemaValidationPolicy):
name = "emit_record"
def record_passes_validation_policy(self, record: Mapping[str, Any], schema: Mapping[str, Any]) -> bool:
return True
class SkipRecordPolicy(AbstractSchemaValidationPolicy):
name = "skip_record"
def record_passes_validation_policy(self, record: Mapping[str, Any], schema: Mapping[str, Any]) -> bool:
return conforms_to_schema(record, schema)
class WaitForDiscoverPolicy(AbstractSchemaValidationPolicy):
name = "wait_for_discover"
def record_passes_validation_policy(self, record: Mapping[str, Any], schema: Mapping[str, Any]) -> bool:
if not conforms_to_schema(record, schema):
raise StopSyncPerValidationPolicy(FileBasedSourceError.STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY)
return True
DEFAULT_SCHEMA_VALIDATION_POLICIES = {
"emit_record": EmitRecordPolicy(),
"skip_record": SkipRecordPolicy(),
"wait_for_discover": WaitForDiscoverPolicy(),
}

View File

@@ -1,19 +0,0 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from typing import Any, Mapping
from airbyte_cdk.sources.file_based.schema_validation_policies import AbstractSchemaValidationPolicy
class DefaultSchemaValidationPolicy(AbstractSchemaValidationPolicy):
SKIP_RECORD = "skip_record_on_schema_mismatch"
EMIT_RECORD = "emit_record_on_schema_mismatch"
WAIT = "wait_for_discover_on_schema_mismatch"
def record_passes_validation_policy(self, record: Mapping[str, Any], schema: Mapping[str, Any]) -> bool:
"""
Return True if the record passes the user's validation policy.
"""
return True

View File

@@ -4,16 +4,15 @@
from abc import abstractmethod
from functools import cached_property
from typing import Any, Dict, Iterable, List, Mapping, Optional, Type
from typing import Any, Dict, Iterable, List, Mapping, Optional
from airbyte_cdk.models import SyncMode
from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig, PrimaryKeyType
from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, UndefinedParserError
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.schema_validation_policies import AbstractSchemaValidationPolicy
from airbyte_cdk.sources.file_based.types import StreamSlice, StreamState
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
@@ -38,20 +37,19 @@ class AbstractFileBasedStream(Stream):
def __init__(
self,
config: FileBasedStreamConfig,
catalog_schema: Optional[ConfiguredAirbyteCatalog],
stream_reader: AbstractFileBasedStreamReader,
availability_strategy: AvailabilityStrategy,
discovery_policy: AbstractDiscoveryPolicy,
parsers: Dict[str, FileTypeParser],
validation_policies: Type[AbstractSchemaValidationPolicy],
):
super().__init__()
self.config = config
self._catalog_schema = {} # TODO: wire through configured catalog
self._catalog_schema = catalog_schema
self._stream_reader = stream_reader
self._discovery_policy = discovery_policy
self._availability_strategy = availability_strategy
self._parsers = parsers
self.validation_policy = validation_policies(self.config.validation_policy)
@property
@abstractmethod
@@ -124,7 +122,7 @@ class AbstractFileBasedStream(Stream):
raise UndefinedParserError(FileBasedSourceError.UNDEFINED_PARSER, stream=self.name, file_type=file_type)
def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool:
return self.validation_policy.record_passes_validation_policy(record, self.get_json_schema())
return self.config.validation_policy.record_passes_validation_policy(record, self._catalog_schema)
@cached_property
def availability_strategy(self):

View File

@@ -5,16 +5,25 @@
import asyncio
import itertools
import logging
import traceback
from functools import cache
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Union
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, MissingSchemaError, RecordParseError, SchemaInferenceError
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, Type
from airbyte_cdk.sources.file_based.exceptions import (
FileBasedSourceError,
InvalidSchemaError,
MissingSchemaError,
SchemaInferenceError,
StopSyncPerValidationPolicy,
)
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.schema_helpers import merge_schemas
from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream
from airbyte_cdk.sources.file_based.stream.cursor import FileBasedCursor
from airbyte_cdk.sources.file_based.types import StreamSlice
from airbyte_cdk.sources.streams import IncrementalMixin
from airbyte_cdk.sources.streams.core import JsonSchema
from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
@@ -65,17 +74,49 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin):
for file in stream_slice["files"]:
# only serialize the datetime once
file_datetime_string = file.last_modified.strftime("%Y-%m-%dT%H:%M:%SZ")
n_skipped = line_no = 0
try:
for record in parser.parse_records(self.config, file, self._stream_reader):
line_no += 1
if not self.record_passes_validation_policy(record):
logging.warning(f"Record did not pass validation policy: {record}")
n_skipped += 1
continue
record[self.ab_last_mod_col] = file_datetime_string
record[self.ab_file_name_col] = file.uri
yield stream_data_to_airbyte_message(self.name, record)
self._cursor.add_file(file)
except StopSyncPerValidationPolicy:
yield AirbyteMessage(
type=Type.LOG,
log=AirbyteLogMessage(
level=Level.INFO,
message=f"Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema. stream={self.name} file={file.uri} validation_policy={self.config.validation_policy.name} n_skipped={n_skipped}",
),
)
break
except Exception as exc:
raise RecordParseError(FileBasedSourceError.ERROR_PARSING_FILE, stream=self.name) from exc
yield AirbyteMessage(
type=Type.LOG,
log=AirbyteLogMessage(
level=Level.ERROR,
message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}",
stack_trace="\n".join(traceback.format_exception(etype=type(exc), value=exc, tb=exc.__traceback__)),
),
)
else:
if n_skipped:
yield AirbyteMessage(
type=Type.LOG,
log=AirbyteLogMessage(
level=Level.INFO,
message=f"Records in file did not pass validation policy. stream={self.name} file={file.uri} n_skipped={n_skipped} validation_policy={self.config.validation_policy.name}",
),
)
@property
def cursor_field(self) -> Union[str, List[str]]:
@@ -86,18 +127,27 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin):
return self.ab_last_mod_col
@cache
def get_json_schema(self) -> Mapping[str, Any]:
def get_json_schema(self) -> JsonSchema:
extra_fields = {
self.ab_last_mod_col: {"type": "string"},
self.ab_file_name_col: {"type": "string"},
}
schema = self._get_raw_json_schema()
schema["properties"] = {**extra_fields, **schema.get("properties", {})}
return schema
try:
schema = self._get_raw_json_schema()
except Exception as exc:
raise SchemaInferenceError(FileBasedSourceError.SCHEMA_INFERENCE_ERROR, stream=self.name) from exc
else:
if not schema:
raise InvalidSchemaError(
FileBasedSourceError.INVALID_SCHEMA_ERROR,
details=f"Empty schema. Please check that the files are valid {self.config.file_type}",
stream=self.name,
)
return {"type": "object", "properties": {**extra_fields, **schema}}
def _get_raw_json_schema(self) -> Mapping[str, Any]:
def _get_raw_json_schema(self) -> JsonSchema:
if self.config.input_schema:
type_mapping = self.config.input_schema
schema = self.config.input_schema
else:
files = self.list_files()
max_n_files_for_schema_inference = self._discovery_policy.max_n_files_for_schema_inference
@@ -105,8 +155,9 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin):
# Use the most recent files for schema inference, so we pick up schema changes during discovery.
files = sorted(files, key=lambda x: x.last_modified, reverse=True)[:max_n_files_for_schema_inference]
logging.warning(f"Refusing to infer schema for {len(files)} files; using {max_n_files_for_schema_inference} files.")
type_mapping = self.infer_schema(files)
return type_mapping
schema = self.infer_schema(files)
return schema
@cache
def list_files(self) -> List[RemoteFile]:
@@ -128,7 +179,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin):
Each file type has a corresponding `infer_schema` handler.
Dispatch on file type.
"""
base_schema = {}
base_schema: Dict[str, str] = {}
pending_tasks = set()
n_started, n_files = 0, len(files)
@@ -143,7 +194,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin):
for task in done:
base_schema = merge_schemas(base_schema, task.result())
return {"type": "object", "properties": base_schema}
return base_schema
async def _infer_file_schema(self, file: RemoteFile) -> Mapping[str, Any]:
try:

View File

@@ -27,6 +27,8 @@ if typing.TYPE_CHECKING:
# AirbyteMessage: An AirbyteMessage. Could be of any type
StreamData = Union[Mapping[str, Any], AirbyteMessage]
JsonSchema = Mapping[str, Any]
def package_name_from_class(cls: object) -> str:
"""Find the package name given a class name"""

View File

@@ -4,6 +4,7 @@
import pytest as pytest
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig, QuotingBehavior
from airbyte_cdk.sources.file_based.schema_validation_policies import EmitRecordPolicy
from pydantic import ValidationError
@@ -25,7 +26,8 @@ def test_csv_config(file_type, input_format, expected_format, expected_error):
"name": "stream1",
"file_type": file_type,
"globs": ["*"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
"validation_policies": {"emit_record": EmitRecordPolicy()},
"format": input_format,
}
if expected_error:

View File

@@ -4,14 +4,22 @@
from datetime import datetime
from io import IOBase
from typing import Any, List, Mapping, Optional
from typing import Any, Dict, List, Mapping, Optional
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.discovery_policy import DefaultDiscoveryPolicy
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
from airbyte_cdk.sources.file_based.file_types.csv_parser import CsvParser
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.schema_validation_policies import AbstractSchemaValidationPolicy
from unit_tests.sources.file_based.in_memory_files_source import InMemoryFilesStreamReader
class EmptySchemaParser(CsvParser):
async def infer_schema(self, config: FileBasedStreamConfig, file: RemoteFile, stream_reader: AbstractFileBasedStreamReader) -> Dict[str, Any]:
return {}
class LowInferenceLimitDiscoveryPolicy(DefaultDiscoveryPolicy):
@property
def max_n_files_for_schema_inference(self):

View File

@@ -8,13 +8,14 @@ from datetime import datetime
from io import IOBase
from typing import Any, Dict, Iterable, List, Optional
from airbyte_cdk.models import ConfiguredAirbyteCatalog
from airbyte_cdk.sources.file_based.default_file_based_availability_strategy import DefaultFileBasedAvailabilityStrategy
from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy
from airbyte_cdk.sources.file_based.file_based_source import FileBasedSource
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.schema_validation_policies import AbstractSchemaValidationPolicy, DefaultSchemaValidationPolicy
from airbyte_cdk.sources.file_based.schema_validation_policies import DEFAULT_SCHEMA_VALIDATION_POLICIES, AbstractSchemaValidationPolicy
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
@@ -25,19 +26,21 @@ class InMemoryFilesSource(FileBasedSource):
file_type,
availability_strategy: AvailabilityStrategy,
discovery_policy: AbstractDiscoveryPolicy,
validation_policies: AbstractSchemaValidationPolicy,
validation_policies: Dict[str, AbstractSchemaValidationPolicy],
parsers: Dict[str, FileTypeParser],
stream_reader: AbstractFileBasedStreamReader,
file_write_options: Dict[str, Any]
catalog: Optional[Dict[str, Any]],
file_write_options: Dict[str, Any],
):
stream_reader = stream_reader or InMemoryFilesStreamReader(files=files, file_type=file_type, file_write_options=file_write_options)
availability_strategy = availability_strategy or DefaultFileBasedAvailabilityStrategy(stream_reader)
super().__init__(
stream_reader,
catalog=ConfiguredAirbyteCatalog(streams=catalog["streams"]) if catalog else None,
availability_strategy=availability_strategy,
discovery_policy=discovery_policy,
parsers=parsers,
validation_policies=validation_policies or DefaultSchemaValidationPolicy,
validation_policies=validation_policies or DEFAULT_SCHEMA_VALIDATION_POLICIES,
)
# Attributes required for test purposes

View File

@@ -2,7 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError
from airbyte_cdk.sources.file_based.exceptions import ConfigValidationError, FileBasedSourceError
from unit_tests.sources.file_based.helpers import (
FailingSchemaValidationPolicy,
TestErrorListMatchingFilesInMemoryFilesStreamReader,
@@ -19,7 +19,7 @@ _base_success_scenario = (
"name": "stream1",
"file_type": "csv",
"globs": ["*.csv"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
}
]
}
@@ -57,13 +57,13 @@ success_multi_stream_scenario = (
"name": "stream1",
"file_type": "csv",
"globs": ["*.csv", "*.gz"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
},
{
"name": "stream2",
"file_type": "csv",
"globs": ["*.csv", "*.gz"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
}
]
}
@@ -81,7 +81,7 @@ success_extensionless_scenario = (
"name": "stream1",
"file_type": "csv",
"globs": ["*"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
}
]
}
@@ -111,7 +111,7 @@ success_user_provided_schema_scenario = (
"name": "stream1",
"file_type": "csv",
"globs": ["*.csv"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
"input_schema": {"col1": "string", "col2": "string"},
}
],
@@ -175,7 +175,7 @@ error_record_validation_user_provided_schema_scenario = (
}
)
.set_validation_policies(FailingSchemaValidationPolicy)
.set_expected_check_error(None, FileBasedSourceError.ERROR_VALIDATING_RECORD)
.set_expected_check_error(ConfigValidationError, FileBasedSourceError.ERROR_VALIDATING_RECORD)
).build()
@@ -189,13 +189,13 @@ error_multi_stream_scenario = (
"name": "stream1",
"file_type": "csv",
"globs": ["*.csv"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
},
{
"name": "stream2",
"file_type": "jsonl",
"globs": ["*.csv"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
}
],
}

View File

@@ -2,8 +2,8 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, RecordParseError, SchemaInferenceError
from unit_tests.sources.file_based.helpers import LowInferenceLimitDiscoveryPolicy
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, InvalidSchemaError, SchemaInferenceError
from unit_tests.sources.file_based.helpers import EmptySchemaParser, LowInferenceLimitDiscoveryPolicy
from unit_tests.sources.file_based.scenarios.scenario_builder import TestScenarioBuilder
single_csv_scenario = (
@@ -16,7 +16,7 @@ single_csv_scenario = (
"name": "stream1",
"file_type": "csv",
"globs": ["*"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
}
]
}
@@ -43,10 +43,10 @@ single_csv_scenario = (
"type": "object",
"properties": {
"col1": {
"type": ["null", "string"]
"type": "string"
},
"col2": {
"type": ["null", "string"]
"type": "string"
},
"_ab_source_file_last_modified": {
"type": "string"
@@ -83,7 +83,7 @@ multi_csv_scenario = (
"name": "stream1",
"file_type": "csv",
"globs": ["*"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
}
]
}
@@ -118,13 +118,13 @@ multi_csv_scenario = (
"type": "object",
"properties": {
"col1": {
"type": ["null", "string"]
"type": "string"
},
"col2": {
"type": ["null", "string"]
"type": "string"
},
"col3": {
"type": ["null", "string"]
"type": "string"
},
"_ab_source_file_last_modified": {
"type": "string"
@@ -165,7 +165,7 @@ multi_csv_stream_n_file_exceeds_limit_for_inference = (
"name": "stream1",
"file_type": "csv",
"globs": ["*"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
}
]
}
@@ -200,9 +200,9 @@ multi_csv_stream_n_file_exceeds_limit_for_inference = (
"type": "object",
"properties": {
"col1": {
"type": ["null", "string"]
"type": "string"
}, "col2": {
"type": ["null", "string"]
"type": "string"
},
"_ab_source_file_last_modified": {
"type": "string"
@@ -236,7 +236,7 @@ multi_csv_stream_n_file_exceeds_limit_for_inference = (
invalid_csv_scenario = (
TestScenarioBuilder()
.set_name("invalid_csv_stream")
.set_name("invalid_csv_scenario")
.set_config(
{
"streams": [
@@ -244,7 +244,7 @@ invalid_csv_scenario = (
"name": "stream1",
"file_type": "csv",
"globs": ["*"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
}
]
}
@@ -253,7 +253,7 @@ invalid_csv_scenario = (
{
"a.csv": {
"contents": [
(),
("col1",),
("val11", "val12"),
("val21", "val22"),
],
@@ -271,9 +271,9 @@ invalid_csv_scenario = (
"type": "object",
"properties": {
"col1": {
"type": ["null", "string"]
"type": "string"
}, "col2": {
"type": ["null", "string"]
"type": "string"
},
"_ab_source_file_last_modified": {
"type": "string"
@@ -290,19 +290,21 @@ invalid_csv_scenario = (
]
}
)
.set_expected_records(
.set_expected_records([])
.set_expected_discover_error(SchemaInferenceError, FileBasedSourceError.SCHEMA_INFERENCE_ERROR.value)
.set_expected_logs(
[
{"col1": "val11", "col2": "val12", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "a.csv"},
{"col1": "val21", "col2": "val22", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "b.csv"},
{
"level": "ERROR",
"message": f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream=stream1 file=a.csv line_no=1 n_skipped=0",
},
]
)
.set_expected_discover_error(SchemaInferenceError, FileBasedSourceError.SCHEMA_INFERENCE_ERROR.value)
.set_expected_read_error(RecordParseError, FileBasedSourceError.ERROR_PARSING_FILE.value)
).build()
csv_single_stream_scenario = (
TestScenarioBuilder()
.set_name("csv_single_stream")
.set_name("csv_single_stream_scenario")
.set_config(
{
"streams": [
@@ -310,7 +312,7 @@ csv_single_stream_scenario = (
"name": "stream1",
"file_type": "csv",
"globs": ["*.csv"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
}
]
}
@@ -343,10 +345,10 @@ csv_single_stream_scenario = (
"type": "object",
"properties": {
"col1": {
"type": ["null", "string"]
"type": "string"
},
"col2": {
"type": ["null", "string"]
"type": "string"
},
"_ab_source_file_last_modified": {
"type": "string"
@@ -384,13 +386,13 @@ csv_multi_stream_scenario = (
"name": "stream1",
"file_type": "csv",
"globs": ["*.csv"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
},
{
"name": "stream2",
"file_type": "csv",
"globs": ["b.csv"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
}
]
}
@@ -424,13 +426,13 @@ csv_multi_stream_scenario = (
"type": "object",
"properties": {
"col1": {
"type": ["null", "string"]
"type": "string"
},
"col2": {
"type": ["null", "string"]
"type": "string"
},
"col3": {
"type": ["null", "string"]
"type": "string"
},
"_ab_source_file_last_modified": {
"type": "string"
@@ -450,7 +452,7 @@ csv_multi_stream_scenario = (
"type": "object",
"properties": {
"col3": {
"type": ["null", "string"]
"type": "string"
},
"_ab_source_file_last_modified": {
"type": "string"
@@ -486,6 +488,7 @@ csv_multi_stream_scenario = (
)
).build()
csv_custom_format_scenario = (
TestScenarioBuilder()
.set_name("csv_custom_format")
@@ -496,7 +499,7 @@ csv_custom_format_scenario = (
"name": "stream1",
"file_type": "csv",
"globs": ["*"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
"format": {
"filetype": "csv",
"delimiter": "#",
@@ -531,13 +534,13 @@ csv_custom_format_scenario = (
"type": "object",
"properties": {
"col1": {
"type": ["null", "string"]
"type": "string",
},
"col2": {
"type": ["null", "string"]
"type": "string",
},
"col3": {
"type": ["null", "string"]
"type": "string",
},
"_ab_source_file_last_modified": {
"type": "string"
@@ -573,6 +576,7 @@ csv_custom_format_scenario = (
)
).build()
multi_stream_custom_format = (
TestScenarioBuilder()
.set_name("multi_stream_custom_format_scenario")
@@ -583,7 +587,7 @@ multi_stream_custom_format = (
"name": "stream1",
"file_type": "csv",
"globs": ["*.csv"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
"format": {
"filetype": "csv",
"delimiter": "#",
@@ -596,7 +600,7 @@ multi_stream_custom_format = (
"name": "stream2",
"file_type": "csv",
"globs": ["b.csv"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
"format": {
"filetype": "csv",
"delimiter": "#",
@@ -638,13 +642,13 @@ multi_stream_custom_format = (
"type": "object",
"properties": {
"col1": {
"type": ["null", "string"]
"type": "string",
},
"col2": {
"type": ["null", "string"]
"type": "string",
},
"col3": {
"type": ["null", "string"]
"type": "string",
},
"_ab_source_file_last_modified": {
"type": "string"
@@ -664,7 +668,7 @@ multi_stream_custom_format = (
"type": "object",
"properties": {
"col3": {
"type": ["null", "string"]
"type": "string",
},
"_ab_source_file_last_modified": {
"type": "string"
@@ -704,3 +708,73 @@ multi_stream_custom_format = (
}
)
).build()
empty_schema_inference_scenario = (
TestScenarioBuilder()
.set_name("empty_schema_inference_scenario")
.set_config(
{
"streams": [
{
"name": "stream1",
"file_type": "csv",
"globs": ["*"],
"validation_policy": "emit_record",
}
]
}
)
.set_files(
{
"a.csv": {
"contents": [
("col1", "col2"),
("val11", "val12"),
("val21", "val22"),
],
"last_modified": "2023-06-05T03:54:07.000Z",
}
}
)
.set_file_type("csv")
.set_expected_catalog(
{
"streams": [
{
"default_cursor_field": ["_ab_source_file_last_modified"],
"json_schema": {
"type": "object",
"properties": {
"col1": {
"type": "string"
},
"col2": {
"type": "string"
},
"_ab_source_file_last_modified": {
"type": "string"
},
"_ab_source_file_url": {
"type": "string"
},
},
},
"name": "stream1",
"source_defined_cursor": True,
"supported_sync_modes": ["full_refresh", "incremental"],
}
]
}
)
.set_parsers({'csv': EmptySchemaParser()})
.set_expected_discover_error(InvalidSchemaError, FileBasedSourceError.INVALID_SCHEMA_ERROR.value)
.set_expected_records(
[
{"data": {"col1": "val11", "col2": "val12", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
"_ab_source_file_url": "a.csv"}, "stream": "stream1"},
{"data": {"col1": "val21", "col2": "val22", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
"_ab_source_file_url": "a.csv"}, "stream": "stream1"},
]
)
).build()

View File

@@ -14,7 +14,7 @@ single_csv_input_state_is_earlier_scenario = (
"name": "stream1",
"file_type": "csv",
"globs": ["*.csv"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
}
]
}
@@ -71,9 +71,9 @@ single_csv_input_state_is_earlier_scenario = (
"type": "object",
"properties": {
"col1": {
"type": ["null", "string"]
"type": "string",
}, "col2": {
"type": ["null", "string"]
"type": "string",
},
"_ab_source_file_last_modified": {
"type": "string"
@@ -100,7 +100,7 @@ single_csv_file_is_skipped_if_same_modified_at_as_in_history = (
"name": "stream1",
"file_type": "csv",
"globs": ["*.csv"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
}
]
}
@@ -154,9 +154,9 @@ single_csv_file_is_skipped_if_same_modified_at_as_in_history = (
"type": "object",
"properties": {
"col1": {
"type": ["null", "string"]
"type": "string",
}, "col2": {
"type": ["null", "string"]
"type": "string",
},
"_ab_source_file_last_modified": {
"type": "string"
@@ -183,7 +183,7 @@ single_csv_file_is_synced_if_modified_at_is_more_recent_than_in_history = (
"name": "stream1",
"file_type": "csv",
"globs": ["*.csv"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
}
]
}
@@ -239,9 +239,9 @@ single_csv_file_is_synced_if_modified_at_is_more_recent_than_in_history = (
"type": "object",
"properties": {
"col1": {
"type": ["null", "string"]
"type": "string",
}, "col2": {
"type": ["null", "string"]
"type": "string",
},
"_ab_source_file_last_modified": {
"type": "string"
@@ -268,7 +268,7 @@ single_csv_no_input_state_scenario = (
"name": "stream1",
"file_type": "csv",
"globs": ["*.csv"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
}
]
}
@@ -297,9 +297,9 @@ single_csv_no_input_state_scenario = (
"type": "object",
"properties": {
"col1": {
"type": ["null", "string"]
"type": "string",
}, "col2": {
"type": ["null", "string"]
"type": "string",
},
"_ab_source_file_last_modified": {
"type": "string"
@@ -341,7 +341,7 @@ multi_csv_same_timestamp_scenario = (
"name": "stream1",
"file_type": "csv",
"globs": ["*.csv"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
}
]
}
@@ -376,13 +376,13 @@ multi_csv_same_timestamp_scenario = (
"type": "object",
"properties": {
"col1": {
"type": ["null", "string"]
"type": "string",
},
"col2": {
"type": ["null", "string"]
"type": "string",
},
"col3": {
"type": ["null", "string"]
"type": "string",
},
"_ab_source_file_last_modified": {
"type": "string"
@@ -431,7 +431,7 @@ single_csv_input_state_is_later_scenario = (
"name": "stream1",
"file_type": "csv",
"globs": ["*.csv"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
}
]
}
@@ -460,9 +460,9 @@ single_csv_input_state_is_later_scenario = (
"type": "object",
"properties": {
"col1": {
"type": ["null", "string"]
"type": "string",
}, "col2": {
"type": ["null", "string"]
"type": "string",
},
"_ab_source_file_last_modified": {
"type": "string"
@@ -516,7 +516,7 @@ multi_csv_different_timestamps_scenario = (
"name": "stream1",
"file_type": "csv",
"globs": ["*.csv"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
}
]
}
@@ -551,13 +551,13 @@ multi_csv_different_timestamps_scenario = (
"type": "object",
"properties": {
"col1": {
"type": ["null", "string"]
"type": "string",
},
"col2": {
"type": ["null", "string"]
"type": "string",
},
"col3": {
"type": ["null", "string"]
"type": "string",
},
"_ab_source_file_last_modified": {
"type": "string"
@@ -613,7 +613,7 @@ multi_csv_per_timestamp_scenario = (
"name": "stream1",
"file_type": "csv",
"globs": ["*.csv"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
}
]
}
@@ -656,13 +656,13 @@ multi_csv_per_timestamp_scenario = (
"type": "object",
"properties": {
"col1": {
"type": ["null", "string"]
"type": "string",
},
"col2": {
"type": ["null", "string"]
"type": "string",
},
"col3": {
"type": ["null", "string"]
"type": "string",
},
"_ab_source_file_last_modified": {
"type": "string"
@@ -724,7 +724,7 @@ multi_csv_skip_file_if_already_in_history = (
"name": "stream1",
"file_type": "csv",
"globs": ["*.csv"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
}
]
}
@@ -767,13 +767,13 @@ multi_csv_skip_file_if_already_in_history = (
"type": "object",
"properties": {
"col1": {
"type": ["null", "string"]
"type": "string",
},
"col2": {
"type": ["null", "string"]
"type": "string",
},
"col3": {
"type": ["null", "string"]
"type": "string",
},
"_ab_source_file_last_modified": {
"type": "string"
@@ -844,7 +844,7 @@ multi_csv_include_missing_files_within_history_range = (
"name": "stream1",
"file_type": "csv",
"globs": ["*.csv"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
}
]
}
@@ -887,13 +887,13 @@ multi_csv_include_missing_files_within_history_range = (
"type": "object",
"properties": {
"col1": {
"type": ["null", "string"]
"type": "string",
},
"col2": {
"type": ["null", "string"]
"type": "string",
},
"col3": {
"type": ["null", "string"]
"type": "string",
},
"_ab_source_file_last_modified": {
"type": "string"
@@ -957,7 +957,7 @@ multi_csv_remove_old_files_if_history_is_full_scenario = (
"name": "stream1",
"file_type": "csv",
"globs": ["*.csv"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
"max_history_size": 3,
}
]
@@ -1001,13 +1001,13 @@ multi_csv_remove_old_files_if_history_is_full_scenario = (
"type": "object",
"properties": {
"col1": {
"type": ["null", "string"]
"type": "string",
},
"col2": {
"type": ["null", "string"]
"type": "string",
},
"col3": {
"type": ["null", "string"]
"type": "string",
},
"_ab_source_file_last_modified": {
"type": "string"
@@ -1092,7 +1092,7 @@ multi_csv_same_timestamp_more_files_than_history_size_scenario = (
"name": "stream1",
"file_type": "csv",
"globs": ["*.csv"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
"max_history_size": 3,
"days_to_sync_if_history_is_full": 3,
}
@@ -1145,13 +1145,13 @@ multi_csv_same_timestamp_more_files_than_history_size_scenario = (
"type": "object",
"properties": {
"col1": {
"type": ["null", "string"]
"type": "string",
},
"col2": {
"type": ["null", "string"]
"type": "string",
},
"col3": {
"type": ["null", "string"]
"type": "string",
},
"_ab_source_file_last_modified": {
"type": "string"
@@ -1209,7 +1209,7 @@ multi_csv_sync_recent_files_if_history_is_incomplete_scenario = (
"name": "stream1",
"file_type": "csv",
"globs": ["*.csv"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
"max_history_size": 3,
"days_to_sync_if_history_is_full": 3,
}
@@ -1262,13 +1262,13 @@ multi_csv_sync_recent_files_if_history_is_incomplete_scenario = (
"type": "object",
"properties": {
"col1": {
"type": ["null", "string"]
"type": "string",
},
"col2": {
"type": ["null", "string"]
"type": "string",
},
"col3": {
"type": ["null", "string"]
"type": "string",
},
"_ab_source_file_last_modified": {
"type": "string"
@@ -1325,7 +1325,7 @@ multi_csv_sync_files_within_time_window_if_history_is_incomplete__different_time
"name": "stream1",
"file_type": "csv",
"globs": ["*.csv"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
"max_history_size": 3,
"days_to_sync_if_history_is_full": 3,
}
@@ -1378,13 +1378,13 @@ multi_csv_sync_files_within_time_window_if_history_is_incomplete__different_time
"type": "object",
"properties": {
"col1": {
"type": ["null", "string"]
"type": "string",
},
"col2": {
"type": ["null", "string"]
"type": "string",
},
"col3": {
"type": ["null", "string"]
"type": "string",
},
"_ab_source_file_last_modified": {
"type": "string"
@@ -1447,7 +1447,7 @@ multi_csv_sync_files_within_history_time_window_if_history_is_incomplete_differe
"name": "stream1",
"file_type": "csv",
"globs": ["*.csv"],
"validation_policy": "emit_record_on_schema_mismatch",
"validation_policy": "emit_record",
"max_history_size": 3,
"days_to_sync_if_history_is_full": 3,
}
@@ -1500,13 +1500,13 @@ multi_csv_sync_files_within_history_time_window_if_history_is_incomplete_differe
"type": "object",
"properties": {
"col1": {
"type": ["null", "string"]
"type": "string",
},
"col2": {
"type": ["null", "string"]
"type": "string",
},
"col3": {
"type": ["null", "string"]
"type": "string",
},
"_ab_source_file_last_modified": {
"type": "string"

View File

@@ -30,11 +30,12 @@ class TestScenario:
files: Dict[str, Any],
file_type: str,
expected_check_status: Optional[str],
expected_catalog: Dict[str, Any],
expected_catalog: Optional[Dict[str, Any]],
expected_logs: Optional[Dict[str, Any]],
expected_records: Optional[Dict[str, Any]],
availability_strategy: Optional[AvailabilityStrategy],
discovery_policy: Optional[AbstractDiscoveryPolicy],
validation_policies: Optional[AbstractSchemaValidationPolicy],
validation_policies: Optional[Dict[str, AbstractSchemaValidationPolicy]],
parsers: Optional[Dict[str, FileTypeParser]],
stream_reader: Optional[AbstractFileBasedStreamReader],
expected_check_error: Tuple[Optional[Exception], Optional[str]],
@@ -47,6 +48,7 @@ class TestScenario:
self.config = config
self.expected_check_status = expected_check_status
self.expected_catalog = expected_catalog
self.expected_logs = expected_logs
self.expected_records = expected_records
self.expected_check_error = expected_check_error
self.expected_discover_error = expected_discover_error
@@ -59,6 +61,7 @@ class TestScenario:
validation_policies,
parsers,
stream_reader,
self.configured_catalog(SyncMode.incremental if incremental_scenario_config else SyncMode.full_refresh),
file_write_options,
)
self.incremental_scenario_config = incremental_scenario_config
@@ -72,7 +75,9 @@ class TestScenario:
expected_streams = {s["name"] for s in self.expected_catalog["streams"]}
assert expected_streams <= streams
def configured_catalog(self, sync_mode: SyncMode) -> Dict[str, Any]:
def configured_catalog(self, sync_mode: SyncMode) -> Optional[Dict[str, Any]]:
if not self.expected_catalog:
return
catalog = {"streams": []}
for stream in self.expected_catalog["streams"]:
catalog["streams"].append(
@@ -100,6 +105,7 @@ class TestScenarioBuilder:
self._file_type = None
self._expected_check_status = None
self._expected_catalog = {}
self._expected_logs = {}
self._expected_records = {}
self._availability_strategy = None
self._discovery_policy = DefaultDiscoveryPolicy()
@@ -136,6 +142,10 @@ class TestScenarioBuilder:
self._expected_catalog = expected_catalog
return self
def set_expected_logs(self, expected_logs: Dict[str, Any]):
self._expected_logs = expected_logs
return self
def set_expected_records(self, expected_records: Dict[str, Any]):
self._expected_records = expected_records
return self
@@ -191,6 +201,7 @@ class TestScenarioBuilder:
self._file_type,
self._expected_check_status,
self._expected_catalog,
self._expected_logs,
self._expected_records,
self._availability_strategy,
self._discovery_policy,

View File

@@ -0,0 +1,508 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from airbyte_cdk.sources.file_based.exceptions import ConfigValidationError, FileBasedSourceError
from unit_tests.sources.file_based.scenarios.scenario_builder import TestScenarioBuilder
_base_single_stream_scenario = (
TestScenarioBuilder()
.set_files(
{
"a.csv": { # The records in this file do not conform to the schema
"contents": [
("col1", "col2"),
("val_a_11", "val_a_21"),
("val_a_12", "val_a_22"),
],
"last_modified": "2023-06-05T03:54:07.000Z",
},
"b.csv": {
"contents": [
("col1",),
("val_b_11",),
("val_b_12",),
],
"last_modified": "2023-06-05T03:54:07.000Z",
},
"c.csv": {
"contents": [
("col1",),
("val_c_11",),
("val_c_12","val_c_22"), # This record doesn't conform to the schema
("val_c_13",),
],
"last_modified": "2023-06-05T03:54:07.000Z",
},
"d.csv": {
"contents": [
("col1",),
("val_d_11",),
],
"last_modified": "2023-06-05T03:54:07.000Z",
},
}
)
.set_file_type("csv")
.set_expected_catalog(
{
"streams": [
{
'default_cursor_field': ['_ab_source_file_last_modified'],
"json_schema": {
"type": "object",
"properties": {
"col1": {
"type": "string",
},
# "col2": { # remove this so the record does not conform to the schema
# "type": "string",
# },
"_ab_source_file_last_modified": {
"type": "string"
},
"_ab_source_file_url": {
"type": "string"
},
},
},
"name": "stream1",
"source_defined_cursor": True,
"supported_sync_modes": ["full_refresh", "incremental"],
}
]
}
)
)
_base_multi_stream_scenario = (
TestScenarioBuilder()
.set_files(
{
"a/a1.csv": { # The records in this file do not conform to the schema
"contents": [
("col1", "col2"),
("val_aa1_11", "val_aa1_21"),
("val_aa1_12", "val_aa1_22"),
],
"last_modified": "2023-06-05T03:54:07.000Z",
},
"a/a2.csv": {
"contents": [
("col1",),
("val_aa2_11",),
("val_aa2_12",),
],
"last_modified": "2023-06-05T03:54:07.000Z",
},
"a/a3.csv": {
"contents": [
("col1",),
("val_aa3_11",),
("val_aa3_12", "val_aa3_22"), # This record does not conform to the schema
("val_aa3_13",),
],
"last_modified": "2023-06-05T03:54:07.000Z",
},
"a/a4.csv": {
"contents": [
("col1",),
("val_aa4_11",),
],
"last_modified": "2023-06-05T03:54:07.000Z",
},
"b/b1.csv": { # The records in this file do not conform to the schema
"contents": [
("col1",),
("val_bb1_11",),
("val_bb1_12",),
],
"last_modified": "2023-06-05T03:54:07.000Z",
},
"b/b2.csv": {
"contents": [
("col1", "col2"),
("val_bb2_11", "val_bb2_21"),
("val_bb2_12", "val_bb2_22"),
],
"last_modified": "2023-06-05T03:54:07.000Z",
},
"b/b3.csv": {
"contents": [
("col1",),
("val_bb3_11",),
("val_bb3_12",),
],
"last_modified": "2023-06-05T03:54:07.000Z",
},
}
)
.set_file_type("csv")
.set_expected_catalog(
{
"streams": [
{
'default_cursor_field': ['_ab_source_file_last_modified'],
"json_schema": {
"type": "object",
"properties": {
"col1": {
"type": "string",
},
# "col2": { # remove this so the record does not conform to the schema
# "type": "string",
# },
"_ab_source_file_last_modified": {
"type": "string"
},
"_ab_source_file_url": {
"type": "string"
},
},
},
"name": "stream1",
"source_defined_cursor": True,
"supported_sync_modes": ["full_refresh", "incremental"],
},
{
"json_schema": {
'default_cursor_field': ['_ab_source_file_last_modified'],
"type": "object",
"properties": {
"col1": {
"type": "string",
},
# "col2": { # remove this so the record does not conform to the schema
# "type": "string",
# },
"_ab_source_file_last_modified": {
"type": "string"
},
"_ab_source_file_url": {
"type": "string"
},
},
},
"name": "stream2",
"source_defined_cursor": True,
"supported_sync_modes": ["full_refresh", "incremental"],
},
]
}
)
)
skip_record_scenario_single_stream = (
_base_single_stream_scenario.copy()
.set_name("skip_record_scenario_single_stream")
.set_config(
{
"streams": [
{
"name": "stream1",
"file_type": "csv",
"globs": ["*.csv"],
"validation_policy": "skip_record",
}
]
}
)
.set_expected_records(
[
# {"data": {"col1": "val_a_11", "col2": "val_a_21", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "a.csv"}, "stream": "stream1"}, # This record is skipped because it does not conform
# {"data": {"col1": "val_a_12", "col2": "val_a_22", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "a.csv"}, "stream": "stream1"}, # This record is skipped because it does not conform
{"data": {"col1": "val_b_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "b.csv"}, "stream": "stream1"},
{"data": {"col1": "val_b_12", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "b.csv"}, "stream": "stream1"},
{"data": {"col1": "val_c_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "c.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_c_12", None: "val_c_22", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "c.csv"}, "stream": "stream1"}, # This record is malformed so should not be emitted
{"data": {"col1": "val_c_13", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "c.csv"}, "stream": "stream1"},
{"data": {"col1": "val_d_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "d.csv"}, "stream": "stream1"},
]
)
.set_expected_logs(
[
{
"level": "INFO",
"message": "Records in file did not pass validation policy. stream=stream1 file=a.csv n_skipped=2 validation_policy=skip_record",
},
{
"level": "INFO",
"message": "Records in file did not pass validation policy. stream=stream1 file=c.csv n_skipped=1 validation_policy=skip_record",
},
]
)
).build()
skip_record_scenario_multi_stream = (
_base_multi_stream_scenario.copy()
.set_name("skip_record_scenario_multi_stream")
.set_config(
{
"streams": [
{
"name": "stream1",
"file_type": "csv",
"globs": ["a/*.csv"],
"validation_policy": "skip_record",
},
{
"name": "stream2",
"file_type": "csv",
"globs": ["b/*.csv"],
"validation_policy": "skip_record",
}
]
}
)
.set_expected_records(
[
# {"data": {"col1": "val_aa1_11", "col2": "val_aa1_21", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "a/a1.csv"}, "stream": "stream1"}, # This record is skipped because it does not conform
# {"data": {"col1": "val_aa1_12", "col2": "val_aa1_22", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "a/a1.csv"}, "stream": "stream1"}, # This record is skipped because it does not conform
{"data": {"col1": "val_aa2_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "a/a2.csv"}, "stream": "stream1"},
{"data": {"col1": "val_aa2_12", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "a/a2.csv"}, "stream": "stream1"},
{"data": {"col1": "val_aa3_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "a/a3.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_aa3_12", None: "val_aa3_22", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "a/a3.csv"}, "stream": "stream1"}, # This record is malformed so should not be emitted
{"data": {"col1": "val_aa3_13", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "a/a3.csv"}, "stream": "stream1"},
{"data": {"col1": "val_aa4_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "a/a4.csv"}, "stream": "stream1"},
{"data": {"col1": "val_bb1_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "b/b1.csv"}, "stream": "stream2"}, # This record is skipped because it does not conform
{"data": {"col1": "val_bb1_12", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "b/b1.csv"}, "stream": "stream2"}, # This record is skipped because it does not conform
# {"data": {"col1": "val_bb2_11", "col2": "val_bb2_21", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "b/b2.csv"}, "stream": "stream2"},
# {"data": {"col1": "val_bb2_12", "col2": "val_bb2_21", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "b/b2.csv"}, "stream": "stream2"},
{"data": {"col1": "val_bb3_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "b/b3.csv"}, "stream": "stream2"},
{"data": {"col1": "val_bb3_12", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "b/b3.csv"}, "stream": "stream2"},
]
)
.set_expected_logs(
[
{
"level": "INFO",
"message": "Records in file did not pass validation policy. stream=stream1 file=a/a1.csv n_skipped=2 validation_policy=skip_record",
},
{
"level": "INFO",
"message": "Records in file did not pass validation policy. stream=stream1 file=a/a3.csv n_skipped=1 validation_policy=skip_record",
},
{
"level": "INFO",
"message": "Records in file did not pass validation policy. stream=stream2 file=b/b2.csv n_skipped=2 validation_policy=skip_record",
},
]
)
).build()
emit_record_scenario_single_stream = (
_base_single_stream_scenario.copy()
.set_name("emit_record_scenario_single_stream")
.set_config(
{
"streams": [
{
"name": "stream1",
"file_type": "csv",
"globs": ["*.csv"],
"validation_policy": "emit_record",
}
]
}
)
.set_expected_records(
[
{"data": {"col1": "val_a_11", "col2": "val_a_21", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "a.csv"}, "stream": "stream1"},
{"data": {"col1": "val_a_12", "col2": "val_a_22", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "a.csv"}, "stream": "stream1"},
{"data": {"col1": "val_b_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "b.csv"}, "stream": "stream1"},
{"data": {"col1": "val_b_12", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "b.csv"}, "stream": "stream1"},
{"data": {"col1": "val_c_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "c.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_c_12", None: "val_c_22", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "c.csv"}, "stream": "stream1"}, # This record is malformed so should not be emitted
# {"data": {"col1": "val_c_13", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "c.csv"}, "stream": "stream1"}, # No more records from this file are emitted after we hit a parse error
{"data": {"col1": "val_d_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "d.csv"}, "stream": "stream1"},
]
)
.set_expected_logs(
[
{
"level": "ERROR",
"message": f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream=stream1 file=c.csv line_no=2 n_skipped=0",
},
]
)
).build()
emit_record_scenario_multi_stream = (
_base_multi_stream_scenario.copy()
.set_name("emit_record_scenario_multi_stream")
.set_config(
{
"streams": [
{
"name": "stream1",
"file_type": "csv",
"globs": ["a/*.csv"],
"validation_policy": "emit_record",
},
{
"name": "stream2",
"file_type": "csv",
"globs": ["b/*.csv"],
"validation_policy": "emit_record",
}
]
}
)
.set_expected_records(
[
{"data": {"col1": "val_aa1_11", "col2": "val_aa1_21", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "a/a1.csv"}, "stream": "stream1"},
{"data": {"col1": "val_aa1_12", "col2": "val_aa1_22", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "a/a1.csv"}, "stream": "stream1"},
{"data": {"col1": "val_aa2_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "a/a2.csv"}, "stream": "stream1"},
{"data": {"col1": "val_aa2_12", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "a/a2.csv"}, "stream": "stream1"},
{"data": {"col1": "val_aa3_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "a/a3.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_aa3_12", None: "val_aa3_22", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "a/a3.csv"}, "stream": "stream1"}, # This record is malformed so should not be emitted
# {"data": {"col1": "val_aa3_13", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "a/a3.csv"}, "stream": "stream1"}, # No more records from this file are emitted after we hit a parse error
{"data": {"col1": "val_aa4_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "a/a4.csv"}, "stream": "stream1"},
{"data": {"col1": "val_bb1_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "b/b1.csv"}, "stream": "stream2"},
{"data": {"col1": "val_bb1_12", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "b/b1.csv"}, "stream": "stream2"},
{"data": {"col1": "val_bb2_11", "col2": "val_bb2_21", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "b/b2.csv"}, "stream": "stream2"},
{"data": {"col1": "val_bb2_12", "col2": "val_bb2_22", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "b/b2.csv"}, "stream": "stream2"},
{"data": {"col1": "val_bb3_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "b/b3.csv"}, "stream": "stream2"},
{"data": {"col1": "val_bb3_12", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "b/b3.csv"}, "stream": "stream2"},
]
)
.set_expected_logs(
[
{
"level": "ERROR",
"message": f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream=stream1 file=a/a3.csv line_no=2 n_skipped=0",
},
]
)
).build()
wait_for_rediscovery_scenario_single_stream = (
_base_single_stream_scenario.copy()
.set_name("wait_for_rediscovery_scenario_single_stream")
.set_config(
{
"streams": [
{
"name": "stream1",
"file_type": "csv",
"globs": ["*.csv"],
"validation_policy": "wait_for_discover",
}
]
}
)
.set_expected_records(
[] # No records are expected because the very first file did not conform to the schema
)
.set_expected_logs(
[
{
"level": "INFO",
"message": "Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema. stream=stream1 file=a.csv validation_policy=wait_for_discover n_skipped=0",
},
]
)
).build()
wait_for_rediscovery_scenario_multi_stream = (
_base_multi_stream_scenario.copy()
.set_name("wait_for_rediscovery_scenario_multi_stream")
.set_config(
{
"streams": [
{
"name": "stream1",
"file_type": "csv",
"globs": ["a/*.csv"],
"validation_policy": "wait_for_discover",
},
{
"name": "stream2",
"file_type": "csv",
"globs": ["b/*.csv"],
"validation_policy": "wait_for_discover",
}
]
}
)
.set_expected_records(
[
# {"data": {"col1": "val_aa1_11", "col2": "val_aa1_21", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "a/a1.csv"}, "stream": "stream1"}, # The first record does not conform so we don't sync anything from this stream
# {"data": {"col1": "val_aa1_12", "col2": "val_aa1_22", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "a/a1.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_aa2_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "a/a2.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_aa2_12", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "a/a2.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_aa3_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "a/a3.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_aa3_12", None: "val_aa3_22", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "a/a3.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_aa3_13", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "a/a3.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_aa4_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "a/a4.csv"}, "stream": "stream1"},
{"data": {"col1": "val_bb1_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "b/b1.csv"}, "stream": "stream2"},
{"data": {"col1": "val_bb1_12", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "b/b1.csv"}, "stream": "stream2"},
# {"data": {"col1": "val_bb2_11", "col2": "val_bb2_21", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "b/b2.csv"}, "stream": "stream2"}, # No more records from this stream are emitted after a nonconforming record is encountered
# {"data": {"col1": "val_bb2_12", "col2": "val_bb2_21", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "b/b2.csv"}, "stream": "stream2"},
# {"data": {"col1": "val_bb3_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "b/b3.csv"}, "stream": "stream2"},
# {"data": {"col1": "val_bb3_12", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "b/b3.csv"}, "stream": "stream2"},
]
)
.set_expected_logs(
[
{
"level": "INFO",
"message": "Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema. stream=stream1 file=a/a1.csv validation_policy=wait_for_discover n_skipped=0",
},
{
"level": "INFO",
"message": "Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema. stream=stream2 file=b/b2.csv validation_policy=wait_for_discover n_skipped=0",
},
]
)
).build()
invalid_validation_policy = (
_base_single_stream_scenario.copy()
.set_name("invalid_validation_policy")
.set_config(
{
"streams": [
{
"name": "stream1",
"file_type": "csv",
"globs": ["*.csv"],
"validation_policy": "x",
}
]
}
)
.set_expected_read_error(ConfigValidationError, FileBasedSourceError.CONFIG_VALIDATION_ERROR.value)
).build()
no_validation_policy = (
_base_single_stream_scenario.copy()
.set_name("empty_validation_policy")
.set_config(
{
"streams": [
{
"name": "stream1",
"file_type": "csv",
"globs": ["*.csv"],
"validation_policy": "",
}
]
}
)
.set_expected_read_error(ConfigValidationError, FileBasedSourceError.CONFIG_VALIDATION_ERROR.value)
).build()

View File

@@ -0,0 +1,56 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from typing import Any, Mapping
import pytest
from airbyte_cdk.sources.file_based.exceptions import StopSyncPerValidationPolicy
from airbyte_cdk.sources.file_based.schema_validation_policies import DEFAULT_SCHEMA_VALIDATION_POLICIES
CONFORMING_RECORD = {
"col1": "val1",
"col2": 1,
}
NONCONFORMING_RECORD = {
"col1": "val1",
"extra_col": "x",
}
SCHEMA = {
"type": "object",
"properties": {
"col1": {
"type": "string"
},
"col2": {
"type": "integer"
},
}
}
@pytest.mark.parametrize(
"record,schema,validation_policy,expected_result",
[
pytest.param(CONFORMING_RECORD, SCHEMA, "emit_record", True, id="record-conforms_emit_record"),
pytest.param(NONCONFORMING_RECORD, SCHEMA, "emit_record", True, id="nonconforming_emit_record"),
pytest.param(CONFORMING_RECORD, SCHEMA, "skip_record", True, id="record-conforms_skip_record"),
pytest.param(NONCONFORMING_RECORD, SCHEMA, "skip_record", False, id="nonconforming_skip_record"),
pytest.param(CONFORMING_RECORD, SCHEMA, "wait_for_discover", True, id="record-conforms_wait_for_discover"),
pytest.param(NONCONFORMING_RECORD, SCHEMA, "wait_for_discover", False, id="nonconforming_wait_for_discover"),
]
)
def test_record_passes_validation_policy(
record: Mapping[str, Any],
schema: Mapping[str, Any],
validation_policy: str,
expected_result: bool
):
if validation_policy == "wait_for_discover" and expected_result is False:
with pytest.raises(StopSyncPerValidationPolicy):
DEFAULT_SCHEMA_VALIDATION_POLICIES[validation_policy].record_passes_validation_policy(record, schema)
else:
assert DEFAULT_SCHEMA_VALIDATION_POLICIES[validation_policy].record_passes_validation_policy(record, schema) == expected_result

View File

@@ -27,6 +27,7 @@ from unit_tests.sources.file_based.scenarios.csv_scenarios import (
csv_custom_format_scenario,
csv_multi_stream_scenario,
csv_single_stream_scenario,
empty_schema_inference_scenario,
invalid_csv_scenario,
multi_csv_scenario,
multi_csv_stream_n_file_exceeds_limit_for_inference,
@@ -50,8 +51,18 @@ from unit_tests.sources.file_based.scenarios.incremental_scenarios import (
single_csv_input_state_is_later_scenario,
single_csv_no_input_state_scenario,
)
from unit_tests.sources.file_based.scenarios.validation_policy_scenarios import (
emit_record_scenario_multi_stream,
emit_record_scenario_single_stream,
invalid_validation_policy,
no_validation_policy,
skip_record_scenario_multi_stream,
skip_record_scenario_single_stream,
wait_for_rediscovery_scenario_multi_stream,
wait_for_rediscovery_scenario_single_stream,
)
scenarios = [
discover_scenarios = [
csv_multi_stream_scenario,
csv_single_stream_scenario,
invalid_csv_scenario,
@@ -75,10 +86,11 @@ scenarios = [
single_csv_file_is_synced_if_modified_at_is_more_recent_than_in_history,
csv_custom_format_scenario,
multi_stream_custom_format,
empty_schema_inference_scenario,
]
@pytest.mark.parametrize("scenario", scenarios, ids=[s.name for s in scenarios])
@pytest.mark.parametrize("scenario", discover_scenarios, ids=[s.name for s in discover_scenarios])
def test_discover(capsys, tmp_path, json_spec, scenario):
expected_exc, expected_msg = scenario.expected_discover_error
if expected_exc:
@@ -89,7 +101,19 @@ def test_discover(capsys, tmp_path, json_spec, scenario):
assert discover(capsys, tmp_path, scenario) == scenario.expected_catalog
@pytest.mark.parametrize("scenario", scenarios, ids=[s.name for s in scenarios])
read_scenarios = discover_scenarios + [
emit_record_scenario_multi_stream,
emit_record_scenario_single_stream,
invalid_validation_policy,
no_validation_policy,
skip_record_scenario_multi_stream,
skip_record_scenario_single_stream,
wait_for_rediscovery_scenario_multi_stream,
wait_for_rediscovery_scenario_single_stream,
]
@pytest.mark.parametrize("scenario", read_scenarios, ids=[s.name for s in read_scenarios])
@freeze_time("2023-06-09T00:00:00Z")
def test_read(capsys, tmp_path, json_spec, scenario):
if scenario.incremental_scenario_config:
@@ -100,17 +124,31 @@ def test_read(capsys, tmp_path, json_spec, scenario):
def run_test_read_full_refresh(capsys, tmp_path, scenario):
expected_exc, expected_msg = scenario.expected_read_error
expected_records = scenario.expected_records
expected_logs = scenario.expected_logs
if expected_exc:
with pytest.raises(expected_exc) as exc:
read(capsys, tmp_path, scenario)
assert expected_msg in get_error_message_from_exc(exc)
else:
output = read(capsys, tmp_path, scenario)
expected_output = scenario.expected_records
assert len(output) == len(expected_output)
for actual, expected in zip(output, expected_output):
assert actual["record"]["data"] == expected["data"]
assert actual["record"]["stream"] == expected["stream"]
records, logs = output["records"], output["logs"]
assert len(records) == len(expected_records)
assert len(logs) == len(expected_logs)
assert_expected_records_match_output(records, expected_records)
assert_expected_logs_match_output(logs, expected_logs)
def assert_expected_records_match_output(output: List[Dict[str, Any]], expected_output: List[Dict[str, Any]]):
for actual, expected in zip(output, expected_output):
assert actual["record"]["data"] == expected["data"]
assert actual["record"]["stream"] == expected["stream"]
def assert_expected_logs_match_output(logs: List[Dict[str, Any]], expected_logs: List[Dict[str, Any]]):
for actual, expected in zip(logs, expected_logs):
assert actual["log"]["level"] == expected["level"]
assert actual["log"]["message"] == expected["message"]
def run_test_read_incremental(capsys, tmp_path, scenario):
@@ -146,11 +184,17 @@ check_scenarios = [
@pytest.mark.parametrize("scenario", check_scenarios, ids=[c.name for c in check_scenarios])
def test_check(capsys, tmp_path, json_spec, scenario):
expected_exc, expected_msg = scenario.expected_check_error
output = check(capsys, tmp_path, scenario)
if expected_msg:
assert expected_msg.value in output["message"]
assert output["status"] == scenario.expected_check_status
if expected_exc:
with pytest.raises(expected_exc):
output = check(capsys, tmp_path, scenario)
if expected_msg:
assert expected_msg.value in output["message"]
assert output["status"] == scenario.expected_check_status
else:
output = check(capsys, tmp_path, scenario)
assert output["status"] == scenario.expected_check_status
def check(capsys, tmp_path, scenario) -> Dict[str, Any]:
@@ -182,12 +226,19 @@ def read(capsys, tmp_path, scenario):
make_file(tmp_path / "catalog.json", scenario.configured_catalog(SyncMode.full_refresh)),
],
)
captured = capsys.readouterr()
return [
msg
for msg in (json.loads(line) for line in captured.out.splitlines())
if msg["type"] == "RECORD"
]
captured = capsys.readouterr().out.splitlines()
return {
"records": [
msg
for msg in (json.loads(line) for line in captured)
if msg["type"] == "RECORD"
],
"logs": [
msg
for msg in (json.loads(line) for line in captured)
if msg["type"] == "LOG"
]
}
def read_with_state(capsys, tmp_path, scenario):

View File

@@ -0,0 +1,245 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from typing import Any, Mapping
import pytest
from airbyte_cdk.sources.file_based.exceptions import SchemaInferenceError
from airbyte_cdk.sources.file_based.schema_helpers import ComparableType, conforms_to_schema, merge_schemas
COMPLETE_CONFORMING_RECORD = {
"null_field": None,
"boolean_field": True,
"integer_field": 1,
"number_field": 1.5,
"string_field": "val1",
"array_field": [1.1, 2.2],
"object_field": {"col": "val"},
}
NONCONFORMING_EXTRA_COLUMN_RECORD = {
"null_field": None,
"boolean_field": True,
"integer_field": 1,
"number_field": 1.5,
"string_field": "val1",
"array_field": [1.1, 2.2],
"object_field": {"col": "val"},
"column_x": "extra"
}
CONFORMING_WITH_MISSING_COLUMN_RECORD = {
"null_field": None,
"boolean_field": True,
"integer_field": 1,
"number_field": 1.5,
"string_field": "val1",
"array_field": [1.1, 2.2],
}
CONFORMING_WITH_NARROWER_TYPE_RECORD = {
"null_field": None,
"boolean_field": True,
"integer_field": True,
"number_field": True,
"string_field": True,
"array_field": [1.1, 2.2],
"object_field": {"col": "val"},
}
NONCONFORMING_WIDER_TYPE_RECORD = {
"null_field": "not None",
"boolean_field": True,
"integer_field": 1,
"number_field": 1.5,
"string_field": "val1",
"array_field": [1.1, 2.2],
"object_field": {"col": "val"},
}
NONCONFORMING_NON_OBJECT_RECORD = {
"null_field": None,
"boolean_field": True,
"integer_field": 1,
"number_field": 1.5,
"string_field": "val1",
"array_field": [1.1, 2.2],
"object_field": "not an object",
}
NONCONFORMING_NON_ARRAY_RECORD = {
"null_field": None,
"boolean_field": True,
"integer_field": 1,
"number_field": 1.5,
"string_field": "val1",
"array_field": "not an array",
"object_field": {"col": "val"},
}
CONFORMING_MIXED_TYPE_NARROWER_RECORD = {
"null_field": None,
"boolean_field": True,
"integer_field": 1,
"number_field": 1.5,
"string_field": "val1",
"array_field": [1.1, 2.2],
"object_field": {"col": "val"},
}
NONCONFORMING_MIXED_TYPE_WIDER_RECORD = {
"null_field": None,
"boolean_field": True,
"integer_field": 1,
"number_field": 1.5,
"string_field": "val1",
"array_field": [1.1, 2.2],
"object_field": {"col": "val"},
}
CONFORMING_MIXED_TYPE_WITHIN_TYPE_RANGE_RECORD = {
"null_field": None,
"boolean_field": True,
"integer_field": 1,
"number_field": 1.5,
"string_field": "val1",
"array_field": [1.1, 2.2],
"object_field": {"col": "val"},
}
NONCONFORMING_INVALID_ARRAY_RECORD = {
"null_field": None,
"boolean_field": True,
"integer_field": 1,
"number_field": 1.5,
"string_field": ["this should not be an array"],
"array_field": [1.1, 2.2],
"object_field": {"col": "val"},
}
NONCONFORMING_TOO_WIDE_ARRAY_RECORD = {
"null_field": None,
"boolean_field": True,
"integer_field": 1,
"number_field": 1.5,
"string_field": "okay",
"array_field": ["val1", "val2"],
"object_field": {"col": "val"},
}
CONFORMING_NARROWER_ARRAY_RECORD = {
"null_field": None,
"boolean_field": True,
"integer_field": 1,
"number_field": 1.5,
"string_field": "okay",
"array_field": [1, 2],
"object_field": {"col": "val"},
}
NONCONFORMING_INVALID_OBJECT_RECORD = {
"null_field": None,
"boolean_field": True,
"integer_field": 1,
"number_field": 1.5,
"string_field": {"this": "should not be an object"},
"array_field": [1.1, 2.2],
"object_field": {"col": "val"},
}
SCHEMA = {
"type": "object",
"properties": {
"null_field": {
"type": "null"
},
"boolean_field": {
"type": "boolean"
},
"integer_field": {
"type": "integer"
},
"number_field": {
"type": "number"
},
"string_field": {
"type": "string"
},
"array_field": {
"type": "array",
"items": {
"type": "number",
},
},
"object_field": {
"type": "object"
},
}
}
@pytest.mark.parametrize(
"record,schema,expected_result",
[
pytest.param(COMPLETE_CONFORMING_RECORD, SCHEMA, True, id="record-conforms"),
pytest.param(NONCONFORMING_EXTRA_COLUMN_RECORD, SCHEMA, False, id="nonconforming-extra-column"),
pytest.param(CONFORMING_WITH_MISSING_COLUMN_RECORD, SCHEMA, True, id="record-conforms-with-missing-column"),
pytest.param(CONFORMING_WITH_NARROWER_TYPE_RECORD, SCHEMA, True, id="record-conforms-with-narrower-type"),
pytest.param(NONCONFORMING_WIDER_TYPE_RECORD, SCHEMA, False, id="nonconforming-wider-type"),
pytest.param(NONCONFORMING_NON_OBJECT_RECORD, SCHEMA, False, id="nonconforming-string-is-not-an-object"),
pytest.param(NONCONFORMING_NON_ARRAY_RECORD, SCHEMA, False, id="nonconforming-string-is-not-an-array"),
pytest.param(NONCONFORMING_TOO_WIDE_ARRAY_RECORD, SCHEMA, False, id="nonconforming-array-values-too-wide"),
pytest.param(CONFORMING_NARROWER_ARRAY_RECORD, SCHEMA, True, id="conforming-array-values-narrower-than-schema"),
pytest.param(NONCONFORMING_INVALID_ARRAY_RECORD, SCHEMA, False, id="nonconforming-array-is-not-a-string"),
pytest.param(NONCONFORMING_INVALID_OBJECT_RECORD, SCHEMA, False, id="nonconforming-object-is-not-a-string"),
]
)
def test_conforms_to_schema(
record: Mapping[str, Any],
schema: Mapping[str, Any],
expected_result: bool
):
assert conforms_to_schema(record, schema) == expected_result
def test_comparable_types():
assert ComparableType.OBJECT > ComparableType.STRING
assert ComparableType.STRING > ComparableType.NUMBER
assert ComparableType.NUMBER > ComparableType.INTEGER
assert ComparableType.INTEGER > ComparableType.BOOLEAN
assert ComparableType["OBJECT"] == ComparableType.OBJECT
@pytest.mark.parametrize(
"schema1,schema2,expected_result",
[
pytest.param({}, {}, {}, id="empty-schemas"),
pytest.param({"a": None}, {}, None, id="null-value-in-schema"),
pytest.param({"a": {"type": "integer"}}, {}, {"a": {"type": "integer"}}, id="single-key-schema1"),
pytest.param({}, {"a": {"type": "integer"}}, {"a": {"type": "integer"}}, id="single-key-schema2"),
pytest.param({"a": {"type": "integer"}}, {"a": {"type": "integer"}}, {"a": {"type": "integer"}}, id="single-key-both-schemas"),
pytest.param({"a": {"type": "integer"}}, {"a": {"type": "number"}}, {"a": {"type": "number"}}, id="single-key-schema2-is-wider"),
pytest.param({"a": {"type": "number"}}, {"a": {"type": "integer"}}, {"a": {"type": "number"}}, id="single-key-schema1-is-wider"),
pytest.param({"a": {"type": "array"}}, {"a": {"type": "integer"}}, None, id="single-key-with-array-schema1"),
pytest.param({"a": {"type": "integer"}}, {"a": {"type": "array"}}, None, id="single-key-with-array-schema2"),
pytest.param({"a": {"type": "object", "properties": {"b": {"type": "integer"}}}}, {"a": {"type": "object", "properties": {"b": {"type": "integer"}}}}, {"a": {"type": "object", "properties": {"b": {"type": "integer"}}}}, id="single-key-same-object"),
pytest.param({"a": {"type": "object", "properties": {"b": {"type": "integer"}}}}, {"a": {"type": "object", "properties": {"b": {"type": "string"}}}}, None, id="single-key-different-objects"),
pytest.param({"a": {"type": "object", "properties": {"b": {"type": "integer"}}}}, {"a": {"type": "number"}}, None, id="single-key-with-object-schema1"),
pytest.param({"a": {"type": "number"}}, {"a": {"type": "object", "properties": {"b": {"type": "integer"}}}}, None, id="single-key-with-object-schema2"),
pytest.param({"a": {"type": "array", "items": {"type": "number"}}}, {"a": {"type": "array", "items": {"type": "number"}}}, {"a": {"type": "array", "items": {"type": "number"}}}, id="equal-arrays-in-both-schemas"),
pytest.param({"a": {"type": "array", "items": {"type": "integer"}}}, {"a": {"type": "array", "items": {"type": "number"}}}, None, id="different-arrays-in-both-schemas"),
pytest.param({"a": {"type": "integer"}, "b": {"type": "string"}}, {"c": {"type": "number"}}, {"a": {"type": "integer"}, "b": {"type": "string"}, "c": {"type": "number"}}, id=""),
pytest.param({"a": {"type": "invalid_type"}}, {"b": {"type": "integer"}}, None, id="invalid-type"),
]
)
def test_merge_schemas(schema1, schema2, expected_result):
if expected_result is not None:
assert merge_schemas(schema1, schema2) == expected_result
else:
with pytest.raises(SchemaInferenceError):
merge_schemas(schema1, schema2)