1
0
mirror of synced 2025-12-23 21:03:15 -05:00

Issue 28893/infer schema csv (#29099)

This commit is contained in:
Maxime Carbonneau-Leclerc
2023-08-14 15:14:46 -04:00
committed by GitHub
parent 2d5939c551
commit 12f1304a67
16 changed files with 762 additions and 311 deletions

View File

@@ -82,7 +82,7 @@ class DefaultFileBasedAvailabilityStrategy(AbstractFileBasedAvailabilityStrategy
parser = stream.get_parser(stream.config.file_type)
try:
record = next(iter(parser.parse_records(stream.config, file, self.stream_reader, logger)))
record = next(iter(parser.parse_records(stream.config, file, self.stream_reader, logger, discovered_schema=None)))
except StopIteration:
# The file is empty. We've verified that we can open it, so will
# consider the connection check successful even though it means

View File

@@ -17,6 +17,11 @@ class QuotingBehavior(Enum):
QUOTE_NONE = "Quote None"
class InferenceType(Enum):
NONE = "None"
PRIMITIVE_TYPES_ONLY = "Primitive Types Only"
DEFAULT_TRUE_VALUES = ["y", "yes", "t", "true", "on", "1"]
DEFAULT_FALSE_VALUES = ["n", "no", "f", "false", "off", "0"]
@@ -81,6 +86,12 @@ class CsvFormat(BaseModel):
default=DEFAULT_FALSE_VALUES,
description="A set of case-sensitive strings that should be interpreted as false values.",
)
inference_type: InferenceType = Field(
title="Inference Type",
default=InferenceType.NONE,
description="How to infer the types of the columns. If none, inference default to strings.",
airbyte_hidden=True,
)
@validator("delimiter")
def validate_delimiter(cls, v: str) -> str:

View File

@@ -4,7 +4,7 @@
import logging
import uuid
from typing import Any, Dict, Iterable, Mapping
from typing import Any, Dict, Iterable, Mapping, Optional
import fastavro
from airbyte_cdk.sources.file_based.config.avro_format import AvroFormat
@@ -12,6 +12,7 @@ from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileB
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.schema_helpers import SchemaType
AVRO_TYPE_TO_JSON_TYPE = {
"null": "null",
@@ -47,7 +48,7 @@ class AvroParser(FileTypeParser):
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
) -> Dict[str, Any]:
) -> SchemaType:
avro_format = config.format or AvroFormat()
if not isinstance(avro_format, AvroFormat):
raise ValueError(f"Expected ParquetFormat, got {avro_format}")
@@ -132,6 +133,7 @@ class AvroParser(FileTypeParser):
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
discovered_schema: Optional[Mapping[str, SchemaType]],
) -> Iterable[Dict[str, Any]]:
avro_format = config.format or AvroFormat()
if not isinstance(avro_format, AvroFormat):

View File

