1
0
mirror of synced 2026-02-03 10:02:09 -05:00
Files
airbyte/airbyte-cdk/python/airbyte_cdk/sources/embedded/runner.py
Joe Reuter 1ee4c04203 CDK: Embedded reader utils (#28873)
* relax pydantic dep

* Automated Commit - Format and Process Resources Changes

* wip

* wrap up base integration

* add init file

* introduce CDK runner and improve error message

* make state param optional

* update protocol models

* review comments

* always run incremental if possible

* fix

---------

Co-authored-by: flash1293 <flash1293@users.noreply.github.com>
2023-08-03 12:02:31 +02:00

35 lines
1.2 KiB
Python

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import logging
from abc import ABC, abstractmethod
from typing import Generic, Iterable, Optional
from airbyte_cdk.connector import TConfig
from airbyte_cdk.models import AirbyteCatalog, AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.sources.source import Source
class SourceRunner(ABC, Generic[TConfig]):
@abstractmethod
def discover(self, config: TConfig) -> AirbyteCatalog:
pass
@abstractmethod
def read(self, config: TConfig, catalog: ConfiguredAirbyteCatalog, state: Optional[AirbyteStateMessage]) -> Iterable[AirbyteMessage]:
pass
class CDKRunner(SourceRunner[TConfig]):
def __init__(self, source: Source, name: str):
self._source = source
self._logger = logging.getLogger(name)
def discover(self, config: TConfig) -> AirbyteCatalog:
return self._source.discover(self._logger, config)
def read(self, config: TConfig, catalog: ConfiguredAirbyteCatalog, state: Optional[AirbyteStateMessage]) -> Iterable[AirbyteMessage]:
return self._source.read(self._logger, config, catalog, state=[state] if state else [])