* [ISSUE #19410] remove request_options_provider from the … (#21403) * [ISSUE #19410] (incomplete) remove request_options_provider from the manifest * [ISSUE #19410] (incomplete) incomplete cleanup config_component_schema.json as well * [ISSUE #19410] update source-monday * [ISSUE #19410] code review * [ISSUE #19410] formatting files * [Low-Code CDK] Replace the $options keyword with $parameters (#21632) * refactor flows and tests to use parameters instead of options * update documentation to reflect the change from options to parameters * create migration script to replace options with parameters in existing manifests * update template to use parameters instead of options * fix tests after rebasing from the branch * address pr feedback and extra uses of options that I missed * additional changes needed after rebasing from master * migrate low-code connectors to use parameters instead of options * 🚨🚨 [Low Code CDK] Update `*ref` format to `#/` (#21434) * [Low-Code CDK] Remove JsonSchema type in favor of JsonSchemaFileLoader (#21832) * fully deprecate JsonSchema in favor of JsonFileSchemaLoader * remove usage in the legacy registry * Update migration scripts according to manifest file rename (#21920) * Issue 21866 remove legacy factory and validation flow (#21878) * [ISSUE #21866] clean ManifestDeclarativeSource validation * [ISSUE #21866] remove dataclasses-jsonschema * [ISSUE #21866] code review * [ISSUE-21866] flake8 * [ISSUE #21559] remove DefaultPaginator.url_base (#21823) * [ISSUE #21559] remove DefaultPaginator.url_base * [ISSUE #21559] code review * [ISSUE #21559] update migration script * [ISSUE #21559] code review * [ISSUE #21559] update documentation * [ISSUE #21559] run migration (#21824) * [ISSUE #21559] remove DefaultPaginator.url_base (#21823) * [ISSUE #21559] remove DefaultPaginator.url_base * [ISSUE #21559] code review * [ISSUE #21559] update migration script * [ISSUE #21559] code review * [ISSUE #21559] update documentation * [ISSUE #21559] run migration (#21824) * [ISSUE #21559] fix manifests * [ISSUE #21926] setup server to allow for local tests (#21974) * [Low Code CDK] remove checkpoint_interval from DeclarativeStream component (#22120) * Issue #21576 rename dpathextractor fieldpointer (#21990) * [ISSUE #21926] setup server to allow for local tests * [ISSUE #21576] Rename DpathExtractor.field_pointer to field_path * [ISSUE #21576] migration script * [ISSUE #21576] update source-monday and source-pocket as well * [ISSUE #21576] migration (#21997) * [ISSUE #21576] code review * Remove checkpoint_interval from source-prestashop manifest (#22141) * replacing options with parameters for a few connectors I missed or were newly added * [Low-Code CDK] Rremove stream_cursor_field from stream and derive it from stream_slicer (#22294) * update schema to derive cursor_field from a stream slicer if it exists * remove usage of stream_cursor_field on simple connector use cases * fixing some of the more complex usage of stream_cursor_field that rely on cartesian product stream slicers * fix documentation to replace references to stream_cursor_field * Low Code CDK: Remove `name` and `primary_key` from non-DeclarativeStream components (#21891) * fix eslint issues for webapp (#22462) * 🪟 🔧 Connector Builder frontend fixes for low_code_cdk_to_beta (#22375) * bump connector builder server to latest CDK version * fix breaking CDK changes in connector builder FE * [Low-Code CDK] Separate request path from RequestOption component (#22398) * split apart path from RequestOption and fix usages and cleanup the code * replace usage of path with RequestPath and get rid of default to RequestOption * fix bug where stream_slice_field was used in outbound request instead of request_option field_name * organize yaml schema names and update documentation for RequestOption and RequestPath * clean up tests * regenerate models * [ISSUE #19961] refactor stream slices (#22225) * [ISSUE #19961] add 'incremental' and partially remove CartesianProductStreamSlicer - Google PageSpeed Insights not working yet * [ISSUE #19961] fixing Google PageSpeed Insights * move incremental_sync field to the stream level and perform merging into one stream slicer at that level * add tests to merging incremental and iterable into cartesian * rewrite documentation to separate incremental sync and iterator concepts * update documentation to use partition router and revise the tutorial to reflect the new changes to the components * [ISSUE #19961] update code to newest CDK version and clean autogenerated files (#22670) * [ISSUE #19961] rename stream_slicer to partition_router and update ma… (#22590) * [ISSUE #19961] rename stream_slicer to partition_router and update manifests (for incremental_sync as well) * [ISSUE 19961] rename CustomStreamSlicer (#22598) * [ISSUE 19961] rename CustomStreamSlicer * [ISSUE #19961] code review CustomStreamSlicer * [ISSUE #19961] fix source_square incremental sync * [ISSUE #19961] rename SingleSlice to SinglePartitionRouter (#22591) * [ISSUE #19961] rename SingleSlice to SinglePartitionRouter * remove SinglePartitionRouter from the schema --------- Co-authored-by: brianjlai <brian.lai@airbyte.io> * [ISSUE #19961] rename SubstreamSlicer to SubstreamPartitionRouter (#22596) * [ISSUE #19961] TMP rename SubstreamSlicer to SubstreamPartitionRouter * [ISSUE #19961] revert DatetimeStreamSlicer.stream_state_field_start and DatetimeStreamSlicer.stream_state_field_end * [ISSUE #19961] rename ListStreamSlicer to ListPartitionRouter (#22593) --------- Co-authored-by: brianjlai <brian.lai@airbyte.io> * [ISSUE #19961] clean faulty merge * [ISSUE #19961] rename DatetimeStreamSlicer (#22617) * [ISSUE #19961] rename stream_slicer to partition_router and update manifests (for incremental_sync as well) * [ISSUE 19961] rename CustomStreamSlicer (#22598) * [ISSUE 19961] rename CustomStreamSlicer * [ISSUE #19961] code review CustomStreamSlicer * [ISSUE #19961] fix source_square incremental sync * [ISSUE #19961] rename SingleSlice to SinglePartitionRouter (#22591) * [ISSUE #19961] rename SingleSlice to SinglePartitionRouter * remove SinglePartitionRouter from the schema --------- Co-authored-by: brianjlai <brian.lai@airbyte.io> * [ISSUE #19961] rename DatetimeStreamSlicer * [ISSUE #19961] rename SubstreamSlicer to SubstreamPartitionRouter (#22596) * [ISSUE #19961] TMP rename SubstreamSlicer to SubstreamPartitionRouter * [ISSUE #19961] revert DatetimeStreamSlicer.stream_state_field_start and DatetimeStreamSlicer.stream_state_field_end * [ISSUE #19961] rename ListStreamSlicer to ListPartitionRouter (#22593) --------- Co-authored-by: brianjlai <brian.lai@airbyte.io> * Update docs/connector-development/config-based/understanding-the-yaml-file/partition-router.md Co-authored-by: Maxime Carbonneau-Leclerc <maxi297@users.noreply.github.com> * Update docs/connector-development/config-based/understanding-the-yaml-file/partition-router.md Co-authored-by: Maxime Carbonneau-Leclerc <maxi297@users.noreply.github.com> * Update docs/connector-development/config-based/understanding-the-yaml-file/yaml-overview.md Co-authored-by: Maxime Carbonneau-Leclerc <maxi297@users.noreply.github.com> * Update docs/connector-development/config-based/understanding-the-yaml-file/partition-router.md Co-authored-by: Maxime Carbonneau-Leclerc <maxi297@users.noreply.github.com> * Update docs/connector-development/config-based/understanding-the-yaml-file/partition-router.md Co-authored-by: Maxime Carbonneau-Leclerc <maxi297@users.noreply.github.com> * Update docs/connector-development/config-based/understanding-the-yaml-file/partition-router.md Co-authored-by: Maxime Carbonneau-Leclerc <maxi297@users.noreply.github.com> * Update docs/connector-development/config-based/understanding-the-yaml-file/incremental-syncs.md Co-authored-by: Maxime Carbonneau-Leclerc <maxi297@users.noreply.github.com> * update docs * [ISSUE #19961] clean unit tests files * [ISSUE #19961] code review --------- Co-authored-by: brianjlai <brian.lai@airbyte.io> Co-authored-by: Brian Lai <51336873+brianjlai@users.noreply.github.com> * [Low-Code CDK] Allow for children of custom components to specify parameters that are normally derived (#22379) * Fix a bug where child components of a custom component cannot receive fields from other components * add tests, documentation and commenting * fix test from merge * add better error message for nested initialization failures * 🪟 🔧 Connector Builder frontend fixes for low_code_cdk_to_beta (#22880) * restrict name to stream level * remove checkpoint interval * adjust logic for new request options * refactor slicers * wording * review comments * make oldest supported version explicit * separate the frontend and connector builder changes from the low-code to beta release * [Low-Code CDK] Add script to run low code unit tests and address issues with a few connectors (#23123) * consolidate all the changes into a new PR after I messed up the merge on the side branch * add set to allow this to be called externally if necessary later * remove last few extra fields i found and fix docs links * fix docs one more time --------- Co-authored-by: Maxime Carbonneau-Leclerc <maxi297@users.noreply.github.com> Co-authored-by: Catherine Noll <clnoll@users.noreply.github.com> Co-authored-by: maxi297 <maxime@airbyte.io> Co-authored-by: Lake Mossman <lake@airbyte.io> Co-authored-by: Joe Reuter <joe@airbyte.io>
132 lines
5.6 KiB
Python
132 lines
5.6 KiB
Python
#
|
|
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
|
#
|
|
|
|
from dataclasses import InitVar, dataclass, field
|
|
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
|
|
|
|
from airbyte_cdk.models import SyncMode
|
|
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
|
|
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
|
|
from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader
|
|
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
|
|
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
|
|
from airbyte_cdk.sources.declarative.types import Config, StreamSlice
|
|
from airbyte_cdk.sources.streams.core import Stream
|
|
|
|
|
|
@dataclass
|
|
class DeclarativeStream(Stream):
|
|
"""
|
|
DeclarativeStream is a Stream that delegates most of its logic to its schema_load and retriever
|
|
|
|
Attributes:
|
|
name (str): stream name
|
|
primary_key (Optional[Union[str, List[str], List[List[str]]]]): the primary key of the stream
|
|
schema_loader (SchemaLoader): The schema loader
|
|
retriever (Retriever): The retriever
|
|
config (Config): The user-provided configuration as specified by the source's spec
|
|
stream_cursor_field (Optional[Union[InterpolatedString, str]]): The cursor field
|
|
transformations (List[RecordTransformation]): A list of transformations to be applied to each output record in the
|
|
stream. Transformations are applied in the order in which they are defined.
|
|
"""
|
|
|
|
retriever: Retriever
|
|
config: Config
|
|
parameters: InitVar[Mapping[str, Any]]
|
|
name: str
|
|
primary_key: Optional[Union[str, List[str], List[List[str]]]]
|
|
schema_loader: Optional[SchemaLoader] = None
|
|
_name: str = field(init=False, repr=False, default="")
|
|
_primary_key: str = field(init=False, repr=False, default="")
|
|
_schema_loader: SchemaLoader = field(init=False, repr=False, default=None)
|
|
stream_cursor_field: Optional[Union[InterpolatedString, str]] = None
|
|
transformations: List[RecordTransformation] = None
|
|
|
|
def __post_init__(self, parameters: Mapping[str, Any]):
|
|
self.stream_cursor_field = InterpolatedString.create(self.stream_cursor_field, parameters=parameters)
|
|
self.transformations = self.transformations or []
|
|
self._schema_loader = self.schema_loader if self.schema_loader else DefaultSchemaLoader(config=self.config, parameters=parameters)
|
|
|
|
@property
|
|
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
|
|
return self._primary_key
|
|
|
|
@primary_key.setter
|
|
def primary_key(self, value: str) -> None:
|
|
if not isinstance(value, property):
|
|
self._primary_key = value
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
"""
|
|
:return: Stream name. By default this is the implementing class name, but it can be overridden as needed.
|
|
"""
|
|
return self._name
|
|
|
|
@name.setter
|
|
def name(self, value: str) -> None:
|
|
if not isinstance(value, property):
|
|
self._name = value
|
|
|
|
@property
|
|
def state(self) -> MutableMapping[str, Any]:
|
|
return self.retriever.state
|
|
|
|
@state.setter
|
|
def state(self, value: MutableMapping[str, Any]):
|
|
"""State setter, accept state serialized by state getter."""
|
|
self.retriever.state = value
|
|
|
|
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
|
|
return self.state
|
|
|
|
@property
|
|
def cursor_field(self) -> Union[str, List[str]]:
|
|
"""
|
|
Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
|
|
:return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
|
|
"""
|
|
cursor = self.stream_cursor_field.eval(self.config)
|
|
return cursor if cursor else []
|
|
|
|
def read_records(
|
|
self,
|
|
sync_mode: SyncMode,
|
|
cursor_field: List[str] = None,
|
|
stream_slice: Mapping[str, Any] = None,
|
|
stream_state: Mapping[str, Any] = None,
|
|
) -> Iterable[Mapping[str, Any]]:
|
|
for record in self.retriever.read_records(sync_mode, cursor_field, stream_slice, stream_state):
|
|
yield self._apply_transformations(record, self.config, stream_slice)
|
|
|
|
def _apply_transformations(self, record: Mapping[str, Any], config: Config, stream_slice: StreamSlice):
|
|
output_record = record
|
|
for transformation in self.transformations:
|
|
output_record = transformation.transform(record, config=config, stream_state=self.state, stream_slice=stream_slice)
|
|
|
|
return output_record
|
|
|
|
def get_json_schema(self) -> Mapping[str, Any]:
|
|
"""
|
|
:return: A dict of the JSON schema representing this stream.
|
|
|
|
The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property.
|
|
Override as needed.
|
|
"""
|
|
return self._schema_loader.get_json_schema()
|
|
|
|
def stream_slices(
|
|
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
|
|
) -> Iterable[Optional[Mapping[str, Any]]]:
|
|
"""
|
|
Override to define the slices for this stream. See the stream slicing section of the docs for more information.
|
|
|
|
:param sync_mode:
|
|
:param cursor_field:
|
|
:param stream_state:
|
|
:return:
|
|
"""
|
|
# this is not passing the cursor field because it is known at init time
|
|
return self.retriever.stream_slices(sync_mode=sync_mode, stream_state=stream_state)
|