[26763] csv config options validation and use by reader (#27850)
* csv options validation applying dialect to reader and rafeactoring parser interfaces a bit * fix tests * pr feedback * add quoting behavior config format
This commit is contained in:
@@ -0,0 +1,83 @@
|
||||
#
|
||||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
import codecs
|
||||
from enum import Enum
|
||||
from typing import Any, List, Mapping, Optional, Union
|
||||
|
||||
from airbyte_cdk.models import ConfiguredAirbyteCatalog
|
||||
from pydantic import BaseModel, validator
|
||||
|
||||
PrimaryKeyType = Optional[Union[str, List[str], List[List[str]]]]
|
||||
|
||||
VALID_FILE_TYPES = {"avro", "csv", "jsonl", "parquet"}
|
||||
|
||||
|
||||
class QuotingBehavior(Enum):
|
||||
QUOTE_ALL = "Quote All"
|
||||
QUOTE_SPECIAL_CHARACTERS = "Quote Special Characters"
|
||||
QUOTE_NONNUMERIC = "Quote Non-numeric"
|
||||
QUOTE_NONE = "Quote None"
|
||||
|
||||
|
||||
class CsvFormat(BaseModel):
|
||||
delimiter: str = ","
|
||||
quote_char: str = '"'
|
||||
escape_char: Optional[str]
|
||||
encoding: Optional[str] = "utf8"
|
||||
double_quote: bool
|
||||
quoting_behavior: Optional[QuotingBehavior] = QuotingBehavior.QUOTE_SPECIAL_CHARACTERS
|
||||
# Noting that the existing S3 connector had a config option newlines_in_values. This was only supported by pyarrow and not
|
||||
# the Python csv package. It has a little adoption, but long term we should ideally phase this out because of the drawbacks
|
||||
# of using pyarrow
|
||||
|
||||
@validator("delimiter")
|
||||
def validate_delimiter(cls, v):
|
||||
if len(v) != 1:
|
||||
raise ValueError("delimiter should only be one character")
|
||||
if v in {"\r", "\n"}:
|
||||
raise ValueError(f"delimiter cannot be {v}")
|
||||
return v
|
||||
|
||||
@validator("quote_char")
|
||||
def validate_quote_char(cls, v):
|
||||
if len(v) != 1:
|
||||
raise ValueError("quote_char should only be one character")
|
||||
return v
|
||||
|
||||
@validator("escape_char")
|
||||
def validate_escape_char(cls, v):
|
||||
if len(v) != 1:
|
||||
raise ValueError("escape_char should only be one character")
|
||||
return v
|
||||
|
||||
@validator("encoding")
|
||||
def validate_encoding(cls, v):
|
||||
try:
|
||||
codecs.lookup(v)
|
||||
except LookupError:
|
||||
raise ValueError(f"invalid encoding format: {v}")
|
||||
return v
|
||||
|
||||
|
||||
class FileBasedStreamConfig(BaseModel):
|
||||
name: str
|
||||
file_type: str
|
||||
globs: Optional[List[str]]
|
||||
validation_policy: str
|
||||
catalog_schema: Optional[ConfiguredAirbyteCatalog]
|
||||
input_schema: Optional[Mapping[str, Any]]
|
||||
primary_key: PrimaryKeyType
|
||||
max_history_size: Optional[int]
|
||||
days_to_sync_if_history_is_full: Optional[int]
|
||||
format: Optional[Mapping[str, CsvFormat]] # this will eventually be a Union once we have more than one format type
|
||||
|
||||
@validator("format", pre=True)
|
||||
def transform_format(cls, v):
|
||||
if isinstance(v, Mapping):
|
||||
file_type = v.get("filetype", "")
|
||||
if file_type.casefold() not in VALID_FILE_TYPES:
|
||||
raise ValueError(f"Format filetype {file_type} is not a supported file type")
|
||||
return {file_type: {key: val for key, val in v.items()}}
|
||||
return v
|
||||
@@ -64,7 +64,7 @@ class DefaultFileBasedAvailabilityStrategy(AvailabilityStrategy):
|
||||
parser = stream.get_parser(stream.config.file_type)
|
||||
|
||||
try:
|
||||
record = next(iter(parser.parse_records(file, self.stream_reader)))
|
||||
record = next(iter(parser.parse_records(stream.config, file, self.stream_reader)))
|
||||
except StopIteration:
|
||||
# The file is empty. We've verified that we can open it, so will
|
||||
# consider the connection check successful even though it means
|
||||
|
||||
@@ -8,6 +8,7 @@ from abc import ABC
|
||||
from typing import Any, Dict, List, Mapping, Optional, Tuple, Type
|
||||
|
||||
from airbyte_cdk.sources import AbstractSource
|
||||
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
|
||||
from airbyte_cdk.sources.file_based.default_file_based_availability_strategy import DefaultFileBasedAvailabilityStrategy
|
||||
from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy, DefaultDiscoveryPolicy
|
||||
from airbyte_cdk.sources.file_based.exceptions import ConfigValidationError, FileBasedSourceError
|
||||
@@ -17,7 +18,6 @@ from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeP
|
||||
from airbyte_cdk.sources.file_based.schema_validation_policies import AbstractSchemaValidationPolicy, DefaultSchemaValidationPolicy
|
||||
from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream, DefaultFileBasedStream
|
||||
from airbyte_cdk.sources.file_based.stream.cursor.default_file_based_cursor import DefaultFileBasedCursor
|
||||
from airbyte_cdk.sources.file_based.stream.file_based_stream_config import FileBasedStreamConfig
|
||||
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
|
||||
from pydantic.error_wrappers import ValidationError
|
||||
|
||||
@@ -85,7 +85,7 @@ class FileBasedSource(AbstractSource, ABC):
|
||||
stream_config = FileBasedStreamConfig(**stream)
|
||||
streams.append(
|
||||
DefaultFileBasedStream(
|
||||
config=FileBasedStreamConfig(**stream),
|
||||
config=stream_config,
|
||||
stream_reader=self.stream_reader,
|
||||
availability_strategy=self.availability_strategy,
|
||||
discovery_policy=self.discovery_policy,
|
||||
|
||||
@@ -4,14 +4,19 @@
|
||||
|
||||
from typing import Any, Dict, Iterable
|
||||
|
||||
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
|
||||
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
|
||||
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
|
||||
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
|
||||
|
||||
|
||||
class AvroParser(FileTypeParser):
|
||||
async def infer_schema(self, file: RemoteFile, stream_reader: AbstractFileBasedStreamReader) -> Dict[str, Any]:
|
||||
async def infer_schema(
|
||||
self, config: FileBasedStreamConfig, file: RemoteFile, stream_reader: AbstractFileBasedStreamReader
|
||||
) -> Dict[str, Any]:
|
||||
...
|
||||
|
||||
def parse_records(self, file: RemoteFile, stream_reader: AbstractFileBasedStreamReader) -> Iterable[Dict[str, Any]]:
|
||||
def parse_records(
|
||||
self, config: FileBasedStreamConfig, file: RemoteFile, stream_reader: AbstractFileBasedStreamReader
|
||||
) -> Iterable[Dict[str, Any]]:
|
||||
...
|
||||
|
||||
@@ -5,18 +5,70 @@
|
||||
import csv
|
||||
from typing import Any, Dict, Iterable
|
||||
|
||||
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig, QuotingBehavior
|
||||
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
|
||||
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
|
||||
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
|
||||
|
||||
DIALECT_NAME = "_config_dialect"
|
||||
|
||||
config_to_quoting: [QuotingBehavior, int] = {
|
||||
QuotingBehavior.QUOTE_ALL: csv.QUOTE_ALL,
|
||||
QuotingBehavior.QUOTE_SPECIAL_CHARACTERS: csv.QUOTE_MINIMAL,
|
||||
QuotingBehavior.QUOTE_NONNUMERIC: csv.QUOTE_NONNUMERIC,
|
||||
QuotingBehavior.QUOTE_NONE: csv.QUOTE_NONE,
|
||||
}
|
||||
|
||||
|
||||
class CsvParser(FileTypeParser):
|
||||
async def infer_schema(self, file: RemoteFile, stream_reader: AbstractFileBasedStreamReader) -> Dict[str, Any]:
|
||||
with stream_reader.open_file(file) as fp:
|
||||
reader = csv.DictReader(fp)
|
||||
return {field.strip(): {"type": ["null", "string"]} for field in next(reader)}
|
||||
async def infer_schema(
|
||||
self, config: FileBasedStreamConfig, file: RemoteFile, stream_reader: AbstractFileBasedStreamReader
|
||||
) -> Dict[str, Any]:
|
||||
config_format = config.format.get(config.file_type) if config.format else None
|
||||
if config_format:
|
||||
dialect_name = config.name + DIALECT_NAME
|
||||
csv.register_dialect(
|
||||
dialect_name,
|
||||
delimiter=config_format.delimiter,
|
||||
quotechar=config_format.quote_char,
|
||||
escapechar=config_format.escape_char,
|
||||
doublequote=config_format.double_quote,
|
||||
quoting=config_to_quoting.get(config_format.quoting_behavior, csv.QUOTE_MINIMAL),
|
||||
)
|
||||
with stream_reader.open_file(file) as fp:
|
||||
# todo: the existing InMemoryFilesSource.open_file() test source doesn't currently require an encoding, but actual
|
||||
# sources will likely require one. Rather than modify the interface now we can wait until the real use case
|
||||
reader = csv.DictReader(fp, dialect=dialect_name)
|
||||
schema = {field.strip(): {"type": ["null", "string"]} for field in next(reader)}
|
||||
csv.unregister_dialect(dialect_name)
|
||||
return schema
|
||||
else:
|
||||
with stream_reader.open_file(file) as fp:
|
||||
reader = csv.DictReader(fp)
|
||||
return {field.strip(): {"type": ["null", "string"]} for field in next(reader)}
|
||||
|
||||
def parse_records(self, file: RemoteFile, stream_reader: AbstractFileBasedStreamReader) -> Iterable[Dict[str, Any]]:
|
||||
with stream_reader.open_file(file) as fp:
|
||||
reader = csv.DictReader(fp)
|
||||
yield from reader
|
||||
def parse_records(
|
||||
self, config: FileBasedStreamConfig, file: RemoteFile, stream_reader: AbstractFileBasedStreamReader
|
||||
) -> Iterable[Dict[str, Any]]:
|
||||
config_format = config.format.get(config.file_type) if config.format else None
|
||||
if config_format:
|
||||
# Formats are configured individually per-stream so a unique dialect should be registered for each stream.
|
||||
# Wwe don't unregister the dialect because we are lazily parsing each csv file to generate records
|
||||
dialect_name = config.name + DIALECT_NAME
|
||||
csv.register_dialect(
|
||||
dialect_name,
|
||||
delimiter=config_format.delimiter,
|
||||
quotechar=config_format.quote_char,
|
||||
escapechar=config_format.escape_char,
|
||||
doublequote=config_format.double_quote,
|
||||
quoting=config_to_quoting.get(config_format.quoting_behavior, csv.QUOTE_MINIMAL),
|
||||
)
|
||||
with stream_reader.open_file(file) as fp:
|
||||
# todo: the existing InMemoryFilesSource.open_file() test source doesn't currently require an encoding, but actual
|
||||
# sources will likely require one. Rather than modify the interface now we can wait until the real use case
|
||||
reader = csv.DictReader(fp, dialect=dialect_name)
|
||||
yield from reader
|
||||
else:
|
||||
with stream_reader.open_file(file) as fp:
|
||||
reader = csv.DictReader(fp)
|
||||
yield from reader
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Any, Dict, Iterable
|
||||
|
||||
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
|
||||
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
|
||||
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
|
||||
|
||||
@@ -19,14 +20,16 @@ class FileTypeParser(ABC):
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
async def infer_schema(self, file: RemoteFile, stream_reader: AbstractFileBasedStreamReader) -> Schema:
|
||||
async def infer_schema(self, config: FileBasedStreamConfig, file: RemoteFile, stream_reader: AbstractFileBasedStreamReader) -> Schema:
|
||||
"""
|
||||
Infer the JSON Schema for this file.
|
||||
"""
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def parse_records(self, file: RemoteFile, stream_reader: AbstractFileBasedStreamReader) -> Iterable[Record]:
|
||||
def parse_records(
|
||||
self, config: FileBasedStreamConfig, file: RemoteFile, stream_reader: AbstractFileBasedStreamReader
|
||||
) -> Iterable[Record]:
|
||||
"""
|
||||
Parse and emit each record.
|
||||
"""
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
|
||||
from typing import Any, Dict, Iterable
|
||||
|
||||
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
|
||||
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
|
||||
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
|
||||
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
|
||||
@@ -13,8 +14,12 @@ class JsonlParser(FileTypeParser):
|
||||
|
||||
MAX_BYTES_PER_FILE_FOR_SCHEMA_INFERENCE = 1_000_000
|
||||
|
||||
async def infer_schema(self, file: RemoteFile, stream_reader: AbstractFileBasedStreamReader) -> Dict[str, Any]:
|
||||
async def infer_schema(
|
||||
self, config: FileBasedStreamConfig, file: RemoteFile, stream_reader: AbstractFileBasedStreamReader
|
||||
) -> Dict[str, Any]:
|
||||
...
|
||||
|
||||
def parse_records(self, file: RemoteFile, stream_reader: AbstractFileBasedStreamReader) -> Iterable[Dict[str, Any]]:
|
||||
def parse_records(
|
||||
self, config: FileBasedStreamConfig, file: RemoteFile, stream_reader: AbstractFileBasedStreamReader
|
||||
) -> Iterable[Dict[str, Any]]:
|
||||
...
|
||||
|
||||
@@ -4,14 +4,19 @@
|
||||
|
||||
from typing import Any, Dict, Iterable
|
||||
|
||||
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
|
||||
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
|
||||
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
|
||||
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
|
||||
|
||||
|
||||
class ParquetParser(FileTypeParser):
|
||||
async def infer_schema(self, file: RemoteFile, stream_reader: AbstractFileBasedStreamReader) -> Dict[str, Any]:
|
||||
async def infer_schema(
|
||||
self, config: FileBasedStreamConfig, file: RemoteFile, stream_reader: AbstractFileBasedStreamReader
|
||||
) -> Dict[str, Any]:
|
||||
...
|
||||
|
||||
def parse_records(self, file: RemoteFile, stream_reader: AbstractFileBasedStreamReader) -> Iterable[Dict[str, Any]]:
|
||||
def parse_records(
|
||||
self, config: FileBasedStreamConfig, file: RemoteFile, stream_reader: AbstractFileBasedStreamReader
|
||||
) -> Iterable[Dict[str, Any]]:
|
||||
...
|
||||
|
||||
@@ -7,13 +7,13 @@ from functools import cached_property
|
||||
from typing import Any, Dict, Iterable, List, Mapping, Optional, Type
|
||||
|
||||
from airbyte_cdk.models import SyncMode
|
||||
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig, PrimaryKeyType
|
||||
from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy
|
||||
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, UndefinedParserError
|
||||
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
|
||||
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
|
||||
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
|
||||
from airbyte_cdk.sources.file_based.schema_validation_policies import AbstractSchemaValidationPolicy
|
||||
from airbyte_cdk.sources.file_based.stream.file_based_stream_config import FileBasedStreamConfig, PrimaryKeyType
|
||||
from airbyte_cdk.sources.file_based.types import StreamSlice, StreamState
|
||||
from airbyte_cdk.sources.streams import Stream
|
||||
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
|
||||
|
||||
@@ -66,7 +66,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin):
|
||||
# only serialize the datetime once
|
||||
file_datetime_string = file.last_modified.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
try:
|
||||
for record in parser.parse_records(file, self._stream_reader):
|
||||
for record in parser.parse_records(self.config, file, self._stream_reader):
|
||||
if not self.record_passes_validation_policy(record):
|
||||
logging.warning(f"Record did not pass validation policy: {record}")
|
||||
continue
|
||||
@@ -147,7 +147,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin):
|
||||
|
||||
async def _infer_file_schema(self, file: RemoteFile) -> Mapping[str, Any]:
|
||||
try:
|
||||
return await self.get_parser(self.config.file_type).infer_schema(file, self._stream_reader)
|
||||
return await self.get_parser(self.config.file_type).infer_schema(self.config, file, self._stream_reader)
|
||||
except Exception as exc:
|
||||
raise SchemaInferenceError(
|
||||
FileBasedSourceError.SCHEMA_INFERENCE_ERROR,
|
||||
|
||||
@@ -1,22 +0,0 @@
|
||||
#
|
||||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
from typing import Any, List, Mapping, Optional, Union
|
||||
|
||||
from airbyte_cdk.models import ConfiguredAirbyteCatalog
|
||||
from pydantic import BaseModel
|
||||
|
||||
PrimaryKeyType = Optional[Union[str, List[str], List[List[str]]]]
|
||||
|
||||
|
||||
class FileBasedStreamConfig(BaseModel):
|
||||
name: str
|
||||
file_type: str
|
||||
globs: Optional[List[str]]
|
||||
validation_policy: str
|
||||
catalog_schema: Optional[ConfiguredAirbyteCatalog]
|
||||
input_schema: Optional[Mapping[str, Any]]
|
||||
primary_key: PrimaryKeyType
|
||||
max_history_size: Optional[int]
|
||||
days_to_sync_if_history_is_full: Optional[int]
|
||||
@@ -0,0 +1,38 @@
|
||||
#
|
||||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
import pytest as pytest
|
||||
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig, QuotingBehavior
|
||||
from pydantic import ValidationError
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"file_type, input_format, expected_format, expected_error",
|
||||
[
|
||||
pytest.param("csv", {"filetype": "csv", "delimiter": "d", "quote_char": "q", "escape_char": "e", "encoding": "ascii", "double_quote": True, "quoting_behavior": "Quote All"}, {"delimiter": "d", "quote_char": "q", "escape_char": "e", "encoding": "ascii", "double_quote": True, "quoting_behavior": QuotingBehavior.QUOTE_ALL}, None, id="test_valid_format"),
|
||||
pytest.param("csv", {"filetype": "csv", "double_quote": False}, {"delimiter": ",", "quote_char": "\"", "encoding": "utf8", "double_quote": False, "quoting_behavior": QuotingBehavior.QUOTE_SPECIAL_CHARACTERS}, None, id="test_default_format_values"),
|
||||
pytest.param("csv", {"filetype": "csv", "delimiter": "nope", "double_quote": True}, None, ValidationError, id="test_invalid_delimiter"),
|
||||
pytest.param("csv", {"filetype": "csv", "quote_char": "nope", "double_quote": True}, None, ValidationError, id="test_invalid_quote_char"),
|
||||
pytest.param("csv", {"filetype": "csv", "escape_char": "nope", "double_quote": True}, None, ValidationError, id="test_invalid_escape_char"),
|
||||
pytest.param("csv", {"filetype": "csv", "delimiter": ",", "quote_char": "\"", "encoding": "not_a_format", "double_quote": True}, {}, ValidationError, id="test_invalid_encoding_type"),
|
||||
pytest.param("csv", {"filetype": "csv", "double_quote": True, "quoting_behavior": "Quote Invalid"}, None, ValidationError, id="test_invalid_quoting_behavior"),
|
||||
pytest.param("invalid", {"filetype": "invalid", "double_quote": False}, {}, ValidationError, id="test_config_format_file_type_mismatch"),
|
||||
]
|
||||
)
|
||||
def test_csv_config(file_type, input_format, expected_format, expected_error):
|
||||
stream_config = {
|
||||
"name": "stream1",
|
||||
"file_type": file_type,
|
||||
"globs": ["*"],
|
||||
"validation_policy": "emit_record_on_schema_mismatch",
|
||||
"format": input_format,
|
||||
}
|
||||
if expected_error:
|
||||
with pytest.raises(expected_error):
|
||||
FileBasedStreamConfig(**stream_config)
|
||||
else:
|
||||
actual_config = FileBasedStreamConfig(**stream_config)
|
||||
assert not hasattr(actual_config.format[file_type], "filetype")
|
||||
for expected_format_field, expected_format_value in expected_format.items():
|
||||
assert getattr(actual_config.format[file_type], expected_format_field) == expected_format_value
|
||||
@@ -6,7 +6,7 @@ import csv
|
||||
import io
|
||||
from datetime import datetime
|
||||
from io import IOBase
|
||||
from typing import Dict, Iterable, List
|
||||
from typing import Any, Dict, Iterable, List, Optional
|
||||
|
||||
from airbyte_cdk.sources.file_based.default_file_based_availability_strategy import DefaultFileBasedAvailabilityStrategy
|
||||
from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy
|
||||
@@ -28,8 +28,9 @@ class InMemoryFilesSource(FileBasedSource):
|
||||
validation_policies: AbstractSchemaValidationPolicy,
|
||||
parsers: Dict[str, FileTypeParser],
|
||||
stream_reader: AbstractFileBasedStreamReader,
|
||||
file_write_options: Dict[str, Any]
|
||||
):
|
||||
stream_reader = stream_reader or InMemoryFilesStreamReader(files=files, file_type=file_type)
|
||||
stream_reader = stream_reader or InMemoryFilesStreamReader(files=files, file_type=file_type, file_write_options=file_write_options)
|
||||
availability_strategy = availability_strategy or DefaultFileBasedAvailabilityStrategy(stream_reader)
|
||||
super().__init__(
|
||||
stream_reader,
|
||||
@@ -47,6 +48,7 @@ class InMemoryFilesSource(FileBasedSource):
|
||||
class InMemoryFilesStreamReader(AbstractFileBasedStreamReader):
|
||||
files: Dict[str, dict]
|
||||
file_type: str
|
||||
file_write_options: Optional[Dict[str, Any]]
|
||||
|
||||
def get_matching_files(
|
||||
self,
|
||||
@@ -68,6 +70,12 @@ class InMemoryFilesStreamReader(AbstractFileBasedStreamReader):
|
||||
|
||||
def _make_csv_file_contents(self, file_name: str) -> str:
|
||||
fh = io.StringIO()
|
||||
writer = csv.writer(fh)
|
||||
writer.writerows(self.files[file_name]["contents"])
|
||||
if self.file_write_options:
|
||||
csv.register_dialect("in_memory_dialect", **self.file_write_options)
|
||||
writer = csv.writer(fh, dialect="in_memory_dialect")
|
||||
writer.writerows(self.files[file_name]["contents"])
|
||||
csv.unregister_dialect("in_memory_dialect")
|
||||
else:
|
||||
writer = csv.writer(fh)
|
||||
writer.writerows(self.files[file_name]["contents"])
|
||||
return fh.getvalue()
|
||||
|
||||
@@ -485,3 +485,222 @@ csv_multi_stream_scenario = (
|
||||
]
|
||||
)
|
||||
).build()
|
||||
|
||||
csv_custom_format_scenario = (
|
||||
TestScenarioBuilder()
|
||||
.set_name("csv_custom_format")
|
||||
.set_config(
|
||||
{
|
||||
"streams": [
|
||||
{
|
||||
"name": "stream1",
|
||||
"file_type": "csv",
|
||||
"globs": ["*"],
|
||||
"validation_policy": "emit_record_on_schema_mismatch",
|
||||
"format": {
|
||||
"filetype": "csv",
|
||||
"delimiter": "#",
|
||||
"quote_char": "|",
|
||||
"escape_char": "!",
|
||||
"double_quote": True,
|
||||
"quoting_behavior": "Quote Special Characters"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
)
|
||||
.set_files(
|
||||
{
|
||||
"a.csv": {
|
||||
"contents": [
|
||||
("col1", "col2", "col3"),
|
||||
("val11", "val12", "val |13|"),
|
||||
("val21", "val22", "val23"),
|
||||
("val,31", "val |,32|", "val, !!!! 33"),
|
||||
],
|
||||
"last_modified": "2023-06-05T03:54:07.000Z",
|
||||
}
|
||||
}
|
||||
)
|
||||
.set_file_type("csv")
|
||||
.set_expected_catalog(
|
||||
{
|
||||
"streams": [
|
||||
{
|
||||
"json_schema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"col1": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"col2": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"col3": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"_ab_source_file_last_modified": {
|
||||
"type": "string"
|
||||
},
|
||||
"_ab_source_file_url": {
|
||||
"type": "string"
|
||||
},
|
||||
},
|
||||
},
|
||||
"name": "stream1",
|
||||
"source_defined_cursor": True,
|
||||
"default_cursor_field": ["_ab_source_file_last_modified"],
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
}
|
||||
]
|
||||
}
|
||||
)
|
||||
.set_expected_records(
|
||||
[
|
||||
{"data": {"col1": "val11", "col2": "val12", "col3": "val |13|", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
|
||||
"_ab_source_file_url": "a.csv"}, "stream": "stream1"},
|
||||
{"data": {"col1": "val21", "col2": "val22", "col3": "val23", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
|
||||
"_ab_source_file_url": "a.csv"}, "stream": "stream1"},
|
||||
{"data": {"col1": "val,31", "col2": "val |,32|", "col3": "val, !! 33", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
|
||||
"_ab_source_file_url": "a.csv"}, "stream": "stream1"},
|
||||
]
|
||||
)
|
||||
.set_file_write_options(
|
||||
{
|
||||
"delimiter": "#",
|
||||
"quotechar": "|",
|
||||
}
|
||||
)
|
||||
).build()
|
||||
|
||||
multi_stream_custom_format = (
|
||||
TestScenarioBuilder()
|
||||
.set_name("multi_stream_custom_format_scenario")
|
||||
.set_config(
|
||||
{
|
||||
"streams": [
|
||||
{
|
||||
"name": "stream1",
|
||||
"file_type": "csv",
|
||||
"globs": ["*.csv"],
|
||||
"validation_policy": "emit_record_on_schema_mismatch",
|
||||
"format": {
|
||||
"filetype": "csv",
|
||||
"delimiter": "#",
|
||||
"escape_char": "!",
|
||||
"double_quote": True,
|
||||
"newlines_in_values": False
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "stream2",
|
||||
"file_type": "csv",
|
||||
"globs": ["b.csv"],
|
||||
"validation_policy": "emit_record_on_schema_mismatch",
|
||||
"format": {
|
||||
"filetype": "csv",
|
||||
"delimiter": "#",
|
||||
"escape_char": "@",
|
||||
"double_quote": True,
|
||||
"newlines_in_values": False,
|
||||
"quoting_behavior": "Quote All"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
)
|
||||
.set_files(
|
||||
{
|
||||
"a.csv": {
|
||||
"contents": [
|
||||
("col1", "col2"),
|
||||
("val11a", "val !! 12a"),
|
||||
("val !! 21a", "val22a"),
|
||||
],
|
||||
"last_modified": "2023-06-05T03:54:07.000Z",
|
||||
},
|
||||
"b.csv": {
|
||||
"contents": [
|
||||
("col3",),
|
||||
("val @@@@ 13b",),
|
||||
("val23b",),
|
||||
],
|
||||
"last_modified": "2023-06-05T03:54:07.000Z",
|
||||
},
|
||||
}
|
||||
)
|
||||
.set_file_type("csv")
|
||||
.set_expected_catalog(
|
||||
{
|
||||
"streams": [
|
||||
{
|
||||
"json_schema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"col1": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"col2": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"col3": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"_ab_source_file_last_modified": {
|
||||
"type": "string"
|
||||
},
|
||||
"_ab_source_file_url": {
|
||||
"type": "string"
|
||||
},
|
||||
},
|
||||
},
|
||||
"name": "stream1",
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_cursor": True,
|
||||
"default_cursor_field": ["_ab_source_file_last_modified"],
|
||||
},
|
||||
{
|
||||
"json_schema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"col3": {
|
||||
"type": ["null", "string"]
|
||||
},
|
||||
"_ab_source_file_last_modified": {
|
||||
"type": "string"
|
||||
},
|
||||
"_ab_source_file_url": {
|
||||
"type": "string"
|
||||
},
|
||||
},
|
||||
},
|
||||
"name": "stream2",
|
||||
"source_defined_cursor": True,
|
||||
"default_cursor_field": ["_ab_source_file_last_modified"],
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
},
|
||||
]
|
||||
}
|
||||
)
|
||||
.set_expected_records(
|
||||
[
|
||||
{"data": {"col1": "val11a", "col2": "val ! 12a", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
|
||||
"_ab_source_file_url": "a.csv"}, "stream": "stream1"},
|
||||
{"data": {"col1": "val ! 21a", "col2": "val22a", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
|
||||
"_ab_source_file_url": "a.csv"}, "stream": "stream1"},
|
||||
{"data": {"col3": "val @@@@ 13b", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "b.csv"},
|
||||
"stream": "stream1"},
|
||||
{"data": {"col3": "val23b", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "b.csv"},
|
||||
"stream": "stream1"},
|
||||
{"data": {"col3": "val @@ 13b", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "b.csv"},
|
||||
"stream": "stream2"},
|
||||
{"data": {"col3": "val23b", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z", "_ab_source_file_url": "b.csv"},
|
||||
"stream": "stream2"},
|
||||
]
|
||||
)
|
||||
.set_file_write_options(
|
||||
{
|
||||
"delimiter": "#",
|
||||
}
|
||||
)
|
||||
).build()
|
||||
|
||||
@@ -41,6 +41,7 @@ class TestScenario:
|
||||
expected_discover_error: Tuple[Optional[Type[Exception]], Optional[str]],
|
||||
expected_read_error: Tuple[Optional[Type[Exception]], Optional[str]],
|
||||
incremental_scenario_config: Optional[IncrementalScenarioConfig],
|
||||
file_write_options: Dict[str, Any]
|
||||
):
|
||||
self.name = name
|
||||
self.config = config
|
||||
@@ -58,6 +59,7 @@ class TestScenario:
|
||||
validation_policies,
|
||||
parsers,
|
||||
stream_reader,
|
||||
file_write_options,
|
||||
)
|
||||
self.incremental_scenario_config = incremental_scenario_config
|
||||
self.validate()
|
||||
@@ -108,6 +110,7 @@ class TestScenarioBuilder:
|
||||
self._expected_discover_error = None, None
|
||||
self._expected_read_error = None, None
|
||||
self._incremental_scenario_config = None
|
||||
self._file_write_options = {}
|
||||
|
||||
def set_name(self, name: str):
|
||||
self._name = name
|
||||
@@ -173,6 +176,10 @@ class TestScenarioBuilder:
|
||||
self._expected_read_error = error, message
|
||||
return self
|
||||
|
||||
def set_file_write_options(self, file_write_options: Dict[str, Any]):
|
||||
self._file_write_options = file_write_options
|
||||
return self
|
||||
|
||||
def copy(self):
|
||||
return deepcopy(self)
|
||||
|
||||
@@ -193,5 +200,6 @@ class TestScenarioBuilder:
|
||||
self._expected_check_error,
|
||||
self._expected_discover_error,
|
||||
self._expected_read_error,
|
||||
self._incremental_scenario_config
|
||||
self._incremental_scenario_config,
|
||||
self._file_write_options,
|
||||
)
|
||||
|
||||
@@ -24,11 +24,13 @@ from unit_tests.sources.file_based.scenarios.check_scenarios import (
|
||||
success_user_provided_schema_scenario,
|
||||
)
|
||||
from unit_tests.sources.file_based.scenarios.csv_scenarios import (
|
||||
csv_custom_format_scenario,
|
||||
csv_multi_stream_scenario,
|
||||
csv_single_stream_scenario,
|
||||
invalid_csv_scenario,
|
||||
multi_csv_scenario,
|
||||
multi_csv_stream_n_file_exceeds_limit_for_inference,
|
||||
multi_stream_custom_format,
|
||||
single_csv_scenario,
|
||||
)
|
||||
from unit_tests.sources.file_based.scenarios.incremental_scenarios import (
|
||||
@@ -71,6 +73,8 @@ scenarios = [
|
||||
multi_csv_sync_files_within_history_time_window_if_history_is_incomplete_different_timestamps_scenario,
|
||||
single_csv_file_is_skipped_if_same_modified_at_as_in_history,
|
||||
single_csv_file_is_synced_if_modified_at_is_more_recent_than_in_history,
|
||||
csv_custom_format_scenario,
|
||||
multi_stream_custom_format,
|
||||
]
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user