1
0
mirror of synced 2026-01-08 12:03:02 -05:00
Files
airbyte/airbyte-cdk/python/airbyte_cdk/sources/source.py
Brian Lai 037e8ed1a9 fix cdk bug to send legacy format if connector overrides read() (#16566)
* fix cdk bug to send legacy format if connector overrides read()

* fix comment

* update changelog and setup.py
2022-09-09 21:09:50 -04:00

80 lines
3.7 KiB
Python

#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
import json
import logging
from abc import ABC, abstractmethod
from typing import Any, Generic, Iterable, List, Mapping, MutableMapping, TypeVar, Union
from airbyte_cdk.connector import BaseConnector, DefaultConnectorMixin, TConfig
from airbyte_cdk.models import AirbyteCatalog, AirbyteMessage, AirbyteStateMessage, AirbyteStateType, ConfiguredAirbyteCatalog
TState = TypeVar("TState")
TCatalog = TypeVar("TCatalog")
class BaseSource(BaseConnector[TConfig], ABC, Generic[TConfig, TState, TCatalog]):
@abstractmethod
def read_state(self, state_path: str) -> TState:
...
@abstractmethod
def read_catalog(self, catalog_path: str) -> TCatalog:
...
@abstractmethod
def read(self, logger: logging.Logger, config: TConfig, catalog: TCatalog, state: TState = None) -> Iterable[AirbyteMessage]:
"""
Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, catalog, and state.
"""
@abstractmethod
def discover(self, logger: logging.Logger, config: TConfig) -> AirbyteCatalog:
"""
Returns an AirbyteCatalog representing the available streams and fields in this integration. For example, given valid credentials to a
Postgres database, returns an Airbyte catalog where each postgres table is a stream, and each table column is a field.
"""
class Source(
DefaultConnectorMixin,
BaseSource[Mapping[str, Any], Union[List[AirbyteStateMessage], MutableMapping[str, Any]], ConfiguredAirbyteCatalog],
ABC,
):
# can be overridden to change an input state
def read_state(self, state_path: str) -> List[AirbyteStateMessage]:
"""
Retrieves the input state of a sync by reading from the specified JSON file. Incoming state can be deserialized into either
a JSON object for legacy state input or as a list of AirbyteStateMessages for the per-stream state format. Regardless of the
incoming input type, it will always be transformed and output as a list of AirbyteStateMessage(s).
:param state_path: The filepath to where the stream states are located
:return: The complete stream state based on the connector's previous sync
"""
if state_path:
state_obj = json.loads(open(state_path, "r").read())
if not state_obj:
return []
is_per_stream_state = isinstance(state_obj, List)
if is_per_stream_state:
parsed_state_messages = []
for state in state_obj:
parsed_message = AirbyteStateMessage.parse_obj(state)
if not parsed_message.stream and not parsed_message.data and not parsed_message.global_:
raise ValueError("AirbyteStateMessage should contain either a stream, global, or state field")
parsed_state_messages.append(parsed_message)
return parsed_state_messages
else:
# Existing connectors that override read() might not be able to interpret the new state format. We temporarily
# send state in the old format for these connectors, but once all have been upgraded, this block can be removed
# vars(self.__class__) checks if the current class directly overrides the read() function
if "read" in vars(self.__class__):
return state_obj
return [AirbyteStateMessage(type=AirbyteStateType.LEGACY, data=state_obj)]
return []
# can be overridden to change an input catalog
def read_catalog(self, catalog_path: str) -> ConfiguredAirbyteCatalog:
return ConfiguredAirbyteCatalog.parse_obj(self.read_config(catalog_path))