@@ -5,17 +5,19 @@
import csv
import json
import logging
from abc import ABC, abstractmethod
from collections import defaultdict
from functools import partial
from io import IOBase
from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Set
from typing import Any, Callable, Dict, Generator, Iterable, List, Mapping, Optional, Set
from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat, QuotingBehavior
from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat, InferenceType, QuotingBehavior
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, RecordParseError
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.schema_helpers import TYPE_PYTHON_MAPPING
from airbyte_cdk.sources.file_based.schema_helpers import TYPE_PYTHON_MAPPING, SchemaType
DIALECT_NAME = "_config_dialect"
@@ -27,45 +29,17 @@ config_to_quoting: Mapping[QuotingBehavior, int] = {
}
class CsvParser(FileTypeParser):
async def infer_schema(
class _CsvReader:
def read_data(
self,
config: FileBasedStreamConfig,
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
) -> Dict[str, Any]:
config_format = config.format or CsvFormat()
if not isinstance(config_format, CsvFormat):
raise ValueError(f"Invalid format config: {config_format}")
dialect_name = config.name + DIALECT_NAME
csv.register_dialect(
dialect_name,
delimiter=config_format.delimiter,
quotechar=config_format.quote_char,
escapechar=config_format.escape_char,
doublequote=config_format.double_quote,
quoting=config_to_quoting.get(config_format.quoting_behavior, csv.QUOTE_MINIMAL),
)
with stream_reader.open_file(file, self.file_read_mode, config_format.encoding, logger) as fp:
# todo: the existing InMemoryFilesSource.open_file() test source doesn't currently require an encoding, but actual
# sources will likely require one. Rather than modify the interface now we can wait until the real use case
headers = self._get_headers(fp, config_format, dialect_name)
schema = {field.strip(): {"type": "string"} for field in headers}
csv.unregister_dialect(dialect_name)
return schema
file_read_mode: FileReadMode,
) -> Generator[Dict[str, Any], None, None]:
config_format = _extract_format(config)
def parse_records(
self,
config: FileBasedStreamConfig,
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
) -> Iterable[Dict[str, Any]]:
schema: Optional[Mapping[str, Any]] = config.get_input_schema()
config_format = config.format or CsvFormat()
if not isinstance(config_format, CsvFormat):
raise ValueError(f"Invalid format config: {config_format}")
# Formats are configured individually per-stream so a unique dialect should be registered for each stream.
# We don't unregister the dialect because we are lazily parsing each csv file to generate records
# This will potentially be a problem if we ever process multiple streams concurrently
@@ -78,47 +52,132 @@ class CsvParser(FileTypeParser):
doublequote=config_format.double_quote,
quoting=config_to_quoting.get(config_format.quoting_behavior, csv.QUOTE_MINIMAL),
)
with stream_reader.open_file(file, self.file_read_mode, config_format.encoding, logger) as fp:
# todo: the existing InMemoryFilesSource.open_file() test source doesn't currently require an encoding, but actual
# sources will likely require one. Rather than modify the interface now we can wait until the real use case
self._skip_rows_before_header(fp, config_format.skip_rows_before_header)
field_names = self._auto_generate_headers(fp, config_format) if config_format.autogenerate_column_names else None
reader = csv.DictReader(fp, dialect=dialect_name, fieldnames=field_names) # type: ignore
yield from self._read_and_cast_types(reader, schema, config_format, logger)
with stream_reader.open_file(file, file_read_mode, config_format.encoding, logger) as fp:
headers = self._get_headers(fp, config_format, dialect_name)
# we assume that if we autogenerate columns, it is because we don't have headers
# if a user wants to autogenerate_column_names with a CSV having headers, he can skip rows
rows_to_skip = (
config_format.skip_rows_before_header
+ (0 if config_format.autogenerate_column_names else 1)
+ config_format.skip_rows_after_header
)
self._skip_rows(fp, rows_to_skip)
reader = csv.DictReader(fp, dialect=dialect_name, fieldnames=headers) # type: ignore
try:
for row in reader:
# The row was not properly parsed if any of the values are None. This will most likely occur if there are more columns
# than headers or more headers dans columns
if None in row or None in row.values():
raise RecordParseError(FileBasedSourceError.ERROR_PARSING_RECORD)
yield row
finally:
# due to RecordParseError or GeneratorExit
csv.unregister_dialect(dialect_name)
def _get_headers(self, fp: IOBase, config_format: CsvFormat, dialect_name: str) -> List[str]:
"""
Assumes the fp is pointing to the beginning of the files and will reset it as such
"""
# Note that this method assumes the dialect has already been registered if we're parsing the headers
self._skip_rows(fp, config_format.skip_rows_before_header)
if config_format.autogenerate_column_names:
headers = self._auto_generate_headers(fp, dialect_name)
else:
# Then read the header
reader = csv.reader(fp, dialect=dialect_name) # type: ignore
headers = list(next(reader))
fp.seek(0)
return headers
def _auto_generate_headers(self, fp: IOBase, dialect_name: str) -> List[str]:
"""
Generates field names as [f0, f1, ...] in the same way as pyarrow's csv reader with autogenerate_column_names=True.
See https://arrow.apache.org/docs/python/generated/pyarrow.csv.ReadOptions.html
"""
reader = csv.reader(fp, dialect=dialect_name) # type: ignore
number_of_columns = len(next(reader)) # type: ignore
return [f"f{i}" for i in range(number_of_columns)]
@staticmethod
def _skip_rows(fp: IOBase, rows_to_skip: int) -> None:
"""
Skip rows before the header. This has to be done on the file object itself, not the reader
"""
for _ in range(rows_to_skip):
fp.readline()
class CsvParser(FileTypeParser):
_MAX_BYTES_PER_FILE_FOR_SCHEMA_INFERENCE = 1_000_000
def __init__(self, csv_reader: Optional[_CsvReader] = None):
self._csv_reader = csv_reader if csv_reader else _CsvReader()
async def infer_schema(
self,
config: FileBasedStreamConfig,
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
) -> SchemaType:
input_schema = config.get_input_schema()
if input_schema:
return input_schema
# todo: the existing InMemoryFilesSource.open_file() test source doesn't currently require an encoding, but actual
# sources will likely require one. Rather than modify the interface now we can wait until the real use case
config_format = _extract_format(config)
type_inferrer_by_field: Dict[str, _TypeInferrer] = defaultdict(
lambda: _JsonTypeInferrer(config_format.true_values, config_format.false_values, config_format.null_values)
if config_format.inference_type != InferenceType.NONE
else _DisabledTypeInferrer()
)
data_generator = self._csv_reader.read_data(config, file, stream_reader, logger, self.file_read_mode)
read_bytes = 0
for row in data_generator:
for header, value in row.items():
type_inferrer_by_field[header].add_value(value)
# This is not accurate as a representation of how many bytes were read because csv does some processing on the actual value
# before returning. Given we would like to be more accurate, we could wrap the IO file using a decorator
read_bytes += len(value)
read_bytes += len(row) - 1 # for separators
if read_bytes >= self._MAX_BYTES_PER_FILE_FOR_SCHEMA_INFERENCE:
break
schema = {header.strip(): {"type": type_inferred.infer()} for header, type_inferred in type_inferrer_by_field.items()}
data_generator.close()
return schema
def parse_records(
self,
config: FileBasedStreamConfig,
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
discovered_schema: Optional[Mapping[str, SchemaType]],
) -> Iterable[Dict[str, Any]]:
config_format = _extract_format(config)
cast_fn = CsvParser._get_cast_function(discovered_schema, config_format, logger)
data_generator = self._csv_reader.read_data(config, file, stream_reader, logger, self.file_read_mode)
for row in data_generator:
yield CsvParser._to_nullable(cast_fn(row), config_format.null_values)
data_generator.close()
@property
def file_read_mode(self) -> FileReadMode:
return FileReadMode.READ
@staticmethod
def _read_and_cast_types(
reader: csv.DictReader, schema: Optional[Mapping[str, Any]], config_format: CsvFormat, logger: logging.Logger # type: ignore
) -> Iterable[Dict[str, Any]]:
"""
If the user provided a schema, attempt to cast the record values to the associated type.
If a column is not in the schema or cannot be cast to an appropriate python type,
cast it to a string. Downstream, the user's validation policy will determine whether the
record should be emitted.
"""
cast_fn = CsvParser._get_cast_function(schema, config_format, logger)
for i, row in enumerate(reader):
if i < config_format.skip_rows_after_header:
continue
# The row was not properly parsed if any of the values are None
if any(val is None for val in row.values()):
raise RecordParseError(FileBasedSourceError.ERROR_PARSING_RECORD)
else:
yield CsvParser._to_nullable(cast_fn(row), config_format.null_values)
@staticmethod
def _get_cast_function(
schema: Optional[Mapping[str, Any]], config_format: CsvFormat, logger: logging.Logger
schema: Optional[Mapping[str, SchemaType]], config_format: CsvFormat, logger: logging.Logger
) -> Callable[[Mapping[str, str]], Mapping[str, str]]:
# Only cast values if the schema is provided
if schema:
property_types = {col: prop["type"] for col, prop in schema["properties"].items()}
return partial(_cast_types, property_types=property_types, config_format=config_format, logger=logger)
return partial(CsvParser._cast_types, property_types=property_types, config_format=config_format, logger=logger)
else:
# If no schema is provided, yield the rows as they are
return _no_cast
@@ -129,96 +188,164 @@ class CsvParser(FileTypeParser):
return nullable
@staticmethod
def _skip_rows_before_header(fp: IOBase, rows_to_skip: int) -> None:
def _cast_types(
row: Dict[str, str], property_types: Dict[str, Any], config_format: CsvFormat, logger: logging.Logger
) -> Dict[str, Any]:
"""
Skip rows before the header. This has to be done on the file object itself, not the reader
Casts the values in the input 'row' dictionary according to the types defined in the JSON schema.
Array and object types are only handled if they can be deserialized as JSON.
If any errors are encountered, the value will be emitted as a string.
"""
for _ in range(rows_to_skip):
fp.readline()
warnings = []
result = {}
def _get_headers(self, fp: IOBase, config_format: CsvFormat, dialect_name: str) -> List[str]:
# Note that this method assumes the dialect has already been registered if we're parsing the headers
if config_format.autogenerate_column_names:
return self._auto_generate_headers(fp, config_format)
else:
# If we're not autogenerating column names, we need to skip the rows before the header
self._skip_rows_before_header(fp, config_format.skip_rows_before_header)
# Then read the header
reader = csv.DictReader(fp, dialect=dialect_name) # type: ignore
return next(reader) # type: ignore
for key, value in row.items():
prop_type = property_types.get(key)
cast_value: Any = value
def _auto_generate_headers(self, fp: IOBase, config_format: CsvFormat) -> List[str]:
"""
Generates field names as [f0, f1, ...] in the same way as pyarrow's csv reader with autogenerate_column_names=True.
See https://arrow.apache.org/docs/python/generated/pyarrow.csv.ReadOptions.html
"""
next_line = next(fp).strip()
number_of_columns = len(next_line.split(config_format.delimiter)) # type: ignore
# Reset the file pointer to the beginning of the file so that the first row is not skipped
fp.seek(0)
return [f"f{i}" for i in range(number_of_columns)]
if isinstance(prop_type, list):
prop_type_distinct = set(prop_type)
prop_type_distinct.remove("null")
if len(prop_type_distinct) != 1:
raise ValueError(f"Could not get non nullable type from {prop_type}")
(prop_type,) = prop_type_distinct
if prop_type in TYPE_PYTHON_MAPPING:
_, python_type = TYPE_PYTHON_MAPPING[prop_type]
if python_type is None:
if value == "":
cast_value = None
else:
warnings.append(_format_warning(key, value, prop_type))
elif python_type == bool:
try:
cast_value = _value_to_bool(value, config_format.true_values, config_format.false_values)
except ValueError:
warnings.append(_format_warning(key, value, prop_type))
elif python_type == dict:
try:
# we don't re-use _value_to_object here because we type the column as object as long as there is only one object
cast_value = json.loads(value)
except json.JSONDecodeError:
warnings.append(_format_warning(key, value, prop_type))
elif python_type == list:
try:
cast_value = _value_to_list(value)
except (ValueError, json.JSONDecodeError):
warnings.append(_format_warning(key, value, prop_type))
elif python_type:
try:
cast_value = _value_to_python_type(value, python_type)
except ValueError:
warnings.append(_format_warning(key, value, prop_type))
else:
warnings.append(_format_warning(key, value, prop_type))
result[key] = cast_value
if warnings:
logger.warning(
f"{FileBasedSourceError.ERROR_CASTING_VALUE.value}: {','.join([w for w in warnings])}",
)
return result
def _cast_types(row: Dict[str, str], property_types: Dict[str, Any], config_format: CsvFormat, logger: logging.Logger) -> Dict[str, Any]:
"""
Casts the values in the input 'row' dictionary according to the types defined in the JSON schema.
class _TypeInferrer(ABC):
@abstractmethod
def add_value(self, value: Any) -> None:
pass
Array and object types are only handled if they can be deserialized as JSON.
@abstractmethod
def infer(self) -> str:
pass
If any errors are encountered, the value will be emitted as a string.
"""
warnings = []
result = {}
for key, value in row.items():
prop_type = property_types.get(key)
cast_value: Any = value
class _DisabledTypeInferrer(_TypeInferrer):
def add_value(self, value: Any) -> None:
pass
if prop_type in TYPE_PYTHON_MAPPING and prop_type is not None:
_, python_type = TYPE_PYTHON_MAPPING[prop_type]
def infer(self) -> str:
return "string"
if python_type is None:
if value == "":
cast_value = None
else:
warnings.append(_format_warning(key, value, prop_type))
elif python_type == bool:
try:
cast_value = _value_to_bool(value, config_format.true_values, config_format.false_values)
except ValueError:
warnings.append(_format_warning(key, value, prop_type))
class _JsonTypeInferrer(_TypeInferrer):
_NULL_TYPE = "null"
_BOOLEAN_TYPE = "boolean"
_INTEGER_TYPE = "integer"
_NUMBER_TYPE = "number"
_STRING_TYPE = "string"
elif python_type == dict:
try:
cast_value = json.loads(value)
except json.JSONDecodeError:
warnings.append(_format_warning(key, value, prop_type))
def __init__(self, boolean_trues: Set[str], boolean_falses: Set[str], null_values: Set[str]) -> None:
self._boolean_trues = boolean_trues
self._boolean_falses = boolean_falses
self._null_values = null_values
self._values: Set[str] = set()
elif python_type == list:
try:
parsed_value = json.loads(value)
if isinstance(parsed_value, list):
cast_value = parsed_value
except json.JSONDecodeError:
warnings.append(_format_warning(key, value, prop_type))
def add_value(self, value: Any) -> None:
self._values.add(value)
elif python_type:
try:
cast_value = python_type(value)
except ValueError:
warnings.append(_format_warning(key, value, prop_type))
def infer(self) -> str:
types_by_value = {value: self._infer_type(value) for value in self._values}
types_excluding_null_values = [types for types in types_by_value.values() if self._NULL_TYPE not in types]
if not types_excluding_null_values:
# this is highly unusual but we will consider the column as a string
return self._STRING_TYPE
else:
warnings.append(_format_warning(key, value, prop_type))
types = set.intersection(*types_excluding_null_values)
if self._BOOLEAN_TYPE in types:
return self._BOOLEAN_TYPE
elif self._INTEGER_TYPE in types:
return self._INTEGER_TYPE
elif self._NUMBER_TYPE in types:
return self._NUMBER_TYPE
return self._STRING_TYPE
result[key] = cast_value
def _infer_type(self, value: str) -> Set[str]:
inferred_types = set()
if warnings:
logger.warning(
f"{FileBasedSourceError.ERROR_CASTING_VALUE.value}: {','.join([w for w in warnings])}",
)
return result
if value in self._null_values:
inferred_types.add(self._NULL_TYPE)
if self._is_boolean(value):
inferred_types.add(self._BOOLEAN_TYPE)
if self._is_integer(value):
inferred_types.add(self._INTEGER_TYPE)
inferred_types.add(self._NUMBER_TYPE)
elif self._is_number(value):
inferred_types.add(self._NUMBER_TYPE)
inferred_types.add(self._STRING_TYPE)
return inferred_types
def _is_boolean(self, value: str) -> bool:
try:
_value_to_bool(value, self._boolean_trues, self._boolean_falses)
return True
except ValueError:
return False
@staticmethod
def _is_integer(value: str) -> bool:
try:
_value_to_python_type(value, int)
return True
except ValueError:
return False
@staticmethod
def _is_number(value: str) -> bool:
try:
_value_to_python_type(value, float)
return True
except ValueError:
return False
def _value_to_bool(value: str, true_values: Set[str], false_values: Set[str]) -> bool:
@@ -229,9 +356,27 @@ def _value_to_bool(value: str, true_values: Set[str], false_values: Set[str]) ->
raise ValueError(f"Value {value} is not a valid boolean value")
def _value_to_list(value: str) -> List[Any]:
parsed_value = json.loads(value)
if isinstance(parsed_value, list):
return parsed_value
raise ValueError(f"Value {parsed_value} is not a valid list value")
def _value_to_python_type(value: str, python_type: type) -> Any:
return python_type(value)
def _format_warning(key: str, value: str, expected_type: Optional[Any]) -> str:
return f"{key}: value={value},expected_type={expected_type}"
def _no_cast(row: Mapping[str, str]) -> Mapping[str, str]:
return row
def _extract_format(config: FileBasedStreamConfig) -> CsvFormat:
config_format = config.format or CsvFormat()
if not isinstance(config_format, CsvFormat):
raise ValueError(f"Invalid format config: {config_format}")
return config_format

