🎉 Source File: Convert 'nan' to 'null' (#15768)
Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>
This commit is contained in:
@@ -17,5 +17,5 @@ COPY source_file ./source_file
|
||||
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
|
||||
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
|
||||
|
||||
LABEL io.airbyte.version=0.2.18
|
||||
LABEL io.airbyte.version=0.2.19
|
||||
LABEL io.airbyte.name=airbyte/source-file
|
||||
|
||||
@@ -0,0 +1,5 @@
|
||||
col1;col2;col3
|
||||
key1;1.11;
|
||||
key2;;2.22
|
||||
key3;;
|
||||
key4;3.33;
|
||||
|
@@ -13,6 +13,7 @@ from urllib.parse import urlparse
|
||||
import boto3
|
||||
import botocore
|
||||
import google
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import smart_open
|
||||
from airbyte_cdk.entrypoint import logger
|
||||
@@ -357,7 +358,7 @@ class Client:
|
||||
fp = self._cache_stream(fp)
|
||||
for df in self.load_dataframes(fp):
|
||||
columns = fields.intersection(set(df.columns)) if fields else df.columns
|
||||
df = df.where(pd.notnull(df), None)
|
||||
df.replace({np.nan: None}, inplace=True)
|
||||
yield from df[list(columns)].to_dict(orient="records")
|
||||
|
||||
def _cache_stream(self, fp):
|
||||
|
||||
@@ -3,9 +3,10 @@
|
||||
#
|
||||
|
||||
|
||||
import logging
|
||||
import traceback
|
||||
from datetime import datetime
|
||||
from typing import Generator, Iterable, Mapping
|
||||
from typing import Any, Iterable, Iterator, Mapping, MutableMapping
|
||||
|
||||
from airbyte_cdk import AirbyteLogger
|
||||
from airbyte_cdk.models import (
|
||||
@@ -108,8 +109,12 @@ class SourceFile(Source):
|
||||
return AirbyteCatalog(streams=streams)
|
||||
|
||||
def read(
|
||||
self, logger: AirbyteLogger, config: Mapping, catalog: ConfiguredAirbyteCatalog, state_path: Mapping[str, any]
|
||||
) -> Generator[AirbyteMessage, None, None]:
|
||||
self,
|
||||
logger: logging.Logger,
|
||||
config: Mapping[str, Any],
|
||||
catalog: ConfiguredAirbyteCatalog,
|
||||
state: MutableMapping[str, Any] = None,
|
||||
) -> Iterator[AirbyteMessage]:
|
||||
"""Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, catalog, and state."""
|
||||
client = self._get_client(config)
|
||||
fields = self.selected_fields(catalog)
|
||||
|
||||
@@ -2,9 +2,11 @@
|
||||
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
import json
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from airbyte_cdk.models import AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, DestinationSyncMode, SyncMode
|
||||
from source_file.source import SourceFile
|
||||
|
||||
HERE = Path(__file__).parent.absolute()
|
||||
@@ -33,3 +35,47 @@ def test_csv_with_utf16_encoding():
|
||||
catalog = SourceFile().discover(logger=logging.getLogger("airbyte"), config=config_local_csv_utf16)
|
||||
stream = next(iter(catalog.streams))
|
||||
assert stream.json_schema == expected_schema
|
||||
|
||||
|
||||
def get_catalog(properties):
|
||||
return ConfiguredAirbyteCatalog(
|
||||
streams=[
|
||||
ConfiguredAirbyteStream(
|
||||
stream=AirbyteStream(
|
||||
name="test",
|
||||
json_schema={"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": properties},
|
||||
),
|
||||
sync_mode=SyncMode.full_refresh,
|
||||
destination_sync_mode=DestinationSyncMode.overwrite,
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def test_nan_to_null():
|
||||
"""make sure numpy.nan converted to None"""
|
||||
config = {
|
||||
"dataset_name": "test",
|
||||
"format": "csv",
|
||||
"reader_options": json.dumps({"sep": ";"}),
|
||||
"url": f"{HERE}/../integration_tests/sample_files/test_nan.csv",
|
||||
"provider": {"storage": "local"},
|
||||
}
|
||||
|
||||
catalog = get_catalog(
|
||||
{
|
||||
"col1": {"type": ["string", "null"]},
|
||||
"col2": {"type": ["number", "null"]},
|
||||
"col3": {"type": ["number", "null"]},
|
||||
}
|
||||
)
|
||||
|
||||
source = SourceFile()
|
||||
records = source.read(logger=logging.getLogger("airbyte"), config=config, catalog=catalog)
|
||||
records = [r.record.data for r in records]
|
||||
assert records == [
|
||||
{"col1": "key1", "col2": 1.11, "col3": None},
|
||||
{"col1": "key2", "col2": None, "col3": 2.22},
|
||||
{"col1": "key3", "col2": None, "col3": None},
|
||||
{"col1": "key4", "col2": 3.33, "col3": None},
|
||||
]
|
||||
|
||||
Reference in New Issue
Block a user