diff --git a/.github/workflows/gradle.yml b/.github/workflows/gradle.yml index fe1e6d038b9..bb9e161bbee 100644 --- a/.github/workflows/gradle.yml +++ b/.github/workflows/gradle.yml @@ -162,6 +162,13 @@ jobs: name: "Airbyte CDK: Build" timeout-minutes: 90 steps: + - name: Checkout master + uses: actions/checkout@v3 + # We checkout master to run mypy the CDK files that were modified in the PR. + # An alternative would be to use a GH action to detect the files and pass them as argument to the build command + # But the additional complexity of detecting the files and passing them to the script through the gradle command doesn't seem worth worth it. + with: + ref: master - name: Checkout Airbyte uses: actions/checkout@v3 @@ -251,6 +258,13 @@ jobs: name: "Connectors Base: Build" timeout-minutes: 90 steps: + - name: Checkout master + uses: actions/checkout@v3 + # We checkout master to run mypy the CDK files that were modified in the PR. + # An alternative would be to use a GH action to detect the files and pass them as argument to the build command + # But the additional complexity of detecting the files and passing them to the script through the gradle command doesn't seem worth worth it. + with: + ref: master - name: Checkout Airbyte uses: actions/checkout@v3 with: diff --git a/airbyte-cdk/python/.mypy.ini b/airbyte-cdk/python/.mypy.ini deleted file mode 100644 index b255df24bae..00000000000 --- a/airbyte-cdk/python/.mypy.ini +++ /dev/null @@ -1,4 +0,0 @@ -[mypy] -# The jsonschema package does not have signature definitions checked into typeshed. -ignore_missing_imports = True - diff --git a/airbyte-cdk/python/README.md b/airbyte-cdk/python/README.md index e1789af3f6b..214bdb44578 100644 --- a/airbyte-cdk/python/README.md +++ b/airbyte-cdk/python/README.md @@ -63,7 +63,8 @@ pip install -e ".[dev]" # [dev] installs development-only dependencies * Iterate on the code locally * Run tests via `python -m pytest -s unit_tests` -* Perform static type checks using `mypy airbyte_cdk`. `MyPy` configuration is in `.mypy.ini`. +* Perform static type checks using `mypy airbyte_cdk`. `MyPy` configuration is in `mypy.ini`. + * Run `mypy ` to only check specific files. This is useful as the CDK still contains code that is not compliant. * The `type_check_and_test.sh` script bundles both type checking and testing in one convenient command. Feel free to use it! ##### Autogenerated files diff --git a/airbyte-cdk/python/airbyte_cdk/models/__init__.py b/airbyte-cdk/python/airbyte_cdk/models/__init__.py index e3127e1d519..b0ecf17e6ce 100644 --- a/airbyte-cdk/python/airbyte_cdk/models/__init__.py +++ b/airbyte-cdk/python/airbyte_cdk/models/__init__.py @@ -5,5 +5,58 @@ # are just wrappers on top of that stand-alone package which do some namespacing magic # to make the airbyte_protocol python classes available to the airbyte-cdk consumer as part # of airbyte-cdk rather than a standalone package. -from .airbyte_protocol import * -from .well_known_types import * +from .airbyte_protocol import ( + AdvancedAuth, + AirbyteCatalog, + AirbyteConnectionStatus, + AirbyteControlConnectorConfigMessage, + AirbyteControlMessage, + AirbyteErrorTraceMessage, + AirbyteEstimateTraceMessage, + AirbyteGlobalState, + AirbyteLogMessage, + AirbyteMessage, + AirbyteProtocol, + AirbyteRecordMessage, + AirbyteStateBlob, + AirbyteStateMessage, + AirbyteStateType, + AirbyteStream, + AirbyteStreamState, + AirbyteStreamStatus, + AirbyteStreamStatusTraceMessage, + AirbyteTraceMessage, + AuthFlowType, + AuthSpecification, + AuthType, + ConfiguredAirbyteCatalog, + ConfiguredAirbyteStream, + ConnectorSpecification, + DestinationSyncMode, + EstimateType, + FailureType, + Level, + OAuth2Specification, + OAuthConfigSpecification, + OrchestratorType, + Status, + StreamDescriptor, + SyncMode, + TraceType, + Type, +) +from .well_known_types import ( + BinaryData, + Boolean, + Date, + Integer, + IntegerEnum, + Model, + Number, + NumberEnum, + String, + TimestampWithoutTimezone, + TimestampWithTimezone, + TimeWithoutTimezone, + TimeWithTimezone, +) diff --git a/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py b/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py index e634c18acd6..74639c8bf3c 100644 --- a/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py +++ b/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py @@ -2,4 +2,4 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # -from airbyte_protocol.models.airbyte_protocol import * +from airbyte_protocol.models import * diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/config/file_based_stream_config.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/config/file_based_stream_config.py index 34d8a451bd4..51ff650211d 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/config/file_based_stream_config.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/config/file_based_stream_config.py @@ -44,7 +44,7 @@ class CsvFormat(BaseModel): double_quote: bool = Field( title="Double Quote", default=True, description="Whether two quotes in a quoted CSV value denote a single quote in the data." ) - quoting_behavior: Optional[QuotingBehavior] = Field( + quoting_behavior: QuotingBehavior = Field( title="Quoting Behavior", default=QuotingBehavior.QUOTE_SPECIAL_CHARACTERS, description="The quoting behavior determines when a value in a row should have quote marks added around it. For example, if Quote Non-numeric is specified, while reading, quotes are expected for row values that do not contain numbers. Or for Quote All, every row value will be expecting quotes.", @@ -54,7 +54,7 @@ class CsvFormat(BaseModel): # of using pyarrow @validator("delimiter") - def validate_delimiter(cls, v): + def validate_delimiter(cls, v: str) -> str: if len(v) != 1: raise ValueError("delimiter should only be one character") if v in {"\r", "\n"}: @@ -62,19 +62,19 @@ class CsvFormat(BaseModel): return v @validator("quote_char") - def validate_quote_char(cls, v): + def validate_quote_char(cls, v: str) -> str: if len(v) != 1: raise ValueError("quote_char should only be one character") return v @validator("escape_char") - def validate_escape_char(cls, v): + def validate_escape_char(cls, v: str) -> str: if len(v) != 1: raise ValueError("escape_char should only be one character") return v @validator("encoding") - def validate_encoding(cls, v): + def validate_encoding(cls, v: str) -> str: try: codecs.lookup(v) except LookupError: @@ -116,13 +116,13 @@ class FileBasedStreamConfig(BaseModel): ) @validator("file_type", pre=True) - def validate_file_type(cls, v): + def validate_file_type(cls, v: str) -> str: if v not in VALID_FILE_TYPES: raise ValueError(f"Format filetype {v} is not a supported file type") return v @validator("format", pre=True) - def transform_format(cls, v): + def transform_format(cls, v: Mapping[str, str]) -> Any: if isinstance(v, Mapping): file_type = v.get("filetype", "") if file_type: @@ -132,6 +132,8 @@ class FileBasedStreamConfig(BaseModel): return v @validator("input_schema", pre=True) - def transform_input_schema(cls, v): + def transform_input_schema(cls, v: Optional[Union[str, Mapping[str, Any]]]) -> Optional[Mapping[str, Any]]: if v: return type_mapping_to_jsonschema(v) + else: + return None diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/default_file_based_availability_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/default_file_based_availability_strategy.py index 6b051f0b1af..104ec66aae4 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/default_file_based_availability_strategy.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/default_file_based_availability_strategy.py @@ -13,15 +13,14 @@ from airbyte_cdk.sources.file_based.remote_file import RemoteFile from airbyte_cdk.sources.file_based.schema_helpers import conforms_to_schema from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy +from airbyte_cdk.sources.streams.core import Stream class DefaultFileBasedAvailabilityStrategy(AvailabilityStrategy): def __init__(self, stream_reader: AbstractFileBasedStreamReader): self.stream_reader = stream_reader - def check_availability( - self, stream: AbstractFileBasedStream, logger: logging.Logger, _: Optional[Source] - ) -> Tuple[bool, Optional[str]]: + def check_availability(self, stream: Stream, logger: logging.Logger, _: Optional[Source]) -> Tuple[bool, Optional[str]]: """ Perform a connection check for the stream. @@ -38,6 +37,8 @@ class DefaultFileBasedAvailabilityStrategy(AvailabilityStrategy): - If the user provided a schema in the config, check that a subset of records in one file conform to the schema via a call to stream.conforms_to_schema(schema). """ + if not isinstance(stream, AbstractFileBasedStream): + raise ValueError(f"Stream {stream.name} is not a file-based stream.") try: files = self._check_list_files(stream) self._check_parse_record(stream, files[0], logger) @@ -60,7 +61,7 @@ class DefaultFileBasedAvailabilityStrategy(AvailabilityStrategy): return files - def _check_parse_record(self, stream: AbstractFileBasedStream, file: RemoteFile, logger: logging.Logger): + def _check_parse_record(self, stream: AbstractFileBasedStream, file: RemoteFile, logger: logging.Logger) -> None: parser = stream.get_parser(stream.config.file_type) try: @@ -75,7 +76,7 @@ class DefaultFileBasedAvailabilityStrategy(AvailabilityStrategy): schema = stream.catalog_schema or stream.config.input_schema if schema and stream.validation_policy.validate_schema_before_sync: - if not conforms_to_schema(record, schema): + if not conforms_to_schema(record, schema): # type: ignore raise CheckAvailabilityError( FileBasedSourceError.ERROR_VALIDATING_RECORD, stream=stream.name, diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/exceptions.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/exceptions.py index a5829bc8434..ba82603eb9c 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/exceptions.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/exceptions.py @@ -37,7 +37,7 @@ class FileBasedSourceError(Enum): class BaseFileBasedSourceError(Exception): - def __init__(self, error: FileBasedSourceError, **kwargs): + def __init__(self, error: FileBasedSourceError, **kwargs): # type: ignore # noqa super().__init__( f"{FileBasedSourceError(error).value} Contact Support if you need assistance.\n{' '.join([f'{k}={v}' for k, v in kwargs.items()])}" ) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_based_source.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_based_source.py index 0d175a96dc7..ec270a3b7c7 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_based_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_based_source.py @@ -5,10 +5,9 @@ import logging import traceback from abc import ABC -from typing import Any, Dict, List, Mapping, Optional, Tuple, Type +from typing import Any, List, Mapping, Optional, Tuple, Type -from airbyte_cdk.models import ConfiguredAirbyteCatalog -from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification +from airbyte_cdk.models import ConfiguredAirbyteCatalog, ConnectorSpecification from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig @@ -21,6 +20,7 @@ from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeP from airbyte_cdk.sources.file_based.schema_validation_policies import DEFAULT_SCHEMA_VALIDATION_POLICIES, AbstractSchemaValidationPolicy from airbyte_cdk.sources.file_based.stream import AbstractFileBasedStream, DefaultFileBasedStream from airbyte_cdk.sources.file_based.stream.cursor.default_file_based_cursor import DefaultFileBasedCursor +from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy from pydantic.error_wrappers import ValidationError @@ -35,15 +35,15 @@ class FileBasedSource(AbstractSource, ABC): availability_strategy: Optional[AvailabilityStrategy], spec_class: Type[AbstractFileBasedSpec], discovery_policy: AbstractDiscoveryPolicy = DefaultDiscoveryPolicy(), - parsers: Dict[str, FileTypeParser] = None, - validation_policies: Dict[str, AbstractSchemaValidationPolicy] = DEFAULT_SCHEMA_VALIDATION_POLICIES, + parsers: Mapping[str, FileTypeParser] = default_parsers, + validation_policies: Mapping[str, AbstractSchemaValidationPolicy] = DEFAULT_SCHEMA_VALIDATION_POLICIES, max_history_size: int = DEFAULT_MAX_HISTORY_SIZE, ): self.stream_reader = stream_reader self.availability_strategy = availability_strategy or DefaultFileBasedAvailabilityStrategy(stream_reader) self.spec_class = spec_class self.discovery_policy = discovery_policy - self.parsers = parsers or default_parsers + self.parsers = parsers self.validation_policies = validation_policies self.stream_schemas = {s.stream.name: s.stream.json_schema for s in catalog.streams} if catalog else {} self.max_history_size = max_history_size @@ -70,6 +70,8 @@ class FileBasedSource(AbstractSource, ABC): errors = [] for stream in streams: + if not isinstance(stream, AbstractFileBasedStream): + raise ValueError(f"Stream {stream} is not a file-based stream.") try: ( stream_is_available, @@ -78,18 +80,18 @@ class FileBasedSource(AbstractSource, ABC): except Exception: errors.append(f"Unable to connect to stream {stream} - {''.join(traceback.format_exc())}") else: - if not stream_is_available: + if not stream_is_available and reason: errors.append(reason) return not bool(errors), (errors or None) - def streams(self, config: Mapping[str, Any]) -> List[AbstractFileBasedStream]: + def streams(self, config: Mapping[str, Any]) -> List[Stream]: """ Return a list of this source's streams. """ try: parsed_config = self.spec_class(**config) - streams = [] + streams: List[Stream] = [] for stream_config in parsed_config.streams: self._validate_input_schema(stream_config) streams.append( @@ -119,13 +121,13 @@ class FileBasedSource(AbstractSource, ABC): connectionSpecification=self.spec_class.schema(), ) - def _validate_and_get_validation_policy(self, stream_config: FileBasedStreamConfig): + def _validate_and_get_validation_policy(self, stream_config: FileBasedStreamConfig) -> AbstractSchemaValidationPolicy: if stream_config.validation_policy not in self.validation_policies: raise ValidationError( f"`validation_policy` must be one of {list(self.validation_policies.keys())}", model=FileBasedStreamConfig ) return self.validation_policies[stream_config.validation_policy] - def _validate_input_schema(self, stream_config: FileBasedStreamConfig): + def _validate_input_schema(self, stream_config: FileBasedStreamConfig) -> None: if stream_config.schemaless and stream_config.input_schema: raise ValidationError("`input_schema` and `schemaless` options cannot both be set", model=FileBasedStreamConfig) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/__init__.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/__init__.py index b9e472915b1..80922439a45 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/__init__.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/__init__.py @@ -1,9 +1,12 @@ +from typing import Mapping + from .avro_parser import AvroParser from .csv_parser import CsvParser +from .file_type_parser import FileTypeParser from .jsonl_parser import JsonlParser from .parquet_parser import ParquetParser -default_parsers = { +default_parsers: Mapping[str, FileTypeParser] = { "avro": AvroParser(), "csv": CsvParser(), "jsonl": JsonlParser(), diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/avro_parser.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/avro_parser.py index 338ada88017..546e928d568 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/avro_parser.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/avro_parser.py @@ -19,7 +19,7 @@ class AvroParser(FileTypeParser): stream_reader: AbstractFileBasedStreamReader, logger: logging.Logger, ) -> Dict[str, Any]: - ... + raise NotImplementedError() def parse_records( self, @@ -28,4 +28,4 @@ class AvroParser(FileTypeParser): stream_reader: AbstractFileBasedStreamReader, logger: logging.Logger, ) -> Iterable[Dict[str, Any]]: - ... + raise NotImplementedError() diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/csv_parser.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/csv_parser.py index ad297d2afc0..5aca4a82fca 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/csv_parser.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/csv_parser.py @@ -17,7 +17,7 @@ from airbyte_cdk.sources.file_based.schema_helpers import TYPE_PYTHON_MAPPING DIALECT_NAME = "_config_dialect" -config_to_quoting: [QuotingBehavior, int] = { +config_to_quoting: Mapping[QuotingBehavior, int] = { QuotingBehavior.QUOTE_ALL: csv.QUOTE_ALL, QuotingBehavior.QUOTE_SPECIAL_CHARACTERS: csv.QUOTE_MINIMAL, QuotingBehavior.QUOTE_NONNUMERIC: csv.QUOTE_NONNUMERIC, @@ -47,13 +47,13 @@ class CsvParser(FileTypeParser): with stream_reader.open_file(file) as fp: # todo: the existing InMemoryFilesSource.open_file() test source doesn't currently require an encoding, but actual # sources will likely require one. Rather than modify the interface now we can wait until the real use case - reader = csv.DictReader(fp, dialect=dialect_name) + reader = csv.DictReader(fp, dialect=dialect_name) # type: ignore schema = {field.strip(): {"type": "string"} for field in next(reader)} csv.unregister_dialect(dialect_name) return schema else: with stream_reader.open_file(file) as fp: - reader = csv.DictReader(fp) + reader = csv.DictReader(fp) # type: ignore return {field.strip(): {"type": "string"} for field in next(reader)} def parse_records( @@ -63,7 +63,7 @@ class CsvParser(FileTypeParser): stream_reader: AbstractFileBasedStreamReader, logger: logging.Logger, ) -> Iterable[Dict[str, Any]]: - schema = config.input_schema + schema: Mapping[str, Any] = config.input_schema # type: ignore config_format = config.format.get(config.file_type) if config.format else None if config_format: # Formats are configured individually per-stream so a unique dialect should be registered for each stream. @@ -80,16 +80,16 @@ class CsvParser(FileTypeParser): with stream_reader.open_file(file) as fp: # todo: the existing InMemoryFilesSource.open_file() test source doesn't currently require an encoding, but actual # sources will likely require one. Rather than modify the interface now we can wait until the real use case - reader = csv.DictReader(fp, dialect=dialect_name) + reader = csv.DictReader(fp, dialect=dialect_name) # type: ignore yield from self._read_and_cast_types(reader, schema, logger) else: with stream_reader.open_file(file) as fp: - reader = csv.DictReader(fp) + reader = csv.DictReader(fp) # type: ignore yield from self._read_and_cast_types(reader, schema, logger) @staticmethod def _read_and_cast_types( - reader: csv.DictReader, schema: Optional[Mapping[str, str]], logger: logging.Logger + reader: csv.DictReader, schema: Optional[Mapping[str, Any]], logger: logging.Logger # type: ignore ) -> Iterable[Dict[str, Any]]: """ If the user provided a schema, attempt to cast the record values to the associated type. @@ -120,9 +120,9 @@ def cast_types(row: Dict[str, str], property_types: Dict[str, Any], logger: logg for key, value in row.items(): prop_type = property_types.get(key) - cast_value = value + cast_value: Any = value - if prop_type in TYPE_PYTHON_MAPPING: + if prop_type in TYPE_PYTHON_MAPPING and prop_type is not None: _, python_type = TYPE_PYTHON_MAPPING[prop_type] if python_type is None: @@ -169,5 +169,5 @@ def cast_types(row: Dict[str, str], property_types: Dict[str, Any], logger: logg return result -def _format_warning(key: str, value: str, expected_type: str) -> str: +def _format_warning(key: str, value: str, expected_type: Optional[Any]) -> str: return f"{key}: value={value},expected_type={expected_type}" diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/jsonl_parser.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/jsonl_parser.py index ec5a06b8f0a..95dec35345b 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/jsonl_parser.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/jsonl_parser.py @@ -22,7 +22,7 @@ class JsonlParser(FileTypeParser): stream_reader: AbstractFileBasedStreamReader, logger: logging.Logger, ) -> Dict[str, Any]: - ... + raise NotImplementedError() def parse_records( self, @@ -31,4 +31,4 @@ class JsonlParser(FileTypeParser): stream_reader: AbstractFileBasedStreamReader, logger: logging.Logger, ) -> Iterable[Dict[str, Any]]: - ... + raise NotImplementedError() diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/remote_file.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/remote_file.py index 118033fc7d0..9dcf21684ee 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/remote_file.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/remote_file.py @@ -21,4 +21,6 @@ class RemoteFile(BaseModel): extensions = self.uri.split(".")[1:] if not extensions: return True + if not self.file_type: + return True return any(self.file_type.casefold() in e.casefold() for e in extensions) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/schema_helpers.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/schema_helpers.py index 61a445f64e8..20efe39a785 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/schema_helpers.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/schema_helpers.py @@ -6,11 +6,11 @@ import json from copy import deepcopy from enum import Enum from functools import total_ordering -from typing import Any, Dict, List, Literal, Mapping, Optional, Union +from typing import Any, Dict, List, Literal, Mapping, Optional, Tuple, Type, Union from airbyte_cdk.sources.file_based.exceptions import ConfigValidationError, FileBasedSourceError, SchemaInferenceError -JsonSchemaSupportedType = Union[List, Literal["string"], str] +JsonSchemaSupportedType = Union[List[str], Literal["string"], str] SchemaType = Dict[str, Dict[str, JsonSchemaSupportedType]] schemaless_schema = {"type": "object", "properties": {"data": {"type": "object"}}} @@ -25,14 +25,14 @@ class ComparableType(Enum): STRING = 4 OBJECT = 5 - def __lt__(self, other): + def __lt__(self, other: Any) -> bool: if self.__class__ is other.__class__: - return self.value < other.value + return self.value < other.value # type: ignore else: return NotImplemented -TYPE_PYTHON_MAPPING = { +TYPE_PYTHON_MAPPING: Mapping[str, Tuple[str, Optional[Type[Any]]]] = { "null": ("null", None), "array": ("array", list), "boolean": ("boolean", bool), @@ -44,7 +44,7 @@ TYPE_PYTHON_MAPPING = { } -def get_comparable_type(value: Any) -> ComparableType: +def get_comparable_type(value: Any) -> Optional[ComparableType]: if value == "null": return ComparableType.NULL if value == "boolean": @@ -57,9 +57,11 @@ def get_comparable_type(value: Any) -> ComparableType: return ComparableType.STRING if value == "object": return ComparableType.OBJECT + else: + return None -def get_inferred_type(value: Any) -> ComparableType: +def get_inferred_type(value: Any) -> Optional[ComparableType]: if value is None: return ComparableType.NULL if isinstance(value, bool): @@ -72,6 +74,8 @@ def get_inferred_type(value: Any) -> ComparableType: return ComparableType.STRING if isinstance(value, dict): return ComparableType.OBJECT + else: + return None def merge_schemas(schema1: SchemaType, schema2: SchemaType) -> SchemaType: @@ -91,10 +95,10 @@ def merge_schemas(schema1: SchemaType, schema2: SchemaType) -> SchemaType: and nothing else. """ for k, t in list(schema1.items()) + list(schema2.items()): - if not isinstance(t, dict) or not _is_valid_type(t.get("type")): + if not isinstance(t, dict) or "type" not in t or not _is_valid_type(t["type"]): raise SchemaInferenceError(FileBasedSourceError.UNRECOGNIZED_TYPE, key=k, type=t) - merged_schema = deepcopy(schema1) + merged_schema: Dict[str, Any] = deepcopy(schema1) for k2, t2 in schema2.items(): t1 = merged_schema.get(k2) if t1 is None: @@ -136,7 +140,7 @@ def _choose_wider_type(key: str, t1: Dict[str, Any], t2: Dict[str, Any]) -> Dict ) # accessing the type_mapping value -def is_equal_or_narrower_type(value: Any, expected_type: str): +def is_equal_or_narrower_type(value: Any, expected_type: str) -> bool: if isinstance(value, list): # We do not compare lists directly; the individual items are compared. # If we hit this condition, it means that the expected type is not @@ -151,7 +155,7 @@ def is_equal_or_narrower_type(value: Any, expected_type: str): return ComparableType(inferred_type) <= ComparableType(get_comparable_type(expected_type)) -def conforms_to_schema(record: Mapping[str, Any], schema: Mapping[str, str]) -> bool: +def conforms_to_schema(record: Mapping[str, Any], schema: Mapping[str, Any]) -> bool: """ Return true iff the record conforms to the supplied schema. @@ -185,9 +189,12 @@ def conforms_to_schema(record: Mapping[str, Any], schema: Mapping[str, str]) -> return True -def _parse_json_input(input_schema: Optional[Union[str, Dict[str, str]]]) -> Optional[Mapping[str, str]]: +def _parse_json_input(input_schema: Union[str, Mapping[str, str]]) -> Optional[Mapping[str, str]]: try: - schema = json.loads(input_schema) + if isinstance(input_schema, str): + schema: Mapping[str, str] = json.loads(input_schema) + else: + schema = input_schema if not all(isinstance(s, str) for s in schema.values()): raise ConfigValidationError( FileBasedSourceError.ERROR_PARSING_USER_PROVIDED_SCHEMA, details="Invalid input schema; nested schemas are not supported." @@ -199,7 +206,7 @@ def _parse_json_input(input_schema: Optional[Union[str, Dict[str, str]]]) -> Opt return schema -def type_mapping_to_jsonschema(input_schema: Optional[Union[str, Mapping[str, str]]]) -> Optional[Mapping[str, str]]: +def type_mapping_to_jsonschema(input_schema: Optional[Union[str, Mapping[str, str]]]) -> Optional[Mapping[str, Any]]: """ Return the user input schema (type mapping), transformed to JSON Schema format. diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/schema_validation_policies/abstract_schema_validation_policy.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/schema_validation_policies/abstract_schema_validation_policy.py index 4b64d74c7da..004139b78b1 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/schema_validation_policies/abstract_schema_validation_policy.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/schema_validation_policies/abstract_schema_validation_policy.py @@ -3,7 +3,7 @@ # from abc import ABC, abstractmethod -from typing import Any, Mapping +from typing import Any, Mapping, Optional class AbstractSchemaValidationPolicy(ABC): @@ -11,8 +11,8 @@ class AbstractSchemaValidationPolicy(ABC): validate_schema_before_sync = False # Whether to verify that records conform to the schema during the stream's availabilty check @abstractmethod - def record_passes_validation_policy(self, record: Mapping[str, Any], schema: Mapping[str, Any]) -> bool: + def record_passes_validation_policy(self, record: Mapping[str, Any], schema: Optional[Mapping[str, Any]]) -> bool: """ Return True if the record passes the user's validation policy. """ - ... + raise NotImplementedError() diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/schema_validation_policies/default_schema_validation_policies.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/schema_validation_policies/default_schema_validation_policies.py index 1877542342a..fa8ff1e0985 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/schema_validation_policies/default_schema_validation_policies.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/schema_validation_policies/default_schema_validation_policies.py @@ -2,7 +2,7 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # -from typing import Any, Mapping +from typing import Any, Mapping, Optional from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, StopSyncPerValidationPolicy from airbyte_cdk.sources.file_based.schema_helpers import conforms_to_schema @@ -12,23 +12,23 @@ from airbyte_cdk.sources.file_based.schema_validation_policies import AbstractSc class EmitRecordPolicy(AbstractSchemaValidationPolicy): name = "emit_record" - def record_passes_validation_policy(self, record: Mapping[str, Any], schema: Mapping[str, Any]) -> bool: + def record_passes_validation_policy(self, record: Mapping[str, Any], schema: Optional[Mapping[str, Any]]) -> bool: return True class SkipRecordPolicy(AbstractSchemaValidationPolicy): name = "skip_record" - def record_passes_validation_policy(self, record: Mapping[str, Any], schema: Mapping[str, Any]) -> bool: - return conforms_to_schema(record, schema) + def record_passes_validation_policy(self, record: Mapping[str, Any], schema: Optional[Mapping[str, Any]]) -> bool: + return schema is not None and conforms_to_schema(record, schema) class WaitForDiscoverPolicy(AbstractSchemaValidationPolicy): name = "wait_for_discover" validate_schema_before_sync = True - def record_passes_validation_policy(self, record: Mapping[str, Any], schema: Mapping[str, Any]) -> bool: - if not conforms_to_schema(record, schema): + def record_passes_validation_policy(self, record: Mapping[str, Any], schema: Optional[Mapping[str, Any]]) -> bool: + if schema is None or not conforms_to_schema(record, schema): raise StopSyncPerValidationPolicy(FileBasedSourceError.STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY) return True diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/abstract_file_based_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/abstract_file_based_stream.py index 9e880e713ad..fc510784dc3 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/abstract_file_based_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/abstract_file_based_stream.py @@ -3,7 +3,7 @@ # from abc import abstractmethod -from functools import cached_property +from functools import cached_property, lru_cache from typing import Any, Dict, Iterable, List, Mapping, Optional from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode @@ -14,7 +14,7 @@ from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFile from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser from airbyte_cdk.sources.file_based.remote_file import RemoteFile from airbyte_cdk.sources.file_based.schema_validation_policies import AbstractSchemaValidationPolicy -from airbyte_cdk.sources.file_based.types import StreamSlice, StreamState +from airbyte_cdk.sources.file_based.types import StreamSlice from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy @@ -69,15 +69,17 @@ class AbstractFileBasedStream(Stream): def read_records( self, sync_mode: SyncMode, - cursor_field: List[str] = None, + cursor_field: Optional[List[str]] = None, stream_slice: Optional[StreamSlice] = None, - stream_state: Optional[StreamState] = None, + stream_state: Optional[Mapping[str, Any]] = None, ) -> Iterable[Mapping[str, Any]]: """ Yield all records from all remote files in `list_files_for_this_sync`. This method acts as an adapter between the generic Stream interface and the file-based's stream since file-based streams manage their own states. """ + if stream_slice is None: + raise ValueError("stream_slice must be set") return self.read_records_from_slice(stream_slice) @abstractmethod @@ -88,7 +90,7 @@ class AbstractFileBasedStream(Stream): ... def stream_slices( - self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: StreamState = None + self, *, sync_mode: SyncMode, cursor_field: Optional[List[str]] = None, stream_state: Optional[Mapping[str, Any]] = None ) -> Iterable[Optional[Mapping[str, Any]]]: """ This method acts as an adapter between the generic Stream interface and the file-based's @@ -105,6 +107,7 @@ class AbstractFileBasedStream(Stream): ... @abstractmethod + @lru_cache(maxsize=None) def get_json_schema(self) -> Mapping[str, Any]: """ Return the JSON Schema for a stream. @@ -133,7 +136,7 @@ class AbstractFileBasedStream(Stream): ) @cached_property - def availability_strategy(self): + def availability_strategy(self) -> AvailabilityStrategy: return self._availability_strategy @property diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/cursor/default_file_based_cursor.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/cursor/default_file_based_cursor.py index 1646094487f..9d4ab4047c5 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/cursor/default_file_based_cursor.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/cursor/default_file_based_cursor.py @@ -4,7 +4,7 @@ import logging from datetime import datetime, timedelta -from typing import Iterable, Mapping, Optional +from typing import Iterable, MutableMapping, Optional from airbyte_cdk.sources.file_based.remote_file import RemoteFile from airbyte_cdk.sources.file_based.stream.cursor.file_based_cursor import FileBasedCursor @@ -16,7 +16,7 @@ class DefaultFileBasedCursor(FileBasedCursor): DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" def __init__(self, max_history_size: int, days_to_sync_if_history_is_full: Optional[int]): - self._file_to_datetime_history: Mapping[str:datetime] = {} + self._file_to_datetime_history: MutableMapping[str, str] = {} self._max_history_size = max_history_size self._time_window_if_history_is_full = timedelta( days=days_to_sync_if_history_is_full or self.DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL @@ -62,7 +62,7 @@ class DefaultFileBasedCursor(FileBasedCursor): def _should_sync_file(self, file: RemoteFile, logger: logging.Logger) -> bool: if file.uri in self._file_to_datetime_history: # If the file's uri is in the history, we should sync the file if it has been modified since it was synced - updated_at_from_history = datetime.strptime(self._file_to_datetime_history.get(file.uri), self.DATE_TIME_FORMAT) + updated_at_from_history = datetime.strptime(self._file_to_datetime_history[file.uri], self.DATE_TIME_FORMAT) if file.last_modified < updated_at_from_history: logger.warning( f"The file {file.uri}'s last modified date is older than the last time it was synced. This is unexpected. Skipping the file." @@ -71,6 +71,8 @@ class DefaultFileBasedCursor(FileBasedCursor): return file.last_modified > updated_at_from_history return file.last_modified > updated_at_from_history if self._is_history_full(): + if self._initial_earliest_file_in_history is None: + return True if file.last_modified > self._initial_earliest_file_in_history.last_modified: # If the history is partial and the file's datetime is strictly greater than the earliest file in the history, # we should sync it diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/cursor/file_based_cursor.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/cursor/file_based_cursor.py index 07aa10681f7..6e2fbbfd427 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/cursor/file_based_cursor.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/cursor/file_based_cursor.py @@ -5,7 +5,7 @@ import logging from abc import ABC, abstractmethod from datetime import datetime -from typing import Any, Iterable, Mapping +from typing import Any, Iterable, MutableMapping from airbyte_cdk.sources.file_based.remote_file import RemoteFile from airbyte_cdk.sources.file_based.types import StreamState @@ -32,7 +32,7 @@ class FileBasedCursor(ABC): """ @abstractmethod - def get_state(self) -> Mapping[str, Any]: + def get_state(self) -> MutableMapping[str, Any]: """ Get the state of the cursor. """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py index 67a17dddbd4..e94137f0aa4 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py @@ -6,9 +6,11 @@ import asyncio import itertools import traceback from functools import cache -from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Union +from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Set, Union -from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, Type +from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level +from airbyte_cdk.models import Type as MessageType +from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType from airbyte_cdk.sources.file_based.exceptions import ( FileBasedSourceError, InvalidSchemaError, @@ -36,7 +38,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin): ab_file_name_col = "_ab_source_file_url" airbyte_columns = [ab_last_mod_col, ab_file_name_col] - def __init__(self, cursor: FileBasedCursor, **kwargs): + def __init__(self, cursor: FileBasedCursor, **kwargs: Any): super().__init__(**kwargs) self._cursor = cursor @@ -45,12 +47,12 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin): return self._cursor.get_state() @state.setter - def state(self, value: MutableMapping[str, Any]): + def state(self, value: MutableMapping[str, Any]) -> None: """State setter, accept state serialized by state getter.""" self._cursor.set_initial_state(value) @property - def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: + def primary_key(self) -> PrimaryKeyType: return self.config.primary_key def compute_slices(self) -> Iterable[Optional[Mapping[str, Any]]]: @@ -93,7 +95,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin): except StopSyncPerValidationPolicy: yield AirbyteMessage( - type=Type.LOG, + type=MessageType.LOG, log=AirbyteLogMessage( level=Level.WARN, message=f"Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema. stream={self.name} file={file.uri} validation_policy={self.config.validation_policy} n_skipped={n_skipped}", @@ -103,7 +105,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin): except Exception: yield AirbyteMessage( - type=Type.LOG, + type=MessageType.LOG, log=AirbyteLogMessage( level=Level.ERROR, message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}", @@ -115,7 +117,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin): else: if n_skipped: yield AirbyteMessage( - type=Type.LOG, + type=MessageType.LOG, log=AirbyteLogMessage( level=Level.WARN, message=f"Records in file did not pass validation policy. stream={self.name} file={file.uri} n_skipped={n_skipped} validation_policy={self.validation_policy.name}", @@ -141,12 +143,11 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin): except Exception as exc: raise SchemaInferenceError(FileBasedSourceError.SCHEMA_INFERENCE_ERROR, stream=self.name) from exc else: - schema["properties"] = {**extra_fields, **schema["properties"]} - return schema + return {"type": "object", "properties": {**extra_fields, **schema["properties"]}} def _get_raw_json_schema(self) -> JsonSchema: if self.config.input_schema: - return self.config.input_schema + return self.config.input_schema # type: ignore elif self.config.schemaless: return schemaless_schema else: @@ -180,7 +181,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin): The output of this method is cached so we don't need to list the files more than once. This means we won't pick up changes to the files during a sync. """ - return list(self._stream_reader.get_matching_files(self.config.globs)) + return list(self._stream_reader.get_matching_files(self.config.globs or [])) def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: loop = asyncio.get_event_loop() @@ -193,13 +194,13 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin): Each file type has a corresponding `infer_schema` handler. Dispatch on file type. """ - base_schema: Dict[str, str] = {} - pending_tasks = set() + base_schema: Dict[str, Any] = {} + pending_tasks: Set[asyncio.tasks.Task[Dict[str, Any]]] = set() n_started, n_files = 0, len(files) - files = iter(files) + files_iterator = iter(files) while pending_tasks or n_started < n_files: - while len(pending_tasks) <= self._discovery_policy.n_concurrent_requests and (file := next(files, None)): + while len(pending_tasks) <= self._discovery_policy.n_concurrent_requests and (file := next(files_iterator, None)): pending_tasks.add(asyncio.create_task(self._infer_file_schema(file))) n_started += 1 # Return when the first task is completed so that we can enqueue a new task as soon as the @@ -210,7 +211,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin): return base_schema - async def _infer_file_schema(self, file: RemoteFile) -> Mapping[str, Any]: + async def _infer_file_schema(self, file: RemoteFile) -> Dict[str, Any]: try: return await self.get_parser(self.config.file_type).infer_schema(self.config, file, self._stream_reader, self.logger) except Exception as exc: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/types.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/types.py index 3e81516467b..b83bf37a37a 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/types.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/types.py @@ -4,7 +4,7 @@ from __future__ import annotations -from typing import Any, Mapping +from typing import Any, Mapping, MutableMapping StreamSlice = Mapping[str, Any] -StreamState = Mapping[str, Any] +StreamState = MutableMapping[str, Any] diff --git a/airbyte-cdk/python/airbyte_cdk/sources/utils/casing.py b/airbyte-cdk/python/airbyte_cdk/sources/utils/casing.py index bf0ac86b16c..806e077ae00 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/utils/casing.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/utils/casing.py @@ -7,6 +7,6 @@ import re # https://stackoverflow.com/a/1176023 -def camel_to_snake(s): +def camel_to_snake(s: str) -> str: s = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", s) return re.sub("([a-z0-9])([A-Z])", r"\1_\2", s).lower() diff --git a/airbyte-cdk/python/bin/run-mypy-on-modified-files.sh b/airbyte-cdk/python/bin/run-mypy-on-modified-files.sh new file mode 100755 index 00000000000..af4ed62f1be --- /dev/null +++ b/airbyte-cdk/python/bin/run-mypy-on-modified-files.sh @@ -0,0 +1,2 @@ +set -e +git diff --name-only --relative --diff-filter=d remotes/origin/master -- . | grep -E '\.py$' | xargs .venv/bin/python -m mypy --config-file mypy.ini --install-types --non-interactive \ No newline at end of file diff --git a/airbyte-cdk/python/build.gradle b/airbyte-cdk/python/build.gradle index 8e21b03a7f5..9624bcac580 100644 --- a/airbyte-cdk/python/build.gradle +++ b/airbyte-cdk/python/build.gradle @@ -23,7 +23,13 @@ task runLowCodeConnectorUnitTests(type: Exec) { commandLine 'bin/low-code-unit-tests.sh' } -blackFormat.dependsOn generateComponentManifestClassFiles +task runMypyOnModifiedFiles(type: Exec) { + environment 'ROOT_DIR', rootDir.absolutePath + commandLine 'bin/run-mypy-on-modified-files.sh' +} + +blackFormat.dependsOn runMypyOnModifiedFiles isortFormat.dependsOn generateComponentManifestClassFiles flakeCheck.dependsOn generateComponentManifestClassFiles installReqs.dependsOn generateComponentManifestClassFiles +runMypyOnModifiedFiles.dependsOn generateComponentManifestClassFiles \ No newline at end of file diff --git a/airbyte-cdk/python/mypy.ini b/airbyte-cdk/python/mypy.ini new file mode 100644 index 00000000000..f51c0846fb8 --- /dev/null +++ b/airbyte-cdk/python/mypy.ini @@ -0,0 +1,26 @@ +# Global options: + +[mypy] +warn_unused_configs = True +warn_redundant_casts = True +ignore_missing_imports = True +strict_equality = True +check_untyped_defs = True +disallow_untyped_decorators = False +disallow_any_generics = True +disallow_untyped_calls = True +disallow_incomplete_defs = True +disallow_untyped_defs = True +warn_return_any = True + +# Only alert on the files we want to check +follow_imports = silent + +# Allow re-exporting types for airbyte-protocol +no_implicit_reexport = False + +[tool.mypy] +plugins = ["pydantic.mypy", "pendulum", "pytest-mypy-plugins"] + +[mypy-airbyte_cdk.models] +ignore_errors = True diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index afe63bdb5cf..ead2ae2b4d8 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -69,7 +69,7 @@ setup( extras_require={ "dev": [ "freezegun", - "MyPy~=0.812", + "mypy", "pytest", "pytest-cov", "pytest-mock", diff --git a/airbyte-cdk/python/type_check_and_test.sh b/airbyte-cdk/python/type_check_and_test.sh index 93fccd7e1e1..37220d8f8e5 100755 --- a/airbyte-cdk/python/type_check_and_test.sh +++ b/airbyte-cdk/python/type_check_and_test.sh @@ -5,7 +5,7 @@ # Static Type Checking echo "Running MyPy to static check and test files." -mypy airbyte_cdk/ unit_tests/ +mypy airbyte_cdk/ unit_tests/ --config mypy.ini printf "\n" diff --git a/airbyte-cdk/python/unit_tests/sources/file_based/config/test_file_based_stream_config.py b/airbyte-cdk/python/unit_tests/sources/file_based/config/test_file_based_stream_config.py index 3391b495400..cb190b449ee 100644 --- a/airbyte-cdk/python/unit_tests/sources/file_based/config/test_file_based_stream_config.py +++ b/airbyte-cdk/python/unit_tests/sources/file_based/config/test_file_based_stream_config.py @@ -2,6 +2,8 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +from typing import Any, Mapping, Type + import pytest as pytest from airbyte_cdk.sources.file_based.config.file_based_stream_config import CsvFormat, FileBasedStreamConfig, QuotingBehavior from pydantic import ValidationError @@ -20,7 +22,7 @@ from pydantic import ValidationError pytest.param("invalid", {"filetype": "invalid", "double_quote": False}, {}, ValidationError, id="test_config_format_file_type_mismatch"), ] ) -def test_csv_config(file_type, input_format, expected_format, expected_error): +def test_csv_config(file_type: str, input_format: Mapping[str, Any], expected_format: Mapping[str, QuotingBehavior], expected_error: Type[Exception]) -> None: stream_config = { "name": "stream1", "file_type": file_type, @@ -36,13 +38,16 @@ def test_csv_config(file_type, input_format, expected_format, expected_error): FileBasedStreamConfig(**stream_config) else: actual_config = FileBasedStreamConfig(**stream_config) - assert not hasattr(actual_config.format[file_type], "filetype") - for expected_format_field, expected_format_value in expected_format.items(): - assert isinstance(actual_config.format[file_type], CsvFormat) - assert getattr(actual_config.format[file_type], expected_format_field) == expected_format_value + if actual_config.format is not None: + assert not hasattr(actual_config.format[file_type], "filetype") + for expected_format_field, expected_format_value in expected_format.items(): + assert isinstance(actual_config.format[file_type], CsvFormat) + assert getattr(actual_config.format[file_type], expected_format_field) == expected_format_value + else: + assert False, "Expected format to be set" -def test_legacy_format(): +def test_legacy_format() -> None: """ This test verifies that we can process the legacy format of the config object used by the existing S3 source with a single `format` option as opposed to the current file_type -> format mapping. @@ -73,6 +78,9 @@ def test_legacy_format(): } actual_config = FileBasedStreamConfig(**stream_config) - assert isinstance(actual_config.format["csv"], CsvFormat) - for expected_format_field, expected_format_value in expected_format.items(): - assert getattr(actual_config.format["csv"], expected_format_field) == expected_format_value + if actual_config.format: + assert isinstance(actual_config.format["csv"], CsvFormat) + for expected_format_field, expected_format_value in expected_format.items(): + assert getattr(actual_config.format["csv"], expected_format_field) == expected_format_value + else: + assert False, "Expected format to be set" diff --git a/airbyte-cdk/python/unit_tests/sources/file_based/helpers.py b/airbyte-cdk/python/unit_tests/sources/file_based/helpers.py index 1ee4e4e9a92..495ed3caba6 100644 --- a/airbyte-cdk/python/unit_tests/sources/file_based/helpers.py +++ b/airbyte-cdk/python/unit_tests/sources/file_based/helpers.py @@ -2,6 +2,7 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import logging from datetime import datetime from io import IOBase from typing import Any, Dict, List, Mapping, Optional @@ -16,13 +17,13 @@ from unit_tests.sources.file_based.in_memory_files_source import InMemoryFilesSt class EmptySchemaParser(CsvParser): - async def infer_schema(self, config: FileBasedStreamConfig, file: RemoteFile, stream_reader: AbstractFileBasedStreamReader) -> Dict[str, Any]: + async def infer_schema(self, config: FileBasedStreamConfig, file: RemoteFile, stream_reader: AbstractFileBasedStreamReader, logger: logging.Logger) -> Dict[str, Any]: return {} class LowInferenceLimitDiscoveryPolicy(DefaultDiscoveryPolicy): @property - def max_n_files_for_schema_inference(self): + def max_n_files_for_schema_inference(self) -> int: return 1 @@ -44,7 +45,7 @@ class FailingSchemaValidationPolicy(AbstractSchemaValidationPolicy): ALWAYS_FAIL = "always_fail" validate_schema_before_sync = True - def record_passes_validation_policy(self, record: Mapping[str, Any], schema: Mapping[str, Any]) -> bool: + def record_passes_validation_policy(self, record: Mapping[str, Any], schema: Optional[Mapping[str, Any]]) -> bool: return False diff --git a/airbyte-cdk/python/unit_tests/sources/file_based/in_memory_files_source.py b/airbyte-cdk/python/unit_tests/sources/file_based/in_memory_files_source.py index e5b2354d811..d7089de358d 100644 --- a/airbyte-cdk/python/unit_tests/sources/file_based/in_memory_files_source.py +++ b/airbyte-cdk/python/unit_tests/sources/file_based/in_memory_files_source.py @@ -7,7 +7,7 @@ import io import tempfile from datetime import datetime from io import IOBase -from typing import Any, Dict, Iterable, List, Optional +from typing import Any, Iterable, List, Mapping, Optional import pandas as pd import pyarrow as pa @@ -15,7 +15,7 @@ import pyarrow.parquet as pq from airbyte_cdk.models import ConfiguredAirbyteCatalog from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec from airbyte_cdk.sources.file_based.default_file_based_availability_strategy import DefaultFileBasedAvailabilityStrategy -from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy +from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy, DefaultDiscoveryPolicy from airbyte_cdk.sources.file_based.file_based_source import DEFAULT_MAX_HISTORY_SIZE, FileBasedSource from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser @@ -28,15 +28,15 @@ from pydantic import AnyUrl, Field class InMemoryFilesSource(FileBasedSource): def __init__( self, - files, - file_type, - availability_strategy: AvailabilityStrategy, - discovery_policy: AbstractDiscoveryPolicy, - validation_policies: Dict[str, AbstractSchemaValidationPolicy], - parsers: Dict[str, FileTypeParser], - stream_reader: AbstractFileBasedStreamReader, - catalog: Optional[Dict[str, Any]], - file_write_options: Dict[str, Any], + files: Mapping[str, Any] , + file_type: str, + availability_strategy: Optional[AvailabilityStrategy], + discovery_policy: Optional[AbstractDiscoveryPolicy], + validation_policies: Mapping[str, AbstractSchemaValidationPolicy], + parsers: Mapping[str, FileTypeParser], + stream_reader: Optional[AbstractFileBasedStreamReader], + catalog: Optional[Mapping[str, Any]], + file_write_options: Mapping[str, Any], max_history_size: int, ): stream_reader = stream_reader or InMemoryFilesStreamReader(files=files, file_type=file_type, file_write_options=file_write_options) @@ -46,7 +46,7 @@ class InMemoryFilesSource(FileBasedSource): catalog=ConfiguredAirbyteCatalog(streams=catalog["streams"]) if catalog else None, availability_strategy=availability_strategy, spec_class=InMemorySpec, - discovery_policy=discovery_policy, + discovery_policy=discovery_policy or DefaultDiscoveryPolicy(), parsers=parsers, validation_policies=validation_policies or DEFAULT_SCHEMA_VALIDATION_POLICIES, max_history_size=max_history_size or DEFAULT_MAX_HISTORY_SIZE @@ -58,9 +58,9 @@ class InMemoryFilesSource(FileBasedSource): class InMemoryFilesStreamReader(AbstractFileBasedStreamReader): - files: Dict[str, dict] + files: Mapping[str, Mapping[str, Any]] file_type: str - file_write_options: Optional[Dict[str, Any]] + file_write_options: Optional[Mapping[str, Any]] def get_matching_files( self, @@ -74,7 +74,7 @@ class InMemoryFilesStreamReader(AbstractFileBasedStreamReader): def open_file(self, file: RemoteFile) -> IOBase: return io.StringIO(self._make_file_contents(file.uri)) - def _make_file_contents(self, file_name: str): + def _make_file_contents(self, file_name: str) -> str: if self.file_type == "csv": return self._make_csv_file_contents(file_name) else: @@ -96,7 +96,7 @@ class InMemoryFilesStreamReader(AbstractFileBasedStreamReader): class InMemorySpec(AbstractFileBasedSpec): @classmethod def documentation_url(cls) -> AnyUrl: - return AnyUrl(scheme="https", url="https://docs.airbyte.com/integrations/sources/in_memory_files") + return AnyUrl(scheme="https", url="https://docs.airbyte.com/integrations/sources/in_memory_files") # type: ignore start_date: Optional[str] = Field( title="Start Date", @@ -114,9 +114,9 @@ class TemporaryParquetFilesStreamReader(InMemoryFilesStreamReader): """ def open_file(self, file: RemoteFile) -> IOBase: - return io.BytesIO(self._make_file_contents(file.uri)) + return io.BytesIO(self._create_file(file.uri)) - def _make_file_contents(self, file_name: str) -> bytes: + def _create_file(self, file_name: str) -> bytes: contents = self.files[file_name]["contents"] schema = self.files[file_name].get("schema") diff --git a/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/check_scenarios.py b/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/check_scenarios.py index 218c1e2d5cf..5b44de8a67b 100644 --- a/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/check_scenarios.py +++ b/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/check_scenarios.py @@ -130,7 +130,7 @@ error_empty_stream_scenario = ( _base_failure_scenario.copy() .set_name("error_empty_stream_scenario") .set_files({}) - .set_expected_check_error(None, FileBasedSourceError.EMPTY_STREAM) + .set_expected_check_error(None, FileBasedSourceError.EMPTY_STREAM.value) ).build() @@ -138,7 +138,7 @@ error_extension_mismatch_scenario = ( _base_failure_scenario.copy() .set_name("error_extension_mismatch_scenario") .set_file_type("jsonl") - .set_expected_check_error(None, FileBasedSourceError.EXTENSION_MISMATCH) + .set_expected_check_error(None, FileBasedSourceError.EXTENSION_MISMATCH.value) ).build() @@ -146,7 +146,7 @@ error_listing_files_scenario = ( _base_failure_scenario.copy() .set_name("error_listing_files_scenario") .set_stream_reader(TestErrorListMatchingFilesInMemoryFilesStreamReader(files=_base_failure_scenario._files, file_type="csv")) - .set_expected_check_error(None, FileBasedSourceError.ERROR_LISTING_FILES) + .set_expected_check_error(None, FileBasedSourceError.ERROR_LISTING_FILES.value) ).build() @@ -154,7 +154,7 @@ error_reading_file_scenario = ( _base_failure_scenario.copy() .set_name("error_reading_file_scenario") .set_stream_reader(TestErrorOpenFileInMemoryFilesStreamReader(files=_base_failure_scenario._files, file_type="csv")) - .set_expected_check_error(None, FileBasedSourceError.ERROR_READING_FILE) + .set_expected_check_error(None, FileBasedSourceError.ERROR_READING_FILE.value) ).build() @@ -175,7 +175,7 @@ error_record_validation_user_provided_schema_scenario = ( } ) .set_validation_policies({FailingSchemaValidationPolicy.ALWAYS_FAIL: FailingSchemaValidationPolicy()}) - .set_expected_check_error(None, FileBasedSourceError.ERROR_VALIDATING_RECORD) + .set_expected_check_error(None, FileBasedSourceError.ERROR_VALIDATING_RECORD.value) ).build() @@ -200,5 +200,5 @@ error_multi_stream_scenario = ( ], } ) - .set_expected_check_error(None, FileBasedSourceError.ERROR_READING_FILE) + .set_expected_check_error(None, FileBasedSourceError.ERROR_READING_FILE.value) ).build() diff --git a/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/scenario_builder.py b/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/scenario_builder.py index 0477febc930..e4aea02f686 100644 --- a/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/scenario_builder.py +++ b/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/scenario_builder.py @@ -4,9 +4,9 @@ from copy import deepcopy from dataclasses import dataclass, field -from typing import Any, Dict, List, Mapping, Optional, Tuple, Type +from typing import Any, List, Mapping, Optional, Tuple, Type -from airbyte_cdk.models.airbyte_protocol import SyncMode +from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.file_based.discovery_policy import AbstractDiscoveryPolicy, DefaultDiscoveryPolicy from airbyte_cdk.sources.file_based.file_based_source import DEFAULT_MAX_HISTORY_SIZE, default_parsers from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader @@ -27,23 +27,23 @@ class TestScenario: self, name: str, config: Mapping[str, Any], - files: Dict[str, Any], + files: Mapping[str, Any], file_type: str, - expected_spec: Optional[Dict[str, Any]], + expected_spec: Optional[Mapping[str, Any]], expected_check_status: Optional[str], - expected_catalog: Optional[Dict[str, Any]], - expected_logs: Optional[Dict[str, Dict[str, Any]]], - expected_records: Optional[Dict[str, Any]], + expected_catalog: Optional[Mapping[str, Any]], + expected_logs: Optional[Mapping[str, Mapping[str, Any]]], + expected_records: List[Mapping[str, Any]], availability_strategy: Optional[AvailabilityStrategy], discovery_policy: Optional[AbstractDiscoveryPolicy], - validation_policies: Optional[Dict[str, AbstractSchemaValidationPolicy]], - parsers: Optional[Dict[str, FileTypeParser]], + validation_policies: Mapping[str, AbstractSchemaValidationPolicy], + parsers: Mapping[str, FileTypeParser], stream_reader: Optional[AbstractFileBasedStreamReader], - expected_check_error: Tuple[Optional[Exception], Optional[str]], + expected_check_error: Tuple[Optional[Type[Exception]], Optional[str]], expected_discover_error: Tuple[Optional[Type[Exception]], Optional[str]], expected_read_error: Tuple[Optional[Type[Exception]], Optional[str]], incremental_scenario_config: Optional[IncrementalScenarioConfig], - file_write_options: Dict[str, Any], + file_write_options: Mapping[str, Any], max_history_size: int, ): self.name = name @@ -72,7 +72,7 @@ class TestScenario: self.incremental_scenario_config = incremental_scenario_config self.validate() - def validate(self): + def validate(self) -> None: assert self.name if not self.expected_catalog: return @@ -80,10 +80,10 @@ class TestScenario: expected_streams = {s["name"] for s in self.expected_catalog["streams"]} assert expected_streams <= streams - def configured_catalog(self, sync_mode: SyncMode) -> Optional[Dict[str, Any]]: + def configured_catalog(self, sync_mode: SyncMode) -> Optional[Mapping[str, Any]]: if not self.expected_catalog: - return - catalog = {"streams": []} + return None + catalog: Mapping[str, Any] = {"streams": []} for stream in self.expected_catalog["streams"]: catalog["streams"].append( { @@ -95,7 +95,7 @@ class TestScenario: return catalog - def input_state(self) -> List[Dict[str, Any]]: + def input_state(self) -> List[Mapping[str, Any]]: if self.incremental_scenario_config: return self.incremental_scenario_config.input_state else: @@ -103,112 +103,114 @@ class TestScenario: class TestScenarioBuilder: - def __init__(self): + def __init__(self) -> None: self._name = "" - self._config = {} - self._files = {} - self._file_type = None - self._expected_spec = None - self._expected_check_status = None - self._expected_catalog = {} - self._expected_logs = None - self._expected_records = {} - self._availability_strategy = None - self._discovery_policy = DefaultDiscoveryPolicy() - self._validation_policies = None + self._config: Mapping[str, Any] = {} + self._files: Mapping[str, Any] = {} + self._file_type: Optional[str] = None + self._expected_spec: Optional[Mapping[str, Any]] = None + self._expected_check_status: Optional[str] = None + self._expected_catalog: Mapping[str, Any] = {} + self._expected_logs: Optional[Mapping[str, Any]] = None + self._expected_records: List[Mapping[str, Any]] = [] + self._availability_strategy: Optional[AvailabilityStrategy] = None + self._discovery_policy: AbstractDiscoveryPolicy = DefaultDiscoveryPolicy() + self._validation_policies: Optional[Mapping[str, AbstractSchemaValidationPolicy]] = None self._parsers = default_parsers - self._stream_reader = None - self._expected_check_error = None, None - self._expected_discover_error = None, None - self._expected_read_error = None, None - self._incremental_scenario_config = None - self._file_write_options = {} + self._stream_reader: Optional[AbstractFileBasedStreamReader] = None + self._expected_check_error: Tuple[Optional[Type[Exception]], Optional[str]] = None, None + self._expected_discover_error: Tuple[Optional[Type[Exception]], Optional[str]] = None, None + self._expected_read_error: Tuple[Optional[Type[Exception]], Optional[str]] = None, None + self._incremental_scenario_config: Optional[IncrementalScenarioConfig] = None + self._file_write_options: Mapping[str, Any] = {} self._max_history_size = DEFAULT_MAX_HISTORY_SIZE - def set_name(self, name: str): + def set_name(self, name: str) -> "TestScenarioBuilder": self._name = name return self - def set_config(self, config: Mapping[str, Any]): + def set_config(self, config: Mapping[str, Any]) -> "TestScenarioBuilder": self._config = config return self - def set_files(self, files: Dict[str, Any]): + def set_files(self, files: Mapping[str, Any]) -> "TestScenarioBuilder": self._files = files return self - def set_file_type(self, file_type: str): + def set_file_type(self, file_type: str) -> "TestScenarioBuilder": self._file_type = file_type return self - def set_expected_spec(self, expected_spec: Dict[str, Any]): + def set_expected_spec(self, expected_spec: Mapping[str, Any]) -> "TestScenarioBuilder": self._expected_spec = expected_spec return self - def set_expected_check_status(self, expected_check_status: str): + def set_expected_check_status(self, expected_check_status: str) -> "TestScenarioBuilder": self._expected_check_status = expected_check_status return self - def set_expected_catalog(self, expected_catalog: Dict[str, Any]): + def set_expected_catalog(self, expected_catalog: Mapping[str, Any]) -> "TestScenarioBuilder": self._expected_catalog = expected_catalog return self - def set_expected_logs(self, expected_logs: Dict[str, List[Dict[str, Any]]]): + def set_expected_logs(self, expected_logs: Mapping[str, List[Mapping[str, Any]]]) -> "TestScenarioBuilder": self._expected_logs = expected_logs return self - def set_expected_records(self, expected_records: Dict[str, Any]): + def set_expected_records(self, expected_records: List[Mapping[str, Any]]) -> "TestScenarioBuilder": self._expected_records = expected_records return self - def set_parsers(self, parsers: AbstractDiscoveryPolicy): + def set_parsers(self, parsers: Mapping[str, FileTypeParser]) -> "TestScenarioBuilder": self._parsers = parsers return self - def set_availability_strategy(self, availability_strategy: AvailabilityStrategy): + def set_availability_strategy(self, availability_strategy: AvailabilityStrategy) -> "TestScenarioBuilder": self._availability_strategy = availability_strategy return self - def set_discovery_policy(self, discovery_policy: AbstractDiscoveryPolicy): + def set_discovery_policy(self, discovery_policy: AbstractDiscoveryPolicy) -> "TestScenarioBuilder": self._discovery_policy = discovery_policy return self - def set_validation_policies(self, validation_policies: AbstractSchemaValidationPolicy): + def set_validation_policies(self, validation_policies: Mapping[str, AbstractSchemaValidationPolicy]) -> "TestScenarioBuilder": self._validation_policies = validation_policies return self - def set_stream_reader(self, stream_reader: AbstractFileBasedStreamReader): + def set_stream_reader(self, stream_reader: AbstractFileBasedStreamReader) -> "TestScenarioBuilder": self._stream_reader = stream_reader return self - def set_max_history_size(self, max_history_size: int): + def set_max_history_size(self, max_history_size: int) -> "TestScenarioBuilder": self._max_history_size = max_history_size return self - def set_incremental_scenario_config(self, incremental_scenario_config: IncrementalScenarioConfig): + def set_incremental_scenario_config(self, incremental_scenario_config: IncrementalScenarioConfig) -> "TestScenarioBuilder": self._incremental_scenario_config = incremental_scenario_config return self - def set_expected_check_error(self, error: Type[Exception], message: str): + def set_expected_check_error(self, error: Optional[Type[Exception]], message: str) -> "TestScenarioBuilder": self._expected_check_error = error, message return self - def set_expected_discover_error(self, error: Type[Exception], message: str): + def set_expected_discover_error(self, error: Type[Exception], message: str) -> "TestScenarioBuilder": self._expected_discover_error = error, message return self - def set_expected_read_error(self, error: Type[Exception], message: str): + def set_expected_read_error(self, error: Type[Exception], message: str) -> "TestScenarioBuilder": self._expected_read_error = error, message return self - def set_file_write_options(self, file_write_options: Dict[str, Any]): + def set_file_write_options(self, file_write_options: Mapping[str, Any]) -> "TestScenarioBuilder": self._file_write_options = file_write_options return self - def copy(self): + def copy(self) -> "TestScenarioBuilder": return deepcopy(self) - def build(self): + def build(self) -> TestScenario: + if self._file_type is None: + raise ValueError("file_type is not set") return TestScenario( self._name, self._config, @@ -221,7 +223,7 @@ class TestScenarioBuilder: self._expected_records, self._availability_strategy, self._discovery_policy, - self._validation_policies, + self._validation_policies or {}, self._parsers, self._stream_reader, self._expected_check_error, diff --git a/airbyte-cdk/python/unit_tests/sources/file_based/schema_validation_policies/test_default_schema_validation_policy.py b/airbyte-cdk/python/unit_tests/sources/file_based/schema_validation_policies/test_default_schema_validation_policy.py index 2f716072d39..8e1a1376154 100644 --- a/airbyte-cdk/python/unit_tests/sources/file_based/schema_validation_policies/test_default_schema_validation_policy.py +++ b/airbyte-cdk/python/unit_tests/sources/file_based/schema_validation_policies/test_default_schema_validation_policy.py @@ -48,7 +48,7 @@ def test_record_passes_validation_policy( schema: Mapping[str, Any], validation_policy: str, expected_result: bool -): +) -> None: if validation_policy == "wait_for_discover" and expected_result is False: with pytest.raises(StopSyncPerValidationPolicy): DEFAULT_SCHEMA_VALIDATION_POLICIES[validation_policy].record_passes_validation_policy(record, schema) diff --git a/airbyte-cdk/python/unit_tests/sources/file_based/stream/test_default_file_based_cursor.py b/airbyte-cdk/python/unit_tests/sources/file_based/stream/test_default_file_based_cursor.py index ca764341c61..fae69e409ba 100644 --- a/airbyte-cdk/python/unit_tests/sources/file_based/stream/test_default_file_based_cursor.py +++ b/airbyte-cdk/python/unit_tests/sources/file_based/stream/test_default_file_based_cursor.py @@ -3,6 +3,7 @@ # from datetime import datetime, timedelta +from typing import Any, List, Mapping from unittest.mock import MagicMock import pytest @@ -92,7 +93,7 @@ from freezegun import freeze_time id="test_files_are_sorted_by_timestamp_and_by_name"), ], ) -def test_add_file(files_to_add, expected_start_time, expected_state_dict): +def test_add_file(files_to_add: List[RemoteFile], expected_start_time: List[datetime], expected_state_dict: Mapping[str, Any]) -> None: cursor = DefaultFileBasedCursor(3, 3) assert cursor._compute_start_time() == datetime.min @@ -148,7 +149,7 @@ def test_add_file(files_to_add, expected_start_time, expected_state_dict): ], 2, True, id="test_sync_more_files_than_history_size"), ]) -def test_get_files_to_sync(files, expected_files_to_sync, max_history_size, history_is_partial): +def test_get_files_to_sync(files: List[RemoteFile], expected_files_to_sync: List[RemoteFile], max_history_size: int, history_is_partial: bool) -> None: logger = MagicMock() cursor = DefaultFileBasedCursor(max_history_size, 3) @@ -161,7 +162,7 @@ def test_get_files_to_sync(files, expected_files_to_sync, max_history_size, hist @freeze_time("2023-06-16T00:00:00Z") -def test_only_recent_files_are_synced_if_history_is_full(): +def test_only_recent_files_are_synced_if_history_is_full() -> None: logger = MagicMock() cursor = DefaultFileBasedCursor(2, 3) @@ -198,7 +199,7 @@ def test_only_recent_files_are_synced_if_history_is_full(): pytest.param(timedelta(days=0), False, id="test_modified_at_is_equal"), pytest.param(timedelta(days=1), True, id="test_modified_at_is_more_recent"), ]) -def test_sync_file_already_present_in_history(modified_at_delta, should_sync_file): +def test_sync_file_already_present_in_history(modified_at_delta: timedelta, should_sync_file: bool) -> None: logger = MagicMock() cursor = DefaultFileBasedCursor(2, 3) original_modified_at = datetime(2021, 1, 2) @@ -233,7 +234,7 @@ def test_sync_file_already_present_in_history(modified_at_delta, should_sync_fil pytest.param("c.csv", datetime(2023, 6, 3), datetime(2023, 6, 3), True, id="test_last_modified_is_equal_to_earliest_dt_in_history_and_lexicographically_greater"), ] ) -def test_should_sync_file(file_name, last_modified, earliest_dt_in_history, should_sync_file): +def test_should_sync_file(file_name: str, last_modified: datetime, earliest_dt_in_history: datetime, should_sync_file: bool) -> None: logger = MagicMock() cursor = DefaultFileBasedCursor(1, 3) @@ -244,12 +245,12 @@ def test_should_sync_file(file_name, last_modified, earliest_dt_in_history, shou assert bool(list(cursor.get_files_to_sync([RemoteFile(uri=file_name, last_modified=last_modified, file_type="csv")], logger))) == should_sync_file -def test_set_initial_state_no_history(): +def test_set_initial_state_no_history() -> None: cursor = DefaultFileBasedCursor(1, 3) cursor.set_initial_state({}) -def test_instantiate_with_negative_values(): +def test_instantiate_with_negative_values() -> None: with pytest.raises(ValueError): DefaultFileBasedCursor(-1, 3) diff --git a/airbyte-cdk/python/unit_tests/sources/file_based/test_file_based_stream_reader.py b/airbyte-cdk/python/unit_tests/sources/file_based/test_file_based_stream_reader.py index 1010415f52b..23562a2c786 100644 --- a/airbyte-cdk/python/unit_tests/sources/file_based/test_file_based_stream_reader.py +++ b/airbyte-cdk/python/unit_tests/sources/file_based/test_file_based_stream_reader.py @@ -2,6 +2,8 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +from typing import List, Set + import pytest from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader from unit_tests.sources.file_based.helpers import make_remote_files @@ -76,6 +78,6 @@ FILES = make_remote_files(FILEPATHS) pytest.param(["a/*.csv", "a/b/*.csv"], {"a/b.csv", "a/c.csv", "a/b/c.csv"}, {"a", "a/b"}, id="a/*.csv,a/b/*.csv"), ], ) -def test_globs_and_prefixes_from_globs(globs, expected_matches, expected_path_prefixes): +def test_globs_and_prefixes_from_globs(globs: List[str], expected_matches: Set[str], expected_path_prefixes: Set[str]) -> None: assert set([f.uri for f in reader.filter_files_by_globs(FILES, globs)]) == expected_matches assert set(reader.get_prefixes_from_globs(globs)) == expected_path_prefixes diff --git a/airbyte-cdk/python/unit_tests/sources/file_based/test_schema_helpers.py b/airbyte-cdk/python/unit_tests/sources/file_based/test_schema_helpers.py index 72dfe6b719e..625fdb23cf8 100644 --- a/airbyte-cdk/python/unit_tests/sources/file_based/test_schema_helpers.py +++ b/airbyte-cdk/python/unit_tests/sources/file_based/test_schema_helpers.py @@ -2,11 +2,17 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # -from typing import Any, Mapping +from typing import Any, Mapping, Optional import pytest from airbyte_cdk.sources.file_based.exceptions import ConfigValidationError, SchemaInferenceError -from airbyte_cdk.sources.file_based.schema_helpers import ComparableType, conforms_to_schema, merge_schemas, type_mapping_to_jsonschema +from airbyte_cdk.sources.file_based.schema_helpers import ( + ComparableType, + SchemaType, + conforms_to_schema, + merge_schemas, + type_mapping_to_jsonschema, +) COMPLETE_CONFORMING_RECORD = { "null_field": None, @@ -203,11 +209,11 @@ def test_conforms_to_schema( record: Mapping[str, Any], schema: Mapping[str, Any], expected_result: bool -): +) -> None: assert conforms_to_schema(record, schema) == expected_result -def test_comparable_types(): +def test_comparable_types() -> None: assert ComparableType.OBJECT > ComparableType.STRING assert ComparableType.STRING > ComparableType.NUMBER assert ComparableType.NUMBER > ComparableType.INTEGER @@ -237,7 +243,7 @@ def test_comparable_types(): pytest.param({"a": {"type": "invalid_type"}}, {"b": {"type": "integer"}}, None, id="invalid-type"), ] ) -def test_merge_schemas(schema1, schema2, expected_result): +def test_merge_schemas(schema1: SchemaType, schema2: SchemaType, expected_result: Optional[SchemaType]) -> None: if expected_result is not None: assert merge_schemas(schema1, schema2) == expected_result else: @@ -336,7 +342,7 @@ def test_merge_schemas(schema1, schema2, expected_result): ), ], ) -def test_type_mapping_to_jsonschema(type_mapping, expected_schema, expected_exc_msg): +def test_type_mapping_to_jsonschema(type_mapping: Mapping[str, Any], expected_schema: Optional[Mapping[str, Any]], expected_exc_msg: Optional[str]) -> None: if expected_exc_msg: with pytest.raises(ConfigValidationError) as exc: type_mapping_to_jsonschema(type_mapping) diff --git a/buildSrc/src/main/groovy/airbyte-python.gradle b/buildSrc/src/main/groovy/airbyte-python.gradle index 2a348cf5ef8..b7884f661ee 100644 --- a/buildSrc/src/main/groovy/airbyte-python.gradle +++ b/buildSrc/src/main/groovy/airbyte-python.gradle @@ -90,7 +90,7 @@ class AirbytePythonPlugin implements Plugin { // and thus there is the wrapper "pyproject-flake8" for this pip 'pyproject-flake8:0.0.1a2' pip 'black:22.3.0' - pip 'mypy:0.930' + pip 'mypy:1.4.1' pip 'isort:5.6.4' pip 'pytest:6.2.5' pip 'coverage[toml]:6.3.1'