* [ISSUE #19410] remove request_options_provider from the … (#21403) * [ISSUE #19410] (incomplete) remove request_options_provider from the manifest * [ISSUE #19410] (incomplete) incomplete cleanup config_component_schema.json as well * [ISSUE #19410] update source-monday * [ISSUE #19410] code review * [ISSUE #19410] formatting files * [Low-Code CDK] Replace the $options keyword with $parameters (#21632) * refactor flows and tests to use parameters instead of options * update documentation to reflect the change from options to parameters * create migration script to replace options with parameters in existing manifests * update template to use parameters instead of options * fix tests after rebasing from the branch * address pr feedback and extra uses of options that I missed * additional changes needed after rebasing from master * migrate low-code connectors to use parameters instead of options * 🚨🚨 [Low Code CDK] Update `*ref` format to `#/` (#21434) * [Low-Code CDK] Remove JsonSchema type in favor of JsonSchemaFileLoader (#21832) * fully deprecate JsonSchema in favor of JsonFileSchemaLoader * remove usage in the legacy registry * Update migration scripts according to manifest file rename (#21920) * Issue 21866 remove legacy factory and validation flow (#21878) * [ISSUE #21866] clean ManifestDeclarativeSource validation * [ISSUE #21866] remove dataclasses-jsonschema * [ISSUE #21866] code review * [ISSUE-21866] flake8 * [ISSUE #21559] remove DefaultPaginator.url_base (#21823) * [ISSUE #21559] remove DefaultPaginator.url_base * [ISSUE #21559] code review * [ISSUE #21559] update migration script * [ISSUE #21559] code review * [ISSUE #21559] update documentation * [ISSUE #21559] run migration (#21824) * [ISSUE #21559] remove DefaultPaginator.url_base (#21823) * [ISSUE #21559] remove DefaultPaginator.url_base * [ISSUE #21559] code review * [ISSUE #21559] update migration script * [ISSUE #21559] code review * [ISSUE #21559] update documentation * [ISSUE #21559] run migration (#21824) * [ISSUE #21559] fix manifests * [ISSUE #21926] setup server to allow for local tests (#21974) * [Low Code CDK] remove checkpoint_interval from DeclarativeStream component (#22120) * Issue #21576 rename dpathextractor fieldpointer (#21990) * [ISSUE #21926] setup server to allow for local tests * [ISSUE #21576] Rename DpathExtractor.field_pointer to field_path * [ISSUE #21576] migration script * [ISSUE #21576] update source-monday and source-pocket as well * [ISSUE #21576] migration (#21997) * [ISSUE #21576] code review * Remove checkpoint_interval from source-prestashop manifest (#22141) * replacing options with parameters for a few connectors I missed or were newly added * [Low-Code CDK] Rremove stream_cursor_field from stream and derive it from stream_slicer (#22294) * update schema to derive cursor_field from a stream slicer if it exists * remove usage of stream_cursor_field on simple connector use cases * fixing some of the more complex usage of stream_cursor_field that rely on cartesian product stream slicers * fix documentation to replace references to stream_cursor_field * Low Code CDK: Remove `name` and `primary_key` from non-DeclarativeStream components (#21891) * fix eslint issues for webapp (#22462) * 🪟 🔧 Connector Builder frontend fixes for low_code_cdk_to_beta (#22375) * bump connector builder server to latest CDK version * fix breaking CDK changes in connector builder FE * [Low-Code CDK] Separate request path from RequestOption component (#22398) * split apart path from RequestOption and fix usages and cleanup the code * replace usage of path with RequestPath and get rid of default to RequestOption * fix bug where stream_slice_field was used in outbound request instead of request_option field_name * organize yaml schema names and update documentation for RequestOption and RequestPath * clean up tests * regenerate models * [ISSUE #19961] refactor stream slices (#22225) * [ISSUE #19961] add 'incremental' and partially remove CartesianProductStreamSlicer - Google PageSpeed Insights not working yet * [ISSUE #19961] fixing Google PageSpeed Insights * move incremental_sync field to the stream level and perform merging into one stream slicer at that level * add tests to merging incremental and iterable into cartesian * rewrite documentation to separate incremental sync and iterator concepts * update documentation to use partition router and revise the tutorial to reflect the new changes to the components * [ISSUE #19961] update code to newest CDK version and clean autogenerated files (#22670) * [ISSUE #19961] rename stream_slicer to partition_router and update ma… (#22590) * [ISSUE #19961] rename stream_slicer to partition_router and update manifests (for incremental_sync as well) * [ISSUE 19961] rename CustomStreamSlicer (#22598) * [ISSUE 19961] rename CustomStreamSlicer * [ISSUE #19961] code review CustomStreamSlicer * [ISSUE #19961] fix source_square incremental sync * [ISSUE #19961] rename SingleSlice to SinglePartitionRouter (#22591) * [ISSUE #19961] rename SingleSlice to SinglePartitionRouter * remove SinglePartitionRouter from the schema --------- Co-authored-by: brianjlai <brian.lai@airbyte.io> * [ISSUE #19961] rename SubstreamSlicer to SubstreamPartitionRouter (#22596) * [ISSUE #19961] TMP rename SubstreamSlicer to SubstreamPartitionRouter * [ISSUE #19961] revert DatetimeStreamSlicer.stream_state_field_start and DatetimeStreamSlicer.stream_state_field_end * [ISSUE #19961] rename ListStreamSlicer to ListPartitionRouter (#22593) --------- Co-authored-by: brianjlai <brian.lai@airbyte.io> * [ISSUE #19961] clean faulty merge * [ISSUE #19961] rename DatetimeStreamSlicer (#22617) * [ISSUE #19961] rename stream_slicer to partition_router and update manifests (for incremental_sync as well) * [ISSUE 19961] rename CustomStreamSlicer (#22598) * [ISSUE 19961] rename CustomStreamSlicer * [ISSUE #19961] code review CustomStreamSlicer * [ISSUE #19961] fix source_square incremental sync * [ISSUE #19961] rename SingleSlice to SinglePartitionRouter (#22591) * [ISSUE #19961] rename SingleSlice to SinglePartitionRouter * remove SinglePartitionRouter from the schema --------- Co-authored-by: brianjlai <brian.lai@airbyte.io> * [ISSUE #19961] rename DatetimeStreamSlicer * [ISSUE #19961] rename SubstreamSlicer to SubstreamPartitionRouter (#22596) * [ISSUE #19961] TMP rename SubstreamSlicer to SubstreamPartitionRouter * [ISSUE #19961] revert DatetimeStreamSlicer.stream_state_field_start and DatetimeStreamSlicer.stream_state_field_end * [ISSUE #19961] rename ListStreamSlicer to ListPartitionRouter (#22593) --------- Co-authored-by: brianjlai <brian.lai@airbyte.io> * Update docs/connector-development/config-based/understanding-the-yaml-file/partition-router.md Co-authored-by: Maxime Carbonneau-Leclerc <maxi297@users.noreply.github.com> * Update docs/connector-development/config-based/understanding-the-yaml-file/partition-router.md Co-authored-by: Maxime Carbonneau-Leclerc <maxi297@users.noreply.github.com> * Update docs/connector-development/config-based/understanding-the-yaml-file/yaml-overview.md Co-authored-by: Maxime Carbonneau-Leclerc <maxi297@users.noreply.github.com> * Update docs/connector-development/config-based/understanding-the-yaml-file/partition-router.md Co-authored-by: Maxime Carbonneau-Leclerc <maxi297@users.noreply.github.com> * Update docs/connector-development/config-based/understanding-the-yaml-file/partition-router.md Co-authored-by: Maxime Carbonneau-Leclerc <maxi297@users.noreply.github.com> * Update docs/connector-development/config-based/understanding-the-yaml-file/partition-router.md Co-authored-by: Maxime Carbonneau-Leclerc <maxi297@users.noreply.github.com> * Update docs/connector-development/config-based/understanding-the-yaml-file/incremental-syncs.md Co-authored-by: Maxime Carbonneau-Leclerc <maxi297@users.noreply.github.com> * update docs * [ISSUE #19961] clean unit tests files * [ISSUE #19961] code review --------- Co-authored-by: brianjlai <brian.lai@airbyte.io> Co-authored-by: Brian Lai <51336873+brianjlai@users.noreply.github.com> * [Low-Code CDK] Allow for children of custom components to specify parameters that are normally derived (#22379) * Fix a bug where child components of a custom component cannot receive fields from other components * add tests, documentation and commenting * fix test from merge * add better error message for nested initialization failures * 🪟 🔧 Connector Builder frontend fixes for low_code_cdk_to_beta (#22880) * restrict name to stream level * remove checkpoint interval * adjust logic for new request options * refactor slicers * wording * review comments * make oldest supported version explicit * separate the frontend and connector builder changes from the low-code to beta release * [Low-Code CDK] Add script to run low code unit tests and address issues with a few connectors (#23123) * consolidate all the changes into a new PR after I messed up the merge on the side branch * add set to allow this to be called externally if necessary later * remove last few extra fields i found and fix docs links * fix docs one more time --------- Co-authored-by: Maxime Carbonneau-Leclerc <maxi297@users.noreply.github.com> Co-authored-by: Catherine Noll <clnoll@users.noreply.github.com> Co-authored-by: maxi297 <maxime@airbyte.io> Co-authored-by: Lake Mossman <lake@airbyte.io> Co-authored-by: Joe Reuter <joe@airbyte.io>
9.4 KiB
Step 5: Incremental Reads
We now have a working implementation of a connector reading the latest exchange rates for a given currency. In this section, we'll update the source to read historical data instead of only reading the latest exchange rates.
According to the API documentation, we can read the exchange rate for a specific date by querying the "/exchangerates_data/{date}" endpoint instead of "/exchangerates_data/latest".
We'll now add a start_date property to the connector.
First we'll update the spec block in source_exchange_rates_tutorial/manifest.yaml
spec:
documentation_url: https://docs.airbyte.io/integrations/sources/exchangeratesapi
connection_specification:
$schema: http://json-schema.org/draft-07/schema#
title: exchangeratesapi.io Source Spec
type: object
required:
- start_date
- access_key
- base
additionalProperties: true
properties:
start_date:
type: string
description: Start getting data from that date.
pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}$
examples:
- YYYY-MM-DD
access_key:
type: string
description: >-
Your API Access Key. See <a
href="https://exchangeratesapi.io/documentation/">here</a>. The key is
case sensitive.
airbyte_secret: true
base:
type: string
description: >-
ISO reference currency. See <a
href="https://www.ecb.europa.eu/stats/policy_and_exchange_rates/euro_reference_exchange_rates/html/index.en.html">here</a>.
examples:
- EUR
- USD
Then we'll set the start_date to last week in our connection config in secrets/config.json.
Let's add a start_date field to secrets/config.json.
The file should look like
{
"access_key": "<your_access_key>",
"start_date": "2022-07-26",
"base": "USD"
}
where the start date should be 7 days in the past.
And we'll update the path in the connector definition to point to /{{ config.start_date }}.
Note that we are setting a default value because the check operation does not know the start_date. We'll default to hitting /exchangerates_data/latest:
definitions:
<...>
rates_stream:
$ref: "#/definitions/base_stream"
$parameters:
name: "rates"
primary_key: "date"
path: "/exchangerates_data/{{config['start_date'] or 'latest'}}"
You can test these changes by executing the read operation:
python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json
By reading the output record, you should see that we read historical data instead of the latest exchange rate. For example:
"historical": true, "base": "USD", "date": "2022-07-18"
The connector will now always read data for the start date, which is not exactly what we want.
Instead, we would like to iterate over all the dates between the start_date and today and read data for each day.
We can do this by adding a DatetimeBasedCursor to the connector definition, and update the path to point to the stream_slice's start_date:
More details on incremental syncs can be found here.
Let's first define a datetime cursor at the top level of the connector definition:
definitions:
datetime_cursor:
type: "DatetimeBasedCursor"
start_datetime:
datetime: "{{ config['start_date'] }}"
datetime_format: "%Y-%m-%d"
end_datetime:
datetime: "{{ now_utc() }}"
datetime_format: "%Y-%m-%d %H:%M:%S.%f+00:00"
step: "P1D"
datetime_format: "%Y-%m-%d"
cursor_granularity: "P1D"
cursor_field: "date"
and refer to it in the stream.
This will generate time windows from the start time until the end time, where each window is exactly one day.
The start time is defined in the config file, while the end time is defined by the now_utc() macro, which will evaluate to the current date in the current timezone at runtime. See the section on string interpolation for more details.
definitions:
<...>
rates_stream:
$ref: "#/definitions/base_stream"
$parameters:
name: "rates"
primary_key: "date"
path: "/exchangerates_data/{{config['start_date'] or 'latest'}}"
We'll also update the base stream to use the datetime cursor:
definitions:
<...>
base_stream:
<...>
incremental_sync:
$ref: "#/definitions/datetime_cursor"
Finally, we'll update the path to point to the stream_slice's start_time
definitions:
<...>
rates_stream:
$ref: "#/definitions/base_stream"
$parameters:
name: "rates"
primary_key: "date"
path: "/exchangerates_data/{{stream_slice['start_time'] or 'latest'}}"
The full connector definition should now look like ./source_exchange_rates_tutorial/manifest.yaml:
version: "0.1.0"
definitions:
selector:
extractor:
field_path: [ ]
requester:
url_base: "https://api.apilayer.com"
http_method: "GET"
authenticator:
type: ApiKeyAuthenticator
header: "apikey"
api_token: "{{ config['access_key'] }}"
request_options_provider:
request_parameters:
base: "{{ config['base'] }}"
datetime_cursor:
type: "DatetimeBasedCursor"
start_datetime:
datetime: "{{ config['start_date'] }}"
datetime_format: "%Y-%m-%d"
end_datetime:
datetime: "{{ now_utc() }}"
datetime_format: "%Y-%m-%d %H:%M:%S.%f+00:00"
step: "P1D"
datetime_format: "%Y-%m-%d"
cursor_granularity: "P1D"
cursor_field: "date"
retriever:
record_selector:
$ref: "#/definitions/selector"
paginator:
type: NoPagination
requester:
$ref: "#/definitions/requester"
base_stream:
incremental_sync:
$ref: "#/definitions/datetime_cursor"
retriever:
$ref: "#/definitions/retriever"
rates_stream:
$ref: "#/definitions/base_stream"
$parameters:
name: "rates"
primary_key: "date"
path: "/exchangerates_data/{{stream_slice['start_time'] or 'latest'}}"
streams:
- "#/definitions/rates_stream"
check:
stream_names:
- "rates"
spec:
documentation_url: https://docs.airbyte.io/integrations/sources/exchangeratesapi
connection_specification:
$schema: http://json-schema.org/draft-07/schema#
title: exchangeratesapi.io Source Spec
type: object
required:
- start_date
- access_key
- base
additionalProperties: true
properties:
start_date:
type: string
description: Start getting data from that date.
pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}$
examples:
- YYYY-MM-DD
access_key:
type: string
description: >-
Your API Access Key. See <a
href="https://exchangeratesapi.io/documentation/">here</a>. The key is
case sensitive.
airbyte_secret: true
base:
type: string
description: >-
ISO reference currency. See <a
href="https://www.ecb.europa.eu/stats/policy_and_exchange_rates/euro_reference_exchange_rates/html/index.en.html">here</a>.
examples:
- EUR
- USD
Running the read operation will now read all data for all days between start_date and now:
python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json
The operation should now output more than one record:
{"type": "LOG", "log": {"level": "INFO", "message": "Read 8 records from rates stream"}}
Supporting incremental syncs
Instead of always reading data for all dates, we would like the connector to only read data for dates we haven't read yet.
This can be achieved by updating the catalog to run in incremental mode (integration_tests/configured_catalog.json):
{
"streams": [
{
"stream": {
"name": "rates",
"json_schema": {},
"supported_sync_modes": [
"full_refresh",
"incremental"
]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite"
}
]
}
In addition to records, the read operation now also outputs state messages:
{"type": "STATE", "state": {"data": {"rates": {"date": "2022-07-15"}}}}
Where the date ("2022-07-15") should be replaced by today's date.
We can simulate incremental syncs by creating a state file containing the last state produced by the read operation.
source-exchange-rates-tutorial/integration_tests/sample_state.json:
{
"rates": {
"date": "2022-07-15"
}
}
Running the read operation will now only read data for dates later than the given state:
python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json --state integration_tests/sample_state.json
There shouldn't be any data read if the state is today's date:
{"type": "LOG", "log": {"level": "INFO", "message": "Setting state of rates stream to {'date': '2022-07-15'}"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Read 0 records from rates stream"}}
Next steps:
Next, we'll run the Connector Acceptance Tests suite to ensure the connector invariants are respected.