View File

@@ -4,13 +4,13 @@
import logging
from abc import ABC, abstractmethod
from typing import Any, Dict, Iterable
from typing import Any, Dict, Iterable, Mapping, Optional
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.schema_helpers import SchemaType
Schema = Dict[str, str]
Record = Dict[str, Any]
@@ -27,7 +27,7 @@ class FileTypeParser(ABC):
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
) -> Schema:
) -> SchemaType:
"""
Infer the JSON Schema for this file.
"""
@@ -40,6 +40,7 @@ class FileTypeParser(ABC):
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
discovered_schema: Optional[Mapping[str, SchemaType]],
) -> Iterable[Record]:
"""
Parse and emit each record.

View File

@@ -4,14 +4,14 @@
import json
import logging
from typing import Any, Dict, Iterable
from typing import Any, Dict, Iterable, Mapping, Optional
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, RecordParseError
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.schema_helpers import PYTHON_TYPE_MAPPING, merge_schemas
from airbyte_cdk.sources.file_based.schema_helpers import PYTHON_TYPE_MAPPING, SchemaType, merge_schemas
class JsonlParser(FileTypeParser):
@@ -25,12 +25,12 @@ class JsonlParser(FileTypeParser):
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
) -> Dict[str, Any]:
) -> SchemaType:
"""
Infers the schema for the file by inferring the schema for each line, and merging
it with the previously-inferred schema.
"""
inferred_schema: Dict[str, Any] = {}
inferred_schema: Mapping[str, Any] = {}
for entry in self._parse_jsonl_entries(file, stream_reader, logger, read_limit=True):
line_schema = self._infer_schema_for_record(entry)
@@ -44,6 +44,7 @@ class JsonlParser(FileTypeParser):
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
discovered_schema: Optional[Mapping[str, SchemaType]],
) -> Iterable[Dict[str, Any]]:
"""
This code supports parsing json objects over multiple lines even though this does not align with the JSONL format. This is for

View File

@@ -5,7 +5,7 @@
import json
import logging
import os
from typing import Any, Dict, Iterable, List, Mapping
from typing import Any, Dict, Iterable, List, Mapping, Optional
from urllib.parse import unquote
import pyarrow as pa
@@ -15,6 +15,7 @@ from airbyte_cdk.sources.file_based.exceptions import ConfigValidationError, Fil
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.schema_helpers import SchemaType
from pyarrow import Scalar
@@ -28,7 +29,7 @@ class ParquetParser(FileTypeParser):
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
) -> Dict[str, Any]:
) -> SchemaType:
parquet_format = config.format or ParquetFormat()
if not isinstance(parquet_format, ParquetFormat):
raise ValueError(f"Expected ParquetFormat, got {parquet_format}")
@@ -51,6 +52,7 @@ class ParquetParser(FileTypeParser):
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
discovered_schema: Optional[Mapping[str, SchemaType]],
) -> Iterable[Dict[str, Any]]:
parquet_format = config.format or ParquetFormat()
if not isinstance(parquet_format, ParquetFormat):

View File

