1
0
mirror of synced 2026-01-08 12:03:02 -05:00
Files
airbyte/airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py
Brian Lai 7bff12aea5 [#3078] [CDK] Add support for enabling debug from command line and some basic general debug logs (#14521)
* allow for command line debug option and basic debug statements + declarative

* feedback from pr comments

* fix some tests w/ req/res mixed up and fixing logging tests

* formatting

* pr feedback: cleaning up traces in logger.py and update docs with debug configuration

* remove unneeded trace logger test

* remove extra print statement
2022-07-13 18:01:07 -04:00

46 lines
2.0 KiB
Python

#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
import json
import logging
from typing import Any, List, Mapping
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
from airbyte_cdk.sources.declarative.parsers.factory import DeclarativeComponentFactory
from airbyte_cdk.sources.declarative.parsers.yaml_parser import YamlParser
from airbyte_cdk.sources.streams import Stream
class YamlDeclarativeSource(DeclarativeSource):
def __init__(self, path_to_yaml):
self.logger = logging.getLogger(f"airbyte.{self.name}")
self.logger.setLevel(logging.DEBUG)
self._factory = DeclarativeComponentFactory()
self._source_config = self._read_and_parse_yaml_file(path_to_yaml)
@property
def connection_checker(self) -> ConnectionChecker:
check = self._source_config["check"]
if "class_name" not in check:
check["class_name"] = "airbyte_cdk.sources.declarative.checks.check_stream.CheckStream"
return self._factory.create_component(check, dict())(source=self)
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
stream_configs = self._source_config["streams"]
for s in stream_configs:
if "class_name" not in s:
s["class_name"] = "airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream"
return [self._factory.create_component(stream_config, config)() for stream_config in self._source_config["streams"]]
def _read_and_parse_yaml_file(self, path_to_yaml_file):
with open(path_to_yaml_file, "r") as f:
config_content = f.read()
parsed_config = YamlParser().parse(config_content)
self.logger.debug(
"parsed YAML into declarative source",
extra={"path_to_yaml_file": path_to_yaml_file, "source_name": self.name, "parsed_config": json.dumps(parsed_config)},
)
return parsed_config