1
0
mirror of synced 2025-12-21 19:11:14 -05:00

Update CDK Tutorial to use new CDK. (#3358)

As the title suggests.

Some intricacies around how various directories have diverged.

The HTTP templates now use integration_tests to house catalogs and config files instead of sample_files. After this PR, everything should be updated to use integration_tests for all files except config files. Config files use secrets since they often contain secrets. Anything, besides actual implementations that have yet to be updated, not following this is my mistake.

Even though it diverges from the generated templates, I've decided to leave the tutorial code and the tutorial as using sample_files for now. I think it's more straightforward than telling people to create a new secrets directory. We can change this at a later date.

Confirmed the checked in http python tutorial code works by running ./gradlew :airbyte-integrations:connectors:source-python-http-tutorial:standardSourceTestFile.
This commit is contained in:
Davin Chia
2021-05-11 16:52:13 +08:00
committed by GitHub
parent 646c39c79c
commit 9a92da06b9
21 changed files with 79 additions and 128 deletions

View File

@@ -11,5 +11,5 @@ $ npm run generate
Select the `Python HTTP API Source` template and then input the name of your connector. For this walk-through we will refer to our source as `python-http-example`. The finalized source code for this tutorial can be found [here](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-python-http-tutorial).
The source we will build in this tutorial will pull data from the [Rates API](https://github.com/airbytehq/airbyte/tree/d940c78307f09f38198e50e54195052d762af944/docs/contributing-to-airbyte/tutorials/cdk-tutorial-alpha/ratesapi.io), a free and open API which documents historical exchange rates for fiat currencies.
The source we will build in this tutorial will pull data from the [Rates API](https://ratesapi.io/), a free and open API which documents historical exchange rates for fiat currencies.

View File

@@ -14,7 +14,7 @@ This step sets up the initial python environment. **All** subsequent `python` or
Let's verify everything is working as intended. Run:
```text
python main_dev.py spec
python main.py spec
```
You should see some output:
@@ -25,7 +25,7 @@ You should see some output:
We just ran Airbyte Protocol's `spec` command! We'll talk more about this later, but this is a simple sanity check to make sure everything is wired up correctly.
Note that the `main_dev.py` file is a simple script that makes it easy to run your connector. Its invocation format is `python main_dev.py <command> [args]`. See the module's generated `README.md` for the commands it supports.
Note that the `main.py` file is a simple script that makes it easy to run your connector. Its invocation format is `python main.py <command> [args]`. See the module's generated `README.md` for the commands it supports.
## Notes on iteration cycle
@@ -45,14 +45,14 @@ There are two ways we recommend iterating on a source. Consider using whichever
**Run the source using python**
You'll notice in your source's directory that there is a python file called `main_dev.py`. This file exists as convenience for development. You run it to test that your source works:
You'll notice in your source's directory that there is a python file called `main.py`. This file exists as convenience for development. You run it to test that your source works:
```text
# from airbyte-integrations/connectors/source-<name>
python main_dev.py spec
python main_dev.py check --config secrets/config.json
python main_dev.py discover --config secrets/config.json
python main_dev.py read --config secrets/config.json --catalog sample_files/configured_catalog.json
python main.py spec
python main.py check --config secrets/config.json
python main.py discover --config secrets/config.json
python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json
```
The nice thing about this approach is that you can iterate completely within python. The downside is that you are not quite running your source as it will actually be run by Airbyte. Specifically, you're not running it from within the docker container that will house it.

View File

@@ -42,17 +42,17 @@ Let's test out this implementation by creating two objects: a valid and an inval
```text
echo '{"start_date": "2021-04-01", "base": "USD"}' > sample_files/config.json
echo '{"start_date": "2021-04-01", "base": "BTC"}' > sample_files/invalid_config.json
python main_dev.py check --config sample_files/config.json
python main_dev.py check --config sample_files/invalid_config.json
python main.py check --config sample_files/config.json
python main.py check --config sample_files/invalid_config.json
```
You should see output like the following:
```text
> python main_dev.py check --config sample_files/config.json
> python main.py check --config sample_files/config.json
{"type": "CONNECTION_STATUS", "connectionStatus": {"status": "SUCCEEDED"}}
> python main_dev.py check --config sample_files/invalid_config.json
> python main.py check --config sample_files/invalid_config.json
{"type": "CONNECTION_STATUS", "connectionStatus": {"status": "FAILED", "message": "Input currency BTC is invalid. Please input one of the following currencies: {'DKK', 'USD', 'CZK', 'BGN', 'JPY'}"}}
```

View File

@@ -40,7 +40,7 @@ class ExchangeRates(HttpStream):
Note that this implementation is entirely empty -- we haven't actually done anything. We'll come back to this in the next step. But for now we just want to declare the schema of this stream. We'll declare this as a stream that the connector outputs by returning it from the `streams` method:
```python
from base_python.cdk.streams.auth.core import NoAuth
from airbyte_cdk.sources.streams.http.auth import NoAuth
class SourcePythonHttpTutorial(AbstractSource):
@@ -55,12 +55,12 @@ class SourcePythonHttpTutorial(AbstractSource):
return [ExchangeRates(authenticator=auth)]
```
Having created this stream in code, we'll put a file `exchange_rates.json` in the `schemas/` folder. You can download the JSON file describing the output schema [here](https://github.com/airbytehq/airbyte/tree/d940c78307f09f38198e50e54195052d762af944/docs/contributing-to-airbyte/tutorials/cdk-tutorial-alpha/http_api_source_assets/exchange_rates.json) for convenience and place it in `schemas/`.
Having created this stream in code, we'll put a file `exchange_rates.json` in the `schemas/` folder. You can download the JSON file describing the output schema [here](https://github.com/airbytehq/airbyte/blob/master/airbyte-cdk/python/docs/tutorials/http_api_source_assets/exchange_rates.json) for convenience and place it in `schemas/`.
With `.json` schema file in place, let's see if the connector can now find this schema and produce a valid catalog:
```text
python main_dev.py discover --config sample_files/config.json
python main.py discover --config sample_files/config.json
```
you should see some output like:

View File

@@ -2,7 +2,7 @@
Describing schemas is good and all, but at some point we have to start reading data! So let's get to work. But before, let's describe what we're about to do:
The `HttpStream` superclass, like described in the [concepts documentation](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/bases/base-python/README.md), is facilitating reading data from HTTP endpoints. It contains built-in functions or helpers for:
The `HttpStream` superclass, like described in the [concepts documentation](../../concepts/http-streams.md), is facilitating reading data from HTTP endpoints. It contains built-in functions or helpers for:
* authentication
* pagination
@@ -21,7 +21,7 @@ Optionally, we can provide additional inputs to customize requests:
* how to recognize rate limit errors, and how long to wait \(by default it retries 429 and 5XX errors using exponential backoff\)
* HTTP method and request body if applicable
There are many other customizable options - you can find them in the [`base_python.cdk.streams.http.HttpStream`](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/bases/base-python/base_python/cdk/streams/http.py) class.
There are many other customizable options - you can find them in the [`airbyte_cdk.sources.streams.http.HttpStream`](https://github.com/airbytehq/airbyte/blob/master/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py) class.
So in order to read data from the exchange rates API, we'll fill out the necessary information for the stream to do its work. First, we'll implement a basic read that just reads the last day's exchange rates, then we'll implement incremental sync using stream slicing.
@@ -30,7 +30,9 @@ Let's begin by pulling data for the last day's rates by using the `/latest` endp
```python
class ExchangeRates(HttpStream):
url_base = "https://api.ratesapi.io/"
primary_key = None
def __init__(self, base: str, **kwargs):
super().__init__()
self.base = base
@@ -83,10 +85,10 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
We're now ready to query the API!
To do this, we'll need a [ConfiguredCatalog](https://docs.airbyte.io/tutorials/tutorials/beginners-guide-to-catalog). We've prepared one [here](https://github.com/airbytehq/airbyte/tree/d940c78307f09f38198e50e54195052d762af944/docs/contributing-to-airbyte/tutorials/cdk-tutorial-alpha/http_api_source_assets/configured_catalog.json) -- download this and place it in `sample_files/configured_catalog.json`. Then run:
To do this, we'll need a [ConfiguredCatalog](https://docs.airbyte.io/tutorials/tutorials/beginners-guide-to-catalog). We've prepared one [here](https://github.com/airbytehq/airbyte/blob/master/airbyte-cdk/python/docs/tutorials/http_api_source_assets/configured_catalog.json) -- download this and place it in `sample_files/configured_catalog.json`. Then run:
```text
python main_dev.py read --config sample_files/config.json --catalog sample_files/configured_catalog.json
python main.py read --config sample_files/config.json --catalog sample_files/configured_catalog.json
```
you should see some output lines, one of which is a record from the API:
@@ -126,6 +128,7 @@ from datetime import datetime, timedelta
class ExchangeRates(HttpStream):
url_base = "https://api.ratesapi.io/"
cursor_field = "date"
primary_key = "date"
def __init__(self, base: str, start_date: datetime, **kwargs):
super().__init__()
@@ -173,7 +176,7 @@ We'll implement the `stream_slices` method to return a list of the dates for whi
return self._chunk_date_range(start_date)
```
Each slice will cause an HTTP request to be made to the API. We can then use the information present in the `stream_slice` parameter \(a single element from the list we constructed in `stream_slices` above\) to set other configurations for the outgoing request like `path` or `request_params`. For more info about stream slicing, see [the slicing docs](https://github.com/airbytehq/airbyte/tree/d940c78307f09f38198e50e54195052d762af944/docs/contributing-to-airbyte/tutorials/concepts/stream_slices.md).
Each slice will cause an HTTP request to be made to the API. We can then use the information present in the `stream_slice` parameter \(a single element from the list we constructed in `stream_slices` above\) to set other configurations for the outgoing request like `path` or `request_params`. For more info about stream slicing, see [the slicing docs](../../concepts/stream_slices.md).
In order to pull data for a specific date, the Exchange Rates API requires that we pass the date as the path component of the URL. Let's override the `path` method to achieve this:
@@ -195,17 +198,17 @@ We should now have a working implementation of incremental sync!
Let's try it out:
```text
python main_dev.py read --config sample_files/config.json --catalog sample_files/configured_catalog.json
python main.py read --config sample_files/config.json --catalog sample_files/configured_catalog.json
```
You should see a bunch of `RECORD` messages and `STATE` messages. To verify that incremental sync is working, pass the input state back to the connector and run it again:
```text
# Save the latest state to sample_files/state.json
python main_dev.py read --config sample_files/config.json --catalog sample_files/configured_catalog.json | grep STATE | tail -n 1 | jq .state.data > sample_files/state.json
python main.py read --config sample_files/config.json --catalog sample_files/configured_catalog.json | grep STATE | tail -n 1 | jq .state.data > sample_files/state.json
# Run a read operation with the latest state message
python main_dev.py read --config sample_files/config.json --catalog sample_files/configured_catalog.json --state sample_files/state.json
python main.py read --config sample_files/config.json --catalog sample_files/configured_catalog.json --state sample_files/state.json
```
You should see that only the record from the last date is being synced! This is acceptable behavior, since Airbyte requires at-least-once delivery of records, so repeating the last record twice is OK.