1
0
mirror of synced 2026-01-16 09:06:29 -05:00

Proof read CDK Tutorial docs. (#3085)

This commit is contained in:
Davin Chia
2021-04-28 00:36:15 +08:00
committed by GitHub
parent fbe712a190
commit 52df90b458

View File

@@ -23,7 +23,7 @@ All the commands below assume that `python` points to a version of python >=3.7.
* Step 7: Use the connector in Airbyte
* Step 8: Write unit tests or integration tests
Each step of the Creating a Source checklist is explained in more detail below. We also mention how you can submit the connector to ship by default with Airbyte at the end of the tutorial.
Each step of the Creating a Source checklist is explained in more detail below. We also mention how you can submit the connector to be included with the general Airbyte release at the end of the tutorial.
## Explaining Each Step
@@ -38,7 +38,7 @@ $ npm install
$ npm run generate
```
Select the `Python HTTP CDK Source` template and then input the name of your connector. For this walk through we will refer to our source as `python-http-example`. The finalized source code for this tutorial can be found [here](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-python-http-tutorial).
Select the `Python HTTP CDK Source` template and then input the name of your connector. For this walk-through we will refer to our source as `python-http-example`. The finalized source code for this tutorial can be found [here](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-python-http-tutorial).
The source we will build in this tutorial will pull data from the [Rates API](ratesapi.io), a free and open API which
documents historical exchange rates for fiat currencies.
@@ -66,14 +66,14 @@ You should see some output:
{"type": "SPEC", "spec": {"documentationUrl": "https://docsurl.com", "connectionSpecification": {"$schema": "http://json-schema.org/draft-07/schema#", "title": "Python Http Tutorial Spec", "type": "object", "required": ["TODO"], "additionalProperties": false, "properties": {"TODO: This schema defines the configuration required for the source. This usually involves metadata such as database and/or authentication information.": {"type": "string", "description": "describe me"}}}}}
```
We just ran the `spec` command of the Airbyte Protocol! We'll talk more about this later, but this is a simple sanity check to make sure everything is wired up correctly.
We just ran Airbyte Protocol's `spec` command! We'll talk more about this later, but this is a simple sanity check to make sure everything is wired up correctly.
Note that the `main_dev.py` file is a simple script that makes it easy to run your connector. Its invocation format is `python main_dev.py <command> [args]`. See the module's generated `README.md` for the commands it supports.
### Notes on iteration cycle
#### Dependencies
Python dependencies for your source should be declared in `airbyte-integrations/connectors/source-<source-name>/setup.py` in the `install_requires` field. You will notice that a couple of Airbyte dependencies are already declared there. Do not remove these; they give your source access to the helper interface that is provided by the generator.
Python dependencies for your source should be declared in `airbyte-integrations/connectors/source-<source-name>/setup.py` in the `install_requires` field. You will notice that a couple of Airbyte dependencies are already declared there. Do not remove these; they give your source access to the helper interfaces provided by the generator.
You may notice that there is a `requirements.txt` in your source's directory as well. Don't edit this. It is autogenerated and used to provide Airbyte dependencies. All your dependencies should be declared in `setup.py`.
@@ -81,7 +81,7 @@ You may notice that there is a `requirements.txt` in your source's directory as
The commands we ran above created a [Python virtual environment](https://docs.python.org/3/tutorial/venv.html) for your source. If you want your IDE to auto complete and resolve dependencies properly, point it at the virtual env `airbyte-integrations/connectors/source-<source-name>/.venv`. Also anytime you change the dependencies in the `setup.py` make sure to re-run `pip install -r requirements.txt`.
#### Iterating on your implementation
Everyone develops differently but here are 2 ways that we recommend iterating on a source. Consider using whichever one matches your style.
There are two ways we recommend iterating on a source. Consider using whichever one matches your style.
**Run the source using python**
@@ -95,7 +95,7 @@ python main_dev.py discover --config secrets/config.json
python main_dev.py read --config secrets/config.json --catalog sample_files/configured_catalog.json
```
The nice thing about this approach is that you can iterate completely within in python. The downside is that you are not quite running your source as it will actually be run by Airbyte. Specifically you're not running it from within the docker container that will house it.
The nice thing about this approach is that you can iterate completely within python. The downside is that you are not quite running your source as it will actually be run by Airbyte. Specifically, you're not running it from within the docker container that will house it.
**Run the source using docker**
@@ -112,15 +112,15 @@ docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-python-http-example:de
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/sample_files:/sample_files airbyte/source-python-http-example:dev read --config /secrets/config.json --catalog /sample_files/configured_catalog.json
```
Note: Each time you make a change to your implementation you need to re-build the connector image. `docker build . -t airbyte/source-<name>:dev`. This ensures the new python code is added into the docker container.
Note: Each time you make a change to your implementation you need to re-build the connector image via `docker build . -t airbyte/source-<name>:dev`. This ensures the new python code is added into the docker container.
The nice thing about this approach is that you are running your source exactly as it will be run by Airbyte. The tradeoff is that iteration is slightly slower, because you need to re-build the connector between each change.
The nice thing about this approach is that you are running your source exactly as it will be run by Airbyte. The tradeoff is iteration is slightly slower, as the connector is re-built between each change.
### Step 3: Define the inputs required by your connector
Each connector contains declares the inputs it needs to read data from the underlying data source. In the Airbyte Protocol terminology, this is is the `spec` operation.
Each connector declares the inputs it needs to read data from the underlying data source. This is the Airbyte Protocol's `spec` operation.
The simplest way to implement this is by creating a `.json` file in `source_<name>/spec.json` which describes your connector's inputs according to the [ConnectorSpecification](https://github.com/airbytehq/airbyte/blob/master/airbyte-protocol/models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml) schema. This is a good place to start when developing your source. Using JsonSchema, define what the inputs are \(e.g. username and password\). Here's [an example](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/spec.json) of what the `spec.json` looks like for the Freshdesk API source.
The simplest way to implement this is by creating a `.json` file in `source_<name>/spec.json` which describes your connector's inputs according to the [ConnectorSpecification](https://github.com/airbytehq/airbyte/blob/master/airbyte-protocol/models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml#L211) schema. This is a good place to start when developing your source. Using JsonSchema, define what the inputs are \(e.g. username and password\). Here's [an example](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/spec.json) of what the `spec.json` looks like for the Freshdesk API source.
For more details on what the spec is, you can read about the Airbyte Protocol [here](../architecture/airbyte-specification.md).
@@ -161,8 +161,7 @@ In addition to metadata, we define two inputs:
### Step 4: Implement connection checking
The second operation in the Airbyte Protocol that we'll implement is the `check` operation.
This operation verifies that the input configuration supplied by the user can be used to connect to the underlying data source. Note that this user-supplied configuration has the values described in the `spec.json` filled in. In other words if the `spec.json` said that the source requires a `username` and `password` the config object might be `{ "username": "airbyte", "password": "password123" }`. You should then implement something It returns a json object that reports, given the credentials in the config, whether we were able to connect to the source.
This operation verifies that the input configuration supplied by the user can be used to connect to the underlying data source. Note that this user-supplied configuration has the values described in the `spec.json` filled in. In other words if the `spec.json` said that the source requires a `username` and `password` the config object might be `{ "username": "airbyte", "password": "password123" }`. You should then implement something that returns a json object reporting, given the credentials in the config, whether we were able to connect to the source.
In our case, this is a fairly trivial check since the API requires no credentials. Instead, let's verify that the user-input `base` currency is a legitimate currency. In `source.py` we'll find the following autogenerated source:
@@ -222,7 +221,7 @@ While developing, we recommend storing configs which contain secrets in `secrets
The `discover` method of the Airbyte Protocol returns an `AirbyteCatalog`: an object which declares all the streams output by a connector and their schemas. It also declares the sync modes supported by the stream (full refresh or incremental). See the [catalog tutorial](https://docs.airbyte.io/tutorials/beginners-guide-to-catalog) for more information.
When using the Airbyte CDK, this is very simple to do. For each stream in our connector we'll need to:
This is a simple task with the Airbyte CDK. For each stream in our connector we'll need to:
1. Create a python `class` in `source.py` which extends `HttpStream`
2. Place a `<stream_name>.json` file in the `source_<name>/schemas/` directory. The name of the file should be the snake_case name of the stream whose schema it describes, and its contents should be the JsonSchema describing the output from that stream.
@@ -265,8 +264,8 @@ class SourcePythonHttpTutorial(AbstractSource):
...
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
# NoAuth just means there is no authentication required for this API. It's only included for completeness
# of the example, but if you don't need authentication, you don't need to pass an authenticator at all.
# NoAuth just means there is no authentication required for this API and is included for completeness.
# Skip passing an authenticator if no authentication is required.
# Other authenticators are available for API token-based auth and Oauth2.
auth = NoAuth()
return [ExchangeRates(authenticator=auth)]
@@ -286,7 +285,7 @@ you should see some output like:
{"type": "CATALOG", "catalog": {"streams": [{"name": "exchange_rates", "json_schema": {"$schema": "http://json-schema.org/draft-04/schema#", "type": "object", "properties": {"base": {"type": "string"}, "rates": {"type": "object", "properties": {"GBP": {"type": "number"}, "HKD": {"type": "number"}, "IDR": {"type": "number"}, "PHP": {"type": "number"}, "LVL": {"type": "number"}, "INR": {"type": "number"}, "CHF": {"type": "number"}, "MXN": {"type": "number"}, "SGD": {"type": "number"}, "CZK": {"type": "number"}, "THB": {"type": "number"}, "BGN": {"type": "number"}, "EUR": {"type": "number"}, "MYR": {"type": "number"}, "NOK": {"type": "number"}, "CNY": {"type": "number"}, "HRK": {"type": "number"}, "PLN": {"type": "number"}, "LTL": {"type": "number"}, "TRY": {"type": "number"}, "ZAR": {"type": "number"}, "CAD": {"type": "number"}, "BRL": {"type": "number"}, "RON": {"type": "number"}, "DKK": {"type": "number"}, "NZD": {"type": "number"}, "EEK": {"type": "number"}, "JPY": {"type": "number"}, "RUB": {"type": "number"}, "KRW": {"type": "number"}, "USD": {"type": "number"}, "AUD": {"type": "number"}, "HUF": {"type": "number"}, "SEK": {"type": "number"}}}, "date": {"type": "string"}}}, "supported_sync_modes": ["full_refresh"]}]}}
```
it's that simple! Now the connector knows how to declare the schema of the stream in your connector. Our source is simple so we're only declaring one stream, but the principle is exactly the same if you had many streams.
It's that simple! Now the connector knows how to declare your connector's stream's schema. We declare only one stream since our source is simple, but the principle is exactly the same if you had many streams.
You can also dynamically define schemas, but that's beyond the scope of this tutorial. See the [schema docs](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/bases/base-python/docs/schemas.md) for more information.
@@ -361,10 +360,10 @@ class ExchangeRates(HttpStream):
```
This may look big, but that's just because there are lots of (unused, for now) parameters in these methods (those can be hidden with Python's `**kwargs`, but don't worry about it for now). Really we just added a few lines of "significant" code:
0. Added a constructor `__init__` which stores the `base` currency to query for.
1. `return {'base': self.base}` to add the `?base=<base-value>` query parameter to the request based on the `base` input by the user
2. `return [response.json()]` to parse the response from the API to match the schema of our schema `.json` file
3. `return "latest"` to indicate that we want to hit the `/latest` endpoint of the API to get the latest exchange rate data.
1. Added a constructor `__init__` which stores the `base` currency to query for.
2. `return {'base': self.base}` to add the `?base=<base-value>` query parameter to the request based on the `base` input by the user.
3. `return [response.json()]` to parse the response from the API to match the schema of our schema `.json` file.
4. `return "latest"` to indicate that we want to hit the `/latest` endpoint of the API to get the latest exchange rate data.
Let's also pass the `base` parameter input by the user to the stream class:
@@ -390,16 +389,16 @@ you should see some output lines, one of which is a record from the API:
There we have it - a stream which reads data in just a few lines of code!
We theoretically _could_ stop here and call it a connector. But let's add incremental sync before we do that.
We theoretically _could_ stop here and call it a connector. But let's give adding incremental sync a shot.
#### Adding incremental sync
To add incremental sync, we'll do a few things:
1. Pass the `start_date` param input by the user into the stream
2. Declare the stream's `cursor_field`
3. Implement the `get_updated_state` method
4. Implement the `stream_slices` method
5. Update the `path` method to specify the date to pull exchange rates for
6. Update the configured catalog to use `incremental` sync when we're testing the stream
1. Pass the `start_date` param input by the user into the stream.
2. Declare the stream's `cursor_field`.
3. Implement the `get_updated_state` method.
4. Implement the `stream_slices` method.
5. Update the `path` method to specify the date to pull exchange rates for.
6. Update the configured catalog to use `incremental` sync when we're testing the stream.
We'll describe what each of these methods do below. Before we begin, it may help to familiarize yourself with how incremental sync works in Airbyte by reading the [docs on incremental](https://docs.airbyte.io/architecture/connections/incremental-append).
@@ -415,7 +414,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
return [ExchangeRates(authenticator=auth, base=config['base'], start_date=start_date)]
```
let's also add this parameter to the constructor and declare the `cursor_field`:
Let's also add this parameter to the constructor and declare the `cursor_field`:
```python
from datetime import datetime, timedelta
@@ -431,11 +430,11 @@ class ExchangeRates(HttpStream):
self.start_date = start_date
```
declaring the `cursor_field` informs the framework that this stream now supports incremental sync. The next time you run `python main_dev.py discover --config sample_files/config.json` you'll find that the `supported_sync_modes` field now also contains `incremental`.
Declaring the `cursor_field` informs the framework that this stream now supports incremental sync. The next time you run `python main_dev.py discover --config sample_files/config.json` you'll find that the `supported_sync_modes` field now also contains `incremental`.
But we're not quite done with supporting incremental, we have to actually emit state! We'll structure our state object very simply: it will be a `dict` whose single key is `'date'` and value is the date of the last day we synced data from e.g: `{'date': '2021-04-26'}` indicates the connector previously read data up until April 26th and therefore shouldn't re-read anything before April 26th.
But we're not quite done with supporting incremental, we have to actually emit state! We'll structure our state object very simply: it will be a `dict` whose single key is `'date'` and value is the date of the last day we synced data from. For example, `{'date': '2021-04-26'}` indicates the connector previously read data up until April 26th and therefore shouldn't re-read anything before April 26th.
Let's do this by implementing the `get_updated_state` method inside the `ExchangeRates` class.
Let's do this by implementing the `get_updated_state` method inside the `ExchangeRates` class.
```python
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, any]:
@@ -479,20 +478,22 @@ def path(self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str
return stream_slice['date']
```
With these changes, your implementation should look like the file [here](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-python-http-tutorial/source_python_http_tutorial/source.py)
With these changes, your implementation should look like the file [here](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-python-http-tutorial/source_python_http_tutorial/source.py).
Last thing we need to do is change the `sync_mode` field in the `sample_files/configured_catalog.json` to `incremental`:
The last thing we need to do is change the `sync_mode` field in the `sample_files/configured_catalog.json` to `incremental`:
```
"sync_mode": "incremental",
```
We should now have a working implementation of incremental sync! Let's try it out:
We should now have a working implementation of incremental sync!
Let's try it out:
```
python main_dev.py read --config sample_files/config.json --catalog sample_files/configured_catalog.json
```
You should a bunch of `RECORD` messages and `STATE` messages. To verify that incremental sync is working, pass the input state back to the connector and run it again:
You should see a bunch of `RECORD` messages and `STATE` messages. To verify that incremental sync is working, pass the input state back to the connector and run it again:
```
# Save the latest state to sample_files/state.json
python main_dev.py read --config sample_files/config.json --catalog sample_files/configured_catalog.json | grep STATE | tail -n 1 | jq .state.data > sample_files/state.json
@@ -508,11 +509,11 @@ With that, we've implemented incremental sync for our connector!
### Step 7: Use the connector in Airbyte
To use your connector in your own installation of Airbyte, build the docker image for your container by running `docker build . -t airbyte/source-python-http-example:dev`. Then, follow the instructions from the [building a toy source tutorial](https://docs.airbyte.io/tutorials/toy-connector#use-the-connector-in-the-airbyte-ui) for using the connector in the Airbyte UI, replacing the name as appropriate.
Note: your built docker image must be accessible to the `docker` daemon running on the Airbyte node. If you're doing this tutorial locally, then the instructions here are sufficient. Otherwise you may need to push your Docker image to Dockerhub.
Note: your built docker image must be accessible to the `docker` daemon running on the Airbyte node. If you're doing this tutorial locally, these instructions are sufficient. Otherwise you may need to push your Docker image to Dockerhub.
### Step 8: Test your connector
#### Unit Tests
Add any relevant unit tests to the `unit_tests` directory. Unit tests should _not_ depend on any secrets.
Add any relevant unit tests to the `unit_tests` directory. Unit tests should **not** depend on any secrets.
You can run the tests using `python -m pytest -s unit_tests`