1
0
mirror of synced 2026-02-03 10:02:09 -05:00
Files
airbyte/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py
Brian Lai b7113a2e5e [Low-Code CDK] Write the component schema and use it during manifest validation (#20422)
* handwritten low code manifest example components

* add MinMaxDatetime to jsonschema

* add a basic gradle command to generate manifest components

* Add auth components to handwritten component schema

- ApiKeyAuthenticator
- BasicHttpAuthenticator
- BearerAuthenticator
- DeclarativeOauth2Authenticator
- NoAuth

* Respect optional properties in DeclarativeOauth2Authenticator

* Fix `Dict[str, Any]` mapping in auth components

* add default error handler composite error handler and http response filter components

* [low code component schema] adding backoff strategies to schema

* [low code component schema] fix float types

* [low code component schema] add RecordFilter

* Remove `config` from auth components

* [low code component schema] add Interpolation (with pending question on 'type' not being defined)

* Add CartesianProductStreamSlicer & DatetimeStreamSlicer

* Add ListStreamSlicer, and fix nesting of DatetimeStreamSlicer

* [low code component schema] add InterpolatedRequestOptionsProvider

* Add slicer components, and fix a couple of components after reviewing output

* [low code component schema] adding transformations and adding type to interpolators

* adding spec and a few small tweaks

* Add DefaultSchemaLoader

* [low code component schema] attempt on custom class

* Add descriptions for auth components

* add RequestOption

* remove interpolated objects from the schema in favor of strings only

* a few schema fixes and adding some custom pagination and stream slicer

* [low code component schema] fix CustomBackoffStrategy

* Add CustomRecordExtractor

* add some description and add additional properties

* insert a transformer to hydrate default manifest components and perform validation against the handwritten schema

* [low code component schema] validating existing schemas

* [low code component schema] clean validation script

* add manifest transformer tests and a few tweaks to the schema

* Revert "[low code component schema] clean validation script"

This reverts commit 2408f41cf6.

* Revert "[low code component schema] validating existing schemas"

This reverts commit 9d39977815.

* [low code component schema] integrate validation script to gradle

* [low code component schema] updating validation script permissions

* remove a few model gen spike files and clean up comments

* default types should take parent type into account and a few schema changes

* add inline schema components and fix validation bugs

* add types to every component's literal enum and more little schema fixes

* missing component prefix

* pr feedback, renaming the schema and updating various docs

* fixing schema bugs from testing and new components

* last few small tweaks and fixes and fix the script

* bump the version before publsih

* include the declarative_component_schema.yaml as package data so its included when package is published

* roll the version to publish

Co-authored-by: Catherine Noll <noll.catherine@gmail.com>
Co-authored-by: maxi297 <maxime@airbyte.io>
2022-12-19 14:42:34 -06:00

234 lines
11 KiB
Python

#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
import inspect
import json
import logging
import pkgutil
import typing
from dataclasses import dataclass, fields
from enum import Enum, EnumMeta
from typing import Any, Iterator, List, Mapping, MutableMapping, Union
import yaml
from airbyte_cdk.models import (
AirbyteConnectionStatus,
AirbyteMessage,
AirbyteStateMessage,
ConfiguredAirbyteCatalog,
ConnectorSpecification,
)
from airbyte_cdk.sources.declarative.checks import CheckStream
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.exceptions import InvalidConnectorDefinitionException
from airbyte_cdk.sources.declarative.parsers.factory import DeclarativeComponentFactory
from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import ManifestComponentTransformer
from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ManifestReferenceResolver
from airbyte_cdk.sources.declarative.types import ConnectionDefinition
from airbyte_cdk.sources.streams.core import Stream
from dataclasses_jsonschema import JsonSchemaMixin
from jsonschema.exceptions import ValidationError
from jsonschema.validators import validate
@dataclass
class ConcreteDeclarativeSource(JsonSchemaMixin):
version: str
check: CheckStream
streams: List[DeclarativeStream]
class ManifestDeclarativeSource(DeclarativeSource):
"""Declarative source defined by a manifest of low-code components that define source connector behavior"""
VALID_TOP_LEVEL_FIELDS = {"check", "definitions", "schemas", "spec", "streams", "type", "version"}
def __init__(self, source_config: ConnectionDefinition, debug: bool = False):
"""
:param source_config(Mapping[str, Any]): The manifest of low-code components that describe the source connector
:param debug(bool): True if debug mode is enabled
"""
self.logger = logging.getLogger(f"airbyte.{self.name}")
evaluated_manifest = {}
resolved_source_config = ManifestReferenceResolver().preprocess_manifest(source_config, evaluated_manifest, "")
self._source_config = resolved_source_config
self._debug = debug
self._factory = DeclarativeComponentFactory()
self._validate_source()
# Stopgap to protect the top-level namespace until it's validated through the schema
unknown_fields = [key for key in self._source_config.keys() if key not in self.VALID_TOP_LEVEL_FIELDS]
if unknown_fields:
raise InvalidConnectorDefinitionException(f"Found unknown top-level fields: {unknown_fields}")
@property
def connection_checker(self) -> ConnectionChecker:
check = self._source_config["check"]
if "type" not in check:
check["type"] = "CheckStream"
return self._factory.create_component(check, dict())(source=self)
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
self._emit_manifest_debug_message(extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)})
source_streams = [self._factory.create_component(stream_config, config, True)() for stream_config in self._stream_configs()]
for stream in source_streams:
# make sure the log level is always applied to the stream's logger
self._apply_log_level_to_stream_logger(self.logger, stream)
return source_streams
def spec(self, logger: logging.Logger) -> ConnectorSpecification:
"""
Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
in the project root.
"""
self._configure_logger_level(logger)
self._emit_manifest_debug_message(extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)})
spec = self._source_config.get("spec")
if spec:
if "type" not in spec:
spec["type"] = "Spec"
spec_component = self._factory.create_component(spec, dict())()
return spec_component.generate_spec()
else:
return super().spec(logger)
def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
self._configure_logger_level(logger)
return super().check(logger, config)
def read(
self,
logger: logging.Logger,
config: Mapping[str, Any],
catalog: ConfiguredAirbyteCatalog,
state: Union[List[AirbyteStateMessage], MutableMapping[str, Any]] = None,
) -> Iterator[AirbyteMessage]:
self._configure_logger_level(logger)
yield from super().read(logger, config, catalog, state)
def _configure_logger_level(self, logger: logging.Logger):
"""
Set the log level to logging.DEBUG if debug mode is enabled
"""
if self._debug:
logger.setLevel(logging.DEBUG)
def _validate_source(self):
# Validates the connector manifest against the schema auto-generated from the low-code backend
full_config = {}
if "version" in self._source_config:
full_config["version"] = self._source_config["version"]
full_config["check"] = self._source_config["check"]
streams = [self._factory.create_component(stream_config, {}, False)() for stream_config in self._stream_configs()]
if len(streams) > 0:
full_config["streams"] = streams
declarative_source_schema = ConcreteDeclarativeSource.json_schema()
try:
validate(full_config, declarative_source_schema)
except ValidationError as e:
raise ValidationError("Validation against auto-generated schema failed") from e
# Validates the connector manifest against the low-code component json schema
manifest = self._source_config
if "type" not in manifest:
manifest["type"] = "DeclarativeSource"
manifest_transformer = ManifestComponentTransformer()
propagated_manifest = manifest_transformer.propagate_types_and_options("", manifest, {})
try:
raw_component_schema = pkgutil.get_data("airbyte_cdk", "sources/declarative/declarative_component_schema.yaml")
declarative_component_schema = yaml.load(raw_component_schema, Loader=yaml.SafeLoader)
except FileNotFoundError as e:
raise FileNotFoundError(f"Failed to read manifest component json schema required for validation: {e}")
try:
validate(propagated_manifest, declarative_component_schema)
except ValidationError as e:
raise ValidationError("Validation against json schema defined in declarative_component_schema.yaml schema failed") from e
def _stream_configs(self):
stream_configs = self._source_config.get("streams", [])
for s in stream_configs:
if "type" not in s:
s["type"] = "DeclarativeStream"
return stream_configs
@staticmethod
def generate_schema() -> str:
expanded_source_manifest = ManifestDeclarativeSource.expand_schema_interfaces(ConcreteDeclarativeSource, {})
expanded_schema = expanded_source_manifest.json_schema()
return json.dumps(expanded_schema, cls=SchemaEncoder)
@staticmethod
def expand_schema_interfaces(expand_class: type, visited: dict) -> type:
"""
Recursive function that takes in class type that will have its interface fields unpacked and expended and then recursively
attempt the same expansion on all the class' underlying fields that are declarative component. It also performs expansion
with respect to interfaces that are contained within generic data types.
:param expand_class: The declarative component class that will have its interface fields expanded
:param visited: cache used to store a record of already visited declarative classes that have already been seen
:return: The expanded declarative component
"""
# Recursive base case to stop recursion if we have already expanded an interface in case of cyclical components
# like CompositeErrorHandler
if expand_class.__name__ in visited:
return visited[expand_class.__name__]
visited[expand_class.__name__] = expand_class
next_classes = []
class_fields = fields(expand_class)
for field in class_fields:
unpacked_field_types = DeclarativeComponentFactory.unpack(field.type)
expand_class.__annotations__[field.name] = unpacked_field_types
next_classes.extend(ManifestDeclarativeSource._get_next_expand_classes(field.type))
for next_class in next_classes:
ManifestDeclarativeSource.expand_schema_interfaces(next_class, visited)
return expand_class
@staticmethod
def _get_next_expand_classes(field_type) -> list[type]:
"""
Parses through a given field type and assembles a list of all underlying declarative components. For a concrete declarative class
it will return itself. For a declarative interface it will return its subclasses. For declarative components in a generic type
it will return the unpacked classes. Any non-declarative types will be skipped.
:param field_type: A field type that
:return:
"""
generic_type = typing.get_origin(field_type)
if generic_type is None:
# We can only continue parsing declarative that inherit from the JsonSchemaMixin class because it is used
# to generate the final json schema
if inspect.isclass(field_type) and issubclass(field_type, JsonSchemaMixin) and not isinstance(field_type, EnumMeta):
subclasses = field_type.__subclasses__()
if subclasses:
return subclasses
else:
return [field_type]
elif generic_type == list or generic_type == Union:
next_classes = []
for underlying_type in typing.get_args(field_type):
next_classes.extend(ManifestDeclarativeSource._get_next_expand_classes(underlying_type))
return next_classes
return []
def _emit_manifest_debug_message(self, extra_args: dict):
self.logger.debug("declarative source created from manifest", extra=extra_args)
class SchemaEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, property) or isinstance(obj, Enum):
return str(obj)
return json.JSONEncoder.default(self, obj)