@@ -11,7 +11,7 @@ from typing import Any, Dict, List, Literal, Mapping, Optional, Tuple, Type, Uni
from airbyte_cdk.sources.file_based.exceptions import ConfigValidationError, FileBasedSourceError, SchemaInferenceError
JsonSchemaSupportedType = Union[List[str], Literal["string"], str]
SchemaType = Dict[str, Dict[str, JsonSchemaSupportedType]]
SchemaType = Mapping[str, Mapping[str, JsonSchemaSupportedType]]
schemaless_schema = {"type": "object", "properties": {"data": {"type": "object"}}}
@@ -99,7 +99,7 @@ def merge_schemas(schema1: SchemaType, schema2: SchemaType) -> SchemaType:
if not isinstance(t, dict) or "type" not in t or not _is_valid_type(t["type"]):
raise SchemaInferenceError(FileBasedSourceError.UNRECOGNIZED_TYPE, key=k, type=t)
merged_schema: Dict[str, Any] = deepcopy(schema1)
merged_schema: Dict[str, Any] = deepcopy(schema1) # type: ignore # as of 2023-08-08, deepcopy can copy Mapping
for k2, t2 in schema2.items():
t1 = merged_schema.get(k2)
if t1 is None:
@@ -116,7 +116,7 @@ def _is_valid_type(t: JsonSchemaSupportedType) -> bool:
return t == "array" or get_comparable_type(t) is not None
def _choose_wider_type(key: str, t1: Dict[str, Any], t2: Dict[str, Any]) -> Dict[str, Any]:
def _choose_wider_type(key: str, t1: Mapping[str, Any], t2: Mapping[str, Any]) -> Mapping[str, Any]:
if (t1["type"] == "array" or t2["type"] == "array") and t1 != t2:
raise SchemaInferenceError(
FileBasedSourceError.SCHEMA_INFERENCE_ERROR,

View File

@@ -6,7 +6,7 @@ import asyncio
import itertools
import traceback
from functools import cache
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Set, Union
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Set, Union
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level
from airbyte_cdk.models import Type as MessageType
@@ -20,7 +20,7 @@ from airbyte_cdk.sources.file_based.exceptions import (
StopSyncPerValidationPolicy,
)
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.schema_helpers import merge_schemas, schemaless_schema
from airbyte_cdk.sources.file_based.schema_helpers import SchemaType, merge_schemas, schemaless_schema
from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream
from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor
from airbyte_cdk.sources.file_based.types import StreamSlice
@@ -84,7 +84,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin):
n_skipped = line_no = 0
try:
for record in parser.parse_records(self.config, file, self._stream_reader, self.logger):
for record in parser.parse_records(self.config, file, self._stream_reader, self.logger, schema):
line_no += 1
if self.config.schemaless:
record = {"data": record}
@@ -231,8 +231,8 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin):
Each file type has a corresponding `infer_schema` handler.
Dispatch on file type.
"""
base_schema: Dict[str, Any] = {}
pending_tasks: Set[asyncio.tasks.Task[Dict[str, Any]]] = set()
base_schema: SchemaType = {}
pending_tasks: Set[asyncio.tasks.Task[SchemaType]] = set()
n_started, n_files = 0, len(files)
files_iterator = iter(files)
@@ -251,7 +251,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin):
return base_schema
async def _infer_file_schema(self, file: RemoteFile) -> Dict[str, Any]:
async def _infer_file_schema(self, file: RemoteFile) -> SchemaType:
try:
return await self.get_parser(self.config.file_type).infer_schema(self.config, file, self._stream_reader, self.logger)
except Exception as exc:

View File

@@ -15,7 +15,7 @@ from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat
pytest.param(0, False, None, id="test_no_skip_rows_before_header_and_no_autogenerate_column_names"),
]
)
def test_csv_format(skip_rows_before_header, autogenerate_column_names, expected_error):
def test_csv_format_skip_rows_and_autogenerate_column_names(skip_rows_before_header, autogenerate_column_names, expected_error) -> None:
if expected_error:
with pytest.raises(expected_error):
CsvFormat(skip_rows_before_header=skip_rows_before_header, autogenerate_column_names=autogenerate_column_names)

View File

@@ -3,18 +3,21 @@
#
import asyncio
import csv
import io
import logging
import unittest
from datetime import datetime
from unittest import mock
from unittest.mock import MagicMock, Mock
from typing import Any, Dict, Generator, List, Set
from unittest import TestCase, mock
from unittest.mock import Mock
import pytest
from airbyte_cdk.sources.file_based.config.csv_format import DEFAULT_FALSE_VALUES, DEFAULT_TRUE_VALUES, CsvFormat
from airbyte_cdk.sources.file_based.config.csv_format import DEFAULT_FALSE_VALUES, DEFAULT_TRUE_VALUES, CsvFormat, InferenceType
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.exceptions import RecordParseError
from airbyte_cdk.sources.file_based.file_based_stream_reader import FileReadMode
from airbyte_cdk.sources.file_based.file_types.csv_parser import CsvParser, _cast_types
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
from airbyte_cdk.sources.file_based.file_types.csv_parser import CsvParser, _CsvReader
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
PROPERTY_TYPES = {
@@ -27,6 +30,7 @@ PROPERTY_TYPES = {
"col7": "array",
"col8": "array",
"col9": "array",
"col10": ["null", "string"],
}
logger = logging.getLogger()
@@ -46,6 +50,7 @@ logger = logging.getLogger()
"col7": '[1, 2]',
"col8": '["1", "2"]',
"col9": '[{"a": "b"}, {"a": "c"}]',
"col10": 'asdf',
},
DEFAULT_TRUE_VALUES,
DEFAULT_FALSE_VALUES,
@@ -59,6 +64,7 @@ logger = logging.getLogger()
"col7": [1, 2],
"col8": ["1", "2"],
"col9": [{"a": "b"}, {"a": "c"}],
"col10": 'asdf',
}, id="cast-all-cols"),
pytest.param({"col1": "1"}, DEFAULT_TRUE_VALUES, DEFAULT_FALSE_VALUES, {"col1": "1"}, id="cannot-cast-to-null"),
pytest.param({"col2": "1"}, DEFAULT_TRUE_VALUES, DEFAULT_FALSE_VALUES, {"col2": True}, id="cast-1-to-bool"),
@@ -74,38 +80,276 @@ logger = logging.getLogger()
pytest.param({"col7": "['a', 'b']"}, DEFAULT_TRUE_VALUES, DEFAULT_FALSE_VALUES, {"col7": "['a', 'b']"}, id="cannot-cast-to-list-of-ints"),
pytest.param({"col8": "['a', 'b']"}, DEFAULT_TRUE_VALUES, DEFAULT_FALSE_VALUES, {"col8": "['a', 'b']"}, id="cannot-cast-to-list-of-strings"),
pytest.param({"col9": "['a', 'b']"}, DEFAULT_TRUE_VALUES, DEFAULT_FALSE_VALUES, {"col9": "['a', 'b']"}, id="cannot-cast-to-list-of-objects"),
pytest.param({"col10": "x"}, DEFAULT_TRUE_VALUES, DEFAULT_FALSE_VALUES, {"col10": "x"}, id="item-not-in-props-doesn't-error"),
pytest.param({"col11": "x"}, DEFAULT_TRUE_VALUES, DEFAULT_FALSE_VALUES, {"col11": "x"}, id="item-not-in-props-doesn't-error"),
]
)
def test_cast_to_python_type(row, true_values, false_values, expected_output):
def test_cast_to_python_type(row: Dict[str, str], true_values: Set[str], false_values: Set[str], expected_output: Dict[str, Any]) -> None:
csv_format = CsvFormat(true_values=true_values, false_values=false_values)
assert _cast_types(row, PROPERTY_TYPES, csv_format, logger) == expected_output
assert CsvParser._cast_types(row, PROPERTY_TYPES, csv_format, logger) == expected_output
@pytest.mark.parametrize(
"reader_values, expected_rows", [
pytest.param([{"col1": "1", "col2": None}], None, id="raise_exception_if_any_value_is_none"),
pytest.param([{"col1": "1", "col2": "2"}], [{"col1": "1", "col2": "2"}], id="read_no_cast"),
]
)
def test_read_and_cast_types(reader_values, expected_rows):
reader = MagicMock()
reader.__iter__.return_value = reader_values
schema = {}
config_format = CsvFormat()
logger = Mock()
_DEFAULT_TRUE_VALUES = {"1", "yes", "yeah", "right"}
_DEFAULT_FALSE_VALUES = {"0", "no", "nop", "wrong"}
parser = CsvParser()
expected_rows = expected_rows
if expected_rows is None:
class SchemaInferenceTestCase(TestCase):
_A_NULL_VALUE = "null"
_HEADER_NAME = "header"
def setUp(self) -> None:
self._config_format = CsvFormat()
self._config_format.true_values = _DEFAULT_TRUE_VALUES
self._config_format.false_values = _DEFAULT_FALSE_VALUES
self._config_format.null_values = {self._A_NULL_VALUE}
self._config_format.inference_type = InferenceType.NONE
self._config = Mock()
self._config.get_input_schema.return_value = None
self._config.format = self._config_format
self._file = Mock(spec=RemoteFile)
self._stream_reader = Mock(spec=AbstractFileBasedStreamReader)
self._logger = Mock(spec=logging.Logger)
self._csv_reader = Mock(spec=_CsvReader)
self._parser = CsvParser(self._csv_reader)
def test_given_user_schema_defined_when_infer_schema_then_return_user_schema(self) -> None:
self._config.get_input_schema.return_value = {self._HEADER_NAME: {"type": "potato"}}
self._test_infer_schema(list(_DEFAULT_TRUE_VALUES.union(_DEFAULT_FALSE_VALUES)), "potato")
def test_given_booleans_only_when_infer_schema_then_type_is_boolean(self) -> None:
self._config_format.inference_type = InferenceType.PRIMITIVE_TYPES_ONLY
self._test_infer_schema(list(_DEFAULT_TRUE_VALUES.union(_DEFAULT_FALSE_VALUES)), "boolean")
def test_given_integers_only_when_infer_schema_then_type_is_integer(self) -> None:
self._config_format.inference_type = InferenceType.PRIMITIVE_TYPES_ONLY
self._test_infer_schema(["2", "90329", "5645"], "integer")
def test_given_integer_overlap_with_bool_value_only_when_infer_schema_then_type_is_integer(self) -> None:
self._config_format.inference_type = InferenceType.PRIMITIVE_TYPES_ONLY
self._test_infer_schema(["1", "90329", "5645"], "integer") # here, "1" is also considered a boolean
def test_given_numbers_and_integers_when_infer_schema_then_type_is_number(self) -> None:
self._config_format.inference_type = InferenceType.PRIMITIVE_TYPES_ONLY
self._test_infer_schema(["2", "90329", "2.312"], "number")
def test_given_arrays_when_infer_schema_then_type_is_string(self) -> None:
self._config_format.inference_type = InferenceType.PRIMITIVE_TYPES_ONLY
self._test_infer_schema(['["first_item", "second_item"]', '["first_item_again", "second_item_again"]'], "string")
def test_given_objects_when_infer_schema_then_type_is_object(self) -> None:
self._config_format.inference_type = InferenceType.PRIMITIVE_TYPES_ONLY
self._test_infer_schema(['{"object1_key": 1}', '{"object2_key": 2}'], "string")
def test_given_strings_only_when_infer_schema_then_type_is_string(self) -> None:
self._config_format.inference_type = InferenceType.PRIMITIVE_TYPES_ONLY
self._test_infer_schema(["a string", "another string"], "string")
def test_given_a_null_value_when_infer_then_ignore_null(self) -> None:
self._config_format.inference_type = InferenceType.PRIMITIVE_TYPES_ONLY
self._test_infer_schema(["2", "90329", "5645", self._A_NULL_VALUE], "integer")
def test_given_only_null_values_when_infer_then_type_is_string(self) -> None:
self._config_format.inference_type = InferenceType.PRIMITIVE_TYPES_ONLY
self._test_infer_schema([self._A_NULL_VALUE, self._A_NULL_VALUE, self._A_NULL_VALUE], "string")
def test_given_big_file_when_infer_schema_then_stop_early(self) -> None:
self._config_format.inference_type = InferenceType.PRIMITIVE_TYPES_ONLY
self._csv_reader.read_data.return_value = ({self._HEADER_NAME: row} for row in ["2." + "2" * 1_000_000] + ["this is a string"])
inferred_schema = self._infer_schema()
# since the type is number, we know the string at the end was not considered
assert inferred_schema == {self._HEADER_NAME: {"type": "number"}}
def _test_infer_schema(self, rows: List[str], expected_type: str) -> None:
self._csv_reader.read_data.return_value = ({self._HEADER_NAME: row} for row in rows)
inferred_schema = self._infer_schema()
assert inferred_schema == {self._HEADER_NAME: {"type": expected_type}}
def _infer_schema(self):
loop = asyncio.new_event_loop()
task = loop.create_task(self._parser.infer_schema(self._config, self._file, self._stream_reader, self._logger))
loop.run_until_complete(task)
return task.result()
class CsvFileBuilder:
def __init__(self) -> None:
self._prefixed_rows: List[str] = []
self._data: List[str] = []
def with_prefixed_rows(self, rows: List[str]) -> 'CsvFileBuilder':
self._prefixed_rows = rows
return self
def with_data(self, data: List[str]) -> 'CsvFileBuilder':
self._data = data
return self
def build(self) -> io.StringIO:
return io.StringIO("\n".join(self._prefixed_rows + self._data))
class CsvReaderTest(unittest.TestCase):
_CONFIG_NAME = "config_name"
def setUp(self) -> None:
self._config_format = CsvFormat()
self._config = Mock()
self._config.name = self._CONFIG_NAME
self._config.format = self._config_format
self._file = Mock(spec=RemoteFile)
self._stream_reader = Mock(spec=AbstractFileBasedStreamReader)
self._logger = Mock(spec=logging.Logger)
self._csv_reader = _CsvReader()
def test_given_skip_rows_when_read_data_then_do_not_considered_prefixed_rows(self) -> None:
self._config_format.skip_rows_before_header = 2
self._stream_reader.open_file.return_value = CsvFileBuilder().with_prefixed_rows(["first line", "second line"]).with_data([
"header",
"a value",
"another value",
]).build()
data_generator = self._read_data()
assert list(data_generator) == [{"header": "a value"}, {"header": "another value"}]
def test_given_autogenerated_headers_when_read_data_then_generate_headers_with_format_fX(self) -> None:
self._config_format.autogenerate_column_names = True
self._stream_reader.open_file.return_value = CsvFileBuilder().with_data([
'0,1,2,3,4,5,6'
]).build()
data_generator = self._read_data()
assert list(data_generator) == [{"f0": "0", "f1": "1", "f2": "2", "f3": "3", "f4": "4", "f5": "5", "f6": "6"}]
def test_given_skip_rows_after_header_when_read_data_then_do_not_parse_skipped_rows(self) -> None:
self._config_format.skip_rows_after_header = 1
self._stream_reader.open_file.return_value = CsvFileBuilder().with_data([
"header1,header2",
"skipped row: important that the is no comma in this string to test if columns do not match in skipped rows",
"a value 1,a value 2",
"another value 1,another value 2"
]).build()
data_generator = self._read_data()
assert list(data_generator) == [
{"header1": "a value 1", "header2": "a value 2"},
{"header1": "another value 1", "header2": "another value 2"}
]
def test_given_quote_delimiter_when_read_data_then_parse_properly(self) -> None:
self._config_format.delimiter = "|"
self._stream_reader.open_file.return_value = CsvFileBuilder().with_data([
"header1|header2",
"a value 1|a value 2",
]).build()
data_generator = self._read_data()
assert list(data_generator) == [{"header1": "a value 1", "header2": "a value 2"}]
def test_given_quote_char_when_read_data_then_parse_properly(self) -> None:
self._config_format.quote_char = "|"
self._stream_reader.open_file.return_value = CsvFileBuilder().with_data([
"header1,header2",
"|a,value,1|,|a,value,2|",
]).build()
data_generator = self._read_data()
assert list(data_generator) == [{"header1": "a,value,1", "header2": "a,value,2"}]
def test_given_escape_char_when_read_data_then_parse_properly(self) -> None:
self._config_format.escape_char = "|"
self._stream_reader.open_file.return_value = CsvFileBuilder().with_data([
"header1,header2",
'"a |"value|", 1",a value 2',
]).build()
data_generator = self._read_data()
assert list(data_generator) == [{"header1": 'a "value", 1', "header2": "a value 2"}]
def test_given_double_quote_on_when_read_data_then_parse_properly(self) -> None:
self._config_format.double_quote = True
self._stream_reader.open_file.return_value = CsvFileBuilder().with_data([
"header1,header2",
'1,"Text with doublequote: ""This is a text."""',
]).build()
data_generator = self._read_data()
assert list(data_generator) == [{"header1": "1", "header2": 'Text with doublequote: "This is a text."'}]
def test_given_double_quote_off_when_read_data_then_parse_properly(self) -> None:
self._config_format.double_quote = False
self._stream_reader.open_file.return_value = CsvFileBuilder().with_data([
"header1,header2",
'1,"Text with doublequote: ""This is a text."""',
]).build()
data_generator = self._read_data()
assert list(data_generator) == [{"header1": "1", "header2": 'Text with doublequote: "This is a text."""'}]
def test_given_generator_closed_when_read_data_then_unregister_dialect(self) -> None:
self._stream_reader.open_file.return_value = CsvFileBuilder().with_data([
"header",
"a value",
"another value",
]).build()
data_generator = self._read_data()
next(data_generator)
assert f"{self._CONFIG_NAME}_config_dialect" in csv.list_dialects()
data_generator.close()
assert f"{self._CONFIG_NAME}_config_dialect" not in csv.list_dialects()
def test_given_too_many_values_for_columns_when_read_data_then_raise_exception_and_unregister_dialect(self) -> None:
self._stream_reader.open_file.return_value = CsvFileBuilder().with_data([
"header",
"a value",
"too many values,value,value,value",
]).build()
data_generator = self._read_data()
next(data_generator)
assert f"{self._CONFIG_NAME}_config_dialect" in csv.list_dialects()
with pytest.raises(RecordParseError):
list(parser._read_and_cast_types(reader, schema, config_format, logger))
else:
assert expected_rows == list(parser._read_and_cast_types(reader, schema, config_format, logger))
next(data_generator)
assert f"{self._CONFIG_NAME}_config_dialect" not in csv.list_dialects()
def test_given_too_few_values_for_columns_when_read_data_then_raise_exception_and_unregister_dialect(self) -> None:
self._stream_reader.open_file.return_value = CsvFileBuilder().with_data([
"header1,header2,header3",
"value1,value2,value3",
"a value",
]).build()
data_generator = self._read_data()
next(data_generator)
assert f"{self._CONFIG_NAME}_config_dialect" in csv.list_dialects()
with pytest.raises(RecordParseError):
next(data_generator)
assert f"{self._CONFIG_NAME}_config_dialect" not in csv.list_dialects()
def _read_data(self) -> Generator[Dict[str, str], None, None]:
data_generator = self._csv_reader.read_data(
self._config,
self._file,
self._stream_reader,
self._logger,
FileReadMode.READ,
)
return data_generator
def test_encoding_is_passed_to_stream_reader():
def test_encoding_is_passed_to_stream_reader() -> None:
parser = CsvParser()
encoding = "ascii"
stream_reader = Mock()
@@ -119,7 +363,7 @@ def test_encoding_is_passed_to_stream_reader():
file_type="csv",
format=CsvFormat(encoding=encoding)
)
list(parser.parse_records(config, file, stream_reader, logger))
list(parser.parse_records(config, file, stream_reader, logger, {"properties": {"c1": {"type": "string"}, "c2": {"type": "string"}}}))
stream_reader.open_file.assert_has_calls([
mock.call(file, FileReadMode.READ, encoding, logger),
mock.call().__enter__(),

View File

@@ -18,23 +18,23 @@ JSONL_CONTENT_WITHOUT_MULTILINE_JSON_OBJECTS = [
b'{"a": 2, "b": "2"}',
]
JSONL_CONTENT_WITH_MULTILINE_JSON_OBJECTS = [
b'{',
b"{",
b' "a": 1,',
b' "b": "1"',
b'}',
b'{',
b"}",
b"{",
b' "a": 2,',
b' "b": "2"',
b'}',
b"}",
]
INVALID_JSON_CONTENT = [
b'{',
b"{",
b' "a": 1,',
b' "b": "1"',
b'{',
b"{",
b' "a": 2,',
b' "b": "2"',
b'}',
b"}",
]
@@ -52,9 +52,7 @@ def _infer_schema(stream_reader: MagicMock) -> Dict[str, Any]:
def test_when_infer_then_return_proper_types(stream_reader: MagicMock) -> None:
record = {"col1": 1, "col2": 2.2, "col3": "3", "col4": ["a", "list"], "col5": {"inner": "obj"}, "col6": None, "col7": True}
stream_reader.open_file.return_value.__enter__.return_value = io.BytesIO(
json.dumps(record).encode("utf-8")
)
stream_reader.open_file.return_value.__enter__.return_value = io.BytesIO(json.dumps(record).encode("utf-8"))
schema = _infer_schema(stream_reader)
@@ -82,7 +80,7 @@ def test_given_no_records_when_infer_then_return_empty_schema(stream_reader: Mag
def test_given_limit_hit_when_infer_then_stop_considering_records(stream_reader: MagicMock) -> None:
jsonl_file_content = ('{"key": 2.' + "2" * JsonlParser.MAX_BYTES_PER_FILE_FOR_SCHEMA_INFERENCE + '}\n{"key": "a string"}')
jsonl_file_content = '{"key": 2.' + "2" * JsonlParser.MAX_BYTES_PER_FILE_FOR_SCHEMA_INFERENCE + '}\n{"key": "a string"}'
stream_reader.open_file.return_value.__enter__.return_value = io.BytesIO(jsonl_file_content.encode("utf-8"))
schema = _infer_schema(stream_reader)
@@ -91,9 +89,9 @@ def test_given_limit_hit_when_infer_then_stop_considering_records(stream_reader:
def test_given_multiline_json_objects_and_read_limit_hit_when_infer_then_return_parse_until_at_least_one_record(
stream_reader: MagicMock
stream_reader: MagicMock,
) -> None:
jsonl_file_content = ('{\n"key": 2.' + "2" * JsonlParser.MAX_BYTES_PER_FILE_FOR_SCHEMA_INFERENCE + "\n}")
jsonl_file_content = '{\n"key": 2.' + "2" * JsonlParser.MAX_BYTES_PER_FILE_FOR_SCHEMA_INFERENCE + "\n}"
stream_reader.open_file.return_value.__enter__.return_value = io.BytesIO(jsonl_file_content.encode("utf-8"))
schema = _infer_schema(stream_reader)
@@ -101,9 +99,7 @@ def test_given_multiline_json_objects_and_read_limit_hit_when_infer_then_return_
assert schema == {"key": {"type": "number"}}
def test_given_multiline_json_objects_and_hits_read_limit_when_infer_then_return_proper_types(
stream_reader: MagicMock
) -> None:
def test_given_multiline_json_objects_and_hits_read_limit_when_infer_then_return_proper_types(stream_reader: MagicMock) -> None:
stream_reader.open_file.return_value.__enter__.return_value = JSONL_CONTENT_WITH_MULTILINE_JSON_OBJECTS
schema = _infer_schema(stream_reader)
assert schema == {"a": {"type": "integer"}, "b": {"type": "string"}}
@@ -117,7 +113,7 @@ def test_given_multiple_records_then_merge_types(stream_reader: MagicMock) -> No
def test_given_one_json_per_line_when_parse_records_then_return_records(stream_reader: MagicMock) -> None:
stream_reader.open_file.return_value.__enter__.return_value = JSONL_CONTENT_WITHOUT_MULTILINE_JSON_OBJECTS
records = list(JsonlParser().parse_records(Mock(), Mock(), stream_reader, Mock()))
records = list(JsonlParser().parse_records(Mock(), Mock(), stream_reader, Mock(), None))
assert records == [{"a": 1, "b": "1"}, {"a": 2, "b": "2"}]
@@ -125,14 +121,14 @@ def test_given_one_json_per_line_when_parse_records_then_do_not_send_warning(str
stream_reader.open_file.return_value.__enter__.return_value = JSONL_CONTENT_WITHOUT_MULTILINE_JSON_OBJECTS
logger = Mock()
list(JsonlParser().parse_records(Mock(), Mock(), stream_reader, logger))
list(JsonlParser().parse_records(Mock(), Mock(), stream_reader, logger, None))
assert logger.warning.call_count == 0
def test_given_multiline_json_object_when_parse_records_then_return_records(stream_reader: MagicMock) -> None:
stream_reader.open_file.return_value.__enter__.return_value = JSONL_CONTENT_WITH_MULTILINE_JSON_OBJECTS
records = list(JsonlParser().parse_records(Mock(), Mock(), stream_reader, Mock()))
records = list(JsonlParser().parse_records(Mock(), Mock(), stream_reader, Mock(), None))
assert records == [{"a": 1, "b": "1"}, {"a": 2, "b": "2"}]
@@ -140,7 +136,7 @@ def test_given_multiline_json_object_when_parse_records_then_log_once_one_record
stream_reader.open_file.return_value.__enter__.return_value = JSONL_CONTENT_WITH_MULTILINE_JSON_OBJECTS
logger = Mock()
next(iter(JsonlParser().parse_records(Mock(), Mock(), stream_reader, logger)))
next(iter(JsonlParser().parse_records(Mock(), Mock(), stream_reader, logger, None)))
assert logger.warning.call_count == 1
@@ -150,5 +146,5 @@ def test_given_unparsable_json_when_parse_records_then_raise_error(stream_reader
logger = Mock()
with pytest.raises(RecordParseError):
list(JsonlParser().parse_records(Mock(), Mock(), stream_reader, logger))
list(JsonlParser().parse_records(Mock(), Mock(), stream_reader, logger, None))
assert logger.warning.call_count == 0

View File

@@ -8,7 +8,7 @@ from unit_tests.sources.file_based.scenarios.scenario_builder import TestScenari
single_csv_scenario = (
TestScenarioBuilder()
.set_name("single_csv_stream")
.set_name("single_csv_scenario")
.set_config(
{
"streams": [
@@ -133,6 +133,15 @@ single_csv_scenario = (
],
"type": "string"
},
"inference_type": {
"default": "None",
"description": "How to infer the types of the columns. If none, inference default to strings.",
"title": "Inference Type",
"enum": [
"None",
"Primitive Types Only",
]
},
"delimiter": {
"title": "Delimiter",
"description": "The character delimiting individual cells in the CSV data. This may only be a 1-character string. For tab-delimited data enter '\\t'.",
@@ -234,7 +243,7 @@ single_csv_scenario = (
"type": "string"
},
"uniqueItems": True
}
},
}
},
{
@@ -551,7 +560,6 @@ multi_csv_stream_n_file_exceeds_limit_for_inference = (
},
]
)
.set_expected_logs({"discover": [{"level": "WARN", "message": "Refusing to infer schema for all 2 files; using 1 files."}]})
.set_discovery_policy(LowInferenceLimitDiscoveryPolicy())
).build()
@@ -1851,6 +1859,7 @@ csv_newline_in_values_not_quoted_scenario = (
"message": "Error parsing record. This could be due to a mismatch between the config's file type and the actual file type, or because the file or record is not parseable. stream=stream1 file=a.csv line_no=2 n_skipped=0",
}
]})
.set_expected_discover_error(SchemaInferenceError, FileBasedSourceError.SCHEMA_INFERENCE_ERROR.value)
).build()
csv_escape_char_is_set_scenario = (

View File

@@ -218,11 +218,11 @@ single_stream_user_input_schema_scenario_skip_nonconforming_records = (
'message': 'Records in file did not pass validation policy. stream=stream1 file=a.csv n_skipped=2 validation_policy=skip_record',
},
{
'level': 'WARNING',
'level': "WARN",
'message': 'Could not cast the value to the expected type.: col1: value=val11,expected_type=integer',
},
{
'level': 'WARNING',
'level': "WARN",
'message': 'Could not cast the value to the expected type.: col1: value=val21,expected_type=integer',
},
]
@@ -553,11 +553,11 @@ multi_stream_user_input_schema_scenario_emit_nonconforming_records = (
.set_expected_logs({
"read": [
{
'level': 'WARNING',
'level': "WARN",
'message': 'Could not cast the value to the expected type.: col2: value=val12b,expected_type=integer',
},
{
'level': 'WARNING',
'level': "WARN",
'message': 'Could not cast the value to the expected type.: col2: value=val22b,expected_type=integer',
},
]
@@ -696,11 +696,11 @@ multi_stream_user_input_schema_scenario_skip_nonconforming_records = (
'message': 'Records in file did not pass validation policy. stream=stream2 file=b.csv n_skipped=2 validation_policy=skip_record',
},
{
'level': 'WARNING',
'level': "WARN",
'message': 'Could not cast the value to the expected type.: col2: value=val12b,expected_type=integer',
},
{
'level': 'WARNING',
'level': "WARN",
'message': 'Could not cast the value to the expected type.: col2: value=val22b,expected_type=integer',
},
]

View File

@@ -9,19 +9,19 @@ _base_single_stream_scenario = (
TestScenarioBuilder()
.set_files(
{
"a.csv": { # The records in this file do not conform to the schema
"a.csv": {
"contents": [
("col1", "col2"),
("val_a_11", "val_a_21"),
("val_a_12", "val_a_22"),
("val_a_11", "1"),
("val_a_12", "2"),
],
"last_modified": "2023-06-05T03:54:07.000Z",
},
"b.csv": {
"b.csv": { # The records in this file do not conform to the schema
"contents": [
("col1",),
("val_b_11",),
("val_b_12",),
("col1", "col2"),
("val_b_11", "this is text that will trigger validation policy"),
("val_b_12", "2"),
],
"last_modified": "2023-06-05T03:54:07.000Z",
},
@@ -29,7 +29,7 @@ _base_single_stream_scenario = (
"contents": [
("col1",),
("val_c_11",),
("val_c_12","val_c_22"), # This record doesn't conform to the schema
("val_c_12", "val_c_22"), # This record is not parsable
("val_c_13",),
],
"last_modified": "2023-06-05T03:54:07.000Z",
@@ -55,9 +55,9 @@ _base_single_stream_scenario = (
"col1": {
"type": "string",
},
# "col2": { # remove this so the record does not conform to the schema
# "type": "string",
# },
"col2": {
"type": "integer",
},
"_ab_source_file_last_modified": {
"type": "string"
},
@@ -80,19 +80,19 @@ _base_multi_stream_scenario = (
TestScenarioBuilder()
.set_files(
{
"a/a1.csv": { # The records in this file do not conform to the schema
"a/a1.csv": {
"contents": [
("col1", "col2"),
("val_aa1_11", "val_aa1_21"),
("val_aa1_12", "val_aa1_22"),
("val_aa1_11", "1"),
("val_aa1_12", "2"),
],
"last_modified": "2023-06-05T03:54:07.000Z",
},
"a/a2.csv": {
"contents": [
("col1",),
("val_aa2_11",),
("val_aa2_12",),
("col1", "col2"),
("val_aa2_11", "this is text that will trigger validation policy"),
("val_aa2_12", "2"),
],
"last_modified": "2023-06-05T03:54:07.000Z",
},
@@ -100,7 +100,7 @@ _base_multi_stream_scenario = (
"contents": [
("col1",),
("val_aa3_11",),
("val_aa3_12", "val_aa3_22"), # This record does not conform to the schema
("val_aa3_12", "val_aa3_22"), # This record is not parsable
("val_aa3_13",),
],
"last_modified": "2023-06-05T03:54:07.000Z",
@@ -115,17 +115,17 @@ _base_multi_stream_scenario = (
"b/b1.csv": { # The records in this file do not conform to the schema
"contents": [
("col1",),
("val_bb1_11",),
("val_bb1_12",),
("col1", "col2"),
("val_bb1_11", "1"),
("val_bb1_12", "2"),
],
"last_modified": "2023-06-05T03:54:07.000Z",
},
"b/b2.csv": {
"contents": [
("col1", "col2"),
("val_bb2_11", "val_bb2_21"),
("val_bb2_12", "val_bb2_22"),
("val_bb2_11", "this is text that will trigger validation policy"),
("val_bb2_12", "2"),
],
"last_modified": "2023-06-05T03:54:07.000Z",
},
@@ -152,9 +152,9 @@ _base_multi_stream_scenario = (
"col1": {
"type": "string",
},
# "col2": { # remove this so the record does not conform to the schema
# "type": "string",
# },
"col2": {
"type": "integer",
},
"_ab_source_file_last_modified": {
"type": "string"
},
@@ -175,9 +175,9 @@ _base_multi_stream_scenario = (
"col1": {
"type": "string",
},
# "col2": { # remove this so the record does not conform to the schema
# "type": "string",
# },
"col2": {
"type": "integer",
},
"_ab_source_file_last_modified": {
"type": "string"
},
@@ -213,13 +213,13 @@ skip_record_scenario_single_stream = (
)
.set_expected_records(
[
# {"data": {"col1": "val_a_11", "col2": "val_a_21", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a.csv"}, "stream": "stream1"}, # This record is skipped because it does not conform
# {"data": {"col1": "val_a_12", "col2": "val_a_22", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a.csv"}, "stream": "stream1"}, # This record is skipped because it does not conform
{"data": {"col1": "val_b_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b.csv"}, "stream": "stream1"},
{"data": {"col1": "val_b_12", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b.csv"}, "stream": "stream1"},
{"data": {"col1": "val_a_11", "col2": 1, "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a.csv"}, "stream": "stream1"},
{"data": {"col1": "val_a_12", "col2": 2, "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_b_11", "col2": "this is text that will trigger validation policy", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b.csv"}, "stream": "stream1"}, # This record is skipped because it does not conform
{"data": {"col1": "val_b_12", "col2": 2, "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b.csv"}, "stream": "stream1"},
{"data": {"col1": "val_c_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "c.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_c_12", None: "val_c_22", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "c.csv"}, "stream": "stream1"}, # This record is malformed so should not be emitted
{"data": {"col1": "val_c_13", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "c.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_c_13", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "c.csv"}, "stream": "stream1"}, # Skipped since previous record is malformed
{"data": {"col1": "val_d_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "d.csv"}, "stream": "stream1"},
]
)
@@ -227,11 +227,15 @@ skip_record_scenario_single_stream = (
"read": [
{
"level": "WARN",
"message": "Records in file did not pass validation policy. stream=stream1 file=a.csv n_skipped=2 validation_policy=skip_record",
"message": "Records in file did not pass validation policy. stream=stream1 file=b.csv n_skipped=1 validation_policy=skip_record",
},
{
"level": "ERROR",
"message": "Error parsing record. This could be due to a mismatch between the config's file type and the actual file type, or because the file or record is not parseable. stream=stream1 file=c.csv line_no=2 n_skipped=0",
},
{
"level": "WARN",
"message": "Records in file did not pass validation policy. stream=stream1 file=c.csv n_skipped=1 validation_policy=skip_record",
"message": "Could not cast the value to the expected type.: col2: value=this is text that will trigger validation policy,expected_type=integer",
},
]
})
@@ -262,18 +266,18 @@ skip_record_scenario_multi_stream = (
)
.set_expected_records(
[
# {"data": {"col1": "val_aa1_11", "col2": "val_aa1_21", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a1.csv"}, "stream": "stream1"}, # This record is skipped because it does not conform
# {"data": {"col1": "val_aa1_12", "col2": "val_aa1_22", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a1.csv"}, "stream": "stream1"}, # This record is skipped because it does not conform
{"data": {"col1": "val_aa2_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a2.csv"}, "stream": "stream1"},
{"data": {"col1": "val_aa2_12", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a2.csv"}, "stream": "stream1"},
{"data": {"col1": "val_aa1_11", "col2": 1, "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a1.csv"}, "stream": "stream1"},
{"data": {"col1": "val_aa1_12", "col2": 2, "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a1.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_aa2_11", "col2": "this is text that will trigger validation policy", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a2.csv"}, "stream": "stream1"}, # This record is skipped because it does not conform
{"data": {"col1": "val_aa2_12", "col2": 2, "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a2.csv"}, "stream": "stream1"},
{"data": {"col1": "val_aa3_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a3.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_aa3_12", None: "val_aa3_22", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a3.csv"}, "stream": "stream1"}, # This record is malformed so should not be emitted
{"data": {"col1": "val_aa3_13", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a3.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_aa3_13", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a3.csv"}, "stream": "stream1"}, # Skipped since previous record is malformed
{"data": {"col1": "val_aa4_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a4.csv"}, "stream": "stream1"},
{"data": {"col1": "val_bb1_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b1.csv"}, "stream": "stream2"}, # This record is skipped because it does not conform
{"data": {"col1": "val_bb1_12", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b1.csv"}, "stream": "stream2"}, # This record is skipped because it does not conform
# {"data": {"col1": "val_bb2_11", "col2": "val_bb2_21", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b2.csv"}, "stream": "stream2"},
# {"data": {"col1": "val_bb2_12", "col2": "val_bb2_21", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b2.csv"}, "stream": "stream2"},
{"data": {"col1": "val_bb1_11", "col2": 1, "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b1.csv"}, "stream": "stream2"},
{"data": {"col1": "val_bb1_12", "col2": 2, "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b1.csv"}, "stream": "stream2"},
# {"data": {"col1": "val_bb2_11", "col2": "val_bb2_21", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b2.csv"}, "stream": "stream2"}, # This record is skipped because it does not conform
{"data": {"col1": "val_bb2_12", "col2": 2, "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b2.csv"}, "stream": "stream2"},
{"data": {"col1": "val_bb3_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b3.csv"}, "stream": "stream2"},
{"data": {"col1": "val_bb3_12", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b3.csv"}, "stream": "stream2"},
]
@@ -282,15 +286,23 @@ skip_record_scenario_multi_stream = (
"read": [
{
"level": "WARN",
"message": "Records in file did not pass validation policy. stream=stream1 file=a/a1.csv n_skipped=2 validation_policy=skip_record",
"message": "Records in file did not pass validation policy. stream=stream1 file=a/a2.csv n_skipped=1 validation_policy=skip_record",
},
{
"level": "ERROR",
"message": "Error parsing record. This could be due to a mismatch between the config's file type and the actual file type, or because the file or record is not parseable. stream=stream1 file=a/a3.csv line_no=2 n_skipped=0",
},
{
"level": "WARN",
"message": "Records in file did not pass validation policy. stream=stream1 file=a/a3.csv n_skipped=1 validation_policy=skip_record",
"message": "Records in file did not pass validation policy. stream=stream2 file=b/b2.csv n_skipped=1 validation_policy=skip_record",
},
{
"level": "WARN",
"message": "Records in file did not pass validation policy. stream=stream2 file=b/b2.csv n_skipped=2 validation_policy=skip_record",
"message": "Could not cast the value to the expected type.: col2: value=this is text that will trigger validation policy,expected_type=integer",
},
{
"level": "WARN",
"message": "Could not cast the value to the expected type.: col2: value=this is text that will trigger validation policy,expected_type=integer",
},
]
})
@@ -314,14 +326,14 @@ emit_record_scenario_single_stream = (
)
.set_expected_records(
[
{"data": {"col1": "val_a_11", "col2": "val_a_21", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a.csv"}, "stream": "stream1"},
{"data": {"col1": "val_a_12", "col2": "val_a_22", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a.csv"}, "stream": "stream1"},
{"data": {"col1": "val_b_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b.csv"}, "stream": "stream1"},
{"data": {"col1": "val_b_12", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b.csv"}, "stream": "stream1"},
{"data": {"col1": "val_a_11", "col2": 1, "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a.csv"}, "stream": "stream1"},
{"data": {"col1": "val_a_12", "col2": 2, "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a.csv"}, "stream": "stream1"},
{"data": {"col1": "val_b_11", "col2": "this is text that will trigger validation policy", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b.csv"}, "stream": "stream1"}, # This record is skipped because it does not conform
{"data": {"col1": "val_b_12", "col2": 2, "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b.csv"}, "stream": "stream1"},
{"data": {"col1": "val_c_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "c.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_c_12", None: "val_c_22", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "c.csv"}, "stream": "stream1"}, # This record is malformed so should not be emitted
# {"data": {"col1": "val_c_13", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "c.csv"}, "stream": "stream1"}, # No more records from this stream are emitted after we hit a parse error
# {"data": {"col1": "val_d_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "d.csv"}, "stream": "stream1"},
{"data": {"col1": "val_d_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "d.csv"}, "stream": "stream1"},
]
)
.set_expected_logs({
@@ -330,6 +342,10 @@ emit_record_scenario_single_stream = (
"level": "ERROR",
"message": f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream=stream1 file=c.csv line_no=2 n_skipped=0",
},
{
"level": "WARN",
"message": "Could not cast the value to the expected type.: col2: value=this is text that will trigger validation policy,expected_type=integer",
},
]
})
).build()
@@ -359,18 +375,18 @@ emit_record_scenario_multi_stream = (
)
.set_expected_records(
[
{"data": {"col1": "val_aa1_11", "col2": "val_aa1_21", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a1.csv"}, "stream": "stream1"},
{"data": {"col1": "val_aa1_12", "col2": "val_aa1_22", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a1.csv"}, "stream": "stream1"},
{"data": {"col1": "val_aa2_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a2.csv"}, "stream": "stream1"},
{"data": {"col1": "val_aa2_12", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a2.csv"}, "stream": "stream1"},
{"data": {"col1": "val_aa1_11", "col2": 1, "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a1.csv"}, "stream": "stream1"},
{"data": {"col1": "val_aa1_12", "col2": 2, "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a1.csv"}, "stream": "stream1"},
{"data": {"col1": "val_aa2_11", "col2": "this is text that will trigger validation policy", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a2.csv"}, "stream": "stream1"},
{"data": {"col1": "val_aa2_12", "col2": 2, "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a2.csv"}, "stream": "stream1"},
{"data": {"col1": "val_aa3_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a3.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_aa3_12", None: "val_aa3_22", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a3.csv"}, "stream": "stream1"}, # This record is malformed so should not be emitted
# {"data": {"col1": "val_aa3_13", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a3.csv"}, "stream": "stream1"}, # No more records from this stream are emitted after we hit a parse error
# {"data": {"col1": "val_aa4_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a4.csv"}, "stream": "stream1"},
{"data": {"col1": "val_bb1_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b1.csv"}, "stream": "stream2"},
{"data": {"col1": "val_bb1_12", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b1.csv"}, "stream": "stream2"},
{"data": {"col1": "val_bb2_11", "col2": "val_bb2_21", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b2.csv"}, "stream": "stream2"},
{"data": {"col1": "val_bb2_12", "col2": "val_bb2_22", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b2.csv"}, "stream": "stream2"},
# {"data": {"col1": "val_aa3_13", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a3.csv"}, "stream": "stream1"}, # Skipped since previous record is malformed
{"data": {"col1": "val_aa4_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a4.csv"}, "stream": "stream1"},
{"data": {"col1": "val_bb1_11", "col2": 1, "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b1.csv"}, "stream": "stream2"},
{"data": {"col1": "val_bb1_12", "col2": 2, "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b1.csv"}, "stream": "stream2"},
{"data": {"col1": "val_bb2_11", "col2": "this is text that will trigger validation policy", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b2.csv"}, "stream": "stream2"},
{"data": {"col1": "val_bb2_12", "col2": 2, "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b2.csv"}, "stream": "stream2"},
{"data": {"col1": "val_bb3_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b3.csv"}, "stream": "stream2"},
{"data": {"col1": "val_bb3_12", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b3.csv"}, "stream": "stream2"},
]
@@ -381,6 +397,14 @@ emit_record_scenario_multi_stream = (
"level": "ERROR",
"message": f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream=stream1 file=a/a3.csv line_no=2 n_skipped=0",
},
{
"level": "WARN",
"message": "Could not cast the value to the expected type.: col2: value=this is text that will trigger validation policy,expected_type=integer",
},
{
"level": "WARN",
"message": "Could not cast the value to the expected type.: col2: value=this is text that will trigger validation policy,expected_type=integer",
},
]
})
).build()
@@ -401,14 +425,20 @@ wait_for_rediscovery_scenario_single_stream = (
]
}
)
.set_expected_records(
[] # No records are expected because the very first file did not conform to the schema
)
.set_expected_records([
{"data": {"col1": "val_a_11", "col2": 1, "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a.csv"}, "stream": "stream1"},
{"data": {"col1": "val_a_12", "col2": 2, "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a.csv"}, "stream": "stream1"},
# No records past that because the first record for the second file did not conform to the schema
])
.set_expected_logs({
"read": [
{
"level": "WARN",
"message": "Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema. stream=stream1 file=a.csv validation_policy=Wait for Discover n_skipped=0",
"message": "Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema. stream=stream1 file=b.csv validation_policy=Wait for Discover n_skipped=0",
},
{
"level": "WARN",
"message": "Could not cast the value to the expected type.: col2: value=this is text that will trigger validation policy,expected_type=integer",
},
]
})
@@ -439,18 +469,18 @@ wait_for_rediscovery_scenario_multi_stream = (
)
.set_expected_records(
[
# {"data": {"col1": "val_aa1_11", "col2": "val_aa1_21", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a1.csv"}, "stream": "stream1"}, # The first record does not conform so we don't sync anything from this stream
# {"data": {"col1": "val_aa1_12", "col2": "val_aa1_22", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a1.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_aa2_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a2.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_aa2_12", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a2.csv"}, "stream": "stream1"},
{"data": {"col1": "val_aa1_11", "col2": 1, "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a1.csv"}, "stream": "stream1"},
{"data": {"col1": "val_aa1_12", "col2": 2, "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a1.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_aa2_11", "col2": "this is text that will trigger validation policy", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a2.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_aa2_12", "col2": 2, "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a2.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_aa3_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a3.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_aa3_12", None: "val_aa3_22", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a3.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_aa3_13", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a3.csv"}, "stream": "stream1"},
# {"data": {"col1": "val_aa4_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "a/a4.csv"}, "stream": "stream1"},
{"data": {"col1": "val_bb1_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b1.csv"}, "stream": "stream2"},
{"data": {"col1": "val_bb1_12", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b1.csv"}, "stream": "stream2"},
# {"data": {"col1": "val_bb2_11", "col2": "val_bb2_21", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b2.csv"}, "stream": "stream2"}, # No more records from this stream are emitted after a nonconforming record is encountered
# {"data": {"col1": "val_bb2_12", "col2": "val_bb2_21", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b2.csv"}, "stream": "stream2"},
{"data": {"col1": "val_bb1_11", "col2": 1, "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b1.csv"}, "stream": "stream2"},
{"data": {"col1": "val_bb1_12", "col2": 2, "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b1.csv"}, "stream": "stream2"},
# {"data": {"col1": "val_bb2_11", "col2": "this is text that will trigger validation policy", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b2.csv"}, "stream": "stream2"},
# {"data": {"col1": "val_bb2_12", "col2": 2, "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b2.csv"}, "stream": "stream2"},
# {"data": {"col1": "val_bb3_11", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b3.csv"}, "stream": "stream2"},
# {"data": {"col1": "val_bb3_12", "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", "_ab_source_file_url": "b/b3.csv"}, "stream": "stream2"},
]
@@ -459,12 +489,20 @@ wait_for_rediscovery_scenario_multi_stream = (
"read": [
{
"level": "WARN",
"message": "Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema. stream=stream1 file=a/a1.csv validation_policy=Wait for Discover n_skipped=0",
"message": "Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema. stream=stream1 file=a/a2.csv validation_policy=Wait for Discover n_skipped=0",
},
{
"level": "WARN",
"message": "Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema. stream=stream2 file=b/b2.csv validation_policy=Wait for Discover n_skipped=0",
},
{
"level": "WARN",
"message": "Could not cast the value to the expected type.: col2: value=this is text that will trigger validation policy,expected_type=integer",
},
{
"level": "WARN",
"message": "Could not cast the value to the expected type.: col2: value=this is text that will trigger validation policy,expected_type=integer",
},
]
})
).build()

View File

@@ -11,6 +11,7 @@ import pytest
from _pytest.capture import CaptureFixture
from _pytest.reports import ExceptionInfo
from airbyte_cdk.entrypoint import launch
from airbyte_cdk.logger import AirbyteLogFormatter
from airbyte_cdk.models import SyncMode
from freezegun import freeze_time
from pytest import LogCaptureFixture
@@ -232,6 +233,7 @@ read_scenarios = discover_scenarios + [
@pytest.mark.parametrize("scenario", read_scenarios, ids=[s.name for s in read_scenarios])
@freeze_time("2023-06-09T00:00:00Z")
def test_read(capsys: CaptureFixture[str], caplog: LogCaptureFixture, tmp_path: PosixPath, scenario: TestScenario) -> None:
caplog.handler.setFormatter(AirbyteLogFormatter())
if scenario.incremental_scenario_config:
run_test_read_incremental(capsys, caplog, tmp_path, scenario)
else:
@@ -371,23 +373,23 @@ def discover(capsys: CaptureFixture[str], tmp_path: PosixPath, scenario: TestSce
def read(capsys: CaptureFixture[str], caplog: LogCaptureFixture, tmp_path: PosixPath, scenario: TestScenario) -> Dict[str, Any]:
launch(
scenario.source,
[
"read",
"--config",
make_file(tmp_path / "config.json", scenario.config),
"--catalog",
make_file(tmp_path / "catalog.json", scenario.configured_catalog(SyncMode.full_refresh)),
],
)
captured = capsys.readouterr().out.splitlines()
logs = caplog.records
return {
"records": [msg for msg in (json.loads(line) for line in captured) if msg["type"] == "RECORD"],
"logs": [msg["log"] for msg in (json.loads(line) for line in captured) if msg["type"] == "LOG"]
+ [{"level": log.levelname, "message": log.message} for log in logs],
}
with caplog.handler.stream as logger_stream:
launch(
scenario.source,
[
"read",
"--config",
make_file(tmp_path / "config.json", scenario.config),
"--catalog",
make_file(tmp_path / "catalog.json", scenario.configured_catalog(SyncMode.full_refresh)),
],
)
captured = capsys.readouterr().out.splitlines() + logger_stream.getvalue().split("\n")[:-1]
return {
"records": [msg for msg in (json.loads(line) for line in captured) if msg["type"] == "RECORD"],
"logs": [msg["log"] for msg in (json.loads(line) for line in captured) if msg["type"] == "LOG"],
}
def read_with_state(