1
0
mirror of synced 2026-01-06 06:04:16 -05:00
Files
airbyte/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_stream.py
Alexandre Girard 150ab593f8 Low code connectors: core structure (#12850)
* checkout from alex/cac

* doc

* doc

* remove broken test

* rename

* rename file

* delete unused file

* rename

* abstract property

* isort

* update state

* Update comment

* remove incremental mixin

* delete comment

* update comments

* update comments

* remove no_state

* rename package

* pass parameters through kwargs

* update interface to pass source in interface

* update interface to pass source in interface

* rename to stream_slicer

* Low code connectors: string interpolation with jinja (#12852)

* checkout from alex/cac

* Add missing tests

* Add missing files

* missing file

* rename

* jinja dependency

* Add comment

* comment

* comment

* Revert "delete unused file"

This reverts commit 758e939367.

* delete unused field

* delete unused field

* rename

* pass kwargs directly

* isort

* Revert "isort"

This reverts commit 4a79223944.

* format

* decoder

* better error handling

* remove nostate

* isort

* delete dead code

* Update mapping type to [str, Any]

* add comment

* Add comment

* pass parameters through kwargs

* move test to right module

* Add missing test

* Use authbase instead of deprecated class

* leverage generator

* rename to declarative

* rename the classes too
2022-05-25 17:27:54 -07:00

81 lines
3.0 KiB
Python

#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.streams.core import Stream
class DeclarativeStream(Stream):
"""
DeclarativeStream is a Stream that delegates most of its logic to its schema_load and retriever
"""
def __init__(self, name, primary_key, cursor_field, schema_loader: SchemaLoader, retriever):
self._name = name
self._primary_key = primary_key
self._cursor_field = cursor_field
self._schema_loader = schema_loader
self._retriever: Retriever = retriever
@property
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
return self._primary_key
@property
def name(self) -> str:
"""
:return: Stream name. By default this is the implementing class name, but it can be overridden as needed.
"""
return self._name
@property
def state(self) -> MutableMapping[str, Any]:
return self._retriever.get_state()
@property
def cursor_field(self) -> Union[str, List[str]]:
"""
Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
:return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
"""
return self._cursor_field
def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
return self._retriever.read_records(sync_mode, cursor_field, stream_slice, stream_state)
def get_json_schema(self) -> Mapping[str, Any]:
"""
:return: A dict of the JSON schema representing this stream.
The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property.
Override as needed.
"""
# TODO show an example of using pydantic to define the JSON schema, or reading an OpenAPI spec
return self._schema_loader.get_json_schema()
def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
"""
Override to define the slices for this stream. See the stream slicing section of the docs for more information.
:param sync_mode:
:param cursor_field:
:param stream_state:
:return:
"""
# this is not passing the cursor field because it is known at init time
return self._retriever.stream_slices(sync_mode=sync_mode, stream_state=stream_state)