[docs] improve docs on incremental sync around cursor granularity (#50862)
Co-authored-by: Marcos Marx <marcosmarxm@users.noreply.github.com>
This commit is contained in:
@@ -10,7 +10,7 @@ When a stream is read incrementally, a state message will be output by the conne
|
||||
|
||||
## DatetimeBasedCursor
|
||||
|
||||
The `DatetimeBasedCursor` is used to read records from the underlying data source (e.g: an API) according to a specified datetime range. This time range is partitioned into time windows according to the `step`. For example, if you have `start_time=2022-01-01T00:00:00`, `end_time=2022-01-05T00:00:00` and `step=P1D`, the following partitions will be created:
|
||||
The `DatetimeBasedCursor` is used to read records from the underlying data source (e.g: an API) according to a specified datetime range. This time range is partitioned into time windows according to the `step`. For example, if you have `start_time=2022-01-01T00:00:00`, `end_time=2022-01-05T12:00:00`, `step=P1D` and `cursor_granularity=PT1S`, the following partitions will be created:
|
||||
|
||||
| Start | End |
|
||||
| ------------------- | ------------------- |
|
||||
@@ -18,108 +18,24 @@ The `DatetimeBasedCursor` is used to read records from the underlying data sourc
|
||||
| 2022-01-02T00:00:00 | 2022-01-02T23:59:59 |
|
||||
| 2022-01-03T00:00:00 | 2022-01-03T23:59:59 |
|
||||
| 2022-01-04T00:00:00 | 2022-01-04T23:59:59 |
|
||||
| 2022-01-05T00:00:00 | 2022-01-05T00:00:00 |
|
||||
| 2022-01-05T00:00:00 | 2022-01-05T12:00:00 |
|
||||
|
||||
During the sync, records are read from the API according to these time windows and the `cursor_field` indicates where the datetime value is stored on a record. This cursor is progressed as these partitions of records are successfully transmitted to the destination.
|
||||
|
||||
Upon a successful sync, the final stream state will be the datetime of the last record emitted. On the subsequent sync, the connector will fetch records whose cursor value begins on that datetime and onward.
|
||||
|
||||
Schema:
|
||||
Refer to the schema for both [`DatetimeBasedCursor`](reference.md#/definitions/DatetimeBasedCursor) and [`MinMaxDatetime`](reference.md#/definitions/MinMaxDatetime) in the YAML reference for more details.
|
||||
|
||||
```yaml
|
||||
DatetimeBasedCursor:
|
||||
description: Cursor to provide incremental capabilities over datetime
|
||||
type: object
|
||||
required:
|
||||
- type
|
||||
- cursor_field
|
||||
- end_datetime
|
||||
- datetime_format
|
||||
- cursor_granularity
|
||||
- start_datetime
|
||||
- step
|
||||
properties:
|
||||
type:
|
||||
type: string
|
||||
enum: [DatetimeBasedCursor]
|
||||
cursor_field:
|
||||
description: The location of the value on a record that will be used as a bookmark during sync
|
||||
type: string
|
||||
datetime_format:
|
||||
description: The format of the datetime
|
||||
type: string
|
||||
cursor_granularity:
|
||||
description: Smallest increment the datetime_format has (ISO 8601 duration) that is used to ensure the start of a slice does not overlap with the end of the previous one
|
||||
type: string
|
||||
end_datetime:
|
||||
description: The datetime that determines the last record that should be synced
|
||||
anyOf:
|
||||
- type: string
|
||||
- "$ref": "#/definitions/MinMaxDatetime"
|
||||
start_datetime:
|
||||
description: The datetime that determines the earliest record that should be synced
|
||||
anyOf:
|
||||
- type: string
|
||||
- "$ref": "#/definitions/MinMaxDatetime"
|
||||
step:
|
||||
description: The size of the time window (ISO8601 duration)
|
||||
type: string
|
||||
end_time_option:
|
||||
description: Request option for end time
|
||||
"$ref": "#/definitions/RequestOption"
|
||||
lookback_window:
|
||||
description: How many days before start_datetime to read data for (ISO8601 duration)
|
||||
type: string
|
||||
global_substream_cursor:
|
||||
title: Whether to store cursor as one value instead of per partition
|
||||
description: If parent stream have thousands of partitions, it can be more efficient to store cursor as one value instead of per partition. Lookback window should be used to avoid missing records that where added during the sync.
|
||||
type: boolean
|
||||
default: false
|
||||
start_time_option:
|
||||
description: Request option for start time
|
||||
"$ref": "#/definitions/RequestOption"
|
||||
partition_field_end:
|
||||
description: Partition start time field
|
||||
type: string
|
||||
partition_field_start:
|
||||
description: Partition end time field
|
||||
type: string
|
||||
$parameters:
|
||||
type: object
|
||||
additionalProperties: true
|
||||
MinMaxDatetime:
|
||||
description: Compares the provided date against optional minimum or maximum times. The max_datetime serves as the ceiling and will be returned when datetime exceeds it. The min_datetime serves as the floor
|
||||
type: object
|
||||
required:
|
||||
- type
|
||||
- datetime
|
||||
properties:
|
||||
type:
|
||||
type: string
|
||||
enum: [MinMaxDatetime]
|
||||
datetime:
|
||||
type: string
|
||||
datetime_format:
|
||||
type: string
|
||||
default: ""
|
||||
max_datetime:
|
||||
type: string
|
||||
min_datetime:
|
||||
type: string
|
||||
$parameters:
|
||||
type: object
|
||||
additionalProperties: true
|
||||
```
|
||||
|
||||
Example:
|
||||
|
||||
```yaml
|
||||
incremental_sync:
|
||||
type: DatetimeBasedCursor
|
||||
start_datetime: "2022-01-01T00:00:00.000000+0000"
|
||||
end_datetime: "2022-01-05T00:00:00.000000+0000"
|
||||
datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z"
|
||||
cursor_granularity: "PT0.000001S"
|
||||
start_datetime: "2022-01-01T00:00:00"
|
||||
end_datetime: "2022-01-05T12:00:00"
|
||||
datetime_format: "%Y-%m-%dT%H:%M:%S"
|
||||
cursor_granularity: "PT1S"
|
||||
step: "P1D"
|
||||
```
|
||||
|
||||
@@ -148,7 +64,7 @@ The stream partitions' field names can be customized through the `partition_fiel
|
||||
The `datetime_format` can be used to specify the format of the start and end time. It is [RFC3339](https://datatracker.ietf.org/doc/html/rfc3339#section-5.6) by default.
|
||||
|
||||
The Stream's state will be derived by reading the record's `cursor_field`.
|
||||
If the `cursor_field` is `updated_at`, and the record is `{"id": 1234, "created": "2021-02-02T00:00:00.000000+0000"}`, then the state after reading that record is `"updated_at": "2021-02-02T00:00:00.000000+0000"`. [^1]
|
||||
If the `cursor_field` is `updated_at`, and the record is `{"id": 1234, "updated_at": "2021-02-02T00:00:00.000000+0000"}`, then the state after reading that record is `"updated_at": "2021-02-02T00:00:00.000000+0000"`. [^1]
|
||||
|
||||
Note that all durations are expressed as [ISO 8601 durations](https://en.wikipedia.org/wiki/ISO_8601#Durations).
|
||||
|
||||
@@ -233,3 +149,4 @@ Choose the option that best fits your data structure and sync requirements to op
|
||||
## More readings
|
||||
|
||||
- [Incremental reads](../../cdk-python/incremental-stream.md)
|
||||
- Many of the concepts discussed here are described in the [No-Code Connector Builder docs](../../connector-builder-ui/incremental-sync.md) as well, with more examples.
|
||||
|
||||
@@ -22,10 +22,10 @@ In the builder UI, these things are specified like this:
|
||||
|
||||
- The "Cursor field" is the property in the record that defines the date and time when the record got changed. It's used to decide which records are synced already and which records are "new"
|
||||
- The "Datetime format" specifies the format the cursor field is using to specify date and time. Check out the [YAML reference](/connector-development/config-based/understanding-the-yaml-file/reference#/definitions/DatetimeBasedCursor) for a full list of supported formats.
|
||||
- "API time filtering capabilities" specifies if the API allows filtering by start and end datetime or whether it's a "feed" of data going from newest to oldest records. See the "Incremental sync without time filtering" section below for details.
|
||||
- "API time filtering capabilities" specifies if the API allows filtering by start and end datetime or whether it's a "feed" of data going from newest to oldest records. See the ["Incremental sync without time filtering"](#incremental-sync-without-time-filtering) section below for details.
|
||||
- The "Start datetime" is the initial start date of the time range to fetch records for. When doing incremental syncs, the second sync will overwrite this date with the last record that got synced so far.
|
||||
- The "End datetime" is the end date of the time range to fetch records for. In most cases it's set to the current date and time when the sync is started to sync all changes that happened so far.
|
||||
- The "Inject start/end time into outgoing HTTP request" defines how to request records that got changed in the time range to sync. In most cases the start and end time is added as a query parameter or body parameter
|
||||
- The "Inject start/end time into outgoing HTTP request" defines how to request records that got changed in the time range to sync. In most cases the start and end time is added as a query parameter or body parameter.
|
||||
|
||||
## Example
|
||||
|
||||
@@ -64,7 +64,7 @@ Setting the start date in the "Testing values" to a date in the past like **2023
|
||||
curl 'https://content.guardianapis.com/search?from-date=<b>2023-04-09T00:00:00Z</b>&to-date={`now`}'
|
||||
</pre>
|
||||
|
||||
The most recent encountered date will be saved as part of the connection - when the next sync is running, it picks up from that date as the new start date. Let's assume the last ecountered article looked like this:
|
||||
The most recent encountered date will be saved as the [*state*](../../understanding-airbyte/airbyte-protocol.md#state--checkpointing) of the connection - when the next sync is running, it picks up from that cutoff date as the new start date. Let's assume the last ecountered article looked like this:
|
||||
|
||||
<pre>
|
||||
{`{
|
||||
@@ -108,22 +108,22 @@ The description above is sufficient for a lot of APIs. However there are some mo
|
||||
|
||||
### Split up interval
|
||||
|
||||
When incremental syncs are enabled and "Split up interval" is set, the connector is not fetching all records since the cutoff date at once - instead it's splitting up the time range between the cutoff date and the desired end date into intervals based on the "Step" configuration expressed as [ISO 8601 duration](https://en.wikipedia.org/wiki/ISO_8601#Durations).
|
||||
When incremental syncs are enabled and "Split Up Interval" is set, the connector is not fetching all records since the cutoff date at once - instead it's splitting up the time range between the cutoff date and the desired end date into intervals based on the "Step" configuration expressed as [ISO 8601 duration](https://en.wikipedia.org/wiki/ISO_8601#Durations).
|
||||
|
||||
The "cursor granularity" also needs to be set to an ISO 8601 duration - it represents the smallest possible time unit the API supports to filter records by. It's used to ensure the start of a interval does not overlap with the end of the previous one.
|
||||
The "Cursor Granularity" also needs to be set to an ISO 8601 duration - it represents the smallest possible time unit the API supports to filter records by. It's used to ensure the start of a interval does not overlap with the end of the previous one.
|
||||
|
||||
For example if the "Step" is set to 10 days (`P10D`) and the "Cursor granularity" set to second (`PT1S`) for the Guardian articles stream described above and a longer time range, then the following requests will be performed:
|
||||
For example if the "Step" is set to 10 days (`P10D`) and the "Cursor granularity" set to one second (`PT1S`) for the Guardian articles stream described above and a longer time range, then the following requests will be performed:
|
||||
|
||||
<pre>
|
||||
curl 'https://content.guardianapis.com/search?from-date=<b>2023-01-01T00:00:00Z</b>&to-date=<b>2023-01-10T00:00:00Z</b>'{`\n`}
|
||||
curl 'https://content.guardianapis.com/search?from-date=<b>2023-01-10T00:00:00Z</b>&to-date=<b>2023-01-20T00:00:00Z</b>'{`\n`}
|
||||
curl 'https://content.guardianapis.com/search?from-date=<b>2023-01-20T00:00:00Z</b>&to-date=<b>2023-01-30T00:00:00Z</b>'{`\n`}
|
||||
curl 'https://content.guardianapis.com/search?from-date=<b>2023-01-01T00:00:00Z</b>&to-date=<b>2023-01-09T23:59:59Z</b>'{`\n`}
|
||||
curl 'https://content.guardianapis.com/search?from-date=<b>2023-01-10T00:00:00Z</b>&to-date=<b>2023-01-19T23:59:59Z</b>'{`\n`}
|
||||
curl 'https://content.guardianapis.com/search?from-date=<b>2023-01-20T00:00:00Z</b>&to-date=<b>2023-01-29T23:59:59Z</b>'{`\n`}
|
||||
...
|
||||
</pre>
|
||||
|
||||
After an interval is processed, the cursor value of the last record will be saved as part of the connection as the new cutoff date.
|
||||
After an interval is processed, the cursor value of the last record will be saved as part of the connection as the new cutoff date, as described in the [example above](#example).
|
||||
|
||||
If left unset, the connector will not split up the time range at all but will instead just request all records for the entire target time range. This configuration works for all connectors, but there are two reasons to change it:
|
||||
If "Split Up Interval" is left unset, the connector will not split up the time range at all but will instead just request all records for the entire target time range. This configuration works for all connectors, but there are two reasons to change it:
|
||||
|
||||
- **To protect a connection against intermittent failures** - if the "Step" size is a day, the cutoff date is saved after all records associated with a day are proccessed. If a sync fails halfway through because the API, the Airbyte system, the destination or the network between these components has a failure, then at most one day worth of data needs to be resynced. However, a smaller step size might cause more requests to the API and more load on the system. It depends on the expected amount of data and load characteristics of an API what step size is optimal, but for a lot of applications the default of one month is a good starting point.
|
||||
- **The API requires the connector to fetch data in pre-specified chunks** - for example the [Exchange Rates API](https://exchangeratesapi.io/documentation/) makes the date to fetch data for part of the URL path and only allows to fetch data for a single day at a time
|
||||
|
||||
Reference in New Issue
Block a user