1
0
mirror of synced 2025-12-25 11:06:55 -05:00

Refactor python sources (#1331)

* Make ConfigContainer internal to SingerSource only
This commit is contained in:
Christophe Duong
2020-12-16 17:52:17 +01:00
committed by GitHub
parent 4eccc60171
commit e92e9aa8c4
27 changed files with 327 additions and 338 deletions

View File

@@ -22,13 +22,14 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
from typing import Generator, Type
import json
from typing import Dict, Generator, Type
import airbyte_protocol
from airbyte_protocol import AirbyteCatalog, AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, Status
from .client import BaseClient
from .integration import ConfigContainer, Source
from .integration import Source
from .logger import AirbyteLogger
@@ -37,22 +38,21 @@ class BaseSource(Source):
client_class: Type[BaseClient] = None
def _get_client(self, config_container: ConfigContainer):
def _get_client(self, config: json):
"""Construct client"""
config = config_container.rendered_config
client = self.client_class(**config)
return client
def discover(self, logger: AirbyteLogger, config_container: ConfigContainer) -> AirbyteCatalog:
def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog:
"""Discover streams"""
client = self._get_client(config_container)
client = self._get_client(config)
return AirbyteCatalog(streams=[stream for stream in client.streams])
def check(self, logger: AirbyteLogger, config_container: ConfigContainer) -> AirbyteConnectionStatus:
def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus:
"""Check connection"""
client = self._get_client(config_container)
client = self._get_client(config)
alive, error = client.health_check()
if not alive:
return AirbyteConnectionStatus(status=Status.FAILED, message=str(error))
@@ -60,12 +60,9 @@ class BaseSource(Source):
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
def read(
self, logger: AirbyteLogger, config_container: ConfigContainer, catalog_path, state=None
self, logger: AirbyteLogger, config: json, catalog: ConfiguredAirbyteCatalog, state: Dict[str, any]
) -> Generator[AirbyteMessage, None, None]:
client = self._get_client(config_container)
config = self.read_config(catalog_path)
catalog = ConfiguredAirbyteCatalog.parse_obj(config)
client = self._get_client(config)
logger.info(f"Starting syncing {self.__class__.__name__}")
for configured_stream in catalog.streams: