Add support for monthly and yearly incremental updates (#18861)
Co-authored-by: Alexandre Girard <alexandre@airbyte.io>
This commit is contained in:
committed by
GitHub
parent
7e66d8198e
commit
aa5da75d81
@@ -5,6 +5,7 @@
|
||||
import datetime
|
||||
import re
|
||||
from dataclasses import InitVar, dataclass, field
|
||||
from dateutil.relativedelta import relativedelta
|
||||
from typing import Any, Iterable, Mapping, Optional, Union
|
||||
|
||||
from airbyte_cdk.models import SyncMode
|
||||
@@ -30,10 +31,12 @@ class DatetimeStreamSlicer(StreamSlicer, JsonSchemaMixin):
|
||||
`"<number><unit>"`
|
||||
|
||||
where unit can be one of
|
||||
- years, y
|
||||
- months, m
|
||||
- weeks, w
|
||||
- days, d
|
||||
|
||||
For example, "1d" will produce windows of 1 day, and 2weeks windows of 2 weeks.
|
||||
For example, "1d" will produce windows of 1 day, and "2w" windows of 2 weeks.
|
||||
|
||||
The timestamp format accepts the same format codes as datetime.strfptime, which are
|
||||
all the format codes required by the 1989 C standard.
|
||||
@@ -68,7 +71,7 @@ class DatetimeStreamSlicer(StreamSlicer, JsonSchemaMixin):
|
||||
stream_state_field_end: Optional[str] = None
|
||||
lookback_window: Optional[Union[InterpolatedString, str]] = None
|
||||
|
||||
timedelta_regex = re.compile(r"((?P<weeks>[\.\d]+?)w)?" r"((?P<days>[\.\d]+?)d)?$")
|
||||
timedelta_regex = re.compile(r"((?P<years>[\.\d]+?)y)?" r"((?P<months>[\.\d]+?)m)?" r"((?P<weeks>[\.\d]+?)w)?" r"((?P<days>[\.\d]+?)d)?$")
|
||||
|
||||
def __post_init__(self, options: Mapping[str, Any]):
|
||||
if not isinstance(self.start_datetime, MinMaxDatetime):
|
||||
@@ -188,14 +191,14 @@ class DatetimeStreamSlicer(StreamSlicer, JsonSchemaMixin):
|
||||
Parse a time string e.g. (2h13m) into a timedelta object.
|
||||
Modified from virhilo's answer at https://stackoverflow.com/a/4628148/851699
|
||||
:param time_str: A string identifying a duration. (eg. 2h13m)
|
||||
:return datetime.timedelta: A datetime.timedelta object
|
||||
:return relativedelta: A relativedelta object
|
||||
"""
|
||||
parts = cls.timedelta_regex.match(time_str)
|
||||
|
||||
assert parts is not None
|
||||
|
||||
time_params = {name: float(param) for name, param in parts.groupdict().items() if param}
|
||||
return datetime.timedelta(**time_params)
|
||||
return relativedelta(**time_params)
|
||||
|
||||
def get_request_params(
|
||||
self,
|
||||
|
||||
@@ -52,6 +52,7 @@ setup(
|
||||
"jsonref~=0.2",
|
||||
"pendulum",
|
||||
"pydantic~=1.9.2",
|
||||
"python-dateutil",
|
||||
"PyYAML~=5.4",
|
||||
"requests",
|
||||
"requests_cache",
|
||||
|
||||
@@ -70,6 +70,56 @@ def mock_datetime_now(monkeypatch):
|
||||
{"start_time": "2021-01-09T00:00:00.000000+0000", "end_time": "2021-01-10T00:00:00.000000+0000"},
|
||||
],
|
||||
),
|
||||
(
|
||||
"test_1_week",
|
||||
None,
|
||||
MinMaxDatetime(datetime="{{ config['start_date'] }}", options={}),
|
||||
MinMaxDatetime(datetime="2021-02-10T00:00:00.000000+0000", options={}),
|
||||
"1w",
|
||||
cursor_field,
|
||||
None,
|
||||
datetime_format,
|
||||
[
|
||||
{"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-01-07T00:00:00.000000+0000"},
|
||||
{"start_time": "2021-01-08T00:00:00.000000+0000", "end_time": "2021-01-14T00:00:00.000000+0000"},
|
||||
{"start_time": "2021-01-15T00:00:00.000000+0000", "end_time": "2021-01-21T00:00:00.000000+0000"},
|
||||
{"start_time": "2021-01-22T00:00:00.000000+0000", "end_time": "2021-01-28T00:00:00.000000+0000"},
|
||||
{"start_time": "2021-01-29T00:00:00.000000+0000", "end_time": "2021-02-04T00:00:00.000000+0000"},
|
||||
{"start_time": "2021-02-05T00:00:00.000000+0000", "end_time": "2021-02-10T00:00:00.000000+0000"},
|
||||
],
|
||||
),
|
||||
(
|
||||
"test_1_month",
|
||||
None,
|
||||
MinMaxDatetime(datetime="{{ config['start_date'] }}", options={}),
|
||||
MinMaxDatetime(datetime="2021-06-10T00:00:00.000000+0000", options={}),
|
||||
"1m",
|
||||
cursor_field,
|
||||
None,
|
||||
datetime_format,
|
||||
[
|
||||
{"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-01-31T00:00:00.000000+0000"},
|
||||
{"start_time": "2021-02-01T00:00:00.000000+0000", "end_time": "2021-02-28T00:00:00.000000+0000"},
|
||||
{"start_time": "2021-03-01T00:00:00.000000+0000", "end_time": "2021-03-31T00:00:00.000000+0000"},
|
||||
{"start_time": "2021-04-01T00:00:00.000000+0000", "end_time": "2021-04-30T00:00:00.000000+0000"},
|
||||
{"start_time": "2021-05-01T00:00:00.000000+0000", "end_time": "2021-05-31T00:00:00.000000+0000"},
|
||||
{"start_time": "2021-06-01T00:00:00.000000+0000", "end_time": "2021-06-10T00:00:00.000000+0000"},
|
||||
],
|
||||
),
|
||||
(
|
||||
"test_1_year",
|
||||
None,
|
||||
MinMaxDatetime(datetime="{{ config['start_date'] }}", options={}),
|
||||
MinMaxDatetime(datetime="2022-06-10T00:00:00.000000+0000", options={}),
|
||||
"1y",
|
||||
cursor_field,
|
||||
None,
|
||||
datetime_format,
|
||||
[
|
||||
{"start_time": "2021-01-01T00:00:00.000000+0000", "end_time": "2021-12-31T00:00:00.000000+0000"},
|
||||
{"start_time": "2022-01-01T00:00:00.000000+0000", "end_time": "2022-01-01T00:00:00.000000+0000"},
|
||||
],
|
||||
),
|
||||
(
|
||||
"test_from_stream_state",
|
||||
{"date": "2021-01-05T00:00:00.000000+0000"},
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
#
|
||||
|
||||
import datetime
|
||||
from dateutil.relativedelta import relativedelta
|
||||
from typing import List, Optional, Union
|
||||
|
||||
import pytest
|
||||
@@ -261,7 +262,7 @@ def test_datetime_stream_slicer():
|
||||
assert stream_slicer.start_datetime.datetime.string == "{{ config['start_time'] }}"
|
||||
assert stream_slicer.start_datetime.min_datetime.string == "{{ config['start_time'] + day_delta(2) }}"
|
||||
assert stream_slicer.end_datetime.datetime.string == "{{ config['end_time'] }}"
|
||||
assert stream_slicer._step == datetime.timedelta(days=10)
|
||||
assert stream_slicer._step == relativedelta(days=10)
|
||||
assert stream_slicer.cursor_field.string == "created"
|
||||
assert stream_slicer.lookback_window.string == "5d"
|
||||
assert stream_slicer.start_time_option.inject_into == RequestOptionType.request_parameter
|
||||
|
||||
Reference in New Issue
Block a user