1
0
mirror of synced 2025-12-20 02:23:30 -05:00
Files
2025-10-27 12:40:46 +02:00

258 lines
8.9 KiB
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import json
import logging
from copy import deepcopy
import jsonschema
import pytest
from source_file.source import SourceFile
from airbyte_cdk.models import (
AirbyteConnectionStatus,
AirbyteMessage,
AirbyteStream,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
ConnectorSpecification,
DestinationSyncMode,
Status,
SyncMode,
Type,
)
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.utils import AirbyteTracedException
logger = logging.getLogger("airbyte")
@pytest.fixture
def source():
return SourceFile()
@pytest.fixture
def config():
config_path: str = "integration_tests/config.json"
with open(config_path, "r") as f:
return json.loads(f.read())
def test_csv_with_utf16_encoding(absolute_path, test_files):
config_local_csv_utf16 = {
"dataset_name": "AAA",
"format": "csv",
"reader_options": '{"encoding":"utf_16", "parse_dates": ["header5"]}',
"url": f"{absolute_path}/{test_files}/test_utf16.csv",
"provider": {"storage": "local"},
}
expected_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"header1": {"type": ["string", "null"]},
"header2": {"type": ["number", "null"]},
"header3": {"type": ["number", "null"]},
"header4": {"type": ["boolean", "null"]},
"header5": {"type": ["string", "null"], "format": "date-time"},
},
"type": "object",
}
catalog = SourceFile().discover(logger=logger, config=config_local_csv_utf16)
stream = next(iter(catalog.streams))
assert stream.json_schema == expected_schema
def test_zipped_csv_with_utf16_encoding(absolute_path, test_files):
config_local_zipped_csv_utf16 = {
"dataset_name": "AAA",
"format": "csv",
"reader_options": '{"encoding":"utf_16", "parse_dates": ["header5"]}',
"url": f"{absolute_path}/{test_files}/test_utf16.csv.zip",
"provider": {"storage": "local"},
}
expected_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"header1": {"type": ["string", "null"]},
"header2": {"type": ["number", "null"]},
"header3": {"type": ["number", "null"]},
"header4": {"type": ["boolean", "null"]},
"header5": {"type": ["string", "null"], "format": "date-time"},
},
"type": "object",
}
catalog = SourceFile().discover(logger=logger, config=config_local_zipped_csv_utf16)
stream = next(iter(catalog.streams))
assert stream.json_schema == expected_schema
def get_catalog(properties):
return ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=AirbyteStream(
name="test",
json_schema={"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": properties},
supported_sync_modes=[SyncMode.full_refresh],
),
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.overwrite,
)
]
)
def test_nan_to_null(absolute_path, test_files):
"""make sure numpy.nan converted to None"""
config = {
"dataset_name": "test",
"format": "csv",
"reader_options": json.dumps({"sep": ";"}),
"url": f"{absolute_path}/{test_files}/test_nan.csv",
"provider": {"storage": "local"},
}
catalog = get_catalog(
{"col1": {"type": ["string", "null"]}, "col2": {"type": ["number", "null"]}, "col3": {"type": ["number", "null"]}}
)
source = SourceFile()
records = source.read(logger=logger, config=deepcopy(config), catalog=catalog)
records = [r.record.data for r in records if r.type == MessageType.RECORD]
assert records == [
{"col1": "key1", "col2": 1.11, "col3": None},
{"col1": "key2", "col2": None, "col3": 2.22},
{"col1": "key3", "col2": None, "col3": None},
{"col1": "key4", "col2": 3.33, "col3": None},
]
config.update({"format": "yaml", "url": f"{absolute_path}/{test_files}/formats/yaml/demo.yaml"})
records = source.read(logger=logger, config=deepcopy(config), catalog=catalog)
records = [r.record.data for r in records if r.type == MessageType.RECORD]
assert records == []
config.update({"provider": {"storage": "SSH", "user": "user", "host": "host"}})
with pytest.raises(Exception):
for record in source.read(logger=logger, config=config, catalog=catalog):
pass
def test_spec(source):
spec = source.spec(None)
assert isinstance(spec, ConnectorSpecification)
def test_check(source, config):
expected = AirbyteConnectionStatus(status=Status.SUCCEEDED)
actual = source.check(logger=logger, config=config)
assert actual == expected
def test_check_invalid_config(source, invalid_config):
expected = AirbyteConnectionStatus(status=Status.FAILED)
actual = source.check(logger=logger, config=invalid_config)
assert actual.status == expected.status
def test_check_invalid_reader_options(source, invalid_reader_options_config):
with pytest.raises(AirbyteTracedException, match="Field 'reader_options' is not a valid JSON object. Please provide key-value pairs"):
source.check(logger=logger, config=invalid_reader_options_config)
def test_discover_dropbox_link(source, config_dropbox_link):
source.discover(logger=logger, config=config_dropbox_link)
def test_discover(source, config, client):
catalog = source.discover(logger=logger, config=config)
catalog = AirbyteMessage(type=Type.CATALOG, catalog=catalog)
schemas = [stream.json_schema for stream in catalog.catalog.streams]
for schema in schemas:
jsonschema.Draft7Validator.check_schema(schema)
def test_check_wrong_reader_options(source, config):
config["reader_options"] = '{encoding":"utf_16"}'
with pytest.raises(AirbyteTracedException, match="Field 'reader_options' is not valid JSON object. https://www.json.org/"):
source.check(logger=logger, config=config)
def test_check_google_spreadsheets_url(source, config):
config["url"] = "https://docs.google.com/spreadsheets/d/"
with pytest.raises(
AirbyteTracedException,
match="Failed to load https://docs.google.com/spreadsheets/d/: please use the Official Google Sheets Source connector",
):
source.check(logger=logger, config=config)
def test_pandas_header_not_none(absolute_path, test_files):
config = {
"dataset_name": "test",
"format": "csv",
"reader_options": json.dumps({}),
"url": f"{absolute_path}/{test_files}/test_no_header.csv",
"provider": {"storage": "local"},
}
catalog = get_catalog({"text11": {"type": ["string", "null"]}, "text12": {"type": ["string", "null"]}})
source = SourceFile()
records = source.read(logger=logger, config=deepcopy(config), catalog=catalog)
records = [r.record.data for r in records if r.type == MessageType.RECORD]
assert records == [
{"text11": "text21", "text12": "text22"},
]
def test_pandas_header_none(absolute_path, test_files):
config = {
"dataset_name": "test",
"format": "csv",
"reader_options": json.dumps({"header": None}),
"url": f"{absolute_path}/{test_files}/test_no_header.csv",
"provider": {"storage": "local"},
}
catalog = get_catalog({"0": {"type": ["string", "null"]}, "1": {"type": ["string", "null"]}})
source = SourceFile()
records = source.read(logger=logger, config=deepcopy(config), catalog=catalog)
records = [r.record.data for r in records if r.type == MessageType.RECORD]
assert records == [
{"0": "text11", "1": "text12"},
{"0": "text21", "1": "text22"},
]
def test_incorrect_reader_options(absolute_path, test_files):
config = {
"dataset_name": "test",
"format": "csv",
"reader_options": json.dumps({"sep": "4", "nrows": 20}),
"url": f"{absolute_path}/{test_files}/test_parser_error.csv",
"provider": {"storage": "local"},
}
source = SourceFile()
with pytest.raises(
AirbyteTracedException,
match="can not be parsed. Please check your reader_options. https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html",
):
_ = source.discover(logger=logger, config=deepcopy(config))
with pytest.raises(
AirbyteTracedException,
match="can not be parsed. Please check your reader_options. https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html",
):
catalog = get_catalog({"0": {"type": ["string", "null"]}, "1": {"type": ["string", "null"]}})
records = source.read(logger=logger, config=deepcopy(config), catalog=catalog)
records = [r.record.data for r in records if r.type == MessageType.RECORD]