File-based CDK: ensure no errors in Sentry given empty CSV (#29944)
This commit is contained in:
committed by
GitHub
parent
636da8ecf2
commit
399b4d1fca
@@ -11,6 +11,7 @@ from functools import partial
|
||||
from io import IOBase
|
||||
from typing import Any, Callable, Dict, Generator, Iterable, List, Mapping, Optional, Set
|
||||
|
||||
from airbyte_cdk.models import FailureType
|
||||
from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat, CsvHeaderAutogenerated, CsvHeaderUserProvided, InferenceType
|
||||
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
|
||||
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, RecordParseError
|
||||
@@ -18,6 +19,7 @@ from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFile
|
||||
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
|
||||
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
|
||||
from airbyte_cdk.sources.file_based.schema_helpers import TYPE_PYTHON_MAPPING, SchemaType
|
||||
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
|
||||
|
||||
DIALECT_NAME = "_config_dialect"
|
||||
|
||||
@@ -141,6 +143,12 @@ class CsvParser(FileTypeParser):
|
||||
if read_bytes >= self._MAX_BYTES_PER_FILE_FOR_SCHEMA_INFERENCE:
|
||||
break
|
||||
|
||||
if not type_inferrer_by_field:
|
||||
raise AirbyteTracedException(
|
||||
message=f"Could not infer schema as there are no rows in {file.uri}. If having an empty CSV file is expected, ignore this. "
|
||||
f"Else, please contact Airbyte.",
|
||||
failure_type=FailureType.config_error,
|
||||
)
|
||||
schema = {header.strip(): {"type": type_inferred.infer()} for header, type_inferred in type_inferrer_by_field.items()}
|
||||
data_generator.close()
|
||||
return schema
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
#
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
@@ -15,11 +14,3 @@ class RemoteFile(BaseModel):
|
||||
|
||||
uri: str
|
||||
last_modified: datetime
|
||||
|
||||
def extension_agrees_with_file_type(self, file_type: Optional[str]) -> bool:
|
||||
extensions = self.uri.split(".")[1:]
|
||||
if not extensions:
|
||||
return True
|
||||
if not file_type:
|
||||
return True
|
||||
return any(file_type.casefold() in e.casefold() for e in extensions)
|
||||
|
||||
@@ -13,6 +13,7 @@ from unittest import TestCase, mock
|
||||
from unittest.mock import Mock
|
||||
|
||||
import pytest
|
||||
from airbyte_cdk.models import FailureType
|
||||
from airbyte_cdk.sources.file_based.config.csv_format import (
|
||||
DEFAULT_FALSE_VALUES,
|
||||
DEFAULT_TRUE_VALUES,
|
||||
@@ -26,6 +27,7 @@ from airbyte_cdk.sources.file_based.exceptions import RecordParseError
|
||||
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader, FileReadMode
|
||||
from airbyte_cdk.sources.file_based.file_types.csv_parser import CsvParser, _CsvReader
|
||||
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
|
||||
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
|
||||
|
||||
PROPERTY_TYPES = {
|
||||
"col1": "null",
|
||||
@@ -169,7 +171,7 @@ class SchemaInferenceTestCase(TestCase):
|
||||
self._config.get_input_schema.return_value = None
|
||||
self._config.format = self._config_format
|
||||
|
||||
self._file = Mock(spec=RemoteFile)
|
||||
self._file = RemoteFile(uri="a uri", last_modified=datetime.now())
|
||||
self._stream_reader = Mock(spec=AbstractFileBasedStreamReader)
|
||||
self._logger = Mock(spec=logging.Logger)
|
||||
self._csv_reader = Mock(spec=_CsvReader)
|
||||
@@ -222,6 +224,12 @@ class SchemaInferenceTestCase(TestCase):
|
||||
# since the type is number, we know the string at the end was not considered
|
||||
assert inferred_schema == {self._HEADER_NAME: {"type": "number"}}
|
||||
|
||||
def test_given_empty_csv_file_when_infer_schema_then_raise_config_error(self) -> None:
|
||||
self._csv_reader.read_data.return_value = []
|
||||
with pytest.raises(AirbyteTracedException) as exception:
|
||||
self._infer_schema()
|
||||
assert exception.value.failure_type == FailureType.config_error
|
||||
|
||||
def _test_infer_schema(self, rows: List[str], expected_type: str) -> None:
|
||||
self._csv_reader.read_data.return_value = ({self._HEADER_NAME: row} for row in rows)
|
||||
inferred_schema = self._infer_schema()
|
||||
@@ -260,7 +268,7 @@ class CsvReaderTest(unittest.TestCase):
|
||||
self._config.name = self._CONFIG_NAME
|
||||
self._config.format = self._config_format
|
||||
|
||||
self._file = Mock(spec=RemoteFile)
|
||||
self._file = RemoteFile(uri="a uri", last_modified=datetime.now())
|
||||
self._stream_reader = Mock(spec=AbstractFileBasedStreamReader)
|
||||
self._logger = Mock(spec=logging.Logger)
|
||||
self._csv_reader = _CsvReader()
|
||||
|
||||
Reference in New Issue
Block a user