1
0
mirror of synced 2025-12-25 02:09:19 -05:00

[low-code connectors] Add request options and state to stream slicers (#14552)

* comment

* comment

* comments

* fix

* test for instantiating chain retrier

* fix parsing

* cleanup

* fix

* reset

* never raise on http error

* remove print

* comment

* comment

* comment

* comment

* remove prints

* add declarative stream to registry

* start working on limit paginator

* support for offset pagination

* tests

* move limit value

* extract request option

* boilerplate

* page increment

* delete offset paginator

* update conditional paginator

* refactor and fix test

* fix test

* small fix

* Delete dead code

* Add docstrings

* quick fix

* exponential backoff

* fix test

* fix

* delete unused properties

* fix

* missing unit tests

* uppercase

* docstrings

* rename to success

* compare full request instead of just url

* renmae module

* rename test file

* rename interface

* rename default retrier

* rename to compositeerrorhandler

* fix missing renames

* move action to filter

* str -> minmaxdatetime

* small fixes

* plural

* add example

* handle header variations

* also fix wait time from

* allow using a regex to extract the value

* group()

* docstring

* add docs

* update comment

* docstrings

* fix tests

* rename param

* cleanup stop_condition

* cleanup

* Add examples

* interpolated pagination strategy

* dont need duplicate class

* docstrings

* more docstrings

* docstrings

* fix tests

* first pass at substream

* seems to work for a single stream

* can also be defined in requester with stream_state

* tmp update

* update comment

* Update airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

* version: Update Parquet library to latest release (#14502)

The upstream Parquet library that is currently pinned for use in the S3 destination plugin is over a year old. The current version is generating invalid schemas for date-time with time-zone fields which appears to be addressed in the `1.12.3` release of the library in commit c72862b613

* merge

* 🎉 Source Github: improve schema for stream `pull_request_commits` added "null" (#14613)

Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>

* Docs: Fixed broken links (#14622)

* fixing broken links

* more broken links

* source-hubspot: change mentioning of Mailchimp into HubSpot  doc (#14620)

* Helm Chart: Add external temporal option (#14597)

* conflict env configmap and chart lock

* reverting lock

* add eof lines and documentation on values yaml

* conflict json file

* rollback json

* solve conflict

* correct minio with new version

Co-authored-by: Guy Feldman <gfeldman@86labs.com>

* 🎉 Add YAML format to source-file reader (#14588)

* Add yaml reader

* Update docs

* Bumpversion of connector

* bump docs

* Update pyarrow dependency

* Upgrade pandas dependency

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>

* 🎉 Source Okta: add GroupMembers stream (#14380)

* add Group_Members stream to okta source

- Group_Members return a list of users, the same schema of Users stream.
- Create a shared schema users, and both group_members and users sechema use it as a reference.
- Add Group_Members stream to source connector

* add tests and fix logs schema

- fix the test error: None is not one of enums though the enum type includes both string and null, it comes from json schema validator
ddb87afad8/jsonschema/_validators.py (L279-L285)
- change grouop_members to use id as the cursor field since `filter` is not supported in the query string
- fix the abnormal state test on logs stream, when since is abnormally large, until has to defined, an equal or a larger value
- remove logs stream from full sync test, because 2 full sync always has a gap -- at least a new log about users or groups api.

* last polish before submit the PR

- bump docker version
- update changelog
- add the right abnormal value for logs stream
- correct the sample catalog

* address comments::

- improve comments for until parameter under the logs stream
- add use_cache on groupMembers

* add use_cache to Group_Members

* change configured_catalog to test

* auto-bump connector version

Co-authored-by: marcosmarxm <marcosmarxm@gmail.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>

* split test files

* renames

* missing unit test

* add missing unit tests

* rename

* assert isinstance

* start extracting to their own files

* use final instead of classmethod

* assert we retry 429 errors

* Add log

* replace asserts with valueexceptions

* delete superfluous print statement

* only accept minmaxdatetime

* fix factory so we don't need to union everything with strings

* get class_name from type

* remove from class types registry

* process error handlers one at a time

* sort

* delete print statement

* comment

* comment

* format

* delete unused file

* comment

* interpolatedboolean

* comment

* not optional

* not optional

* unit tests

* fix request body data

* add test

* move file to right module

* update

* reset to master

* format

* rename to pass_by

* rename to page size

* fix

* fix some tests

* reset

* fix

* fix some of the tests

* fix test

* fix more tests

* all tests pass

* path is not optional

* reset

* reset

* reset

* delete print

* remove prints

* delete duplicate method

* add test

* fix body data

* delete extra newlines

* move to subpackage

* fix imports

* handle str body data

* simplify

* Update tests

* filter dates before stream state

* Revert "Update tests"

This reverts commit c0808c8009.

* update

* fix test

* state management

* add test

* delete dead code

* update cursor

* update cursor cartesian

* delete unused state class

* fix

* missing test

* update cursor substreams

* missing test

* fix typing

* fix typing

* delete unused field

* delete unused method

* update datetime stream slice

* cleanup

* assert

* request options

* request option cartesian

* assert when passing by path

* request options for substreams

* always return a map

* pass stream_state

* refactor and almost done fixing tests

* fix tests

* rename to inject_into

* only accept enum

* delete conditional paginator

* only return body data

* missing test

* update docstrings

* update docstrings

* update comment

* rename

* tests

* class_name -> type

* improve interface

* fix some of the tests

* fix more of the tests

* fix tests

* reset

* reset

* Revert "reset"

This reverts commit eb9a918a09.

* remove extra argument

* docstring

* update

* delete unused file

* reset

* reset

* rename

* fix timewindow

* create InterpolatedString

* helper method

* assert on request option

* better asserts

* format

* docstrings

* docstrings

* remove optional from type hint

* Update airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

* inherit from request options provider

* inherit from request options provider

* remove optional from type hint

* remove extra parameter

* none check

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
Co-authored-by: Tobias Macey <tmacey@boundlessnotions.com>
Co-authored-by: Serhii Chvaliuk <grubberr@gmail.com>
Co-authored-by: Amruta Ranade <11484018+Amruta-Ranade@users.noreply.github.com>
Co-authored-by: Bas Beelen <bjgbeelen@gmail.com>
Co-authored-by: Marcos Marx <marcosmarxm@users.noreply.github.com>
Co-authored-by: Guy Feldman <gfeldman@86labs.com>
Co-authored-by: Christophe Duong <christophe.duong@gmail.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
Co-authored-by: Yiyang Li <yiyangli2010@gmail.com>
Co-authored-by: marcosmarxm <marcosmarxm@gmail.com>
This commit is contained in:
Alexandre Girard
2022-07-27 15:30:49 -07:00
committed by GitHub
parent fd09c325de
commit 44ec661b5a
23 changed files with 1112 additions and 406 deletions

View File

@@ -27,6 +27,7 @@ from airbyte_cdk.sources.declarative.schema.json_schema import JsonSchema
from airbyte_cdk.sources.declarative.stream_slicers.cartesian_product_stream_slicer import CartesianProductStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.substream_slicer import SubstreamSlicer
from airbyte_cdk.sources.declarative.transformations import RemoveFields
from airbyte_cdk.sources.declarative.transformations.add_fields import AddFields
@@ -59,4 +60,5 @@ CLASS_TYPES_REGISTRY: Mapping[str, Type] = {
"RecordSelector": RecordSelector,
"RemoveFields": RemoveFields,
"SimpleRetriever": SimpleRetriever,
"SubstreamSlicer": SubstreamSlicer,
}

View File

@@ -30,10 +30,9 @@ from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.schema.json_schema import JsonSchema
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.declarative.states.dict_state import DictState
from airbyte_cdk.sources.declarative.states.state import State
from airbyte_cdk.sources.declarative.stream_slicers.single_slice import SingleSlice
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.substream_slicer import ParentStreamConfig
from airbyte_cdk.sources.streams.core import Stream
"""
@@ -55,8 +54,8 @@ DEFAULT_IMPLEMENTATIONS_REGISTRY: Mapping[Type, Type] = {
RequestOptionsProvider: InterpolatedRequestOptionsProvider,
Requester: HttpRequester,
Retriever: SimpleRetriever,
ParentStreamConfig: ParentStreamConfig,
SchemaLoader: JsonSchema,
State: DictState,
Stream: DeclarativeStream,
StreamSlicer: SingleSlice,
}

View File

@@ -127,6 +127,10 @@ class LimitPaginator(Paginator):
def request_body_json(self) -> Mapping[str, Any]:
return self._get_request_options(RequestOptionType.body_json)
def request_kwargs(self) -> Mapping[str, Any]:
# Never update kwargs
return {}
def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, Any]:
options = {}
if self._page_token_option.inject_into == option_type:

View File

@@ -28,5 +28,9 @@ class NoPagination(Paginator):
def request_body_json(self) -> Mapping[str, Any]:
return {}
def request_kwargs(self) -> Mapping[str, Any]:
# Never update kwargs
return {}
def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Mapping[str, Any]:
return {}

View File

@@ -2,13 +2,14 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from abc import ABC, abstractmethod
from abc import abstractmethod
from typing import Any, List, Mapping, Optional
import requests
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider
class Paginator(ABC):
class Paginator(RequestOptionsProvider):
"""
Defines the token to use to fetch the next page of records from the API.

View File

@@ -20,14 +20,13 @@ class RequestOptionsProvider(ABC):
"""
@abstractmethod
def request_params(
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None
) -> MutableMapping[str, Any]:
def request_params(self, **kwargs) -> MutableMapping[str, Any]:
"""
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
"""
pass
@abstractmethod
def request_headers(

View File

@@ -13,8 +13,6 @@ from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
from airbyte_cdk.sources.declarative.requesters.requester import Requester
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.states.dict_state import DictState
from airbyte_cdk.sources.declarative.states.state import State
from airbyte_cdk.sources.declarative.stream_slicers.single_slice import SingleSlice
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
@@ -42,7 +40,6 @@ class SimpleRetriever(Retriever, HttpStream):
record_selector: HttpSelector,
paginator: Optional[Paginator] = None,
stream_slicer: Optional[StreamSlicer] = SingleSlice(),
state: Optional[State] = None,
):
"""
:param name: The stream's name
@@ -59,8 +56,7 @@ class SimpleRetriever(Retriever, HttpStream):
self._requester = requester
self._record_selector = record_selector
super().__init__(self._requester.get_authenticator())
self._iterator = stream_slicer
self._state: State = (state or DictState()).deep_copy()
self._stream_slicer = stream_slicer
self._last_response = None
self._last_records = None
@@ -300,12 +296,14 @@ class SimpleRetriever(Retriever, HttpStream):
stream_state: Optional[StreamState] = None,
) -> Iterable[Mapping[str, Any]]:
# Warning: use self.state instead of the stream_state passed as argument!
stream_slice = stream_slice or {} # None-check
records_generator = HttpStream.read_records(self, sync_mode, cursor_field, stream_slice, self.state)
for r in records_generator:
self._state.update_state(stream_slice=stream_slice, stream_state=self.state, last_response=self._last_response, last_record=r)
self._stream_slicer.update_cursor(stream_slice, last_record=r)
yield r
else:
self._state.update_state(stream_slice=stream_slice, stream_state=self.state, last_reponse=self._last_response)
last_record = self._last_records[-1] if self._last_records else None
self._stream_slicer.update_cursor(stream_slice, last_record=last_record)
yield from []
def stream_slices(
@@ -320,13 +318,13 @@ class SimpleRetriever(Retriever, HttpStream):
:return:
"""
# Warning: use self.state instead of the stream_state passed as argument!
return self._iterator.stream_slices(sync_mode, self.state)
return self._stream_slicer.stream_slices(sync_mode, self.state)
@property
def state(self) -> StreamState:
return self._state.get_stream_state()
def state(self) -> MutableMapping[str, Any]:
return self._stream_slicer.get_stream_state()
@state.setter
def state(self, value: StreamState):
"""State setter, accept state serialized by state getter."""
self._state.set_state(value)
self._stream_slicer.update_cursor(value)

View File

@@ -1,3 +0,0 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

View File

@@ -1,67 +0,0 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from enum import Enum
from typing import Mapping
from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation
from airbyte_cdk.sources.declarative.states.state import State
def _get_max(*, name, val, other_state):
other_val = other_state.get(name)
if other_val:
return max(val, other_val)
else:
return val
class StateType(Enum):
STR = str
INT = int
class DictState(State):
stream_state_field = "stream_state"
def __init__(self, initial_mapping: Mapping[str, str] = None, config=None):
if initial_mapping is None:
initial_mapping = dict()
if config is None:
config = dict()
self._templates_to_evaluate = initial_mapping
self._interpolator = JinjaInterpolation()
self._context = dict()
self._config = config
def set_state(self, state):
self._context[self.stream_state_field] = state
def update_state(self, **kwargs):
stream_state = kwargs.get(self.stream_state_field)
prev_stream_state = self.get_stream_state() or stream_state
self._context.update(**kwargs)
self._context[self.stream_state_field] = self._compute_state(prev_stream_state)
def get_state(self, state_field):
return self._context.get(state_field, {})
def get_stream_state(self):
return self.get_state(self.stream_state_field)
def _compute_state(self, prev_state):
updated_state = {
self._interpolator.eval(name, self._config): self._interpolator.eval(value, self._config, **self._context)
for name, value in self._templates_to_evaluate.items()
}
updated_state = {name: value for name, value in updated_state.items() if value}
if prev_state:
next_state = {name: _get_max(name=name, val=value, other_state=prev_state) for name, value in updated_state.items()}
else:
next_state = updated_state
self._context[self.stream_state_field] = next_state
return next_state

View File

@@ -1,19 +0,0 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
import copy
from abc import ABC, abstractmethod
class State(ABC):
@abstractmethod
def update_state(self, **kwargs):
pass
@abstractmethod
def get_stream_state(self):
pass
def deep_copy(self):
return copy.deepcopy(self)

View File

@@ -4,7 +4,7 @@
import itertools
from collections import ChainMap
from typing import Any, Iterable, List, Mapping
from typing import Any, Iterable, List, Mapping, Optional
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
@@ -28,8 +28,34 @@ class CartesianProductStreamSlicer(StreamSlicer):
"""
def __init__(self, stream_slicers: List[StreamSlicer]):
"""
:param stream_slicers: Underlying stream slicers. The RequestOptions (e.g: Request headers, parameters, etc..) returned by this slicer are the combination of the RequestOptions of its input slicers. If there are conflicts e.g: two slicers define the same header or request param, the conflict is resolved by taking the value from the first slicer, where ordering is determined by the order in which slicers were input to this composite slicer.
"""
self._stream_slicers = stream_slicers
def update_cursor(self, stream_slice: Mapping[str, Any], last_record: Optional[Mapping[str, Any]] = None):
for slicer in self._stream_slicers:
slicer.update_cursor(stream_slice, last_record)
def request_params(self) -> Mapping[str, Any]:
return dict(ChainMap(*[s.request_params() for s in self._stream_slicers]))
def request_headers(self) -> Mapping[str, Any]:
return dict(ChainMap(*[s.request_headers() for s in self._stream_slicers]))
def request_body_data(self) -> Mapping[str, Any]:
return dict(ChainMap(*[s.request_body_data() for s in self._stream_slicers]))
def request_body_json(self) -> Optional[Mapping]:
return dict(ChainMap(*[s.request_body_json() for s in self._stream_slicers]))
def request_kwargs(self) -> Mapping[str, Any]:
# Never update kwargs
return {}
def get_stream_state(self) -> Mapping[str, Any]:
return dict(ChainMap(*[slicer.get_stream_state() for slicer in self._stream_slicers]))
def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]:
sub_slices = (s.stream_slices(sync_mode, stream_state) for s in self._stream_slicers)
return (ChainMap(*a) for a in itertools.product(*sub_slices))

View File

@@ -6,45 +6,77 @@ import datetime
import re
from typing import Any, Iterable, Mapping, Optional
import dateutil
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.declarative.types import Config
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState
class DatetimeStreamSlicer(StreamSlicer):
timedelta_regex = re.compile(
r"((?P<weeks>[\.\d]+?)w)?"
r"((?P<days>[\.\d]+?)d)?"
r"((?P<hours>[\.\d]+?)h)?"
r"((?P<minutes>[\.\d]+?)m)?"
r"((?P<seconds>[\.\d]+?)s)?"
r"((?P<microseconds>[\.\d]+?)ms)?"
r"((?P<milliseconds>[\.\d]+?)us)?$"
)
"""
Slices the stream over a datetime range.
Given a start time, end time, a step function, and an optional lookback window,
the stream slicer will partition the date range from start time - lookback window to end time.
The step function is defined as a string of the form:
`"<number><unit>"`
where unit can be one of
- weeks, w
- days, d
For example, "1d" will produce windows of 1 day, and 2weeks windows of 2 weeks.
"""
timedelta_regex = re.compile(r"((?P<weeks>[\.\d]+?)w)?" r"((?P<days>[\.\d]+?)d)?$")
# FIXME: start_time, end_time, and step should be datetime and timedelta?
# FIXME: timezone should be declarative?
def __init__(
self,
start_datetime: MinMaxDatetime,
end_datetime: MinMaxDatetime,
step: str,
cursor_value: InterpolatedString,
cursor_field: InterpolatedString,
datetime_format: str,
config: Config,
start_time_option: Optional[RequestOption] = None,
end_time_option: Optional[RequestOption] = None,
stream_state_field_start: Optional[str] = None,
stream_state_field_end: Optional[str] = None,
lookback_window: Optional[InterpolatedString] = None,
):
"""
:param start_datetime:
:param end_datetime:
:param step: size of the timewindow
:param cursor_field: record's cursor field
:param datetime_format: format of the datetime
:param config: connection config
:param start_time_option: request option for start time
:param end_time_option: request option for end time
:param stream_state_field_start: stream slice start time field
:param stream_state_field_end: stream slice end time field
:param lookback_window: how many days before start_datetime to read data for
"""
self._timezone = datetime.timezone.utc
self._interpolation = JinjaInterpolation()
self._datetime_format = datetime_format
self._start_datetime = start_datetime
self._end_datetime = end_datetime
self._step = self._parse_timedelta(step)
self._config = config
self._cursor_value = cursor_value
self._cursor_field = InterpolatedString.create(cursor_field)
self._start_time_option = start_time_option
self._end_time_option = end_time_option
self._stream_slice_field_start = InterpolatedString.create(stream_state_field_start or "start_date")
self._stream_slice_field_end = InterpolatedString.create(stream_state_field_end or "end_date")
self._cursor = None # tracks current datetime
self._cursor_end = None # tracks end of current stream slice
self._lookback_window = lookback_window
# If datetime format is not specified then start/end datetime should inherit it from the stream slicer
@@ -53,29 +85,85 @@ class DatetimeStreamSlicer(StreamSlicer):
if not self._end_datetime.datetime_format:
self._end_datetime.datetime_format = self._datetime_format
if self._start_time_option and self._start_time_option.inject_into == RequestOptionType.path:
raise ValueError("Start time cannot be passed by path")
if self._end_time_option and self._end_time_option.inject_into == RequestOptionType.path:
raise ValueError("End time cannot be passed by path")
def get_stream_state(self) -> StreamState:
return {self._cursor_field.eval(self._config): self._cursor} if self._cursor else {}
def update_cursor(self, stream_slice: StreamSlice, last_record: Optional[Record] = None):
"""
Update the cursor value to the max datetime between the last record, the start of the stream_slice, and the current cursor value.
Update the cursor_end value with the stream_slice's end time.
:param stream_slice: current stream slice
:param last_record: last record read
:return: None
"""
stream_slice_value = stream_slice.get(self._cursor_field.eval(self._config))
stream_slice_value_end = stream_slice.get(self._stream_slice_field_end.eval(self._config))
last_record_value = last_record.get(self._cursor_field.eval(self._config)) if last_record else None
cursor = None
if stream_slice_value and last_record_value:
cursor = max(stream_slice_value, last_record_value)
elif stream_slice_value:
cursor = stream_slice_value
else:
cursor = last_record_value
if self._cursor and cursor:
self._cursor = max(cursor, self._cursor)
elif cursor:
self._cursor = cursor
if self._stream_slice_field_end:
self._cursor_end = stream_slice_value_end
def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]:
# Evaluate and compare start_date, end_date, and cursor_value based on configs and runtime state
"""
Partition the daterange into slices of size = step.
The start of the window is the minimum datetime between start_datetime - looback_window and the stream_state's datetime
The end of the window is the minimum datetime between the start of the window and end_datetime.
:param sync_mode:
:param stream_state: current stream state. If set, the start_date will be the day following the stream_state.
:return:
"""
stream_state = stream_state or {}
kwargs = {"stream_state": stream_state}
end_datetime = min(self._end_datetime.get_datetime(self._config, **kwargs), datetime.datetime.now(tz=datetime.timezone.utc))
lookback_delta = self._parse_timedelta(self._lookback_window.eval(self._config, **kwargs) if self._lookback_window else "0d")
start_datetime = self._start_datetime.get_datetime(self._config, **kwargs) - lookback_delta
start_datetime = min(start_datetime, end_datetime)
if self._cursor_value and self._cursor_value.eval(self._config, **kwargs):
cursor_datetime = self.parse_date(self._cursor_value.eval(self._config, **kwargs))
if self._cursor_field.eval(self._config, stream_state=stream_state) in stream_state:
cursor_datetime = self.parse_date(stream_state[self._cursor_field.eval(self._config)])
else:
cursor_datetime = start_datetime
start_datetime = max(cursor_datetime, start_datetime)
if not self._is_start_date_valid(start_datetime, end_datetime):
end_datetime = start_datetime
return self._partition_daterange(start_datetime, end_datetime, self._step)
start_datetime = max(cursor_datetime, start_datetime)
state_date = self.parse_date(stream_state.get(self._cursor_field.eval(self._config, stream_state=stream_state)))
if state_date:
# If the input_state's date is greater than start_datetime, the start of the time window is the state's next day
next_date = state_date + datetime.timedelta(days=1)
start_datetime = max(start_datetime, next_date)
dates = self._partition_daterange(start_datetime, end_datetime, self._step)
return dates
def _format_datetime(self, dt: datetime.datetime):
if self._datetime_format == "timestamp":
return dt.timestamp()
else:
return dt.strftime(self._datetime_format)
def _partition_daterange(self, start, end, step: datetime.timedelta):
start_field = self._stream_slice_field_start.eval(self._config)
end_field = self._stream_slice_field_end.eval(self._config)
dates = []
while start <= end:
end_date = self._get_date(start + step - datetime.timedelta(days=1), end, min)
dates.append({"start_date": start.strftime(self._datetime_format), "end_date": end_date.strftime(self._datetime_format)})
dates.append({start_field: self._format_datetime(start), end_field: self._format_datetime(end_date)})
start += step
return dates
@@ -88,7 +176,7 @@ class DatetimeStreamSlicer(StreamSlicer):
if self.is_int(date):
return datetime.datetime.fromtimestamp(int(date)).replace(tzinfo=self._timezone)
else:
return datetime.datetime.strptime(date, self._datetime_format).replace(tzinfo=self._timezone)
return dateutil.parser.parse(date).replace(tzinfo=self._timezone)
elif isinstance(date, int):
return datetime.datetime.fromtimestamp(int(date)).replace(tzinfo=self._timezone)
return date
@@ -115,6 +203,27 @@ class DatetimeStreamSlicer(StreamSlicer):
time_params = {name: float(param) for name, param in parts.groupdict().items() if param}
return datetime.timedelta(**time_params)
@staticmethod
def _is_start_date_valid(start_date: datetime, end_date: datetime) -> bool:
return start_date <= end_date
def request_params(self) -> Mapping[str, Any]:
return self._get_request_options(RequestOptionType.request_parameter)
def request_headers(self) -> Mapping[str, Any]:
return self._get_request_options(RequestOptionType.header)
def request_body_data(self) -> Mapping[str, Any]:
return self._get_request_options(RequestOptionType.body_data)
def request_body_json(self) -> Mapping[str, Any]:
return self._get_request_options(RequestOptionType.body_json)
def request_kwargs(self) -> Mapping[str, Any]:
# Never update kwargs
return {}
def _get_request_options(self, option_type):
options = {}
if self._start_time_option and self._start_time_option.inject_into == option_type:
if self._cursor:
options[self._start_time_option.field_name] = self._cursor
if self._end_time_option and self._end_time_option.inject_into == option_type:
options[self._end_time_option.field_name] = self._cursor_end
return options

View File

@@ -3,12 +3,13 @@
#
import ast
from typing import Any, Iterable, List, Mapping, Union
from typing import Any, Iterable, List, Mapping, Optional, Union
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.declarative.types import Config
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState
class ListStreamSlicer(StreamSlicer):
@@ -17,13 +18,61 @@ class ListStreamSlicer(StreamSlicer):
If slice_values is a string, then evaluate it as literal and assert the resulting literal is a list
"""
def __init__(self, slice_values: Union[str, List[str]], slice_definition: Mapping[str, Any], config: Config):
def __init__(
self,
slice_values: Union[str, List[str]],
cursor_field: Union[InterpolatedString, str],
config: Config,
request_option: Optional[RequestOption] = None,
):
"""
:param slice_values: The values to iterate over
:param cursor_field: The name of the cursor field
:param config: The user-provided configuration as specified by the source's spec
:param request_option: The request option to configure the HTTP request
"""
if isinstance(slice_values, str):
slice_values = ast.literal_eval(slice_values)
assert isinstance(slice_values, list)
self._interpolation = InterpolatedMapping(slice_definition)
if isinstance(cursor_field, str):
cursor_field = InterpolatedString(cursor_field)
self._cursor_field = cursor_field
self._slice_values = slice_values
self._config = config
self._cursor = None
self._request_option = request_option
if request_option and request_option.inject_into == RequestOptionType.path:
raise ValueError("Slice value cannot be injected in the path")
def update_cursor(self, stream_slice: StreamSlice, last_record: Optional[Record] = None):
slice_value = stream_slice.get(self._cursor_field.eval(self._config))
if slice_value and slice_value in self._slice_values:
self._cursor = slice_value
def get_stream_state(self) -> StreamState:
return {self._cursor_field.eval(self._config): self._cursor} if self._cursor else {}
def request_params(self) -> Mapping[str, Any]:
return self._get_request_option(RequestOptionType.request_parameter)
def request_headers(self) -> Mapping[str, Any]:
return self._get_request_option(RequestOptionType.header)
def request_body_data(self) -> Mapping[str, Any]:
return self._get_request_option(RequestOptionType.body_data)
def request_body_json(self) -> Mapping[str, Any]:
return self._get_request_option(RequestOptionType.body_json)
def request_kwargs(self) -> Mapping[str, Any]:
# Never update kwargs
return {}
def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]:
return [self._interpolation.eval(self._config, slice_value=slice_value) for slice_value in self._slice_values]
return [{self._cursor_field.eval(self._config): slice_value} for slice_value in self._slice_values]
def _get_request_option(self, request_option_type: RequestOptionType):
if self._request_option and self._request_option.inject_into == request_option_type:
return {self._request_option.field_name: self._cursor}
else:
return {}

View File

@@ -2,15 +2,37 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from typing import Any, Iterable, Mapping
from typing import Any, Iterable, Mapping, Optional
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
class SingleSlice(StreamSlicer):
def __init__(self, **kwargs):
"""Stream slicer returning only a single stream slice"""
def update_cursor(self, stream_slice: StreamSlice, last_record: Optional[Record] = None):
pass
def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]:
def get_stream_state(self) -> StreamState:
return {}
def request_params(self) -> Mapping[str, Any]:
return {}
def request_headers(self) -> Mapping[str, Any]:
return {}
def request_body_data(self) -> Mapping[str, Any]:
return {}
def request_body_json(self) -> Mapping[str, Any]:
return {}
def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[StreamSlice]:
return [dict()]
def request_kwargs(self) -> Mapping[str, Any]:
# Never update kwargs
return {}

View File

@@ -2,13 +2,43 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from abc import ABC, abstractmethod
from typing import Any, Iterable, Mapping, Optional
from abc import abstractmethod
from typing import Iterable, Optional
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
class StreamSlicer(ABC):
class StreamSlicer(RequestOptionsProvider):
"""
Slices the stream into a subset of records.
Slices enable state checkpointing and data retrieval parallelization.
The stream slicer keeps track of the cursor state as a dict of cursor_field -> cursor_value
See the stream slicing section of the docs for more information.
"""
@abstractmethod
def stream_slices(self, sync_mode: SyncMode, stream_state: Optional[Mapping[str, Any]]) -> Iterable[Mapping[str, Any]]:
pass
def stream_slices(self, sync_mode: SyncMode, stream_state: StreamState) -> Iterable[StreamSlice]:
"""
Defines stream slices
:param sync_mode: The sync mode used the read data
:param stream_state: The current stream state
:return: List of stream slices
"""
@abstractmethod
def update_cursor(self, stream_slice: StreamSlice, last_record: Optional[Record] = None):
"""
State setter, accept state serialized by state getter.
:param stream_slice: Current stream_slice
:param last_record: Last record read from the source
"""
@abstractmethod
def get_stream_state(self) -> StreamState:
"""Returns the current stream state"""

View File

@@ -2,31 +2,94 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from typing import Any, Iterable, List, Mapping
from dataclasses import dataclass
from typing import Any, Iterable, List, Mapping, Optional
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
from airbyte_cdk.sources.declarative.states.dict_state import DictState
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
from airbyte_cdk.sources.streams.core import Stream
@dataclass
class ParentStreamConfig:
"""
Describes how to create a stream slice from a parent stream
stream: The stream to read records from
parent_key: The key of the parent stream's records that will be the stream slice key
stream_slice_field: The stream slice key
request_option: How to inject the slice value on an outgoing HTTP request
"""
stream: Stream
parent_key: str
stream_slice_field: str
request_option: Optional[RequestOption] = None
class SubstreamSlicer(StreamSlicer):
"""
Stream slicer that iterates over the parent's stream slices and records and emits slices by interpolating the slice_definition mapping
Will populate the state with `parent_stream_slice` and `parent_record` so they can be accessed by other components
"""
def __init__(self, parent_streams: List[Stream], state: DictState, slice_definition: Mapping[str, Any]):
self._parent_streams = parent_streams
self._state = state
self._interpolation = InterpolatedMapping(slice_definition)
def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]:
def __init__(
self,
parent_streams_configs: List[ParentStreamConfig],
):
"""
Iterate over each parent stream.
:param parent_streams_configs: parent streams to iterate over and their config
"""
if not parent_streams_configs:
raise ValueError("SubstreamSlicer needs at least 1 parent stream")
self._parent_stream_configs = parent_streams_configs
self._cursor = None
def update_cursor(self, stream_slice: StreamSlice, last_record: Optional[Record] = None):
cursor = {}
for parent_stream_config in self._parent_stream_configs:
slice_value = stream_slice.get(parent_stream_config.stream_slice_field)
if slice_value:
cursor.update({parent_stream_config.stream_slice_field: slice_value})
self._cursor = cursor
def request_params(self) -> Mapping[str, Any]:
return self._get_request_option(RequestOptionType.request_parameter)
def request_headers(self) -> Mapping[str, Any]:
return self._get_request_option(RequestOptionType.header)
def request_body_data(self) -> Mapping[str, Any]:
return self._get_request_option(RequestOptionType.body_data)
def request_body_json(self) -> Optional[Mapping]:
return self._get_request_option(RequestOptionType.body_json)
def request_kwargs(self) -> Mapping[str, Any]:
# Never update kwargs
return {}
def _get_request_option(self, option_type: RequestOptionType):
params = {}
for parent_config in self._parent_stream_configs:
if parent_config.request_option and parent_config.request_option.inject_into == option_type:
key = parent_config.stream_slice_field
value = self._cursor.get(key)
if value:
params.update({key: value})
return params
def get_stream_state(self) -> StreamState:
return self._cursor if self._cursor else {}
def stream_slices(self, sync_mode: SyncMode, stream_state: StreamState) -> Iterable[StreamSlice]:
"""
Iterate over each parent stream's record and create a StreamSlice for each record.
For each stream, iterate over its stream_slices.
For each stream slice, iterate over each records.
For each stream slice, iterate over each record.
yield a stream slice for each such records.
If a parent slice contains no record, emit a slice with parent_record=None.
@@ -36,29 +99,24 @@ class SubstreamSlicer(StreamSlicer):
- parent_record: mapping representing the parent record
- parent_stream_name: string representing the parent stream name
"""
if not self._parent_streams:
if not self._parent_stream_configs:
yield from []
else:
for parent_stream in self._parent_streams:
for parent_stream_config in self._parent_stream_configs:
parent_stream = parent_stream_config.stream
parent_field = parent_stream_config.parent_key
stream_state_field = parent_stream_config.stream_slice_field
for parent_stream_slice in parent_stream.stream_slices(sync_mode=sync_mode, cursor_field=None, stream_state=stream_state):
self._state.update_state(parent_stream_slice=parent_stream_slice)
self._state.update_state(parent_record=None)
empty_parent_slice = True
parent_slice = parent_stream_slice.get("slice")
for parent_record in parent_stream.read_records(
sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=parent_stream_slice, stream_state=None
):
empty_parent_slice = False
slice_definition = self._get_slice_definition(parent_stream_slice, parent_record, parent_stream.name)
self._state.update_state(parent_record=parent_record)
yield slice_definition
stream_state_value = parent_record.get(parent_field)
yield {stream_state_field: stream_state_value, "parent_slice": parent_slice}
# If the parent slice contains no records,
# yield a slice definition with parent_record==None
if empty_parent_slice:
slice_definition = self._get_slice_definition(parent_stream_slice, None, parent_stream.name)
yield slice_definition
def _get_slice_definition(self, parent_stream_slice, parent_record, parent_stream_name):
return self._interpolation.eval(
None, parent_stream_slice=parent_stream_slice, parent_record=parent_record, parent_stream_name=parent_stream_name
)
stream_state_value = parent_stream_slice.get(parent_field)
yield {stream_state_field: stream_state_value, "parent_slice": parent_slice}

View File

@@ -1,85 +0,0 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from airbyte_cdk.sources.declarative.states.dict_state import DictState
config = {"name": "date"}
name = "{{ config['name'] }}"
value = "{{ last_record['updated_at'] }}"
dict_mapping = {
name: value,
}
def test_empty_state_is_none():
state = DictState(dict_mapping, config)
initial_state = state.get_stream_state()
expected_state = {}
assert expected_state == initial_state
def test_update_initial_state():
state = DictState(dict_mapping, config)
stream_slice = None
stream_state = None
last_response = {"data": {"id": "1234", "updated_at": "2021-01-01"}, "last_refresh": "2020-01-01"}
last_record = {"id": "1234", "updated_at": "2021-01-01"}
state.update_state(stream_slice=stream_slice, stream_state=stream_state, last_response=last_response, last_record=last_record)
actual_state = state.get_stream_state()
expected_state = {"date": "2021-01-01"}
assert expected_state == actual_state
def test_update_state_with_recent_cursor():
state = DictState(dict_mapping, config)
stream_slice = None
stream_state = {"date": "2020-12-31"}
last_response = {"data": {"id": "1234", "updated_at": "2021-01-01"}, "last_refresh": "2020-01-01"}
last_record = {"id": "1234", "updated_at": "2021-01-01"}
state.update_state(stream_slice=stream_slice, stream_state=stream_state, last_response=last_response, last_record=last_record)
actual_state = state.get_stream_state()
expected_state = {"date": "2021-01-01"}
assert expected_state == actual_state
def test_update_state_with_old_cursor():
state = DictState(dict_mapping, config)
stream_slice = None
stream_state = {"date": "2021-01-02"}
last_response = {"data": {"id": "1234", "updated_at": "2021-01-01"}, "last_refresh": "2020-01-01"}
last_record = {"id": "1234", "updated_at": "2021-01-01"}
state.update_state(stream_slice=stream_slice, stream_state=stream_state, last_response=last_response, last_record=last_record)
actual_state = state.get_stream_state()
expected_state = {"date": "2021-01-02"}
assert expected_state == actual_state
def test_update_state_with_older_state():
state = DictState(dict_mapping, config)
stream_slice = None
stream_state = {"date": "2021-01-02"}
last_response = {"data": {"id": "1234", "updated_at": "2021-01-02"}, "last_refresh": "2020-01-01"}
last_record = {"id": "1234", "updated_at": "2021-01-02"}
state.update_state(stream_slice=stream_slice, stream_state=stream_state, last_response=last_response, last_record=last_record)
actual_state = state.get_stream_state()
expected_state = {"date": "2021-01-02"}
out_of_order_response = {"data": {"id": "1234", "updated_at": "2021-01-02"}, "last_refresh": "2020-01-01"}
out_of_order_record = {"id": "1234", "updated_at": "2021-01-01"}
state.update_state(
stream_slice=stream_slice, stream_state=stream_state, last_response=out_of_order_response, last_record=out_of_order_record
)
assert expected_state == actual_state
def test_state_is_a_timestamp():
state = DictState(dict_mapping, config)
stream_slice = None
stream_state = {"date": 12345}
last_response = {"data": {"id": "1234", "updated_at": 123456}, "last_refresh": "2020-01-01"}
last_record = {"id": "1234", "updated_at": 123456}
state.update_state(stream_slice=stream_slice, stream_state=stream_state, last_response=last_response, last_record=last_record)
actual_state = state.get_stream_state()
expected_state = {"date": 123456}
assert expected_state == actual_state

View File

@@ -17,9 +17,10 @@ from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRe
primary_key = "pk"
records = [{"id": 1}, {"id": 2}]
config = {}
def test_simple_retriever():
def test_simple_retriever_full():
requester = MagicMock()
request_params = {"param": "value"}
requester.request_params.return_value = request_params
@@ -38,9 +39,8 @@ def test_simple_retriever():
response = requests.Response()
state = MagicMock()
underlying_state = {"date": "2021-01-01"}
state.get_stream_state.return_value = underlying_state
iterator.get_stream_state.return_value = underlying_state
url_base = "https://airbyte.io"
requester.get_url_base.return_value = url_base
@@ -69,12 +69,8 @@ def test_simple_retriever():
paginator=paginator,
record_selector=record_selector,
stream_slicer=iterator,
state=state,
)
# hack because we clone the state...
retriever._state = state
assert retriever.primary_key == primary_key
assert retriever.url_base == url_base
assert retriever.path() == path

View File

@@ -6,6 +6,7 @@ import pytest as pytest
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType
from airbyte_cdk.sources.declarative.stream_slicers.cartesian_product_stream_slicer import CartesianProductStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer
@@ -16,14 +17,14 @@ from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import Li
[
(
"test_single_stream_slicer",
[ListStreamSlicer(["customer", "store", "subscription"], {"owner_resource": "{{ slice_value }}"}, None)],
[ListStreamSlicer(["customer", "store", "subscription"], "owner_resource", None)],
[{"owner_resource": "customer"}, {"owner_resource": "store"}, {"owner_resource": "subscription"}],
),
(
"test_two_stream_slicers",
[
ListStreamSlicer(["customer", "store", "subscription"], {"owner_resource": "{{ slice_value }}"}, None),
ListStreamSlicer(["A", "B"], {"letter": "{{ slice_value }}"}, None),
ListStreamSlicer(["customer", "store", "subscription"], "owner_resource", None),
ListStreamSlicer(["A", "B"], "letter", None),
],
[
{"owner_resource": "customer", "letter": "A"},
@@ -37,7 +38,7 @@ from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import Li
(
"test_list_and_datetime",
[
ListStreamSlicer(["customer", "store", "subscription"], {"owner_resource": "{{ slice_value }}"}, None),
ListStreamSlicer(["customer", "store", "subscription"], "owner_resource", None),
DatetimeStreamSlicer(
MinMaxDatetime(datetime="2021-01-01", datetime_format="%Y-%m-%d"),
MinMaxDatetime(datetime="2021-01-03", datetime_format="%Y-%m-%d"),
@@ -65,3 +66,97 @@ def test_substream_slicer(test_name, stream_slicers, expected_slices):
slicer = CartesianProductStreamSlicer(stream_slicers)
slices = [s for s in slicer.stream_slices(SyncMode.incremental, stream_state=None)]
assert slices == expected_slices
@pytest.mark.parametrize(
"test_name, stream_slice, expected_state",
[
("test_update_cursor_no_state_no_record", {}, {}),
("test_update_cursor_partial_state", {"owner_resource": "customer"}, {"owner_resource": "customer"}),
(
"test_update_cursor_full_state",
{"owner_resource": "customer", "date": "2021-01-01"},
{"owner_resource": "customer", "date": "2021-01-01"},
),
],
)
def test_update_cursor(test_name, stream_slice, expected_state):
stream_slicers = [
ListStreamSlicer(["customer", "store", "subscription"], "owner_resource", None),
DatetimeStreamSlicer(
MinMaxDatetime(datetime="2021-01-01", datetime_format="%Y-%m-%d"),
MinMaxDatetime(datetime="2021-01-03", datetime_format="%Y-%m-%d"),
"1d",
InterpolatedString("date"),
"%Y-%m-%d",
None,
),
]
slicer = CartesianProductStreamSlicer(stream_slicers)
slicer.update_cursor(stream_slice, None)
updated_state = slicer.get_stream_state()
assert expected_state == updated_state
@pytest.mark.parametrize(
"test_name, stream_1_request_option, stream_2_request_option, expected_req_params, expected_headers,expected_body_json, expected_body_data",
[
(
"test_param_header",
RequestOption(RequestOptionType.request_parameter, "owner"),
RequestOption(RequestOptionType.header, "repo"),
{"owner": "customer"},
{"repo": "airbyte"},
{},
{},
),
(
"test_header_header",
RequestOption(RequestOptionType.header, "owner"),
RequestOption(RequestOptionType.header, "repo"),
{},
{"owner": "customer", "repo": "airbyte"},
{},
{},
),
(
"test_body_data",
RequestOption(RequestOptionType.body_data, "owner"),
RequestOption(RequestOptionType.body_data, "repo"),
{},
{},
{},
{"owner": "customer", "repo": "airbyte"},
),
(
"test_body_json",
RequestOption(RequestOptionType.body_json, "owner"),
RequestOption(RequestOptionType.body_json, "repo"),
{},
{},
{"owner": "customer", "repo": "airbyte"},
{},
),
],
)
def test_request_option(
test_name,
stream_1_request_option,
stream_2_request_option,
expected_req_params,
expected_headers,
expected_body_json,
expected_body_data,
):
slicer = CartesianProductStreamSlicer(
[
ListStreamSlicer(["customer", "store", "subscription"], "owner_resource", None, stream_1_request_option),
ListStreamSlicer(["airbyte", "airbyte-cloud"], "repository", None, stream_2_request_option),
]
)
slicer.update_cursor({"owner_resource": "customer", "repository": "airbyte"}, None)
assert expected_req_params == slicer.request_params()
assert expected_headers == slicer.request_headers()
assert expected_body_json == slicer.request_body_json()
assert expected_body_data == slicer.request_body_data()

View File

@@ -9,16 +9,17 @@ import pytest
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType
from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer
datetime_format = "%Y-%m-%d"
datetime_format = "%Y-%m-%dT%H:%M:%S.%f%z"
FAKE_NOW = datetime.datetime(2022, 1, 1, tzinfo=datetime.timezone.utc)
config = {"start_date": "2021-01-01"}
config = {"start_date": "2021-01-01T00:00:00.000000+0000", "start_date_ymd": "2021-01-01"}
end_date_now = InterpolatedString(
"{{ today_utc() }}",
)
cursor_value = InterpolatedString("{{ stream_state['date'] }}")
cursor_field = "created"
timezone = datetime.timezone.utc
@@ -30,161 +31,185 @@ def mock_datetime_now(monkeypatch):
@pytest.mark.parametrize(
"test_name, stream_state, start, end, step, cursor, lookback_window, expected_slices",
"test_name, stream_state, start, end, step, cursor_field, lookback_window, datetime_format, expected_slices",
[
(
"test_1_day",
None,
MinMaxDatetime("{{ config['start_date'] }}", datetime_format=datetime_format),
MinMaxDatetime("2021-01-10", datetime_format=datetime_format),
MinMaxDatetime("{{ config['start_date'] }}"),
MinMaxDatetime("2021-01-10T00:00:00.000000+0000"),
"1d",
cursor_value,
cursor_field,
None,
datetime_format,
[
{"start_date": "2021-01-01", "end_date": "2021-01-01"},
{"start_date": "2021-01-02", "end_date": "2021-01-02"},
{"start_date": "2021-01-03", "end_date": "2021-01-03"},
{"start_date": "2021-01-04", "end_date": "2021-01-04"},
{"start_date": "2021-01-05", "end_date": "2021-01-05"},
{"start_date": "2021-01-06", "end_date": "2021-01-06"},
{"start_date": "2021-01-07", "end_date": "2021-01-07"},
{"start_date": "2021-01-08", "end_date": "2021-01-08"},
{"start_date": "2021-01-09", "end_date": "2021-01-09"},
{"start_date": "2021-01-10", "end_date": "2021-01-10"},
{"start_date": "2021-01-01T00:00:00.000000+0000", "end_date": "2021-01-01T00:00:00.000000+0000"},
{"start_date": "2021-01-02T00:00:00.000000+0000", "end_date": "2021-01-02T00:00:00.000000+0000"},
{"start_date": "2021-01-03T00:00:00.000000+0000", "end_date": "2021-01-03T00:00:00.000000+0000"},
{"start_date": "2021-01-04T00:00:00.000000+0000", "end_date": "2021-01-04T00:00:00.000000+0000"},
{"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"},
{"start_date": "2021-01-06T00:00:00.000000+0000", "end_date": "2021-01-06T00:00:00.000000+0000"},
{"start_date": "2021-01-07T00:00:00.000000+0000", "end_date": "2021-01-07T00:00:00.000000+0000"},
{"start_date": "2021-01-08T00:00:00.000000+0000", "end_date": "2021-01-08T00:00:00.000000+0000"},
{"start_date": "2021-01-09T00:00:00.000000+0000", "end_date": "2021-01-09T00:00:00.000000+0000"},
{"start_date": "2021-01-10T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"},
],
),
(
"test_2_day",
None,
MinMaxDatetime("{{ config['start_date'] }}", datetime_format=datetime_format),
MinMaxDatetime("2021-01-10", datetime_format=datetime_format),
MinMaxDatetime("{{ config['start_date'] }}"),
MinMaxDatetime("2021-01-10T00:00:00.000000+0000"),
"2d",
cursor_value,
cursor_field,
None,
datetime_format,
[
{"start_date": "2021-01-01", "end_date": "2021-01-02"},
{"start_date": "2021-01-03", "end_date": "2021-01-04"},
{"start_date": "2021-01-05", "end_date": "2021-01-06"},
{"start_date": "2021-01-07", "end_date": "2021-01-08"},
{"start_date": "2021-01-09", "end_date": "2021-01-10"},
{"start_date": "2021-01-01T00:00:00.000000+0000", "end_date": "2021-01-02T00:00:00.000000+0000"},
{"start_date": "2021-01-03T00:00:00.000000+0000", "end_date": "2021-01-04T00:00:00.000000+0000"},
{"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-06T00:00:00.000000+0000"},
{"start_date": "2021-01-07T00:00:00.000000+0000", "end_date": "2021-01-08T00:00:00.000000+0000"},
{"start_date": "2021-01-09T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"},
],
),
(
"test_from_stream_state",
{"date": "2021-01-05"},
MinMaxDatetime("{{ stream_state['date'] }}", datetime_format=datetime_format),
MinMaxDatetime("2021-01-10", datetime_format=datetime_format),
{"date": "2021-01-05T00:00:00.000000+0000"},
MinMaxDatetime("{{ stream_state['date'] }}"),
MinMaxDatetime("2021-01-10T00:00:00.000000+0000"),
"1d",
cursor_value,
cursor_field,
None,
datetime_format,
[
# FIXME: should this include 2021-01-05?
{"start_date": "2021-01-05", "end_date": "2021-01-05"},
{"start_date": "2021-01-06", "end_date": "2021-01-06"},
{"start_date": "2021-01-07", "end_date": "2021-01-07"},
{"start_date": "2021-01-08", "end_date": "2021-01-08"},
{"start_date": "2021-01-09", "end_date": "2021-01-09"},
{"start_date": "2021-01-10", "end_date": "2021-01-10"},
{"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"},
{"start_date": "2021-01-06T00:00:00.000000+0000", "end_date": "2021-01-06T00:00:00.000000+0000"},
{"start_date": "2021-01-07T00:00:00.000000+0000", "end_date": "2021-01-07T00:00:00.000000+0000"},
{"start_date": "2021-01-08T00:00:00.000000+0000", "end_date": "2021-01-08T00:00:00.000000+0000"},
{"start_date": "2021-01-09T00:00:00.000000+0000", "end_date": "2021-01-09T00:00:00.000000+0000"},
{"start_date": "2021-01-10T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"},
],
),
(
"test_12_day",
None,
MinMaxDatetime("{{ config['start_date'] }}", datetime_format=datetime_format),
MinMaxDatetime("2021-01-10", datetime_format=datetime_format),
MinMaxDatetime("{{ config['start_date'] }}"),
MinMaxDatetime("2021-01-10T00:00:00.000000+0000"),
"12d",
cursor_value,
cursor_field,
None,
datetime_format,
[
{"start_date": "2021-01-01", "end_date": "2021-01-10"},
{"start_date": "2021-01-01T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"},
],
),
(
"test_end_date_greater_than_now",
None,
MinMaxDatetime("2021-12-28", datetime_format=datetime_format),
MinMaxDatetime(f"{(FAKE_NOW + datetime.timedelta(days=1)).strftime(datetime_format)}", datetime_format=datetime_format),
MinMaxDatetime("2021-12-28T00:00:00.000000+0000"),
MinMaxDatetime(f"{(FAKE_NOW + datetime.timedelta(days=1)).strftime(datetime_format)}"),
"1d",
cursor_value,
cursor_field,
None,
datetime_format,
[
{"start_date": "2021-12-28", "end_date": "2021-12-28"},
{"start_date": "2021-12-29", "end_date": "2021-12-29"},
{"start_date": "2021-12-30", "end_date": "2021-12-30"},
{"start_date": "2021-12-31", "end_date": "2021-12-31"},
{"start_date": "2022-01-01", "end_date": "2022-01-01"},
{"start_date": "2021-12-28T00:00:00.000000+0000", "end_date": "2021-12-28T00:00:00.000000+0000"},
{"start_date": "2021-12-29T00:00:00.000000+0000", "end_date": "2021-12-29T00:00:00.000000+0000"},
{"start_date": "2021-12-30T00:00:00.000000+0000", "end_date": "2021-12-30T00:00:00.000000+0000"},
{"start_date": "2021-12-31T00:00:00.000000+0000", "end_date": "2021-12-31T00:00:00.000000+0000"},
{"start_date": "2022-01-01T00:00:00.000000+0000", "end_date": "2022-01-01T00:00:00.000000+0000"},
],
),
(
"test_start_date_greater_than_end_date",
{"date": "2021-01-05"},
MinMaxDatetime("2021-01-10", datetime_format=datetime_format),
MinMaxDatetime("{{ stream_state['date'] }}", datetime_format=datetime_format),
"1d",
cursor_value,
None,
MinMaxDatetime("2021-01-10T00:00:00.000000+0000"),
MinMaxDatetime("2021-01-05T00:00:00.000000+0000"),
"1d",
cursor_field,
None,
datetime_format,
[
{"start_date": "2021-01-05", "end_date": "2021-01-05"},
{"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"},
],
),
(
"test_cursor_date_greater_than_start_date",
{"date": "2021-01-05"},
MinMaxDatetime("{{ stream_state['date'] }}", datetime_format=datetime_format),
MinMaxDatetime("2021-01-10", datetime_format=datetime_format),
{"date": "2021-01-05T00:00:00.000000+0000"},
MinMaxDatetime("{{ stream_state['date'] }}"),
MinMaxDatetime("2021-01-10T00:00:00.000000+0000"),
"1d",
InterpolatedString("{{ stream_state['date'] }}"),
None,
datetime_format,
[
{"start_date": "2021-01-05", "end_date": "2021-01-05"},
{"start_date": "2021-01-06", "end_date": "2021-01-06"},
{"start_date": "2021-01-07", "end_date": "2021-01-07"},
{"start_date": "2021-01-08", "end_date": "2021-01-08"},
{"start_date": "2021-01-09", "end_date": "2021-01-09"},
{"start_date": "2021-01-10", "end_date": "2021-01-10"},
{"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"},
{"start_date": "2021-01-06T00:00:00.000000+0000", "end_date": "2021-01-06T00:00:00.000000+0000"},
{"start_date": "2021-01-07T00:00:00.000000+0000", "end_date": "2021-01-07T00:00:00.000000+0000"},
{"start_date": "2021-01-08T00:00:00.000000+0000", "end_date": "2021-01-08T00:00:00.000000+0000"},
{"start_date": "2021-01-09T00:00:00.000000+0000", "end_date": "2021-01-09T00:00:00.000000+0000"},
{"start_date": "2021-01-10T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"},
],
),
(
"test_cursor_date_greater_than_start_date_multiday_step",
{cursor_field: "2021-01-05T00:00:00.000000+0000"},
MinMaxDatetime("2021-01-03T00:00:00.000000+0000"),
MinMaxDatetime("2021-01-10T00:00:00.000000+0000"),
"2d",
cursor_field,
None,
datetime_format,
[
{"start_date": "2021-01-06T00:00:00.000000+0000", "end_date": "2021-01-07T00:00:00.000000+0000"},
{"start_date": "2021-01-08T00:00:00.000000+0000", "end_date": "2021-01-09T00:00:00.000000+0000"},
{"start_date": "2021-01-10T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"},
],
),
(
"test_start_date_less_than_min_date",
{"date": "2021-01-05"},
MinMaxDatetime("{{ config['start_date'] }}", min_datetime="{{ stream_state['date'] }}", datetime_format=datetime_format),
MinMaxDatetime("2021-01-10", datetime_format=datetime_format),
{"date": "2021-01-05T00:00:00.000000+0000"},
MinMaxDatetime("{{ config['start_date'] }}", min_datetime="{{ stream_state['date'] }}"),
MinMaxDatetime("2021-01-10T00:00:00.000000+0000"),
"1d",
InterpolatedString("{{ stream_state['date'] }}"),
None,
datetime_format,
[
{"start_date": "2021-01-05", "end_date": "2021-01-05"},
{"start_date": "2021-01-06", "end_date": "2021-01-06"},
{"start_date": "2021-01-07", "end_date": "2021-01-07"},
{"start_date": "2021-01-08", "end_date": "2021-01-08"},
{"start_date": "2021-01-09", "end_date": "2021-01-09"},
{"start_date": "2021-01-10", "end_date": "2021-01-10"},
{"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"},
{"start_date": "2021-01-06T00:00:00.000000+0000", "end_date": "2021-01-06T00:00:00.000000+0000"},
{"start_date": "2021-01-07T00:00:00.000000+0000", "end_date": "2021-01-07T00:00:00.000000+0000"},
{"start_date": "2021-01-08T00:00:00.000000+0000", "end_date": "2021-01-08T00:00:00.000000+0000"},
{"start_date": "2021-01-09T00:00:00.000000+0000", "end_date": "2021-01-09T00:00:00.000000+0000"},
{"start_date": "2021-01-10T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"},
],
),
(
"test_end_date_greater_than_max_date",
{"date": "2021-01-05"},
MinMaxDatetime("{{ config['start_date'] }}", datetime_format=datetime_format),
MinMaxDatetime("2021-01-10", max_datetime="{{ stream_state['date'] }}", datetime_format=datetime_format),
{"date": "2021-01-05T00:00:00.000000+0000"},
MinMaxDatetime("{{ config['start_date'] }}"),
MinMaxDatetime("2021-01-10T00:00:00.000000+0000", max_datetime="{{ stream_state['date'] }}"),
"1d",
cursor_field,
None,
None,
datetime_format,
[
{"start_date": "2021-01-01", "end_date": "2021-01-01"},
{"start_date": "2021-01-02", "end_date": "2021-01-02"},
{"start_date": "2021-01-03", "end_date": "2021-01-03"},
{"start_date": "2021-01-04", "end_date": "2021-01-04"},
{"start_date": "2021-01-05", "end_date": "2021-01-05"},
{"start_date": "2021-01-01T00:00:00.000000+0000", "end_date": "2021-01-01T00:00:00.000000+0000"},
{"start_date": "2021-01-02T00:00:00.000000+0000", "end_date": "2021-01-02T00:00:00.000000+0000"},
{"start_date": "2021-01-03T00:00:00.000000+0000", "end_date": "2021-01-03T00:00:00.000000+0000"},
{"start_date": "2021-01-04T00:00:00.000000+0000", "end_date": "2021-01-04T00:00:00.000000+0000"},
{"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"},
],
),
(
"test_start_end_min_max_inherits_datetime_format_from_stream_slicer",
{"date": "2021-01-05"},
MinMaxDatetime("{{ config['start_date'] }}"),
MinMaxDatetime("{{ config['start_date_ymd'] }}"),
MinMaxDatetime("2021-01-10", max_datetime="{{ stream_state['date'] }}"),
"1d",
cursor_field,
None,
None,
"%Y-%m-%d",
[
{"start_date": "2021-01-01", "end_date": "2021-01-01"},
{"start_date": "2021-01-02", "end_date": "2021-01-02"},
@@ -197,46 +222,67 @@ def mock_datetime_now(monkeypatch):
"test_with_lookback_window_from_start_date",
{"date": "2021-01-05"},
MinMaxDatetime("{{ config['start_date'] }}"),
MinMaxDatetime("2021-01-10", max_datetime="{{ stream_state['date'] }}"),
MinMaxDatetime("2021-01-10", max_datetime="{{ stream_state['date'] }}", datetime_format="%Y-%m-%d"),
"1d",
None,
cursor_field,
"3d",
datetime_format,
[
{"start_date": "2020-12-29", "end_date": "2020-12-29"},
{"start_date": "2020-12-30", "end_date": "2020-12-30"},
{"start_date": "2020-12-31", "end_date": "2020-12-31"},
{"start_date": "2021-01-01", "end_date": "2021-01-01"},
{"start_date": "2021-01-02", "end_date": "2021-01-02"},
{"start_date": "2021-01-03", "end_date": "2021-01-03"},
{"start_date": "2021-01-04", "end_date": "2021-01-04"},
{"start_date": "2021-01-05", "end_date": "2021-01-05"},
{"start_date": "2020-12-29T00:00:00.000000+0000", "end_date": "2020-12-29T00:00:00.000000+0000"},
{"start_date": "2020-12-30T00:00:00.000000+0000", "end_date": "2020-12-30T00:00:00.000000+0000"},
{"start_date": "2020-12-31T00:00:00.000000+0000", "end_date": "2020-12-31T00:00:00.000000+0000"},
{"start_date": "2021-01-01T00:00:00.000000+0000", "end_date": "2021-01-01T00:00:00.000000+0000"},
{"start_date": "2021-01-02T00:00:00.000000+0000", "end_date": "2021-01-02T00:00:00.000000+0000"},
{"start_date": "2021-01-03T00:00:00.000000+0000", "end_date": "2021-01-03T00:00:00.000000+0000"},
{"start_date": "2021-01-04T00:00:00.000000+0000", "end_date": "2021-01-04T00:00:00.000000+0000"},
{"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"},
],
),
(
"test_with_lookback_window_defaults_to_0d",
{"date": "2021-01-05"},
MinMaxDatetime("{{ config['start_date'] }}"),
MinMaxDatetime("2021-01-10", max_datetime="{{ stream_state['date'] }}"),
MinMaxDatetime("2021-01-10", max_datetime="{{ stream_state['date'] }}", datetime_format="%Y-%m-%d"),
"1d",
None,
cursor_field,
"{{ config['does_not_exist'] }}",
datetime_format,
[
{"start_date": "2021-01-01", "end_date": "2021-01-01"},
{"start_date": "2021-01-02", "end_date": "2021-01-02"},
{"start_date": "2021-01-03", "end_date": "2021-01-03"},
{"start_date": "2021-01-04", "end_date": "2021-01-04"},
{"start_date": "2021-01-05", "end_date": "2021-01-05"},
{"start_date": "2021-01-01T00:00:00.000000+0000", "end_date": "2021-01-01T00:00:00.000000+0000"},
{"start_date": "2021-01-02T00:00:00.000000+0000", "end_date": "2021-01-02T00:00:00.000000+0000"},
{"start_date": "2021-01-03T00:00:00.000000+0000", "end_date": "2021-01-03T00:00:00.000000+0000"},
{"start_date": "2021-01-04T00:00:00.000000+0000", "end_date": "2021-01-04T00:00:00.000000+0000"},
{"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"},
],
),
(
"test_start_is_after_stream_state",
{cursor_field: "2021-01-05T00:00:00.000000+0000"},
MinMaxDatetime("2021-01-01T00:00:00.000000+0000"),
MinMaxDatetime("2021-01-10T00:00:00.000000+0000"),
"1d",
cursor_field,
None,
datetime_format,
[
{"start_date": "2021-01-06T00:00:00.000000+0000", "end_date": "2021-01-06T00:00:00.000000+0000"},
{"start_date": "2021-01-07T00:00:00.000000+0000", "end_date": "2021-01-07T00:00:00.000000+0000"},
{"start_date": "2021-01-08T00:00:00.000000+0000", "end_date": "2021-01-08T00:00:00.000000+0000"},
{"start_date": "2021-01-09T00:00:00.000000+0000", "end_date": "2021-01-09T00:00:00.000000+0000"},
{"start_date": "2021-01-10T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"},
],
),
],
)
def test_stream_slices(mock_datetime_now, test_name, stream_state, start, end, cursor, step, lookback_window, expected_slices):
def test_stream_slices(
mock_datetime_now, test_name, stream_state, start, end, step, cursor_field, lookback_window, datetime_format, expected_slices
):
lookback_window = InterpolatedString(lookback_window) if lookback_window else None
slicer = DatetimeStreamSlicer(
start_datetime=start,
end_datetime=end,
step=step,
cursor_value=cursor,
cursor_field=cursor_field,
datetime_format=datetime_format,
lookback_window=lookback_window,
config=config,
@@ -246,5 +292,156 @@ def test_stream_slices(mock_datetime_now, test_name, stream_state, start, end, c
assert expected_slices == stream_slices
@pytest.mark.parametrize(
"test_name, previous_cursor, stream_slice, last_record, expected_state",
[
("test_update_cursor_no_state_no_record", None, {}, None, {}),
(
"test_update_cursor_with_state_no_record",
None,
{cursor_field: "2021-01-02T00:00:00.000000+0000"},
None,
{cursor_field: "2021-01-02T00:00:00.000000+0000"},
),
(
"test_update_cursor_with_state_equals_record",
None,
{cursor_field: "2021-01-02T00:00:00.000000+0000"},
{cursor_field: "2021-01-02T00:00:00.000000+0000"},
{cursor_field: "2021-01-02T00:00:00.000000+0000"},
),
(
"test_update_cursor_with_state_greater_than_record",
None,
{cursor_field: "2021-01-03T00:00:00.000000+0000"},
{cursor_field: "2021-01-02T00:00:00.000000+0000"},
{cursor_field: "2021-01-03T00:00:00.000000+0000"},
),
(
"test_update_cursor_with_state_less_than_record",
None,
{cursor_field: "2021-01-02T00:00:00.000000+0000"},
{cursor_field: "2021-01-03T00:00:00.000000+0000"},
{cursor_field: "2021-01-03T00:00:00.000000+0000"},
),
(
"test_update_cursor_with_state_less_than_previous_cursor",
"2021-01-03T00:00:00.000000+0000",
{cursor_field: "2021-01-02T00:00:00.000000+0000"},
{},
{cursor_field: "2021-01-03T00:00:00.000000+0000"},
),
],
)
def test_update_cursor(test_name, previous_cursor, stream_slice, last_record, expected_state):
slicer = DatetimeStreamSlicer(
start_datetime=MinMaxDatetime("2021-01-01T00:00:00.000000+0000"),
end_datetime=MinMaxDatetime("2021-01-10T00:00:00.000000+0000"),
step="1d",
cursor_field=InterpolatedString(cursor_field),
datetime_format=datetime_format,
lookback_window=InterpolatedString("0d"),
config=config,
)
slicer._cursor = previous_cursor
slicer.update_cursor(stream_slice, last_record)
updated_state = slicer.get_stream_state()
assert expected_state == updated_state
@pytest.mark.parametrize(
"test_name, inject_into, field_name, expected_req_params, expected_headers, expected_body_json, expected_body_data",
[
("test_start_time_inject_into_none", None, None, {}, {}, {}, {}),
(
"test_start_time_passed_by_req_param",
RequestOptionType.request_parameter,
"start_time",
{"start_time": "2021-01-02T00:00:00.000000+0000", "endtime": "2021-01-04T00:00:00.000000+0000"},
{},
{},
{},
),
(
"test_start_time_inject_into_header",
RequestOptionType.header,
"start_time",
{},
{"start_time": "2021-01-02T00:00:00.000000+0000", "endtime": "2021-01-04T00:00:00.000000+0000"},
{},
{},
),
(
"test_start_time_inject_intoy_body_json",
RequestOptionType.body_json,
"start_time",
{},
{},
{"start_time": "2021-01-02T00:00:00.000000+0000", "endtime": "2021-01-04T00:00:00.000000+0000"},
{},
),
(
"test_start_time_inject_into_body_data",
RequestOptionType.body_data,
"start_time",
{},
{},
{},
{"start_time": "2021-01-02T00:00:00.000000+0000", "endtime": "2021-01-04T00:00:00.000000+0000"},
),
(
"test_start_time_inject_into_path",
RequestOptionType.path,
"start_time",
{},
{},
{},
{"start_time": "2021-01-02T00:00:00.000000+0000", "endtime": "2021-01-04T00:00:00.000000+0000"},
),
],
)
def test_request_option(test_name, inject_into, field_name, expected_req_params, expected_headers, expected_body_json, expected_body_data):
if inject_into == RequestOptionType.path:
start_request_option = RequestOption(inject_into)
end_request_option = RequestOption(inject_into)
try:
DatetimeStreamSlicer(
start_datetime=MinMaxDatetime("2021-01-01T00:00:00.000000+0000"),
end_datetime=MinMaxDatetime("2021-01-10T00:00:00.000000+0000"),
step="1d",
cursor_field=InterpolatedString(cursor_field),
datetime_format=datetime_format,
lookback_window=InterpolatedString("0d"),
start_time_option=start_request_option,
end_time_option=end_request_option,
config=config,
)
assert False
except ValueError:
return
else:
start_request_option = RequestOption(inject_into, field_name) if inject_into else None
end_request_option = RequestOption(inject_into, "endtime") if inject_into else None
slicer = DatetimeStreamSlicer(
start_datetime=MinMaxDatetime("2021-01-01T00:00:00.000000+0000"),
end_datetime=MinMaxDatetime("2021-01-10T00:00:00.000000+0000"),
step="1d",
cursor_field=InterpolatedString(cursor_field),
datetime_format=datetime_format,
lookback_window=InterpolatedString("0d"),
start_time_option=start_request_option,
end_time_option=end_request_option,
config=config,
)
stream_slice = {cursor_field: "2021-01-02T00:00:00.000000+0000", "end_date": "2021-01-04T00:00:00.000000+0000"}
slicer.update_cursor(stream_slice)
assert expected_req_params == slicer.request_params()
assert expected_headers == slicer.request_headers()
assert expected_body_json == slicer.request_body_json()
assert expected_body_data == slicer.request_body_data()
if __name__ == "__main__":
unittest.main()

View File

@@ -4,21 +4,101 @@
import pytest as pytest
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType
from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer
slice_values = ["customer", "store", "subscription"]
cursor_field = "owner_resource"
@pytest.mark.parametrize(
"test_name, slice_values, slice_definition, expected_slices",
"test_name, slice_values, cursor_field, expected_slices",
[
(
"test_single_element",
["customer", "store", "subscription"],
{"owner_resource": "{{ slice_value }}"},
"owner_resource",
[{"owner_resource": "customer"}, {"owner_resource": "store"}, {"owner_resource": "subscription"}],
),
(
"test_input_list_is_string",
'["customer", "store", "subscription"]',
"owner_resource",
[{"owner_resource": "customer"}, {"owner_resource": "store"}, {"owner_resource": "subscription"}],
),
],
)
def test_list_slicer(test_name, slice_values, slice_definition, expected_slices):
slicer = ListStreamSlicer(slice_values, slice_definition, config={})
def test_list_slicer(test_name, slice_values, cursor_field, expected_slices):
slicer = ListStreamSlicer(slice_values, cursor_field, config={})
slices = [s for s in slicer.stream_slices(SyncMode.incremental, stream_state=None)]
assert slices == expected_slices
@pytest.mark.parametrize(
"test_name, stream_slice, last_record, expected_state",
[
("test_update_cursor_no_state_no_record", {}, None, {}),
("test_update_cursor_with_state_no_record", {"owner_resource": "customer"}, None, {"owner_resource": "customer"}),
("test_update_cursor_value_not_in_list", {"owner_resource": "invalid"}, None, {}),
],
)
def test_update_cursor(test_name, stream_slice, last_record, expected_state):
slicer = ListStreamSlicer(slice_values, cursor_field, config={})
slicer.update_cursor(stream_slice, last_record)
updated_state = slicer.get_stream_state()
assert expected_state == updated_state
@pytest.mark.parametrize(
"test_name, request_option, expected_req_params, expected_headers, expected_body_json, expected_body_data",
[
(
"test_inject_into_req_param",
RequestOption(RequestOptionType.request_parameter, "owner_resource"),
{"owner_resource": "customer"},
{},
{},
{},
),
("test_pass_by_header", RequestOption(RequestOptionType.header, "owner_resource"), {}, {"owner_resource": "customer"}, {}, {}),
(
"test_inject_into_body_json",
RequestOption(RequestOptionType.body_json, "owner_resource"),
{},
{},
{"owner_resource": "customer"},
{},
),
(
"test_inject_into_body_data",
RequestOption(RequestOptionType.body_data, "owner_resource"),
{},
{},
{},
{"owner_resource": "customer"},
),
(
"test_inject_into_path",
RequestOption(RequestOptionType.path),
{},
{},
{},
{"owner_resource": "customer"},
),
],
)
def test_request_option(test_name, request_option, expected_req_params, expected_headers, expected_body_json, expected_body_data):
if request_option.inject_into == RequestOptionType.path:
try:
ListStreamSlicer(slice_values, cursor_field, {}, request_option)
assert False
except ValueError:
return
slicer = ListStreamSlicer(slice_values, cursor_field, {}, request_option)
stream_slice = {cursor_field: "customer"}
slicer.update_cursor(stream_slice)
assert expected_req_params == slicer.request_params()
assert expected_headers == slicer.request_headers()
assert expected_body_json == slicer.request_body_json()
assert expected_body_data == slicer.request_body_data()

View File

@@ -6,8 +6,8 @@ from typing import Any, Iterable, List, Mapping, Optional, Union
import pytest as pytest
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.states.dict_state import DictState
from airbyte_cdk.sources.declarative.stream_slicers.substream_slicer import SubstreamSlicer
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType
from airbyte_cdk.sources.declarative.stream_slicers.substream_slicer import ParentStreamConfig, SubstreamSlicer
from airbyte_cdk.sources.streams.core import Stream
parent_records = [{"id": 1, "data": "data1"}, {"id": 2, "data": "data2"}]
@@ -20,8 +20,6 @@ all_parent_data = data_first_parent_slice + data_second_parent_slice + data_thir
parent_slices = [{"slice": "first"}, {"slice": "second"}, {"slice": "third"}]
second_parent_stream_slice = [{"slice": "second_parent"}]
slice_definition = {"{{ parent_stream_name }}_id": "{{ parent_record['id'] }}", "parent_slice": "{{ parent_stream_slice['slice'] }}"}
class MockStream(Stream):
def __init__(self, slices, records, name):
@@ -56,24 +54,22 @@ class MockStream(Stream):
@pytest.mark.parametrize(
"test_name, parent_streams, slice_definition, expected_slices",
"test_name, parent_stream_configs, expected_slices",
[
("test_no_parents", [], None),
(
"test_single_parent_slices_no_records",
[MockStream([{}], [], "first_stream")],
slice_definition,
[ParentStreamConfig(MockStream([{}], [], "first_stream"), "id", "first_stream_id")],
[{"first_stream_id": None, "parent_slice": None}],
),
(
"test_single_parent_slices_with_records",
[MockStream([{}], parent_records, "first_stream")],
slice_definition,
[ParentStreamConfig(MockStream([{}], parent_records, "first_stream"), "id", "first_stream_id")],
[{"first_stream_id": 1, "parent_slice": None}, {"first_stream_id": 2, "parent_slice": None}],
),
(
"test_with_parent_slices_and_records",
[MockStream(parent_slices, all_parent_data, "first_stream")],
slice_definition,
[ParentStreamConfig(MockStream(parent_slices, all_parent_data, "first_stream"), "id", "first_stream_id")],
[
{"parent_slice": "first", "first_stream_id": 0},
{"parent_slice": "first", "first_stream_id": 1},
@@ -84,10 +80,11 @@ class MockStream(Stream):
(
"test_multiple_parent_streams",
[
MockStream(parent_slices, data_first_parent_slice + data_second_parent_slice, "first_stream"),
MockStream(second_parent_stream_slice, more_records, "second_stream"),
ParentStreamConfig(
MockStream(parent_slices, data_first_parent_slice + data_second_parent_slice, "first_stream"), "id", "first_stream_id"
),
ParentStreamConfig(MockStream(second_parent_stream_slice, more_records, "second_stream"), "id", "second_stream_id"),
],
slice_definition,
[
{"parent_slice": "first", "first_stream_id": 0},
{"parent_slice": "first", "first_stream_id": 1},
@@ -99,8 +96,132 @@ class MockStream(Stream):
),
],
)
def test_substream_slicer(test_name, parent_streams, slice_definition, expected_slices):
state = DictState()
slicer = SubstreamSlicer(parent_streams, state, slice_definition)
def test_substream_slicer(test_name, parent_stream_configs, expected_slices):
if expected_slices is None:
try:
SubstreamSlicer(parent_stream_configs)
assert False
except ValueError:
return
slicer = SubstreamSlicer(parent_stream_configs)
slices = [s for s in slicer.stream_slices(SyncMode.incremental, stream_state=None)]
assert slices == expected_slices
@pytest.mark.parametrize(
"test_name, stream_slice, expected_state",
[
("test_update_cursor_no_state_no_record", {}, {}),
("test_update_cursor_with_state_single_parent", {"first_stream_id": "1234"}, {"first_stream_id": "1234"}),
("test_update_cursor_with_unknown_state_field", {"unknown_stream_id": "1234"}, {}),
(
"test_update_cursor_with_state_from_both_parents",
{"first_stream_id": "1234", "second_stream_id": "4567"},
{"first_stream_id": "1234", "second_stream_id": "4567"},
),
],
)
def test_update_cursor(test_name, stream_slice, expected_state):
parent_stream_name_to_config = [
ParentStreamConfig(
MockStream(parent_slices, data_first_parent_slice + data_second_parent_slice, "first_stream"), "id", "first_stream_id"
),
ParentStreamConfig(MockStream(second_parent_stream_slice, more_records, "second_stream"), "id", "second_stream_id"),
]
slicer = SubstreamSlicer(parent_stream_name_to_config)
slicer.update_cursor(stream_slice, None)
updated_state = slicer.get_stream_state()
assert expected_state == updated_state
@pytest.mark.parametrize(
"test_name, parent_stream_request_options, expected_req_params, expected_headers, expected_body_json, expected_body_data",
[
(
"test_request_option_in_request_param",
[
RequestOption(RequestOptionType.request_parameter, "first_stream"),
RequestOption(RequestOptionType.request_parameter, "second_stream"),
],
{"first_stream_id": "1234", "second_stream_id": "4567"},
{},
{},
{},
),
(
"test_request_option_in_header",
[
RequestOption(RequestOptionType.header, "first_stream"),
RequestOption(RequestOptionType.header, "second_stream"),
],
{},
{"first_stream_id": "1234", "second_stream_id": "4567"},
{},
{},
),
(
"test_request_option_in_param_and_header",
[
RequestOption(RequestOptionType.request_parameter, "first_stream"),
RequestOption(RequestOptionType.header, "second_stream"),
],
{"first_stream_id": "1234"},
{"second_stream_id": "4567"},
{},
{},
),
(
"test_request_option_in_body_json",
[
RequestOption(RequestOptionType.body_json, "first_stream"),
RequestOption(RequestOptionType.body_json, "second_stream"),
],
{},
{},
{"first_stream_id": "1234", "second_stream_id": "4567"},
{},
),
(
"test_request_option_in_body_data",
[
RequestOption(RequestOptionType.body_data, "first_stream"),
RequestOption(RequestOptionType.body_data, "second_stream"),
],
{},
{},
{},
{"first_stream_id": "1234", "second_stream_id": "4567"},
),
],
)
def test_request_option(
test_name,
parent_stream_request_options,
expected_req_params,
expected_headers,
expected_body_json,
expected_body_data,
):
slicer = SubstreamSlicer(
[
ParentStreamConfig(
MockStream(parent_slices, data_first_parent_slice + data_second_parent_slice, "first_stream"),
"id",
"first_stream_id",
parent_stream_request_options[0],
),
ParentStreamConfig(
MockStream(second_parent_stream_slice, more_records, "second_stream"),
"id",
"second_stream_id",
parent_stream_request_options[1],
),
],
)
slicer.update_cursor({"first_stream_id": "1234", "second_stream_id": "4567"}, None)
assert expected_req_params == slicer.request_params()
assert expected_headers == slicer.request_headers()
assert expected_body_json == slicer.request_body_json()
assert expected_body_data == slicer.request_body_data()

View File

@@ -26,6 +26,7 @@ from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.schema.json_schema import JsonSchema
from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer
from airbyte_cdk.sources.declarative.transformations import AddFields, RemoveFields
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
@@ -85,8 +86,7 @@ def test_list_based_stream_slicer_with_values_refd():
stream_slicer:
class_name: airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer.ListStreamSlicer
slice_values: "*ref(repositories)"
slice_definition:
repository: "{{ slice_value }}"
cursor_field: repository
"""
config = parser.parse(content)
stream_slicer = factory.create_component(config["stream_slicer"], input_config)()
@@ -96,14 +96,101 @@ def test_list_based_stream_slicer_with_values_refd():
def test_list_based_stream_slicer_with_values_defined_in_config():
content = """
stream_slicer:
class_name: airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer.ListStreamSlicer
type: ListStreamSlicer
slice_values: "{{config['repos']}}"
slice_definition:
repository: "{{ slice_value }}"
cursor_field: repository
request_option:
inject_into: header
field_name: repository
"""
config = parser.parse(content)
stream_slicer = factory.create_component(config["stream_slicer"], input_config)()
assert ["airbyte", "airbyte-cloud"] == stream_slicer._slice_values
assert stream_slicer._request_option._option_type == RequestOptionType.header
assert stream_slicer._request_option._field_name == "repository"
def test_create_substream_slicer():
content = """
schema_loader:
file_path: "./source_sendgrid/schemas/{{name}}.yaml"
retriever:
requester:
path: "/v3"
record_selector:
extractor:
transform: "_"
stream_A:
type: DeclarativeStream
options:
name: "A"
primary_key: "id"
retriever: "*ref(retriever)"
url_base: "https://airbyte.io"
schema_loader: "*ref(schema_loader)"
stream_B:
type: DeclarativeStream
options:
name: "B"
primary_key: "id"
retriever: "*ref(retriever)"
url_base: "https://airbyte.io"
schema_loader: "*ref(schema_loader)"
stream_slicer:
type: SubstreamSlicer
parent_streams_configs:
- stream: "*ref(stream_A)"
parent_key: id
stream_slice_field: repository_id
request_option:
inject_into: request_parameter
field_name: repository_id
- stream: "*ref(stream_B)"
parent_key: someid
stream_slice_field: word_id
"""
config = parser.parse(content)
stream_slicer = factory.create_component(config["stream_slicer"], input_config)()
parent_stream_configs = stream_slicer._parent_stream_configs
assert len(parent_stream_configs) == 2
assert isinstance(parent_stream_configs[0].stream, DeclarativeStream)
assert isinstance(parent_stream_configs[1].stream, DeclarativeStream)
assert stream_slicer._parent_stream_configs[0].parent_key == "id"
assert stream_slicer._parent_stream_configs[0].stream_slice_field == "repository_id"
assert stream_slicer._parent_stream_configs[0].request_option.inject_into == RequestOptionType.request_parameter
assert stream_slicer._parent_stream_configs[0].request_option._field_name == "repository_id"
assert stream_slicer._parent_stream_configs[1].parent_key == "someid"
assert stream_slicer._parent_stream_configs[1].stream_slice_field == "word_id"
assert stream_slicer._parent_stream_configs[1].request_option is None
def test_create_cartesian_stream_slicer():
content = """
stream_slicer_A:
type: ListStreamSlicer
slice_values: "{{config['repos']}}"
cursor_field: repository
stream_slicer_B:
type: ListStreamSlicer
slice_values:
- hello
- world
cursor_field: words
stream_slicer:
type: CartesianProductStreamSlicer
stream_slicers:
- "*ref(stream_slicer_A)"
- "*ref(stream_slicer_B)"
"""
config = parser.parse(content)
stream_slicer = factory.create_component(config["stream_slicer"], input_config)()
underlying_slicers = stream_slicer._stream_slicers
assert len(underlying_slicers) == 2
assert isinstance(underlying_slicers[0], ListStreamSlicer)
assert isinstance(underlying_slicers[1], ListStreamSlicer)
assert ["airbyte", "airbyte-cloud"] == underlying_slicers[0]._slice_values
assert ["hello", "world"] == underlying_slicers[1]._slice_values
def test_datetime_stream_slicer():
@@ -111,15 +198,18 @@ def test_datetime_stream_slicer():
stream_slicer:
type: DatetimeStreamSlicer
options:
datetime_format: "%Y-%m-%d"
datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z"
start_datetime:
type: MinMaxDatetime
datetime: "{{ config['start_time'] }}"
min_datetime: "{{ config['start_time'] + day_delta(2) }}"
end_datetime: "{{ config['end_time'] }}"
step: "10d"
cursor_value: "created"
cursor_field: "created"
lookback_window: "5d"
start_time_option:
inject_into: request_parameter
field_name: created[gte]
"""
config = parser.parse(content)
@@ -128,14 +218,16 @@ def test_datetime_stream_slicer():
assert stream_slicer._timezone == datetime.timezone.utc
assert type(stream_slicer._start_datetime) == MinMaxDatetime
assert type(stream_slicer._end_datetime) == MinMaxDatetime
assert stream_slicer._start_datetime._datetime_format == "%Y-%m-%d"
assert stream_slicer._start_datetime._datetime_format == "%Y-%m-%dT%H:%M:%S.%f%z"
assert stream_slicer._start_datetime._timezone == datetime.timezone.utc
assert stream_slicer._start_datetime._datetime_interpolator._string == "{{ config['start_time'] }}"
assert stream_slicer._start_datetime._min_datetime_interpolator._string == "{{ config['start_time'] + day_delta(2) }}"
assert stream_slicer._end_datetime._datetime_interpolator._string == "{{ config['end_time'] }}"
assert stream_slicer._step == datetime.timedelta(days=10)
assert stream_slicer._cursor_value._string == "created"
assert stream_slicer._cursor_field._string == "created"
assert stream_slicer._lookback_window._string == "5d"
assert stream_slicer._start_time_option.inject_into == RequestOptionType.request_parameter
assert stream_slicer._start_time_option._field_name == "created[gte]"
def test_full_config():
@@ -181,8 +273,6 @@ requester:
retriever:
class_name: "airbyte_cdk.sources.declarative.retrievers.simple_retriever.SimpleRetriever"
name: "{{ options['name'] }}"
state:
class_name: airbyte_cdk.sources.declarative.states.dict_state.DictState
stream_slicer:
class_name: airbyte_cdk.sources.declarative.stream_slicers.single_slice.SingleSlice
paginator: