1
0
mirror of synced 2025-12-22 11:31:02 -05:00

CDK: add support for streams with state attribute (#9746)

* add support for streams with state attribute
* fix pre-commit and format
* update state attribute docs and logic
* added IncrementalMixin

Co-authored-by: Eugene Kulak <kulak.eugene@gmail.com>
This commit is contained in:
Eugene Kulak
2022-02-16 22:20:33 +02:00
committed by GitHub
parent d6747abd8f
commit d173ce741f
11 changed files with 656 additions and 277 deletions

View File

@@ -24,9 +24,9 @@ Optionally, we can provide additional inputs to customize requests:
Backoff policy options:
- `retry_factor` Specifies factor for exponential backoff policy (by default is 5)
- `max_retries` Specifies maximum amount of retries for backoff policy (by default is 5)
- `raise_on_http_errors` If set to False, allows opting-out of raising HTTP code exception (by default is True)
* `retry_factor` Specifies factor for exponential backoff policy \(by default is 5\)
* `max_retries` Specifies maximum amount of retries for backoff policy \(by default is 5\)
* `raise_on_http_errors` If set to False, allows opting-out of raising HTTP code exception \(by default is True\)
There are many other customizable options - you can find them in the [`airbyte_cdk.sources.streams.http.HttpStream`](https://github.com/airbytehq/airbyte/blob/master/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py) class.
@@ -37,9 +37,9 @@ Let's begin by pulling data for the last day's rates by using the `/latest` endp
```python
class ExchangeRates(HttpStream):
url_base = "https://api.exchangeratesapi.io/"
primary_key = None
def __init__(self, base: str, **kwargs):
super().__init__()
self.base = base
@@ -85,7 +85,7 @@ This may look big, but that's just because there are lots of \(unused, for now\)
Let's also pass the `base` parameter input by the user to the stream class:
```python
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
auth = NoAuth()
return [ExchangeRates(authenticator=auth, base=config['base'])]
```
@@ -110,7 +110,14 @@ We theoretically _could_ stop here and call it a connector. But let's give addin
## Adding incremental sync
To add incremental sync, we'll do a few things: 1. Pass the `start_date` param input by the user into the stream. 2. Declare the stream's `cursor_field`. 3. Implement the `get_updated_state` method. 4. Implement the `stream_slices` method. 5. Update the `path` method to specify the date to pull exchange rates for. 6. Update the configured catalog to use `incremental` sync when we're testing the stream.
To add incremental sync, we'll do a few things:
1. Pass the `start_date` param input by the user into the stream.
2. Declare the stream's `cursor_field`.
3. Declare the stream's property `_cursor_value` to hold the state value
4. Add `IncrementalMixin` to the list of the ancestors of the stream and implement setter and getter of the `state`.
5. Implement the `stream_slices` method.
6. Update the `path` method to specify the date to pull exchange rates for.
7. Update the configured catalog to use `incremental` sync when we're testing the stream.
We'll describe what each of these methods do below. Before we begin, it may help to familiarize yourself with how incremental sync works in Airbyte by reading the [docs on incremental](https://docs.airbyte.io/architecture/connections/incremental-append).
@@ -132,7 +139,7 @@ Let's also add this parameter to the constructor and declare the `cursor_field`:
from datetime import datetime, timedelta
class ExchangeRates(HttpStream):
class ExchangeRates(HttpStream, IncrementalMixin):
url_base = "https://api.exchangeratesapi.io/"
cursor_field = "date"
primary_key = "date"
@@ -141,24 +148,38 @@ class ExchangeRates(HttpStream):
super().__init__()
self.base = base
self.start_date = start_date
self._cursor_value = None
```
Declaring the `cursor_field` informs the framework that this stream now supports incremental sync. The next time you run `python main_dev.py discover --config sample_files/config.json` you'll find that the `supported_sync_modes` field now also contains `incremental`.
But we're not quite done with supporting incremental, we have to actually emit state! We'll structure our state object very simply: it will be a `dict` whose single key is `'date'` and value is the date of the last day we synced data from. For example, `{'date': '2021-04-26'}` indicates the connector previously read data up until April 26th and therefore shouldn't re-read anything before April 26th.
Let's do this by implementing the `get_updated_state` method inside the `ExchangeRates` class.
Let's do this by implementing the getter and setter for the `state` inside the `ExchangeRates` class.
```python
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, any]:
# This method is called once for each record returned from the API to compare the cursor field value in that record with the current state
# we then return an updated state object. If this is the first time we run a sync or no state was passed, current_stream_state will be None.
if current_stream_state is not None and 'date' in current_stream_state:
current_parsed_date = datetime.strptime(current_stream_state['date'], '%Y-%m-%d')
latest_record_date = datetime.strptime(latest_record['date'], '%Y-%m-%d')
return {'date': max(current_parsed_date, latest_record_date).strftime('%Y-%m-%d')}
@property
def state(self) -> Mapping[str, Any]:
if self._cursor_value:
return {self.cursor_field: self._cursor_value.strftime('%Y-%m-%d')}
else:
return {'date': self.start_date.strftime('%Y-%m-%d')}
return {self.cursor_field: self.start_date.strftime('%Y-%m-%d')}
@state.setter
def state(self, value: Mapping[str, Any]):
self._cursor_value = datetime.strptime(value[self.cursor_field], '%Y-%m-%d')
```
Update internal state `cursor_value` inside `read_records` method
```python
def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]:
for record in super().read_records(*args, **kwargs):
if self._cursor_value:
latest_record_date = datetime.strptime(latest_record[self.cursor_field], '%Y-%m-%d')
self._cursor_value = max(self._cursor_value, latest_record_date)
yield record
```
This implementation compares the date from the latest record with the date in the current state and takes the maximum as the "new" state object.
@@ -166,20 +187,19 @@ This implementation compares the date from the latest record with the date in th
We'll implement the `stream_slices` method to return a list of the dates for which we should pull data based on the stream state if it exists:
```python
def _chunk_date_range(self, start_date: datetime) -> List[Mapping[str, any]]:
def _chunk_date_range(self, start_date: datetime) -> List[Mapping[str, Any]]:
"""
Returns a list of each day between the start date and now.
The return value is a list of dicts {'date': date_string}.
"""
dates = []
while start_date < datetime.now():
dates.append({'date': start_date.strftime('%Y-%m-%d')})
dates.append({self.cursor_field: start_date.strftime('%Y-%m-%d')})
start_date += timedelta(days=1)
return dates
def stream_slices(self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None) -> Iterable[
Optional[Mapping[str, any]]]:
start_date = datetime.strptime(stream_state['date'], '%Y-%m-%d') if stream_state and 'date' in stream_state else self.start_date
def stream_slices(self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None) -> Iterable[Optional[Mapping[str, Any]]]:
start_date = datetime.strptime(stream_state[self.cursor_field], '%Y-%m-%d') if stream_state and self.cursor_field in stream_state else self.start_date
return self._chunk_date_range(start_date)
```
@@ -222,3 +242,4 @@ You should see that only the record from the last date is being synced! This is
With that, we've implemented incremental sync for our connector!
````