1
0
mirror of synced 2025-12-19 18:14:56 -05:00

chore: hard-deprecate connector generator script (#55788)

This commit is contained in:
Natik Gadzhi
2025-03-31 07:21:14 -07:00
committed by GitHub
parent 0b2544fe22
commit 46359769b8
65 changed files with 4292 additions and 12344 deletions

View File

@@ -1,6 +0,0 @@
# Connector templates
This directory contains templates used to bootstrap developing new connectors, as well as a generator module which generates code using the templates as input.
See the `generator/` directory to get started writing a new connector.
Other directories contain templates used to bootstrap a connector.

View File

@@ -1,37 +0,0 @@
# See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference)
# for more information about how to configure these tests
connector_image: {{ connectorImage }}
acceptance_tests:
spec:
tests:
- spec_path: "{{ specPath }}"
connection:
tests:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
discovery:
tests:
- config_path: "secrets/config.json"
basic_read:
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams: []
# TODO uncomment this block to specify that the tests should assert the connector outputs the records provided in the input file a file
# expect_records:
# path: "integration_tests/expected_records.jsonl"
# exact_order: no
incremental:
bypass_reason: "This connector does not implement incremental sync"
# TODO uncomment this block this block if your connector implements incremental sync:
# tests:
# - config_path: "secrets/config.json"
# configured_catalog_path: "integration_tests/configured_catalog.json"
# future_state:
# future_state_path: "integration_tests/abnormal_state.json"
full_refresh:
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"

View File

@@ -1,103 +0,0 @@
# {{capitalCase name}} Destination
This is the repository for the {{capitalCase name}} destination connector, written in Python.
For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.com/integrations/destinations/{{dashCase name}}).
## Local development
### Prerequisites
* Python (`^3.10`)
* Poetry (`^1.7`) - installation instructions [here](https://python-poetry.org/docs/#installation)
### Installing the connector
From this connector directory, run:
```bash
poetry install --with dev
```
#### Create credentials
**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.com/integrations/destinations/{{dashCase name}})
to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `destination_{{snakeCase name}}/spec.json` file.
Note that the `secrets` directory is gitignored by default, so there is no danger of accidentally checking in sensitive information.
See `integration_tests/sample_config.json` for a sample config file.
**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `destination {{dashCase name}} test creds`
and place them into `secrets/config.json`.
### Locally running the connector
```
poetry run destination-{{dashCase name}} spec
poetry run destination-{{dashCase name}} check --config secrets/config.json
poetry run destination-{{dashCase name}} write --config secrets/config.json --catalog sample_files/configured_catalog.json
```
### Running tests
To run tests locally, from the connector directory run:
```
poetry run pytest tests
```
### Building the docker image
1. Install [`airbyte-ci`](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md)
2. Run the following command to build the docker image:
```bash
airbyte-ci connectors --name=destination-{{dashCase name}} build
```
An image will be available on your host with the tag `airbyte/destination-{{dashCase name}}:dev`.
### Running as a docker container
Then run any of the connector commands as follows:
```
docker run --rm airbyte/destination-{{dashCase name}}:dev spec
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-{{dashCase name}}:dev check --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-{{dashCase name}}:dev write --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
```
### Running our CI test suite
You can run our full test suite locally using [`airbyte-ci`](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md):
```bash
airbyte-ci connectors --name=destination-{{dashCase name}} test
```
### Customizing acceptance Tests
Customize `acceptance-test-config.yml` file to configure acceptance tests. See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference) for more information.
If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py.
### Dependency Management
All of your dependencies should be managed via Poetry.
To add a new dependency, run:
```bash
poetry add <package-name>
```
Please commit the changes to `pyproject.toml` and `poetry.lock` files.
## Publishing a new version of the connector
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
1. Make sure your changes are passing our test suite: `airbyte-ci connectors --name=destination-{{dashCase name}} test`
2. Bump the connector version (please follow [semantic versioning for connectors](https://docs.airbyte.com/contributing-to-airbyte/resources/pull-requests-handbook/#semantic-versioning-for-connectors)):
- bump the `dockerImageTag` value in in `metadata.yaml`
- bump the `version` value in `pyproject.toml`
3. Make sure the `metadata.yaml` content is up to date.
4. Make sure the connector documentation and its changelog is up to date (`docs/integrations/destinations/{{dashCase name}}.md`).
5. Create a Pull Request: use [our PR naming conventions](https://docs.airbyte.com/contributing-to-airbyte/resources/pull-requests-handbook/#pull-request-title-convention).
6. Pat yourself on the back for being an awesome contributor.
7. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.
8. Once your PR is merged, the new version of the connector will be automatically published to Docker Hub and our connector registry.

View File

@@ -1,8 +0,0 @@
#
# Copyright (c) {{currentYear}} Airbyte, Inc., all rights reserved.
#
from .destination import Destination{{properCase name}}
__all__ = ["Destination{{properCase name}}"]

View File

@@ -1,53 +0,0 @@
#
# Copyright (c) {{currentYear}} Airbyte, Inc., all rights reserved.
#
import logging
from typing import Any, Iterable, Mapping
from airbyte_cdk.destinations import Destination
from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, Status
class Destination{{properCase name}}(Destination):
def write(
self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
) -> Iterable[AirbyteMessage]:
"""
TODO
Reads the input stream of messages, config, and catalog to write data to the destination.
This method returns an iterable (typically a generator of AirbyteMessages via yield) containing state messages received
in the input message stream. Outputting a state message means that every AirbyteRecordMessage which came before it has been
successfully persisted to the destination. This is used to ensure fault tolerance in the case that a sync fails before fully completing,
then the source is given the last state message output from this method as the starting point of the next sync.
:param config: dict of JSON configuration matching the configuration declared in spec.json
:param configured_catalog: The Configured Catalog describing the schema of the data being received and how it should be persisted in the
destination
:param input_messages: The stream of input messages received from the source
:return: Iterable of AirbyteStateMessages wrapped in AirbyteMessage structs
"""
pass
def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
"""
Tests if the input configuration can be used to successfully connect to the destination with the needed permissions
e.g: if a provided API token or password can be used to connect and write to the destination.
:param logger: Logging object to display debug/info/error to the logs
(logs will not be accessible via airbyte UI if they are not passed to this logger)
:param config: Json object containing the configuration of this destination, content of this json is as specified in
the properties of the spec.json file
:return: AirbyteConnectionStatus indicating a Success or Failure
"""
try:
# TODO
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
except Exception as e:
return AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {repr(e)}")

View File

@@ -1,13 +0,0 @@
#
# Copyright (c) {{currentYear}} Airbyte, Inc., all rights reserved.
#
import sys
from airbyte_cdk.entrypoint import launch
from .destination import Destination{{properCase name}}
def run():
destination = Destination{{properCase name}}()
destination.run(sys.argv[1:])

View File

@@ -1,20 +0,0 @@
{
"documentationUrl": "https://docs.airbyte.com/integrations/destinations/{{kebabCase name}}",
"supported_destination_sync_modes": [
"TODO, available options are: 'overwrite', 'append', and 'append_dedup'"
],
"supportsIncremental": true,
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Destination {{capitalCase name}}",
"type": "object",
"required": ["TODO -- fix me!"],
"additionalProperties": false,
"properties": {
"TODO": {
"type": "string",
"description": "FIX ME"
}
}
}
}

View File

@@ -1,8 +0,0 @@
#
# Copyright (c) {{currentYear}} Airbyte, Inc., all rights reserved.
#
def integration_test():
# TODO write integration tests
pass

View File

@@ -1,11 +0,0 @@
#
# Copyright (c) {{currentYear}} Airbyte, Inc., all rights reserved.
#
import sys
from destination_{{snakeCase name}} import Destination{{pascalCase name}}
if __name__ == "__main__":
Destination{{pascalCase name}}().run(sys.argv[1:])

View File

@@ -1,30 +0,0 @@
data:
allowedHosts:
hosts:
- TODO # Please change to the hostname of the source.
registries:
oss:
enabled: true
cloud:
enabled: false
connectorBuildOptions:
# Please update to the latest version of the connector base image.
# Please use the full address with sha256 hash to guarantee build reproducibility.
# https://hub.docker.com/r/airbyte/python-connector-base
baseImage: docker.io/airbyte/python-connector-base:1.2.0@sha256:c22a9d97464b69d6ef01898edf3f8612dc11614f05a84984451dde195f337db9
connectorSubtype: database
connectorType: destination
definitionId: {{generateDefinitionId}}
dockerImageTag: 0.1.0
dockerRepository: airbyte/destination-{{dashCase name}}
githubIssueLabel: destination-{{dashCase name}}
icon: {{dashCase name}}.svg
license: MIT
name: {{capitalCase name}}
releaseDate: TODO
releaseStage: alpha
supportLevel: community
documentationUrl: https://docs.airbyte.com/integrations/destinations/{{dashCase name}}
tags:
- language:python
metadataSpecVersion: "1.0"

View File

@@ -1,28 +0,0 @@
[build-system]
requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"
[tool.poetry]
version = "0.1.0"
name = "destination-{{dashCase name}}"
description = "Destination implementation for {{dashCase name}}."
authors = [ "Airbyte <contact@airbyte.io>",]
license = "MIT"
readme = "README.md"
documentation = "https://docs.airbyte.com/integrations/destinations/{{dashCase name}}"
homepage = "https://airbyte.com"
repository = "https://github.com/airbytehq/airbyte"
packages = [ { include = "destination_{{snakeCase name}}" }, {include = "main.py" } ]
[tool.poetry.dependencies]
python = "^3.10,<3.12"
airbyte-cdk = "^6.33.0"
[tool.poetry.scripts]
destination-{{dashCase name}} = "destination_{{snakeCase name}}.run:run"
[tool.poetry.group.dev.dependencies]
requests-mock = "*"
pytest-mock = "*"
pytest = "*"

View File

@@ -1,3 +0,0 @@
{
"fix-me": "TODO populate with needed configuration for integration tests or delete this file and any references to it. The schema of this file should match what is in your spec.yaml"
}

View File

@@ -1,7 +0,0 @@
#
# Copyright (c) {{currentYear}} Airbyte, Inc., all rights reserved.
#
def test_example_method():
assert True

View File

@@ -1,29 +0,0 @@
# dependencies
/node_modules
/.pnp
.pnp.js
# testing
/coverage
# production
/build
# misc
.DS_Store
.env.local
.env.development.local
.env.test.local
.env.production.local
npm-debug.log*
yarn-debug.log*
yarn-error.log*
*.iml
/.idea
.npmrc
.env
.env.development
.env.production

View File

@@ -1,18 +0,0 @@
FROM node:16-alpine
ARG UID
ARG GID
ENV ENV_UID $UID
ENV ENV_GID $GID
ENV DOCS_DIR "/airbyte/docs/integrations"
RUN mkdir -p /airbyte
WORKDIR /airbyte/airbyte-integrations/connector-templates/generator
CMD npm install --silent --no-update-notifier && echo "INSTALL DONE" && \
npm run generate "$package_desc" "$package_name" && \
LAST_CREATED_CONNECTOR=$(ls -td /airbyte/airbyte-integrations/connectors/* | head -n 1) && \
echo "chowning generated directory: $LAST_CREATED_CONNECTOR" && \
chown -R $ENV_UID:$ENV_GID $LAST_CREATED_CONNECTOR/* && \
echo "chowning docs directory: $DOCS_DIR" && \
chown -R $ENV_UID:$ENV_GID $DOCS_DIR/*

View File

@@ -1,48 +0,0 @@
# Connector generator
This module generates code to bootstrap your connector development.
## Getting started
### Using NPM
```bash
npm install
npm run generate
```
### Using Docker
If you don't want to install `npm` you can run the generator using Docker:
```
./generate.sh
```
## Contributions
### Testing connector templates
To test that the templates generate valid code, we follow a slightly non-obvious strategy. Since the templates
themselves do not contain valid Java/Python/etc.. syntax, we can't build them directly.
At the same time, due to the way Gradle works (where phase 1 is "discovering" all the projects that need to be
built and phase 2 is running the build), it's not very ergonomic to have one Gradle task generate a module
from each template, build it in the same build lifecycle, then remove it.
So we use the following strategy:
1. Locally, generate an empty connector using the generator module (call the generated connector something like `java-jdbc-scaffolding`)
1. Check the generated module into source control
Then, [in CI](https://github.com/airbytehq/airbyte/blob/master/.github/workflows/gradle.yml), we test two invariants:
1. There is no diff between the checked in module, and a module generated during using the latest version of the templater
1. The checked in module builds successfully
Together, these two invariants guarantee that the templates produce a valid module.
The way this is performed is as follows:
1. [in CI ](https://github.com/airbytehq/airbyte/blob/master/.github/workflows/gradle.yml) we trigger the task `:airbyte-integrations:connector-templates:generator:generateScaffolds`. This task deletes the checked in `java-jdbc-scaffolding`. Then the task generates a fresh instance of the module with the same name `java-jdbc-scaffolding`.
1. We run a `git diff`. If there is a diff, then fail the build (this means the latest version of the templates produce code which has not been manually reviewed by someone who checked them in intentionally). Steps 1 & 2 test the first invariant.
1. Separately, in `settings.gradle`, the `java-jdbc-scaffolding` module is registered as a java submodule. This causes it to be built as part of the normal build cycle triggered in CI. If the generated code does not compile for whatever reason, the build will fail on building the `java-jdbc-scaffolding` module.

View File

@@ -1,41 +0,0 @@
#!/usr/bin/env bash
error_handler() {
echo "While trying to generate a connector, an error occurred on line $1 of generate.sh and the process aborted early. This is probably a bug."
}
trap 'error_handler $LINENO' ERR
set -e
# Ensure script always runs from this directory because thats how docker build contexts work
cd "$(dirname "${0}")" || exit 1
# Make sure docker is running before trying
if ! docker ps; then
echo "docker is not running, this script requires docker to be up"
echo "please start up the docker daemon!"
exit
fi
_UID=$(id -u)
_GID=$(id -g)
# Remove container if already exist
echo "Removing previous generator if it exists..."
docker container rm -f airbyte-connector-bootstrap >/dev/null 2>&1
# Build image for container from Dockerfile
# Specify the host system user UID and GID to chown the generated files to host system user.
# This is done because all generated files in container with mounted folders has root owner
echo "Building generator docker image..."
docker build --build-arg UID="$_UID" --build-arg GID="$_GID" . -t airbyte/connector-bootstrap
# Run the container and mount the airbyte folder
if [ $# -eq 2 ]; then
echo "2 arguments supplied: 1=$1 2=$2"
docker run --name airbyte-connector-bootstrap --user "$_UID:$_GID" -e HOME=/tmp -e package_desc="$1" -e package_name="$2" -e CI=$CI -v "$(pwd)/../../../.":/airbyte airbyte/connector-bootstrap
else
echo "Running generator..."
docker run --rm -it --name airbyte-connector-bootstrap --user "$_UID:$_GID" -e HOME=/tmp -e CI=$CI -v "$(pwd)/../../../.":/airbyte airbyte/connector-bootstrap
fi
echo "Finished running generator"

File diff suppressed because it is too large Load Diff

View File

@@ -1,16 +0,0 @@
{
"name": "airbyte-connector-generator",
"version": "0.1.0",
"private": true,
"scripts": {
"generate": "plop"
},
"devDependencies": {
"capital-case": "^1.0.4",
"change-case": "^4.1.2",
"handlebars": "^4.7.7",
"plop": "^3.0.5",
"set-value": ">=4.0.1",
"uuid": "^8.3.2"
}
}

View File

@@ -1,148 +0,0 @@
"use strict";
const path = require("path");
const uuid = require("uuid");
const capitalCase = require("capital-case");
const changeCase = require("change-case");
const getSuccessMessage = function (
connectorName,
outputPath,
additionalMessage
) {
return `
🚀 🚀 🚀 🚀 🚀 🚀
Success!
Your ${connectorName} connector has been created at .${path.resolve(
outputPath
)}.
Follow the TODOs in the generated module to implement your connector.
Questions, comments, or concerns? Let us know in our connector development forum:
https://discuss.airbyte.io/c/connector-development/16
We're always happy to provide any support!
${additionalMessage || ""}
`;
};
module.exports = function (plop) {
const connectorAcceptanceTestFilesInputRoot =
"../connector_acceptance_test_files";
const pythonSourceInputRoot = "../source-python";
const pythonDestinationInputRoot = "../destination-python";
const outputDir = "../../connectors";
const pythonSourceOutputRoot = `${outputDir}/source-{{dashCase name}}`;
const pythonDestinationOutputRoot = `${outputDir}/destination-{{dashCase name}}`;
const sourceConnectorImagePrefix = "airbyte/source-";
const sourceConnectorImageTag = "dev";
const defaultSpecPathFolderPrefix = "source_";
const specFileName = "spec.yaml";
plop.setHelper("capitalCase", function (name) {
return capitalCase.capitalCase(name);
});
plop.setHelper("currentYear", function () {
return new Date().getFullYear();
});
plop.setHelper("generateDefinitionId", function () {
// if the env var CI is set then return a fixed FAKE uuid so that the tests are deterministic
if (process.env.CI) {
return "FAKE-UUID-0000-0000-000000000000";
}
return uuid.v4().toLowerCase();
});
plop.setHelper("connectorImage", function () {
let suffix = "";
if (typeof this.connectorImageNameSuffix !== "undefined") {
suffix = this.connectorImageNameSuffix;
}
return `${sourceConnectorImagePrefix}${changeCase.paramCase(this.name)}${suffix}:${sourceConnectorImageTag}`;
});
plop.setHelper("specPath", function () {
let suffix = "";
if (typeof this.specPathFolderSuffix !== "undefined") {
suffix = this.specPathFolderSuffix;
}
let inSubFolder = true;
if (typeof this.inSubFolder !== "undefined") {
inSubFolder = this.inSubFolder;
}
if (inSubFolder) {
return `${defaultSpecPathFolderPrefix}${changeCase.snakeCase(
this.name
)}${suffix}/${specFileName}`;
} else {
return specFileName;
}
});
plop.setActionType("emitSuccess", function (answers, config, plopApi) {
console.log(
getSuccessMessage(
answers.name,
plopApi.renderString(config.outputPath, answers),
config.message
)
);
});
plop.setGenerator("Python CDK Destination", {
description: "Generate a destination connector based on Python CDK.",
prompts: [
{ type: "input", name: "name", message: "Connector name e.g: redis" },
],
actions: [
{
abortOnFail: true,
type: "addMany",
destination: pythonDestinationOutputRoot,
base: pythonDestinationInputRoot,
templateFiles: `${pythonDestinationInputRoot}/**/**`,
},
{ type: "emitSuccess", outputPath: pythonDestinationOutputRoot },
],
});
plop.setGenerator("Python CDK Source", {
description:
"Generate a source connector based on Python CDK.",
prompts: [
{
type: "input",
name: "name",
message: 'Source name e.g: "google-analytics"',
},
],
actions: [
{
abortOnFail: true,
type: "addMany",
destination: pythonSourceOutputRoot,
base: pythonSourceInputRoot,
templateFiles: `${pythonSourceInputRoot}/**/**`,
},
// common acceptance tests
{
abortOnFail: true,
type: "addMany",
destination: pythonSourceOutputRoot,
base: connectorAcceptanceTestFilesInputRoot,
templateFiles: `${connectorAcceptanceTestFilesInputRoot}/**/**`,
},
{ type: "emitSuccess", outputPath: pythonSourceOutputRoot },
],
});
};

View File

@@ -1,105 +0,0 @@
# {{capitalCase name}} Source
This is the repository for the {{capitalCase name}} source connector, written in Python.
For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.com/integrations/sources/{{dashCase name}}).
## Local development
### Prerequisites
* Python (`^3.10`)
* Poetry (`^1.7`) - installation instructions [here](https://python-poetry.org/docs/#installation)
### Installing the connector
From this connector directory, run:
```bash
poetry install --with dev
```
### Create credentials
**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.com/integrations/sources/{{dashCase name}})
to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `src/source_{{snakeCase name}}/spec.yaml` file.
Note that any directory named `secrets` is gitignored across the entire Airbyte repo, so there is no danger of accidentally checking in sensitive information.
See `sample_files/sample_config.json` for a sample config file.
### Locally running the connector
```
poetry run source-{{dashCase name}} spec
poetry run source-{{dashCase name}} check --config secrets/config.json
poetry run source-{{dashCase name}} discover --config secrets/config.json
poetry run source-{{dashCase name}} read --config secrets/config.json --catalog sample_files/configured_catalog.json
```
### Running tests
To run tests locally, from the connector directory run:
```
poetry run pytest tests
```
### Building the docker image
1. Install [`airbyte-ci`](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md)
2. Run the following command to build the docker image:
```bash
airbyte-ci connectors --name=source-{{dashCase name}} build
```
An image will be available on your host with the tag `airbyte/source-{{dashCase name}}:dev`.
### Running as a docker container
Then run any of the connector commands as follows:
```
docker run --rm airbyte/source-{{dashCase name}}:dev spec
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-{{dashCase name}}:dev check --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-{{dashCase name}}:dev discover --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-{{dashCase name}}:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
```
### Running our CI test suite
You can run our full test suite locally using [`airbyte-ci`](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md):
```bash
airbyte-ci connectors --name=source-{{dashCase name}} test
```
### Customizing acceptance Tests
Customize `acceptance-test-config.yml` file to configure acceptance tests. See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference) for more information.
If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py.
### Dependency Management
All of your dependencies should be managed via Poetry.
To add a new dependency, run:
```bash
poetry add <package-name>
```
Please commit the changes to `pyproject.toml` and `poetry.lock` files.
## Publishing a new version of the connector
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
1. Make sure your changes are passing our test suite: `airbyte-ci connectors --name=source-{{dashCase name}} test`
2. Bump the connector version (please follow [semantic versioning for connectors](https://docs.airbyte.com/contributing-to-airbyte/resources/pull-requests-handbook/#semantic-versioning-for-connectors)):
- bump the `dockerImageTag` value in in `metadata.yaml`
- bump the `version` value in `pyproject.toml`
3. Make sure the `metadata.yaml` content is up to date.
4. Make sure the connector documentation and its changelog is up to date (`docs/integrations/sources/{{dashCase name}}.md`).
5. Create a Pull Request: use [our PR naming conventions](https://docs.airbyte.com/contributing-to-airbyte/resources/pull-requests-handbook/#pull-request-title-convention).
6. Pat yourself on the back for being an awesome contributor.
7. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.
8. Once your PR is merged, the new version of the connector will be automatically published to Docker Hub and our connector registry.

View File

@@ -1,3 +0,0 @@
#
# Copyright (c) {{currentYear}} Airbyte, Inc., all rights reserved.
#

View File

@@ -1,5 +0,0 @@
{
"todo-stream-name": {
"todo-field-name": "todo-abnormal-value"
}
}

View File

@@ -1,16 +0,0 @@
#
# Copyright (c) {{currentYear}} Airbyte, Inc., all rights reserved.
#
import pytest
pytest_plugins = ("connector_acceptance_test.plugin",)
@pytest.fixture(scope="session", autouse=True)
def connector_setup():
"""This fixture is a placeholder for external resources that acceptance test might require."""
# TODO: setup test dependencies if needed. otherwise remove the TODO comments
yield
# TODO: clean up test dependencies

View File

@@ -1,22 +0,0 @@
{
"streams": [
{
"stream": {
"name": "customers",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "employees",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
}
]
}

View File

@@ -1,3 +0,0 @@
{
"todo-wrong-field": "this should be an incomplete config file, used in standard tests"
}

View File

@@ -1,5 +0,0 @@
{
"todo-stream-name": {
"todo-field-name": "value"
}
}

View File

@@ -1,8 +0,0 @@
#
# Copyright (c) {{currentYear}} Airbyte, Inc., all rights reserved.
#
from source_{{snakeCase name}}.run import run
if __name__ == "__main__":
run()

View File

@@ -1,34 +0,0 @@
data:
allowedHosts:
hosts:
- TODO # Please change to the hostname of the source.
registries:
oss:
enabled: true
cloud:
enabled: false
remoteRegistries:
pypi:
enabled: true
packageName: airbyte-source-{{dashCase name}}
connectorBuildOptions:
# Please update to the latest version of the connector base image.
# https://hub.docker.com/r/airbyte/python-connector-base
# Please use the full address with sha256 hash to guarantee build reproducibility.
baseImage: docker.io/airbyte/python-connector-base:1.2.0@sha256:c22a9d97464b69d6ef01898edf3f8612dc11614f05a84984451dde195f337db9
connectorSubtype: api
connectorType: source
definitionId: {{generateDefinitionId}}
dockerImageTag: 0.1.0
dockerRepository: airbyte/source-{{dashCase name}}
githubIssueLabel: source-{{dashCase name}}
icon: {{dashCase name}}.svg
license: MIT
name: {{capitalCase name}}
releaseDate: TODO
supportLevel: community
releaseStage: alpha
documentationUrl: https://docs.airbyte.com/integrations/sources/{{dashCase name}}
tags:
- language:python
metadataSpecVersion: "1.0"

View File

@@ -1,28 +0,0 @@
[build-system]
requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"
[tool.poetry]
version = "0.1.0"
name = "source-{{dashCase name}}"
description = "Source implementation for {{dashCase name}}."
authors = [ "Airbyte <contact@airbyte.io>",]
license = "MIT"
readme = "README.md"
documentation = "https://docs.airbyte.com/integrations/sources/{{dashCase name}}"
homepage = "https://airbyte.com"
repository = "https://github.com/airbytehq/airbyte"
packages = [ { include = "source_{{snakeCase name}}" }, {include = "main.py" } ]
[tool.poetry.dependencies]
python = "^3.10,<3.12"
airbyte-cdk = "^6.33.0"
[tool.poetry.scripts]
source-{{dashCase name}} = "source_{{snakeCase name}}.run:run"
[tool.poetry.group.dev.dependencies]
requests-mock = "*"
pytest-mock = "*"
pytest = "*"

View File

@@ -1,3 +0,0 @@
{
"fix-me": "TODO populate with needed configuration for integration tests or delete this file and any references to it. The schema of this file should match what is in your spec.yaml"
}

View File

@@ -1,8 +0,0 @@
#
# Copyright (c) {{currentYear}} Airbyte, Inc., all rights reserved.
#
from .source import Source{{properCase name}}
__all__ = ["Source{{properCase name}}"]

View File

@@ -1,13 +0,0 @@
#
# Copyright (c) {{currentYear}} Airbyte, Inc., all rights reserved.
#
import sys
from airbyte_cdk.entrypoint import launch
from .source import Source{{properCase name}}
def run():
source = Source{{properCase name}}()
launch(source, sys.argv[1:])

View File

@@ -1,30 +0,0 @@
# TODO: Define your stream schemas
Your connector must describe the schema of each stream it can output using [JSONSchema](https://json-schema.org).
The simplest way to do this is to describe the schema of your streams using one `.json` file per stream. You can also dynamically generate the schema of your stream in code, or you can combine both approaches: start with a `.json` file and dynamically add properties to it.
The schema of a stream is the return value of `Stream.get_json_schema`.
## Static schemas
By default, `Stream.get_json_schema` reads a `.json` file in the `schemas/` directory whose name is equal to the value of the `Stream.name` property. In turn `Stream.name` by default returns the name of the class in snake case. Therefore, if you have a class `class EmployeeBenefits(HttpStream)` the default behavior will look for a file called `schemas/employee_benefits.json`. You can override any of these behaviors as you need.
Important note: any objects referenced via `$ref` should be placed in the `shared/` directory in their own `.json` files.
## Dynamic schemas
If you'd rather define your schema in code, override `Stream.get_json_schema` in your stream class to return a `dict` describing the schema using [JSONSchema](https://json-schema.org).
## Dynamically modifying static schemas
Override `Stream.get_json_schema` to run the default behavior, edit the returned value, then return the edited value:
```
def get_json_schema(self):
schema = super().get_json_schema()
schema['dynamically_determined_property'] = "property"
return schema
```
Delete this file once you're done. Or don't. Up to you :)

View File

@@ -1,16 +0,0 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": ["null", "string"]
},
"name": {
"type": ["null", "string"]
},
"signup_date": {
"type": ["null", "string"],
"format": "date-time"
}
}
}

View File

@@ -1,19 +0,0 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": ["null", "string"]
},
"name": {
"type": ["null", "string"]
},
"years_of_service": {
"type": ["null", "integer"]
},
"start_date": {
"type": ["null", "string"],
"format": "date-time"
}
}
}

View File

@@ -1,206 +0,0 @@
#
# Copyright (c) {{currentYear}} Airbyte, Inc., all rights reserved.
#
from abc import ABC
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
import requests
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
"""
TODO: Most comments in this class are instructive and should be deleted after the source is implemented.
This file provides a stubbed example of how to use the Airbyte CDK to develop both a source connector which supports full refresh or and an
incremental syncs from an HTTP API.
The various TODOs are both implementation hints and steps - fulfilling all the TODOs should be sufficient to implement one basic and one incremental
stream from a source. This pattern is the same one used by Airbyte internally to implement connectors.
The approach here is not authoritative, and devs are free to use their own judgement.
There are additional required TODOs in the files within the integration_tests folder and the spec.yaml file.
"""
# Basic full refresh stream
class {{properCase name}}Stream(HttpStream, ABC):
"""
TODO remove this comment
This class represents a stream output by the connector.
This is an abstract base class meant to contain all the common functionality at the API level e.g: the API base URL, pagination strategy,
parsing responses etc..
Each stream should extend this class (or another abstract subclass of it) to specify behavior unique to that stream.
Typically for REST APIs each stream corresponds to a resource in the API. For example if the API
contains the endpoints
- GET v1/customers
- GET v1/employees
then you should have three classes:
`class {{properCase name}}Stream(HttpStream, ABC)` which is the current class
`class Customers({{properCase name}}Stream)` contains behavior to pull data for customers using v1/customers
`class Employees({{properCase name}}Stream)` contains behavior to pull data for employees using v1/employees
If some streams implement incremental sync, it is typical to create another class
`class Incremental{{properCase name}}Stream(({{properCase name}}Stream), ABC)` then have concrete stream implementations extend it. An example
is provided below.
See the reference docs for the full list of configurable options.
"""
# TODO: Fill in the url base. Required.
url_base = "https://example-api.com/v1/"
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""
TODO: Override this method to define a pagination strategy. If you will not be using pagination, no action is required - just return None.
This method should return a Mapping (e.g: dict) containing whatever information required to make paginated requests. This dict is passed
to most other methods in this class to help you form headers, request bodies, query params, etc..
For example, if the API accepts a 'page' parameter to determine which page of the result to return, and a response from the API contains a
'page' number, then this method should probably return a dict {'page': response.json()['page'] + 1} to increment the page count by 1.
The request_params method should then read the input next_page_token and set the 'page' param to next_page_token['page'].
:param response: the most recent response from the API
:return If there is another page in the result, a mapping (e.g: dict) containing information needed to query the next page in the response.
If there are no more pages in the result, return None.
"""
return None
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
"""
TODO: Override this method to define any query parameters to be set. Remove this method if you don't need to define request params.
Usually contains common params e.g. pagination size etc.
"""
return {}
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""
TODO: Override this method to define how a response is parsed.
:return an iterable containing each record in the response
"""
yield {}
class Customers({{properCase name}}Stream):
"""
TODO: Change class name to match the table/data source this stream corresponds to.
"""
# TODO: Fill in the primary key. Required. This is usually a unique field in the stream, like an ID or a timestamp.
primary_key = "customer_id"
def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
"""
TODO: Override this method to define the path this stream corresponds to. E.g. if the url is https://example-api.com/v1/customers then this
should return "customers". Required.
"""
return "customers"
# Basic incremental stream
class Incremental{{properCase name}}Stream({{properCase name}}Stream, ABC):
"""
TODO fill in details of this class to implement functionality related to incremental syncs for your connector.
if you do not need to implement incremental sync for any streams, remove this class.
"""
# TODO: Fill in to checkpoint stream reads after N records. This prevents re-reading of data if the stream fails for any reason.
state_checkpoint_interval = None
@property
def cursor_field(self) -> str:
"""
TODO
Override to return the cursor field used by this stream e.g: an API entity might always use created_at as the cursor field. This is
usually id or date based. This field's presence tells the framework this in an incremental stream. Required for incremental.
:return str: The name of the cursor field.
"""
return []
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Override to determine the latest state after reading the latest record. This typically compared the cursor_field from the latest record and
the current state and picks the 'most' recent cursor. This is how a stream's state is determined. Required for incremental.
"""
return {}
class Employees(Incremental{{properCase name}}Stream):
"""
TODO: Change class name to match the table/data source this stream corresponds to.
"""
# TODO: Fill in the cursor_field. Required.
cursor_field = "start_date"
# TODO: Fill in the primary key. Required. This is usually a unique field in the stream, like an ID or a timestamp.
primary_key = "employee_id"
def path(self, **kwargs) -> str:
"""
TODO: Override this method to define the path this stream corresponds to. E.g. if the url is https://example-api.com/v1/employees then this should
return "single". Required.
"""
return "employees"
def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
"""
TODO: Optionally override this method to define this stream's slices. If slicing is not needed, delete this method.
Slices control when state is saved. Specifically, state is saved after a slice has been fully read.
This is useful if the API offers reads by groups or filters, and can be paired with the state object to make reads efficient. See the "concepts"
section of the docs for more information.
The function is called before reading any records in a stream. It returns an Iterable of dicts, each containing the
necessary data to craft a request for a slice. The stream state is usually referenced to determine what slices need to be created.
This means that data in a slice is usually closely related to a stream's cursor_field and stream_state.
An HTTP request is made for each returned slice. The same slice can be accessed in the path, request_params and request_header functions to help
craft that specific request.
For example, if https://example-api.com/v1/employees offers a date query params that returns data for that particular day, one way to implement
this would be to consult the stream state object for the last synced date, then return a slice containing each date from the last synced date
till now. The request_params function would then grab the date from the stream_slice and make it part of the request by injecting it into
the date query param.
"""
raise NotImplementedError("Implement stream slices or delete this method!")
# Source
class Source{{properCase name}}(AbstractSource):
def check_connection(self, logger, config) -> Tuple[bool, any]:
"""
TODO: Implement a connection check to validate that the user-provided config can be used to connect to the underlying API
See https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232
for an example.
:param config: the user-input config object conforming to the connector's spec.yaml
:param logger: logger object
:return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
"""
return True, None
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
TODO: Replace the streams below with your own streams.
:param config: A Mapping of the user input configuration as defined in the connector spec.
"""
# TODO remove the authenticator if not required.
auth = TokenAuthenticator(token="api_key") # Oauth2Authenticator is also available if you need oauth support
return [Customers(authenticator=auth), Employees(authenticator=auth)]

View File

@@ -1,12 +0,0 @@
documentationUrl: https://docsurl.com
connectionSpecification:
$schema: http://json-schema.org/draft-07/schema#
title: {{capitalCase name}} Spec
type: object
required:
- TODO
properties:
# 'TODO: This schema defines the configuration required for the source. This usually involves metadata such as database and/or authentication information.':
TODO:
type: string
description: describe me

View File

@@ -1,3 +0,0 @@
#
# Copyright (c) {{currentYear}} Airbyte, Inc., all rights reserved.
#

View File

@@ -1,59 +0,0 @@
#
# Copyright (c) {{currentYear}} Airbyte, Inc., all rights reserved.
#
from airbyte_cdk.models import SyncMode
from pytest import fixture
from source_{{snakeCase name}}.source import Incremental{{properCase name}}Stream
@fixture
def patch_incremental_base_class(mocker):
# Mock abstract methods to enable instantiating abstract class
mocker.patch.object(Incremental{{properCase name}}Stream, "path", "v0/example_endpoint")
mocker.patch.object(Incremental{{properCase name}}Stream, "primary_key", "test_primary_key")
mocker.patch.object(Incremental{{properCase name}}Stream, "__abstractmethods__", set())
def test_cursor_field(patch_incremental_base_class):
stream = Incremental{{properCase name}}Stream()
# TODO: replace this with your expected cursor field
expected_cursor_field = []
assert stream.cursor_field == expected_cursor_field
def test_get_updated_state(patch_incremental_base_class):
stream = Incremental{{properCase name}}Stream()
# TODO: replace this with your input parameters
inputs = {"current_stream_state": None, "latest_record": None}
# TODO: replace this with your expected updated stream state
expected_state = {}
assert stream.get_updated_state(**inputs) == expected_state
def test_stream_slices(patch_incremental_base_class):
stream = Incremental{{properCase name}}Stream()
# TODO: replace this with your input parameters
inputs = {"sync_mode": SyncMode.incremental, "cursor_field": [], "stream_state": {}}
# TODO: replace this with your expected stream slices list
expected_stream_slice = [{}]
assert stream.stream_slices(**inputs) == expected_stream_slice
def test_supports_incremental(patch_incremental_base_class, mocker):
mocker.patch.object(Incremental{{properCase name}}Stream, "cursor_field", "dummy_field")
stream = Incremental{{properCase name}}Stream()
assert stream.supports_incremental
def test_source_defined_cursor(patch_incremental_base_class):
stream = Incremental{{properCase name}}Stream()
assert stream.source_defined_cursor
def test_stream_checkpoint_interval(patch_incremental_base_class):
stream = Incremental{{properCase name}}Stream()
# TODO: replace this with your expected checkpoint interval
expected_checkpoint_interval = None
assert stream.state_checkpoint_interval == expected_checkpoint_interval

View File

@@ -1,22 +0,0 @@
#
# Copyright (c) {{currentYear}} Airbyte, Inc., all rights reserved.
#
from unittest.mock import MagicMock
from source_{{snakeCase name}}.source import Source{{properCase name}}
def test_check_connection(mocker):
source = Source{{properCase name}}()
logger_mock, config_mock = MagicMock(), MagicMock()
assert source.check_connection(logger_mock, config_mock) == (True, None)
def test_streams(mocker):
source = Source{{properCase name}}()
config_mock = MagicMock()
streams = source.streams(config_mock)
# TODO: replace this with your streams number
expected_streams_number = 2
assert len(streams) == expected_streams_number

View File

@@ -1,83 +0,0 @@
#
# Copyright (c) {{currentYear}} Airbyte, Inc., all rights reserved.
#
from http import HTTPStatus
from unittest.mock import MagicMock
import pytest
from source_{{snakeCase name}}.source import {{properCase name}}Stream
@pytest.fixture
def patch_base_class(mocker):
# Mock abstract methods to enable instantiating abstract class
mocker.patch.object({{properCase name}}Stream, "path", "v0/example_endpoint")
mocker.patch.object({{properCase name}}Stream, "primary_key", "test_primary_key")
mocker.patch.object({{properCase name}}Stream, "__abstractmethods__", set())
def test_request_params(patch_base_class):
stream = {{properCase name}}Stream()
# TODO: replace this with your input parameters
inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None}
# TODO: replace this with your expected request parameters
expected_params = {}
assert stream.request_params(**inputs) == expected_params
def test_next_page_token(patch_base_class):
stream = {{properCase name}}Stream()
# TODO: replace this with your input parameters
inputs = {"response": MagicMock()}
# TODO: replace this with your expected next page token
expected_token = None
assert stream.next_page_token(**inputs) == expected_token
def test_parse_response(patch_base_class):
stream = {{properCase name}}Stream()
# TODO: replace this with your input parameters
inputs = {"response": MagicMock()}
# TODO: replace this with your expected parced object
expected_parsed_object = {}
assert next(stream.parse_response(**inputs)) == expected_parsed_object
def test_request_headers(patch_base_class):
stream = {{properCase name}}Stream()
# TODO: replace this with your input parameters
inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None}
# TODO: replace this with your expected request headers
expected_headers = {}
assert stream.request_headers(**inputs) == expected_headers
def test_http_method(patch_base_class):
stream = {{properCase name}}Stream()
# TODO: replace this with your expected http request method
expected_method = "GET"
assert stream.http_method == expected_method
@pytest.mark.parametrize(
("http_status", "should_retry"),
[
(HTTPStatus.OK, False),
(HTTPStatus.BAD_REQUEST, False),
(HTTPStatus.TOO_MANY_REQUESTS, True),
(HTTPStatus.INTERNAL_SERVER_ERROR, True),
],
)
def test_should_retry(patch_base_class, http_status, should_retry):
response_mock = MagicMock()
response_mock.status_code = http_status
stream = {{properCase name}}Stream()
assert stream.should_retry(response_mock) == should_retry
def test_backoff_time(patch_base_class):
response_mock = MagicMock()
stream = {{properCase name}}Stream()
expected_backoff_time = None
assert stream.backoff_time(response_mock) == expected_backoff_time

View File

@@ -1,23 +1,5 @@
# Connector Development Kit
:::info
Over the next few months, the project will only accept connector contributions that are made using the
[Low-Code CDK](https://docs.airbyte.com/connector-development/config-based/low-code-cdk-overview) or the
[Connector Builder](https://docs.airbyte.com/connector-development/connector-builder-ui/overview).
New pull requests made with the Python CDK will be closed, but we will inquire to understand why it wasn't done with
Low-Code/Connector Builder so we can address missing features. This decision is aimed at improving maintenance and
providing a larger catalog with high-quality connectors.
You can continue to use the Python CDK to build connectors to help your company or projects.
:::
:::info
Developer updates will be announced via
[#help-connector-development](https://airbytehq.slack.com/archives/C027KKE4BCZ) Slack channel. If you are using the
CDK, please join to stay up to date on changes and issues.
:::
:::info
This section is for the Python CDK. See our
[community-maintained CDKs section](../README.md#community-maintained-cdks) if you want to write connectors in other
@@ -39,18 +21,9 @@ inquire further!
## Getting Started
Generate an empty connector using the code generator. First clone the Airbyte repository, then from the repository
root run
In most cases, you won't need to use the CDK directly, and should start building connectors in Connector Builder, an IDE that is powerd by Airbyte Python CDK. If you do need customization beyond what it offers, you can do so by using `airbyte_cdk` as aa dependency in your Python project.
```bash
cd airbyte-integrations/connector-templates/generator
./generate.sh
```
Next, find all `TODO`s in the generated project directory. They're accompanied by comments explaining what you'll
need to do in order to implement your connector. Upon completing all TODOs properly, you should have a functioning connector.
Additionally, you can follow [this tutorial](../tutorials/custom-python-connector/0-getting-started.md) for a complete walkthrough of creating an HTTP connector using the Airbyte CDK.
[Airbyte CDK reference documentation](https://airbytehq.github.io/airbyte-python-cdk/airbyte_cdk.html) is published automatically with each new CDK release. The rest of this document explains the most basic concepts applicable to any Airbyte API connector.
### Concepts & Documentation
@@ -68,64 +41,8 @@ Having trouble figuring out how to write a `stream_slices` function or aren't su
#### Practical Tips
Airbyte recommends using the CDK template generator to develop with the CDK. The template generates created all the required scaffolding, with convenient TODOs, allowing developers to truly focus on implementing the API.
For tips on useful Python knowledge, see the [Python Concepts](python-concepts.md) page.
You can find a complete tutorial for implementing an HTTP source connector in [this tutorial](../tutorials/custom-python-connector/0-getting-started.md)
### Example Connectors
**HTTP Connectors**:
- [Stripe](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py)
- [Slack](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-slack/source_slack/source.py)
**Simple Python connectors using the barebones `Source` abstraction**:
- [Google Sheets](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-google-sheets/source_google_sheets/source.py)
- [Mailchimp](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/source.py)
## Contributing
### First time setup
We assume `python` points to Python 3.9 or higher.
Setup a virtual env:
```bash
python -m venv .venv
source .venv/bin/activate
pip install -e ".[tests]" # [tests] installs test-only dependencies
```
#### Iteration
- Iterate on the code locally
- Run tests via `pytest -s unit_tests`
- Perform static type checks using `mypy airbyte_cdk`. `MyPy` configuration is in `.mypy.ini`.
- The `type_check_and_test.sh` script bundles both type checking and testing in one convenient command. Feel free to use it!
#### Debugging
While developing your connector, you can print detailed debug information during a sync by specifying the `--debug` flag. This allows you to get a better picture of what is happening during each step of your sync.
```bash
python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json --debug
```
In addition to preset CDK debug statements, you can also add your own statements to emit debug information specific to your connector:
```python
self.logger.debug("your debug message here", extra={"debug_field": self.value})
```
#### Testing
All tests are located in the `unit_tests` directory. Run `pytest --cov=airbyte_cdk unit_tests/` to run them. This also presents a test coverage report.
#### Publishing a new version to PyPi
1. Open a PR
2. Once it is approved and merge, an Airbyte member must run the `Publish CDK Manually` workflow using `release-type=major|manor|patch` and setting the changelog message.
We're welcoming all contributions to Airbyte Python CDK! [`airbytehq/airbyte-python-cdk` Github repository](https://github.com/airbytehq/airbyte-python-cdk) CONTRIBUTING.md is the best spot to see up to date guide on how to get started.

View File

@@ -1,58 +0,0 @@
# Python Concepts
The Airbyte CDK makes use of various not-so-obvious Python concepts. You might want to revisit these concepts as you implement your connector:
## Abstract Classes [ABCs \(AbstractBaseClasses\)](https://docs.python.org/3/library/abc.html) and [abstractmethods](https://docs.python.org/3/library/abc.html#abc.abstractmethod)
You'll want a strong understanding of these as the central API concepts require extending and using them.
## [Keyword Arguments](https://realpython.com/python-kwargs-and-args/).
You'll often see this referred to as `**kwargs` in the code.
## [Properties](https://www.freecodecamp.org/news/python-property-decorator/)
Note that there are two ways of defining properties: statically and dynamically.
### Statically
```text
class Employee(ABC):
@property
@abstractmethod
def job_title():
""" returns this employee's job title"""
class Pilot(Employee):
job_title = "pilot"
```
Notice how statically defining properties in this manner looks the same as defining variables. You can then reference this property as follows:
```text
pilot = Pilot()
print(pilot.job_title) # pilot
```
### Dynamically
You can also run arbitrary code to get the value of a property. For example:
```text
class Employee(ABC):
@property
@abstractmethod
def job_title():
""" returns this employee's job title"""
class Pilot(Employee):
def job_title():
# You can run any arbitrary code and return its result
return "pilot"
```
## [Generators](https://wiki.python.org/moin/Generators)
Generators are basically iterators over arbitrary source data. They are handy because their syntax is extremely concise and feel just like any other list or collection when working with them in code.
If you see `yield` anywhere in the code -- that's a generator at work.

View File

@@ -7,4 +7,4 @@
2. Select the records from the response
3. Repeat for as long as the paginator points to a next page
[connector-flow](../assets/connector-flow.png)
![connector-flow](../assets/connector-flow.png)

View File

@@ -1,6 +1,6 @@
# Low-code connector development
Airbytes low-code framework enables you to build source connectors for REST APIs via a [connector builder UI](https://docs.airbyte.com/connector-development/connector-builder-ui/overview) or by modifying boilerplate YAML files via terminal or text editor.
Airbyte's low-code framework enables you to build source connectors for REST APIs via a [connector builder UI](../connector-builder-ui/overview.md) or by modifying boilerplate YAML files via terminal or text editor.
:::info
Developer updates will be announced via our [#help-connector-development Slack channel](https://airbytehq.slack.com/archives/C027KKE4BCZ). If you are using the CDK, please join to stay up to date on changes and issues.
@@ -75,7 +75,7 @@ To use the low-code framework to build an REST API Source connector:
5. Specify stream schemas
6. Add the connector to the Airbyte platform
For a step-by-step tutorial, refer to the [Getting Started tutorial](./tutorial/0-getting-started.md) or the [video tutorial](https://youtu.be/i7VSL2bDvmw)
For a step-by-step tutorial, refer to the [Getting Started with the Connector Builder](../connector-builder-ui/tutorial.mdx) or the [video tutorial](https://youtu.be/i7VSL2bDvmw)
## Connector Builder UI
@@ -107,7 +107,7 @@ The following table describes the components of the YAML file:
| `definitions` | Describes the objects to be reused in the YAML connector |
| `streams` | Lists the streams of the source |
| `check` | Describes how to test the connection to the source by trying to read a record from a specified list of streams and failing if no records could be read |
| `spec` | A [connector specification](https://docs.airbyte.com/understanding-airbyte/airbyte-protocol#actor-specification) which describes the required and optional parameters which can be input by the end user to configure this connector |
| `spec` | A [connector specification](../../understanding-airbyte/airbyte-protocol#actor-specification) which describes the required and optional parameters which can be input by the end user to configure this connector |
:::tip
Streams define the schema of the data to sync, as well as how to read it from the underlying API source. A stream generally corresponds to a resource within the API. They are analogous to tables for a relational database source.
@@ -131,17 +131,6 @@ For each stream, configure the following components:
For a deep dive into each of the components, refer to [Understanding the YAML file](./understanding-the-yaml-file/yaml-overview.md) or the [full YAML Schema definition](https://github.com/airbytehq/airbyte-python-cdk/blob/main/airbyte_cdk/sources/declarative/declarative_component_schema.yaml)
## Tutorial
This section is a tutorial that will guide you through the end-to-end process of implementing a low-code connector.
0. [Getting started](tutorial/0-getting-started.md)
1. [Creating a source](tutorial/1-create-source.md)
2. [Installing dependencies](tutorial/2-install-dependencies.md)
3. [Connecting to the API](tutorial/3-connecting-to-the-API-source.md)
4. [Reading data](tutorial/4-reading-data.md)
5. [Incremental reads](tutorial/5-incremental-reads.md)
6. [Testing](tutorial/6-testing.md)
## Sample connectors

View File

@@ -1,49 +0,0 @@
# Getting Started
## Summary
Throughout this tutorial, we'll walk you through the creation of an Airbyte source to read and extract data from an HTTP API.
We'll build a connector reading data from the Exchange Rates API, but the steps apply to other HTTP APIs you might be interested in integrating with.
The API documentations can be found [here](https://apilayer.com/marketplace/exchangerates_data-api).
In this tutorial, we will read data from the following endpoints:
- `Latest Rates Endpoint`
- `Historical Rates Endpoint`
With the end goal of implementing a `Source` with a single `Stream` containing exchange rates going from a base currency to many other currencies.
The output schema of our stream will look like the following:
```json
{
"base": "USD",
"date": "2022-07-15",
"rates": {
"CAD": 1.28,
"EUR": 0.98
}
}
```
## Exchange Rates API Setup
Before we get started, you'll need to generate an API access key for the Exchange Rates API.
This can be done by signing up for the Free tier plan on [Exchange Rates Data API](https://apilayer.com/marketplace/exchangerates_data-api), not [Exchange Rates API](https://exchangeratesapi.io/):
1. Visit https://apilayer.com/ and click "Sign In" on the top
2. Finish the sign up process, signing up for the free tier
3. Once you're signed in, visit https://apilayer.com/marketplace/exchangerates_data-api and click "Subscribe" for free
4. On the top right, you'll see an API key. This is your API key.
## Requirements
- An Exchange Rates API key
- Python >= 3.10
- [Poetry](https://python-poetry.org/)
- Docker must be running
- [`airbyte-ci`](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md#L1) CLI
## Next Steps
Next, we'll [create a Source using the connector generator.](1-create-source.md)

View File

@@ -1,33 +0,0 @@
# Step 1: Generate the source connector project locally
Let's start by cloning the Airbyte repository:
```bash
git clone git@github.com:airbytehq/airbyte.git
cd airbyte
```
Airbyte provides a code generator which bootstraps the scaffolding for our connector.
```bash
cd airbyte-integrations/connector-templates/generator
./generate.sh
```
This will bring up an interactive helper application. Use the arrow keys to pick a template from the list. Select the `Low-code Source` template and then input the name of your connector. The application will create a new directory in `airbyte/airbyte-integrations/connectors/` with the name of your new connector.
The generator will create a new module for your connector with the name `source-<connector-name>`.
```
Configuration Based Source
Source name: exchange-rates-tutorial
```
For this walkthrough, we'll refer to our source as `exchange-rates-tutorial`.
## Next steps
Next, [we'll install dependencies required to run the connector](2-install-dependencies.md)
## More readings
- [Connector generator](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connector-templates/generator/README.md)

View File

@@ -1,37 +0,0 @@
# Step 2: Install dependencies
```bash
cd ../../connectors/source-exchange-rates-tutorial
poetry install
```
These steps create an initial python environment, and install the dependencies required to run an API Source connector.
Let's verify everything works as expected by running the Airbyte `spec` operation:
```bash
poetry run source-exchange-rates-tutorial spec
```
You should see an output similar to the one below:
```
{"type": "SPEC", "spec": {"documentationUrl": "https://docsurl.com", "connectionSpecification": {"$schema": "http://json-schema.org/draft-07/schema#", "title": "Python Http Tutorial Spec", "type": "object", "required": ["TODO"], "additionalProperties": false, "properties": {"TODO: This schema defines the configuration required for the source. This usually involves metadata such as database and/or authentication information.": {"type": "string", "description": "describe me"}}}}}
```
This is a simple sanity check to make sure everything is wired up correctly.
More details on the `spec` operation can be found in [Basic Concepts](https://docs.airbyte.com/connector-development/cdk-python/basic-concepts) and [Defining Stream Schemas](https://docs.airbyte.com/connector-development/cdk-python/schemas).
For now, note that the `main.py` file is a convenience wrapper to help run the connector.
Its invocation format is `python main.py <command> [args]`.
The module's generated `README.md` contains more details on the supported commands.
## Next steps
Next, we'll [connect to the API source](3-connecting-to-the-API-source.md)
## More readings
- [Basic Concepts](https://docs.airbyte.com/connector-development/cdk-python/basic-concepts)
- [Defining Stream Schemas](https://docs.airbyte.com/connector-development/cdk-python/schemas)
- The module's generated `README.md` contains more details on the supported commands.

View File

@@ -1,224 +0,0 @@
# Step 3: Connecting to the API
We're now ready to start implementing the connector.
Over the course of this tutorial, we'll be editing a few files that were generated by the code generator:
- `source-exchange-rates-tutorial/source_exchange_rates_tutorial/manifest.yaml`: This is the connector manifest. It describes how the data should be read from the API source, as well as what inputs can be used to configure the connector.
- `source-exchange_rates-tutorial/integration_tests/configured_catalog.json`: This is the connector's [catalog](../../../understanding-airbyte/beginners-guide-to-catalog.md). It describes what data is available in a source
- `source-exchange-rates-tutorial/integration_tests/sample_state.json`: This is a sample state object to be used to test [incremental syncs](../../cdk-python/incremental-stream.md).
We'll also be creating the following files:
- `source-exchange-rates-tutorial/secrets/config.json`: This is the configuration file we'll be using to test the connector. Its schema should match the schema defined in the spec file.
- `source-exchange-rates-tutorial/secrets/invalid_config.json`: This is an invalid configuration file we'll be using to test the connector. Its schema should match the schema defined in the spec file.
- `source_exchange_rates_tutorial/schemas/rates.json`: This is the [schema definition](../../cdk-python/schemas.md) for the stream we'll implement.
## Updating the connector spec and config
Let's populate the specification (`spec`) and the configuration (`secrets/config.json`) so the connector can access the access key and base currency.
1. We'll add these properties to the `spec` block in the `source-exchange-rates-tutorial/source_exchange_rates_tutorial/manifest.yaml`
```yaml
spec:
documentation_url: https://docs.airbyte.com/integrations/sources/exchangeratesapi
connection_specification:
$schema: http://json-schema.org/draft-07/schema#
title: exchangeratesapi.io Source Spec
type: object
required:
- access_key
- base
additionalProperties: true
properties:
access_key:
type: string
description: >-
Your API Access Key. See <a
href="https://exchangeratesapi.io/documentation/">here</a>. The key is
case sensitive.
airbyte_secret: true
base:
type: string
description: >-
ISO reference currency. See <a
href="https://www.ecb.europa.eu/stats/policy_and_exchange_rates/euro_reference_exchange_rates/html/index.en.html">here</a>.
examples:
- EUR
- USD
```
2. We also need to fill in the connection config in the `secrets/config.json`
Because of the sensitive nature of the access key, we recommend storing this config in the `secrets` directory because it is ignored by git.
```bash
echo '{"access_key": "<your_access_key>", "base": "USD"}' > secrets/config.json
```
## Updating the connector definition
Next, we'll update the connector definition (`source-exchange-rates-tutorial/source_exchange_rates_tutorial/manifest.yaml`). It was generated by the code generation script.
More details on the connector definition file can be found in the [overview](../low-code-cdk-overview.md) and [connection definition](../understanding-the-yaml-file/yaml-overview.md) sections.
Let's fill this out these TODOs with the information found in the [Exchange Rates API docs](https://apilayer.com/marketplace/exchangerates_data-api).
1. We'll first set the API's base url. According to the API documentation, the base url is `"https://api.apilayer.com"`.
```yaml
definitions:
<...>
requester:
url_base: "https://api.apilayer.com"
```
2. Then, let's rename the stream from `customers` to `rates`, update the primary key to `date`, and set the path to "/exchangerates_data/latest" as per the API's documentation. This path is specific to the stream, so we'll set it within the `rates_stream` definition
```yaml
rates_stream:
$ref: "#/definitions/base_stream"
$parameters:
name: "rates"
primary_key: "date"
path: "/exchangerates_data/latest"
```
We'll also update the reference in the `streams` block
```yaml
streams:
- "#/definitions/rates_stream"
```
3. Update the references in the `check` block
```yaml
check:
stream_names:
- "rates"
```
Adding the reference in the `check` tells the `check` operation to use that stream to test the connection.
4. Next, we'll set up the authentication.
The Exchange Rates API requires an access key to be passed as header named "apikey".
This can be done using an `ApiKeyAuthenticator`, which we'll configure to point to the config's `access_key` field.
```yaml
definitions:
<...>
requester:
url_base: "https://api.apilayer.com"
http_method: "GET"
authenticator:
type: ApiKeyAuthenticator
header: "apikey"
api_token: "{{ config['access_key'] }}"
```
5. According to the ExchangeRatesApi documentation, we can specify the base currency of interest in a request parameter. Let's assume the user will configure this via the connector configuration in parameter called `base`; we'll pass the value input by the user as a request parameter:
```yaml
definitions:
<...>
requester:
<...>
request_options_provider:
request_parameters:
base: "{{ config['base'] }}"
```
The full connector definition should now look like
```yaml
version: "0.1.0"
definitions:
selector:
extractor:
field_path: []
requester:
url_base: "https://api.apilayer.com"
http_method: "GET"
authenticator:
type: ApiKeyAuthenticator
header: "apikey"
api_token: "{{ config['access_key'] }}"
request_options_provider:
request_parameters:
base: "{{ config['base'] }}"
retriever:
record_selector:
$ref: "#/definitions/selector"
paginator:
type: NoPagination
requester:
$ref: "#/definitions/requester"
base_stream:
retriever:
$ref: "#/definitions/retriever"
rates_stream:
$ref: "#/definitions/base_stream"
$parameters:
name: "rates"
primary_key: "date"
path: "/exchangerates_data/latest"
streams:
- "#/definitions/rates_stream"
check:
stream_names:
- "rates"
spec:
documentation_url: https://docs.airbyte.com/integrations/sources/exchangeratesapi
connection_specification:
$schema: http://json-schema.org/draft-07/schema#
title: exchangeratesapi.io Source Spec
type: object
required:
- access_key
- base
additionalProperties: true
properties:
access_key:
type: string
description: >-
Your API Access Key. See <a
href="https://exchangeratesapi.io/documentation/">here</a>. The key is
case sensitive.
airbyte_secret: true
base:
type: string
description: >-
ISO reference currency. See <a
href="https://www.ecb.europa.eu/stats/policy_and_exchange_rates/euro_reference_exchange_rates/html/index.en.html">here</a>.
examples:
- EUR
- USD
```
We can now run the `check` operation, which verifies the connector can connect to the API source.
```bash
poetry run source-exchange-rates-tutorial check --config secrets/config.json
```
which should now succeed with logs similar to:
```
{"type": "LOG", "log": {"level": "INFO", "message": "Check succeeded"}}
{"type": "CONNECTION_STATUS", "connectionStatus": {"status": "SUCCEEDED"}}
```
## Next steps
Next, we'll [extract the records from the response](4-reading-data.md)
## More readings
- [Config-based connectors overview](../low-code-cdk-overview.md)
- [Authentication](../understanding-the-yaml-file/authentication.md)
- [Request options providers](../understanding-the-yaml-file/request-options.md)
- [Schema definition](../../cdk-python/schemas.md)
- [Connector specification reference](../../connector-specification-reference.md)
- [Beginner's guide to catalog](../../../understanding-airbyte/beginners-guide-to-catalog.md)

View File

@@ -1,70 +0,0 @@
# Step 4: Reading data
Now that we're able to authenticate to the source API, we'll want to select data from the HTTP responses.
Let's first add the stream to the configured catalog in `source-exchange-rates-tutorial/integration_tests/configured_catalog.json`
```json
{
"streams": [
{
"stream": {
"name": "rates",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}
```
The configured catalog declares the sync modes supported by the stream (full refresh or incremental).
See the [catalog guide](https://docs.airbyte.com/understanding-airbyte/beginners-guide-to-catalog) for more information.
Let's define the stream schema in `source-exchange-rates-tutorial/source_exchange_rates_tutorial/schemas/rates.json`
You can download the JSON file describing the output schema with all currencies [here](./exchange_rates_schema.json) for convenience and place it in `schemas/`.
```bash
curl https://raw.githubusercontent.com/airbytehq/airbyte/master/docs/connector-development/config-based/tutorial/exchange_rates_schema.json > source_exchange_rates_tutorial/schemas/rates.json
```
We can also delete the boilerplate schema files
```bash
rm source_exchange_rates_tutorial/schemas/customers.json
rm source_exchange_rates_tutorial/schemas/employees.json
```
As an alternative to storing the stream's data schema to the `schemas/` directory, we can store it inline in the YAML file, by including the optional `schema_loader` key and associated schema in the entry for each stream. More information on how to define a stream's schema in the YAML file can be found [here](../understanding-the-yaml-file/yaml-overview.md).
Reading from the source can be done by running the `read` operation
```bash
poetry run source-exchange-rates-tutorial read --config secrets/config.json --catalog integration_tests/configured_catalog.json
```
The logs should show that 1 record was read from the stream.
```
{"type": "LOG", "log": {"level": "INFO", "message": "Read 1 records from rates stream"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing rates"}}
```
The `--debug` flag can be set to print out debug information, including the outgoing request and its associated response
```bash
poetry run source-exchange-rates-tutorial read --config secrets/config.json --catalog integration_tests/configured_catalog.json --debug
```
## Next steps
We now have a working implementation of a connector reading the latest exchange rates for a given currency.
We're however limited to only reading the latest exchange rate value.
Next, we'll [enhance the connector to read data for a given date, which will enable us to backfill the stream with historical data](5-incremental-reads.md).
## More readings
- [Record selector](../understanding-the-yaml-file/record-selector.md)
- [Catalog guide](https://docs.airbyte.com/understanding-airbyte/beginners-guide-to-catalog)

View File

@@ -1,314 +0,0 @@
# Step 5: Incremental Reads
We now have a working implementation of a connector reading the latest exchange rates for a given currency.
In this section, we'll update the source to read historical data instead of only reading the latest exchange rates.
According to the API documentation, we can read the exchange rate for a specific date by querying the `"/exchangerates_data/{date}"` endpoint instead of `"/exchangerates_data/latest"`.
We'll now add a `start_date` property to the connector.
First we'll update the spec block in `source_exchange_rates_tutorial/manifest.yaml`
```yaml
spec:
documentation_url: https://docs.airbyte.com/integrations/sources/exchangeratesapi
connection_specification:
$schema: http://json-schema.org/draft-07/schema#
title: exchangeratesapi.io Source Spec
type: object
required:
- start_date
- access_key
- base
additionalProperties: true
properties:
start_date:
type: string
description: Start getting data from that date.
pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}$
examples:
- YYYY-MM-DD
access_key:
type: string
description: >-
Your API Access Key. See <a
href="https://exchangeratesapi.io/documentation/">here</a>. The key is
case sensitive.
airbyte_secret: true
base:
type: string
description: >-
ISO reference currency. See <a
href="https://www.ecb.europa.eu/stats/policy_and_exchange_rates/euro_reference_exchange_rates/html/index.en.html">here</a>.
examples:
- EUR
- USD
```
Then we'll set the `start_date` to last week in our connection config in `secrets/config.json`.
Let's add a start_date field to `secrets/config.json`.
The file should look like
```json
{
"access_key": "<your_access_key>",
"start_date": "2022-07-26",
"base": "USD"
}
```
where the start date should be 7 days in the past.
And we'll update the `path` in the connector definition to point to `/{{ config.start_date }}`.
Note that we are setting a default value because the `check` operation does not know the `start_date`. We'll default to hitting `/exchangerates_data/latest`:
```yaml
definitions:
<...>
rates_stream:
$ref: "#/definitions/base_stream"
$parameters:
name: "rates"
primary_key: "date"
path: "/exchangerates_data/{{config['start_date'] or 'latest'}}"
```
You can test these changes by executing the `read` operation:
```bash
poetry run source-exchange-rates-tutorial read --config secrets/config.json --catalog integration_tests/configured_catalog.json
```
By reading the output record, you should see that we read historical data instead of the latest exchange rate.
For example:
> "historical": true, "base": "USD", "date": "2022-07-18"
The connector will now always read data for the start date, which is not exactly what we want.
Instead, we would like to iterate over all the dates between the `start_date` and today and read data for each day.
We can do this by adding a `DatetimeBasedCursor` to the connector definition, and update the `path` to point to the stream_slice's `start_date`:
More details on incremental syncs can be found [here](../understanding-the-yaml-file/incremental-syncs.md).
Let's first define a datetime cursor at the top level of the connector definition:
```yaml
definitions:
datetime_cursor:
type: "DatetimeBasedCursor"
start_datetime:
datetime: "{{ config['start_date'] }}"
datetime_format: "%Y-%m-%d"
end_datetime:
datetime: "{{ now_utc() }}"
datetime_format: "%Y-%m-%d %H:%M:%S.%f+00:00"
step: "P1D"
datetime_format: "%Y-%m-%d"
cursor_granularity: "P1D"
cursor_field: "date"
```
and refer to it in the stream.
This will generate time windows from the start time until the end time, where each window is exactly one day.
The start time is defined in the config file, while the end time is defined by the `now_utc()` macro, which will evaluate to the current date in the current timezone at runtime. See the section on [string interpolation](../advanced-topics/string-interpolation.md) for more details.
```yaml
definitions:
<...>
rates_stream:
$ref: "#/definitions/base_stream"
$parameters:
name: "rates"
primary_key: "date"
path: "/exchangerates_data/{{config['start_date'] or 'latest'}}"
```
We'll also update the base stream to use the datetime cursor:
```yaml
definitions:
<...>
base_stream:
<...>
incremental_sync:
$ref: "#/definitions/datetime_cursor"
```
Finally, we'll update the path to point to the `stream_slice`'s start_time
```yaml
definitions:
<...>
rates_stream:
$ref: "#/definitions/base_stream"
$parameters:
name: "rates"
primary_key: "date"
path: "/exchangerates_data/{{stream_slice['start_time'] or 'latest'}}"
```
The full connector definition should now look like `./source_exchange_rates_tutorial/manifest.yaml`:
```yaml
version: "0.1.0"
definitions:
selector:
extractor:
field_path: []
requester:
url_base: "https://api.apilayer.com"
http_method: "GET"
authenticator:
type: ApiKeyAuthenticator
header: "apikey"
api_token: "{{ config['access_key'] }}"
request_options_provider:
request_parameters:
base: "{{ config['base'] }}"
datetime_cursor:
type: "DatetimeBasedCursor"
start_datetime:
datetime: "{{ config['start_date'] }}"
datetime_format: "%Y-%m-%d"
end_datetime:
datetime: "{{ now_utc() }}"
datetime_format: "%Y-%m-%d %H:%M:%S.%f+00:00"
step: "P1D"
datetime_format: "%Y-%m-%d"
cursor_granularity: "P1D"
cursor_field: "date"
retriever:
record_selector:
$ref: "#/definitions/selector"
paginator:
type: NoPagination
requester:
$ref: "#/definitions/requester"
base_stream:
incremental_sync:
$ref: "#/definitions/datetime_cursor"
retriever:
$ref: "#/definitions/retriever"
rates_stream:
$ref: "#/definitions/base_stream"
$parameters:
name: "rates"
primary_key: "date"
path: "/exchangerates_data/{{stream_slice['start_time'] or 'latest'}}"
streams:
- "#/definitions/rates_stream"
check:
stream_names:
- "rates"
spec:
documentation_url: https://docs.airbyte.com/integrations/sources/exchangeratesapi
connection_specification:
$schema: http://json-schema.org/draft-07/schema#
title: exchangeratesapi.io Source Spec
type: object
required:
- start_date
- access_key
- base
additionalProperties: true
properties:
start_date:
type: string
description: Start getting data from that date.
pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}$
examples:
- YYYY-MM-DD
access_key:
type: string
description: >-
Your API Access Key. See <a
href="https://exchangeratesapi.io/documentation/">here</a>. The key is
case sensitive.
airbyte_secret: true
base:
type: string
description: >-
ISO reference currency. See <a
href="https://www.ecb.europa.eu/stats/policy_and_exchange_rates/euro_reference_exchange_rates/html/index.en.html">here</a>.
examples:
- EUR
- USD
```
Running the `read` operation will now read all data for all days between start_date and now:
```bash
poetry run source-exchange-rates-tutorial read --config secrets/config.json --catalog integration_tests/configured_catalog.json
```
The operation should now output more than one record:
```
{"type": "LOG", "log": {"level": "INFO", "message": "Read 8 records from rates stream"}}
```
## Supporting incremental syncs
Instead of always reading data for all dates, we would like the connector to only read data for dates we haven't read yet.
This can be achieved by updating the catalog to run in incremental mode (`integration_tests/configured_catalog.json`):
```json
{
"streams": [
{
"stream": {
"name": "rates",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite"
}
]
}
```
In addition to records, the `read` operation now also outputs state messages:
```
{"type": "STATE", "state": {"data": {"rates": {"date": "2022-07-15"}}}}
```
Where the date ("2022-07-15") should be replaced by today's date.
We can simulate incremental syncs by creating a state file containing the last state produced by the `read` operation.
`source-exchange-rates-tutorial/integration_tests/sample_state.json`:
```json
{
"rates": {
"date": "2022-07-15"
}
}
```
Running the `read` operation will now only read data for dates later than the given state:
```bash
poetry run source-exchange-rates-tutorial read --config secrets/config.json --catalog integration_tests/configured_catalog.json --state integration_tests/sample_state.json
```
There shouldn't be any data read if the state is today's date:
```
{"type": "LOG", "log": {"level": "INFO", "message": "Setting state of rates stream to {'date': '2022-07-15'}"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Read 0 records from rates stream"}}
```
## Next steps:
Next, we'll run the [Connector Acceptance Tests suite to ensure the connector invariants are respected](6-testing.md).
## More readings
- [Incremental syncs](../understanding-the-yaml-file/incremental-syncs.md)
- [Partition routers](../understanding-the-yaml-file/partition-router.md)
- [Stream slices](../../cdk-python/stream-slices.md)

View File

@@ -1,48 +0,0 @@
# Step 6: Testing
We should make sure the connector respects the Airbyte specifications before we start using it in production.
This can be done by executing the Connector Acceptance Tests.
These tests will assert the most basic functionalities work as expected and are configured in `acceptance-test-config.yml`.
Before running the tests, we'll create an invalid config to make sure the `check` operation fails if the credentials are wrong, and an abnormal state to verify the connector's behavior when running with an abnormal state.
Update `integration_tests/invalid_config.json` with this content
```json
{
"access_key": "<invalid_key>",
"start_date": "2022-07-21",
"base": "USD"
}
```
and `integration_tests/abnormal_state.json` with
```json
{
"rates": {
"date": "2999-12-31"
}
}
```
You can run the [acceptance tests](https://github.com/airbytehq/airbyte/blob/master/docs/connector-development/testing-connectors/connector-acceptance-tests-reference.md#L1) with the following commands using [`airbyte-ci`](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md#L1):
```bash
airbyte-ci connectors --use-remote-secrets=false --name source-exchange-rates-tutorial test --only-step=acceptance
```
## Next steps:
Next, we'll add the connector to the [Airbyte platform](https://docs.airbyte.com/operator-guides/using-custom-connectors).
## Read more:
- [Error handling](../understanding-the-yaml-file/error-handling.md)
- [Pagination](../understanding-the-yaml-file/pagination.md)
- [Testing connectors](../../testing-connectors/README.md)
- [Contribution guide](../../../contributing-to-airbyte/README.md)
- [Greenhouse source](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-greenhouse)
- [Sendgrid source](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sendgrid)
- [Sentry source](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sentry)

View File

@@ -1,119 +0,0 @@
{
"type": "object",
"required": ["base", "date", "rates"],
"properties": {
"access_key": {
"type": "string"
},
"base": {
"type": "string"
},
"date": {
"type": "string"
},
"rates": {
"type": "object",
"properties": {
"CAD": {
"type": ["null", "number"]
},
"HKD": {
"type": ["null", "number"]
},
"ISK": {
"type": ["null", "number"]
},
"PHP": {
"type": ["null", "number"]
},
"DKK": {
"type": ["null", "number"]
},
"HUF": {
"type": ["null", "number"]
},
"CZK": {
"type": ["null", "number"]
},
"GBP": {
"type": ["null", "number"]
},
"RON": {
"type": ["null", "number"]
},
"SEK": {
"type": ["null", "number"]
},
"IDR": {
"type": ["null", "number"]
},
"INR": {
"type": ["null", "number"]
},
"BRL": {
"type": ["null", "number"]
},
"RUB": {
"type": ["null", "number"]
},
"HRK": {
"type": ["null", "number"]
},
"JPY": {
"type": ["null", "number"]
},
"THB": {
"type": ["null", "number"]
},
"CHF": {
"type": ["null", "number"]
},
"EUR": {
"type": ["null", "number"]
},
"MYR": {
"type": ["null", "number"]
},
"BGN": {
"type": ["null", "number"]
},
"TRY": {
"type": ["null", "number"]
},
"CNY": {
"type": ["null", "number"]
},
"NOK": {
"type": ["null", "number"]
},
"NZD": {
"type": ["null", "number"]
},
"ZAR": {
"type": ["null", "number"]
},
"USD": {
"type": ["null", "number"]
},
"MXN": {
"type": ["null", "number"]
},
"SGD": {
"type": ["null", "number"]
},
"AUD": {
"type": ["null", "number"]
},
"ILS": {
"type": ["null", "number"]
},
"KRW": {
"type": ["null", "number"]
},
"PLN": {
"type": ["null", "number"]
}
}
}
}
}

View File

@@ -22,7 +22,7 @@ Docker and Java with the versions listed in the
### Creating a destination
- Step 1: Create the destination using the template generator
- Step 1: Create the destination using one of the other connectors as an example
- Step 2: Build the newly generated destination
- Step 3: Implement `spec` to define the configuration required to run the connector
- Step 4: Implement `check` to provide a way to validate configurations provided to the connector
@@ -46,18 +46,9 @@ questions you have, or ask us on [slack](https://slack.airbyte.io).
## Explaining Each Step
### Step 1: Create the destination using the template
### Step 1: Create the destination
Airbyte provides a code generator which bootstraps the scaffolding for our connector.
```bash
$ cd airbyte-integrations/connector-templates/generator # assumes you are starting from the root of the Airbyte project.
$ ./generate.sh
```
Select the `Java Destination` template and then input the name of your connector. We'll refer to the
destination as `<name>-destination` in this tutorial, but you should replace `<name>` with the
actual name you used for your connector e.g: `BigQueryDestination` or `bigquery-destination`.
Use `destination-s3` as an example and copy over the relevant build system pieces.
### Step 2: Build the newly generated destination

View File

@@ -1,21 +0,0 @@
{
"streams": [
{
"stream": {
"name": "pokemon",
"json_schema": {
"$schema": "http://json-schema.org/draft-04/schema#",
"type": "object",
"properties": {
"pokemon_name": {
"type": "string"
}
}
},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}

View File

@@ -1,271 +0,0 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": ["null", "integer"]
},
"name": {
"type": ["null", "string"]
},
"base_experience": {
"type": ["null", "integer"]
},
"height": {
"type": ["null", "integer"]
},
"is_default ": {
"type": ["null", "boolean"]
},
"order": {
"type": ["null", "integer"]
},
"weight": {
"type": ["null", "integer"]
},
"abilities": {
"type": ["null", "array"],
"items": {
"type": ["null", "object"],
"properties": {
"is_hidden": {
"type": ["null", "boolean"]
},
"slot": {
"type": ["null", "integer"]
},
"ability": {
"type": ["null", "object"],
"properties": {
"name": {
"type": ["null", "string"]
},
"url": {
"type": ["null", "string"]
}
}
}
}
}
},
"forms": {
"type": ["null", "array"],
"items": {
"type": ["null", "object"],
"properties": {
"name": {
"type": ["null", "string"]
},
"url": {
"type": ["null", "string"]
}
}
}
},
"game_indices": {
"type": ["null", "array"],
"items": {
"type": ["null", "object"],
"properties": {
"game_index": {
"type": ["null", "integer"]
},
"version": {
"type": ["null", "object"],
"properties": {
"name": {
"type": ["null", "string"]
},
"url": {
"type": ["null", "string"]
}
}
}
}
}
},
"held_items": {
"type": ["null", "array"],
"items": {
"type": ["null", "object"],
"properties": {
"item": {
"type": ["null", "object"],
"properties": {
"name": {
"type": ["null", "string"]
},
"url": {
"type": ["null", "string"]
}
}
},
"version_details": {
"type": ["null", "array"],
"items": {
"type": ["null", "object"],
"properties": {
"version": {
"type": ["null", "object"],
"properties": {
"name": {
"type": ["null", "string"]
},
"url": {
"type": ["null", "string"]
}
}
},
"rarity": {
"type": ["null", "string"]
}
}
}
}
}
}
},
"location_area_encounters": {
"type": ["null", "string"]
},
"moves": {
"type": ["null", "array"],
"items": {
"type": ["null", "object"],
"properties": {
"move": {
"type": ["null", "object"],
"properties": {
"name": {
"type": ["null", "string"]
},
"url": {
"type": ["null", "string"]
}
}
},
"version_group_details": {
"type": ["null", "array"],
"items": {
"type": ["null", "object"],
"properties": {
"move_learn_method": {
"type": ["null", "object"],
"properties": {
"name": {
"type": ["null", "string"]
},
"url": {
"type": ["null", "string"]
}
}
},
"version_group": {
"type": ["null", "object"],
"properties": {
"name": {
"type": ["null", "string"]
},
"url": {
"type": ["null", "string"]
}
}
},
"level_learned_at": {
"type": ["null", "integer"]
}
}
}
}
}
}
},
"sprites": {
"type": ["null", "object"],
"properties": {
"front_default": {
"type": ["null", "string"]
},
"front_shiny": {
"type": ["null", "string"]
},
"front_female": {
"type": ["null", "string"]
},
"front_shiny_female": {
"type": ["null", "string"]
},
"back_default": {
"type": ["null", "string"]
},
"back_shiny": {
"type": ["null", "string"]
},
"back_female": {
"type": ["null", "string"]
},
"back_shiny_female": {
"type": ["null", "string"]
}
}
},
"species": {
"type": ["null", "object"],
"properties": {
"name": {
"type": ["null", "string"]
},
"url": {
"type": ["null", "string"]
}
}
},
"stats": {
"type": ["null", "array"],
"items": {
"type": ["null", "object"],
"properties": {
"stat": {
"type": ["null", "object"],
"properties": {
"name": {
"type": ["null", "string"]
},
"url": {
"type": ["null", "string"]
}
}
},
"effort": {
"type": ["null", "integer"]
},
"base_stat": {
"type": ["null", "integer"]
}
}
}
},
"types": {
"type": ["null", "array"],
"items": {
"type": ["null", "object"],
"properties": {
"slot": {
"type": ["null", "integer"]
},
"type": {
"type": ["null", "object"],
"properties": {
"name": {
"type": ["null", "string"]
},
"url": {
"type": ["null", "string"]
}
}
}
}
}
}
}
}

View File

@@ -1,307 +0,0 @@
# Python CDK Speedrun: Creating a Source
## CDK Speedrun \(HTTP API Source Creation Any Route\)
This is a blazing fast guide to building an HTTP source connector. Think of it as the TL;DR version
of [this tutorial.](custom-python-connector/0-getting-started.md)
If you are a visual learner and want to see a video version of this guide going over each part in
detail, check it out below.
[A speedy CDK overview.](https://www.youtube.com/watch?v=kJ3hLoNfz_E)
## Dependencies
1. Python &gt;= 3.9
2. [Poetry](https://python-poetry.org/)
3. Docker
#### Generate the Template
```bash
# # clone the repo if you havent already
# git clone --depth 1 https://github.com/airbytehq/airbyte/
# cd airbyte # start from repo root
cd airbyte-integrations/connector-templates/generator
./generate.sh
```
Select the `Python CDK Source` and name it `python-http-example`.
#### Create Dev Environment
```bash
cd ../../connectors/source-python-http-example
poetry install
```
### Define Connector Inputs
```bash
cd source_python_http_example
```
We're working with the PokeAPI, so we need to define our input schema to reflect that. Open the
`spec.yaml` file here and replace it with:
```yaml
documentationUrl: https://docs.airbyte.com/integrations/sources/pokeapi
connectionSpecification:
$schema: http://json-schema.org/draft-07/schema#
title: Pokeapi Spec
type: object
required:
- pokemon_name
properties:
pokemon_name:
type: string
description: Pokemon requested from the API.
pattern: ^[a-z0-9_\-]+$
examples:
- ditto
- luxray
- snorlax
```
As you can see, we have one input to our input schema, which is `pokemon_name`, which is required.
Normally, input schemas will contain information such as API keys and client secrets that need to
get passed down to all endpoints or streams.
Ok, let's write a function that checks the inputs we just defined. Nuke the `source.py` file. Now
add this code to it. For a crucial time skip, we're going to define all the imports we need in the
future here. Also note that your `AbstractSource` class name must be a camel-cased version of the
name you gave in the generation phase. In our case, this is `SourcePythonHttpExample`.
```python
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
import requests
import logging
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
from . import pokemon_list
logger = logging.getLogger("airbyte")
class SourcePythonHttpExample(AbstractSource):
def check_connection(self, logger, config) -> Tuple[bool, any]:
logger.info("Checking Pokemon API connection...")
input_pokemon = config["pokemon_name"]
if input_pokemon not in pokemon_list.POKEMON_LIST:
result = f"Input Pokemon {input_pokemon} is invalid. Please check your spelling and input a valid Pokemon."
logger.info(f"PokeAPI connection failed: {result}")
return False, result
else:
logger.info(f"PokeAPI connection success: {input_pokemon} is a valid Pokemon")
return True, None
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
return [Pokemon(pokemon_name=config["pokemon_name"])]
```
Create a new file called `pokemon_list.py` at the same level. This will handle input validation for
us so that we don't input invalid Pokemon. Let's start with a very limited list - any Pokemon not
included in this list will get rejected.
```python
"""
pokemon_list.py includes a list of all known pokemon for config validation in source.py.
"""
POKEMON_LIST = [
"bulbasaur",
"charizard",
"wartortle",
"pikachu",
"crobat",
]
```
Test it.
```bash
cd ..
mkdir sample_files
echo '{"pokemon_name": "pikachu"}' > sample_files/config.json
echo '{"pokemon_name": "chikapu"}' > sample_files/invalid_config.json
poetry run source-python-http-example check --config sample_files/config.json
poetry run source-python-http-example check --config sample_files/invalid_config.json
```
Expected output:
```bash
> poetry run source-python-http-example check --config sample_files/config.json
{"type": "CONNECTION_STATUS", "connectionStatus": {"status": "SUCCEEDED"}}
> poetry run source-python-http-example check --config sample_files/invalid_config.json
{"type": "CONNECTION_STATUS", "connectionStatus": {"status": "FAILED", "message": "'Input Pokemon chikapu is invalid. Please check your spelling our input a valid Pokemon.'"}}
```
### Define your Stream
In your `source.py` file, add this `Pokemon` class. This stream represents an endpoint you want to
hit, which in our case, is the single [Pokemon endpoint](https://pokeapi.co/docs/v2#pokemon).
```python
class Pokemon(HttpStream):
url_base = "https://pokeapi.co/api/v2/"
# Set this as a noop.
primary_key = None
def __init__(self, pokemon_name: str, **kwargs):
super().__init__(**kwargs)
self.pokemon_name = pokemon_name
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
# The API does not offer pagination, so we return None to indicate there are no more pages in the response
return None
def path(
self,
) -> str:
return "" # TODO
def parse_response(
self,
) -> Iterable[Mapping]:
return None # TODO
```
Now download [this file](./cdk-speedrun-assets/pokemon.json). Name it `pokemon.json` and place it in
`/source_python_http_example/schemas`.
This file defines your output schema for every endpoint that you want to implement. Normally, this
will likely be the most time-consuming section of the connector development process, as it requires
defining the output of the endpoint exactly. This is really important, as Airbyte needs to have
clear expectations for what the stream will output. Note that the name of this stream will be
consistent in the naming of the JSON schema and the `HttpStream` class, as `pokemon.json` and
`Pokemon` respectively in this case. Learn more about schema creation
[here](https://docs.airbyte.com/connector-development/cdk-python/full-refresh-stream#defining-the-streams-schema).
Test your discover function. You should receive a fairly large JSON object in return.
```bash
poetry run source-python-http-example discover --config sample_files/config.json
```
Note that our discover function is using the `pokemon_name` config variable passed in from the
`Pokemon` stream when we set it in the `__init__` function.
### Reading Data from the Source
Update your `Pokemon` class to implement the required functions as follows:
```python
class Pokemon(HttpStream):
url_base = "https://pokeapi.co/api/v2/"
# Set this as a noop.
primary_key = None
def __init__(self, pokemon_name: str, **kwargs):
super().__init__(**kwargs)
# Here's where we set the variable from our input to pass it down to the source.
self.pokemon_name = pokemon_name
def path(self, **kwargs) -> str:
pokemon_name = self.pokemon_name
# This defines the path to the endpoint that we want to hit.
return f"pokemon/{pokemon_name}"
def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
# The api requires that we include the Pokemon name as a query param so we do that in this method.
return {"pokemon_name": self.pokemon_name}
def parse_response(
self,
response: requests.Response,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Iterable[Mapping]:
# The response is a simple JSON whose schema matches our stream's schema exactly,
# so we just return a list containing the response.
return [response.json()]
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
# While the PokeAPI does offer pagination, we will only ever retrieve one Pokemon with this implementation,
# so we just return None to indicate that there will never be any more pages in the response.
return None
```
We now need a catalog that defines all of our streams. We only have one stream: `Pokemon`. Download
that file [here](./cdk-speedrun-assets/configured_catalog_pokeapi.json). Place it in `/sample_files`
named as `configured_catalog.json`. More clearly, this is where we tell Airbyte all the
streams/endpoints we support for the connector and in which sync modes Airbyte can run the connector
on. Learn more about the AirbyteCatalog
[here](https://docs.airbyte.com/understanding-airbyte/beginners-guide-to-catalog) and learn more
about sync modes [here](https://docs.airbyte.com/understanding-airbyte/connections#sync-modes).
Let's read some data.
```bash
poetry run source-python-http-example read --config sample_files/config.json --catalog sample_files/configured_catalog.json
```
If all goes well, containerize it so you can use it in the UI:
**Option A: Building the docker image with `airbyte-ci`**
This is the preferred method for building and testing connectors.
If you want to open source your connector we encourage you to use our
[`airbyte-ci`](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md)
tool to build your connector. It will not use a Dockerfile but will build the connector image from
our
[base image](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/base_images/README.md)
and use our internal build logic to build an image from your Python connector code.
Running `airbyte-ci connectors --name source-<source-name> build` will build your connector image.
Once the command is done, you will find your connector image in your local docker host:
`airbyte/source-<source-name>:dev`.
**Option B: Building the docker image with a Dockerfile**
If you don't want to rely on `airbyte-ci` to build your connector, you can build the docker image
using your own Dockerfile. This method is not preferred, and is not supported for certified
connectors.
Create a `Dockerfile` in the root of your connector directory. The `Dockerfile` should look
something like this:
```Dockerfile
FROM airbyte/python-connector-base:1.1.0
COPY . ./airbyte/integration_code
RUN pip install ./airbyte/integration_code
# The entrypoint and default env vars are already set in the base image
# ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
# ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
```
Please use this as an example. This is not optimized.
Build your image:
```bash
docker build . -t airbyte/source-example-python:dev
```
You're done. Stop the clock :\)
## Further reading
If you have enjoyed the above example, and would like to explore the Python CDK in even more detail,
you may be interested looking at
[how to build a connector to extract data from the Webflow API](https://airbyte.com/tutorials/extract-data-from-the-webflow-api)

View File

@@ -7,36 +7,34 @@ git clone git@github.com:airbytehq/airbyte.git
cd airbyte
```
Use the Airbyte provided code generator which bootstraps the scaffolding for our connector:
Next, you will want to create a new connector.
## Initialize connector project
```bash
cd airbyte-integrations/connector-templates/generator
./generate.sh
git clone git@github.com:airbytehq/airbyte.git
cd airbyte
# Make a directory for a new connector and navigate to it
mkdir airbyte-integrations/connectors/source-exchange-rates-tutorial
cd airbyte-integrations/connectors/source-exchange-rates-tutorial
# Initialize a project, follow Poetry prompts, and then add airbyte-cdk as a dependency.
poetry init
poetry add airbyte-cdk
```
Select Python CDK Source Set name to `survey-monkey-demo`
For this walkthrough, we'll refer to our source as `exchange-rates-tutorial`.
Next change your working directory to the new connector module. Also change airbyte-cdk version
to the one used for this tutorial in `pyproject.toml`:
## Add Connector Metadata file
```bash
cd ../../connectors/source-survey-monkey-demo
```
Each Airbyte connector needs to have a valid `metadata.yaml` file in the root of the connector directory. [Here is metadata.yaml format documentation](../../../connector-development/connector-metadata-file.md).
Then create an initial python environment and install the dependencies required to run an API Source connector:
## Implement connector entrypoint scripts
```bash
poetry lock
poetry install --with dev
```
Airbyte connectors are expected to be able to run `spec`, `check`, `discover`, and `read` commands. You can use `run.py` file in Airbyte connectors as an example of how to implement them.
Let's verify the unit tests pass
```bash
poetry run pytest unit_tests
```
And the check operation fails as expected
## Running operations
```bash
poetry run source-survey-monkey-demo check --config secrets/config.json

View File

@@ -1,335 +0,0 @@
# Adding Incremental Sync to a Source
## Overview
This tutorial will assume that you already have a working source. If you do not, feel free to refer
to the [Building a Toy Connector](build-a-connector-the-hard-way.md) tutorial. This tutorial will
build directly off the example from that article. We will also assume that you have a basic
understanding of how Airbyte's Incremental-Append replication strategy works. We have a brief
explanation of it [here](../../../using-airbyte/core-concepts/sync-modes/incremental-append.md).
## Update Catalog in `discover`
First we need to identify a given stream in the Source as supporting incremental. This information
is declared in the catalog that the `discover` method returns. You will notice in the stream object
contains a field called `supported_sync_modes`. If we are adding incremental to an existing stream,
we just need to add `"incremental"` to that array. This tells Airbyte that this stream can either be
synced in an incremental fashion. In practice, this will mean that in the UI, a user will have the
ability to configure this type of sync.
In the example we used in the Toy Connector tutorial, the `discover` method would not look like
this. Note: that "incremental" has been added to the `supported_sync_modes` array. We also set
`source_defined_cursor` to `True` and `default_cursor_field` to `["date"]` to declare that the
Source knows what field to use for the cursor, in this case the date field, and does not require
user input. Nothing else has changed.
```python
def discover():
catalog = {
"streams": [{
"name": "stock_prices",
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": True,
"default_cursor_field": ["date"],
"json_schema": {
"properties": {
"date": {
"type": "string"
},
"price": {
"type": "number"
},
"stock_ticker": {
"type": "string"
}
}
}
}]
}
airbyte_message = {"type": "CATALOG", "catalog": catalog}
print(json.dumps(airbyte_message))
```
Also, create a file called `incremental_configured_catalog.json` with the following content:
```javascript
{
"streams": [
{
"stream": {
"name": "stock_prices",
"supported_sync_modes": [
"full_refresh",
"incremental"
],
"json_schema": {
"properties": {
"date": {
"type": "string"
},
"price": {
"type": "number"
},
"stock_ticker": {
"type": "string"
}
}
}
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite"
}
]
}
```
## Update `read`
Next we will adapt the `read` method that we wrote previously. We need to change three things.
First, we need to pass it information about what data was replicated in the previous sync. In
Airbyte this is called a `state` object. The structure of the state object is determined by the
Source. This means that each Source can construct a state object that makes sense to it and does not
need to worry about adhering to any other convention. That being said, a pretty typical structure
for a state object is a map of stream name to the last value in the cursor field for that stream.
In this case we might choose something like this:
```javascript
{
"stock_prices": {
"date": "2020-02-01"
}
}
```
The second change we need to make to the `read` method is to use the state object so that we only
emit new records.
Lastly, we need to emit an updated state object, so that the next time this Source runs we do not
resend messages that we have already sent.
Here's what our updated `read` method would look like.
```python
def read(config, catalog, state):
# Assert required configuration was provided
if "api_key" not in config or "stock_ticker" not in config:
log_error("Input config must contain the properties 'api_key' and 'stock_ticker'")
sys.exit(1)
# Find the stock_prices stream if it is present in the input catalog
stock_prices_stream = None
for configured_stream in catalog["streams"]:
if configured_stream["stream"]["name"] == "stock_prices":
stock_prices_stream = configured_stream
if stock_prices_stream is None:
log_error("No streams selected")
return
# By default we fetch stock prices for the 7 day period ending with today
today = date.today()
cursor_value = today.strftime("%Y-%m-%d")
from_day = (today - timedelta(days=7)).strftime("%Y-%m-%d")
# In case of incremental sync, state should contain the last date when we fetched stock prices
if stock_prices_stream["sync_mode"] == "incremental":
if state and "stock_prices" in state and state["stock_prices"].get("date"):
from_date = datetime.strptime(state["stock_prices"].get("date"), "%Y-%m-%d")
from_day = (from_date + timedelta(days=1)).strftime("%Y-%m-%d")
# If the state indicates that we have already ran the sync up to cursor_value, we can skip the sync
if cursor_value > from_day:
# If we've made it this far, all the configuration is good and we can pull the market data
response = _call_api(ticker=config["stock_ticker"], token = config["api_key"], from_day=from_day, to_day=cursor_value)
if response.status_code != 200:
# In a real scenario we'd handle this error better :)
log_error("Failure occurred when calling Polygon.io API")
sys.exit(1)
else:
# Stock prices are returned sorted by date in ascending order
# We want to output them one by one as AirbyteMessages
response_json = response.json()
if response_json["resultsCount"] > 0:
results = response_json["results"]
for result in results:
data = {"date": datetime.fromtimestamp(result["t"]/1000, tz=timezone.utc).strftime("%Y-%m-%d"), "stock_ticker": config["stock_ticker"], "price": result["c"]}
record = {"stream": "stock_prices", "data": data, "emitted_at": int(datetime.now().timestamp()) * 1000}
output_message = {"type": "RECORD", "record": record}
print(json.dumps(output_message))
# We update the cursor as we print out the data, so that next time sync starts where we stopped printing out results
if stock_prices_stream["sync_mode"] == "incremental":
cursor_value = datetime.fromtimestamp(results[len(results)-1]["t"]/1000, tz=timezone.utc).strftime("%Y-%m-%d")
# Emit new state message.
if stock_prices_stream["sync_mode"] == "incremental":
output_message = {"type": "STATE", "state": {"data": {"stock_prices": {"date": cursor_value}}}}
print(json.dumps(output_message))
```
That code requires to add a new library import in the `source.py` file:
```python
from datetime import timezone
```
We will also need to parse `state` argument in the `run` method. In order to do that, we will modify
the code that calls `read` method from `run` method:
```python
elif command == "read":
config = read_json(get_input_file_path(parsed_args.config))
configured_catalog = read_json(get_input_file_path(parsed_args.catalog))
state = None
if parsed_args.state:
state = read_json(get_input_file_path(parsed_args.state))
read(config, configured_catalog, state)
```
Finally, we need to pass more arguments to our `_call_api` method in order to fetch only new prices
for incremental sync:
```python
def _call_api(ticker, token, from_day, to_day):
return requests.get(f"https://api.polygon.io/v2/aggs/ticker/{ticker}/range/1/day/{from_day}/{to_day}?sort=asc&limit=120&apiKey={token}")
```
You will notice that in order to test these changes you need a `state` object. If you run an
incremental sync without passing a state object, the new code will output a state object that you
can use with the next sync. If you run this:
```bash
python source.py read --config secrets/valid_config.json --catalog incremental_configured_catalog.json
```
The output will look like following:
```bash
{"type": "RECORD", "record": {"stream": "stock_prices", "data": {"date": "2022-03-07", "stock_ticker": "TSLA", "price": 804.58}, "emitted_at": 1647294277000}}
{"type": "RECORD", "record": {"stream": "stock_prices", "data": {"date": "2022-03-08", "stock_ticker": "TSLA", "price": 824.4}, "emitted_at": 1647294277000}}
{"type": "RECORD", "record": {"stream": "stock_prices", "data": {"date": "2022-03-09", "stock_ticker": "TSLA", "price": 858.97}, "emitted_at": 1647294277000}}
{"type": "RECORD", "record": {"stream": "stock_prices", "data": {"date": "2022-03-10", "stock_ticker": "TSLA", "price": 838.3}, "emitted_at": 1647294277000}}
{"type": "RECORD", "record": {"stream": "stock_prices", "data": {"date": "2022-03-11", "stock_ticker": "TSLA", "price": 795.35}, "emitted_at": 1647294277000}}
{"type": "STATE", "state": {"data": {"stock_prices": {"date": "2022-03-11"}}}}
```
Notice that the last line of output is the state object. Copy the state object:
```json
{ "stock_prices": { "date": "2022-03-11" } }
```
and paste it into a new file (i.e. `state.json`). Now you can run an incremental sync:
```bash
python source.py read --config secrets/valid_config.json --catalog incremental_configured_catalog.json --state state.json
```
## Run the incremental tests
The
[Connector Acceptance Test (CAT) suite](../../testing-connectors/connector-acceptance-tests-reference)
also includes test cases to ensure that incremental mode is working correctly.
To enable these tests, modify the existing `acceptance-test-config.yml` by adding the following:
```yaml
incremental:
- config_path: "secrets/valid_config.json"
configured_catalog_path: "incremental_configured_catalog.json"
future_state_path: "abnormal_state.json"
```
Your full `acceptance-test-config.yml` should look something like this:
```yaml
# See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference)
# for more information about how to configure these tests
connector_image: airbyte/source-stock-ticker-api:dev
tests:
spec:
- spec_path: "spec.json"
config_path: "secrets/valid_config.json"
connection:
- config_path: "secrets/valid_config.json"
status: "succeed"
- config_path: "secrets/invalid_config.json"
status: "failed"
discovery:
- config_path: "secrets/valid_config.json"
basic_read:
- config_path: "secrets/valid_config.json"
configured_catalog_path: "fullrefresh_configured_catalog.json"
empty_streams: []
full_refresh:
- config_path: "secrets/valid_config.json"
configured_catalog_path: "fullrefresh_configured_catalog.json"
incremental:
- config_path: "secrets/valid_config.json"
configured_catalog_path: "incremental_configured_catalog.json"
future_state_path: "abnormal_state.json"
```
You will also need to create an `abnormal_state.json` file with a date in the future, which should
not produce any records:
```javascript
{"stock_prices": {"date": "2121-01-01"}}
```
And lastly you need to modify the `check` function call to include the new parameters `from_day` and
`to_day` in `source.py`:
```python
def check(config):
# Validate input configuration by attempting to get the daily closing prices of the input stock ticker
response = _call_api(ticker=config["stock_ticker"], token=config["api_key"], from_day=datetime.now().date()-timedelta(days=1), to_day=datetime.now().date())
if response.status_code == 200:
result = {"status": "SUCCEEDED"}
elif response.status_code == 403:
# HTTP code 403 means authorization failed so the API key is incorrect
result = {"status": "FAILED", "message": "API Key is incorrect."}
else:
result = {"status": "FAILED", "message": "Input configuration is incorrect. Please verify the input stock ticker and API key."}
output_message = {"type": "CONNECTION_STATUS", "connectionStatus": result}
print(json.dumps(output_message))
```
Run the tests once again:
```bash
./acceptance-test-docker.sh
```
And finally, you should see a successful test summary:
```
collecting ...
test_core.py ✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓✓ 86% ████████▋
test_full_refresh.py ✓ 91% █████████▏
test_incremental.py ✓✓ 100% ██████████
Results (8.90s):
22 passed
```
That's all you need to do to add incremental functionality to the stock ticker Source.
You can deploy the new version of your connector simply by running:
```bash
airbyte-ci connectors --name source-stock-ticker-api build
```
Bonus points: go to Airbyte UI and reconfigure the connection to use incremental sync.
Incremental definitely requires more configurability than full refresh, so your implementation may
deviate slightly depending on whether your cursor field is source defined or user-defined. If you
think you are running into one of those cases, check out our
[incremental](/using-airbyte/core-concepts/sync-modes/incremental-append.md) documentation for more
information on different types of configuration.

View File

@@ -5,7 +5,7 @@
The PokéAPI is primarly used as a tutorial and educational resource, as it requires zero dependencies. Learn how Airbyte and this connector works with these tutorials:
- [Airbyte Quickstart: An Introduction to Deploying and Syncing](../../using-airbyte/getting-started/readme.md)
- [Airbyte CDK Speedrun: A Quick Primer on Building Source Connectors](../../connector-development/tutorials/cdk-speedrun.md)
- [Using Connector Builder and the low-code CDK](../../connector-development/connector-builder-ui/overview.md)
- [How to Build ETL Sources in Under 30 Minutes: A Video Tutorial](https://www.youtube.com/watch?v=kJ3hLoNfz_E&t=13s&ab_channel=Airbyte)
## Features

8130
docusaurus/pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,9 +1,6 @@
const fs = require("fs");
const path = require("path");
const {
parseMarkdownContentTitle,
parseMarkdownFile,
} = require("@docusaurus/utils");
const { parseMarkdownContentTitle, parseMarkdownFile } = require("@docusaurus/utils");
const connectorsDocsRoot = "../docs/integrations";
const sourcesDocs = `${connectorsDocsRoot}/sources`;
@@ -20,7 +17,7 @@ function getFilenamesInDir(prefix, dir, excludes) {
fileName.endsWith("-migrations.md") ||
fileName.endsWith(".js") ||
fileName === "low-code.md"
)
),
)
.map((fileName) => fileName.replace(".md", ""))
.filter((fileName) => excludes.indexOf(fileName.toLowerCase()) === -1)
@@ -28,20 +25,20 @@ function getFilenamesInDir(prefix, dir, excludes) {
// Get the first header of the markdown document
try {
const filePath = path.join(dir, `${filename}.md`);
const fileContent = fs.readFileSync(filePath, 'utf8');
const firstLine = fileContent.split('\n').find(line => line.trim().startsWith('# '));
const contentTitle = firstLine ? firstLine.replace(/^#\s*/, '').trim() : filename;
const fileContent = fs.readFileSync(filePath, "utf8");
const firstLine = fileContent.split("\n").find((line) => line.trim().startsWith("# "));
const contentTitle = firstLine ? firstLine.replace(/^#\s*/, "").trim() : filename;
return {
type: 'doc',
type: "doc",
id: prefix + filename,
label: contentTitle || filename
label: contentTitle || filename,
};
} catch (error) {
console.warn(`Warning: Using filename as title for ${path.join(prefix, filename)}`);
return {
type: 'doc',
type: "doc",
id: prefix + filename,
label: filename
label: filename,
};
}
@@ -89,10 +86,9 @@ function getDestinationConnectors() {
}
function getEnterpriseConnectors() {
return getFilenamesInDir(
"integrations/enterprise-connectors/",
enterpriseConnectorDocs, ["readme"]
);
return getFilenamesInDir("integrations/enterprise-connectors/", enterpriseConnectorDocs, [
"readme",
]);
}
const sourcePostgres = {
@@ -252,19 +248,6 @@ const buildAConnector = {
type: "doc",
id: "connector-development/config-based/low-code-cdk-overview",
},
{
type: "category",
label: "Tutorial",
items: [
"connector-development/config-based/tutorial/getting-started",
"connector-development/config-based/tutorial/create-source",
"connector-development/config-based/tutorial/install-dependencies",
"connector-development/config-based/tutorial/connecting-to-the-API-source",
"connector-development/config-based/tutorial/reading-data",
"connector-development/config-based/tutorial/incremental-reads",
"connector-development/config-based/tutorial/testing",
],
},
{
type: "category",
label: "Understanding the YAML file",
@@ -306,7 +289,7 @@ const buildAConnector = {
"connector-development/config-based/advanced-topics/parameters",
"connector-development/config-based/advanced-topics/references",
"connector-development/config-based/advanced-topics/string-interpolation",
]
],
},
],
},
@@ -325,14 +308,12 @@ const buildAConnector = {
"connector-development/cdk-python/resumable-full-refresh-stream",
"connector-development/cdk-python/incremental-stream",
"connector-development/cdk-python/http-streams",
"connector-development/cdk-python/python-concepts",
"connector-development/cdk-python/migration-to-base-image",
"connector-development/cdk-python/stream-slices",
{
type: "category",
label: "Tutorials",
items: [
"connector-development/tutorials/cdk-speedrun",
"connector-development/tutorials/custom-python-connector/getting-started",
{
type: "category",
label: "Python CDK: Creating a Python Source",
@@ -359,9 +340,7 @@ const buildAConnector = {
type: "doc",
id: "connector-development/testing-connectors/README",
},
items: [
"connector-development/testing-connectors/connector-acceptance-tests-reference",
],
items: ["connector-development/testing-connectors/connector-acceptance-tests-reference"],
},
"connector-development/connector-specification-reference",
"connector-development/partner-certified-destinations",
@@ -396,8 +375,8 @@ const connectorCatalog = {
sourceMssql,
...getSourceConnectors(),
].sort((itemA, itemB) => {
const labelA = itemA?.label || '';
const labelB = itemB?.label || '';
const labelA = itemA?.label || "";
const labelB = itemB?.label || "";
return labelA.localeCompare(labelB);
}),
},
@@ -408,15 +387,13 @@ const connectorCatalog = {
type: "doc",
id: "integrations/destinations/README",
},
items: [
destinationS3,
destinationPostgres,
...getDestinationConnectors(),
].sort((itemA, itemB) => {
const labelA = itemA?.label || '';
const labelB = itemB?.label || '';
return labelA.localeCompare(labelB);
}),
items: [destinationS3, destinationPostgres, ...getDestinationConnectors()].sort(
(itemA, itemB) => {
const labelA = itemA?.label || "";
const labelB = itemB?.label || "";
return labelA.localeCompare(labelB);
},
),
},
{
type: "doc",
@@ -651,8 +628,8 @@ module.exports = {
id: "integrations/enterprise-connectors/README",
},
items: [...getEnterpriseConnectors()].sort((itemA, itemB) => {
const labelA = itemA?.label || '';
const labelB = itemB?.label || '';
const labelA = itemA?.label || "";
const labelB = itemB?.label || "";
return labelA.localeCompare(labelB);
}),
},
@@ -674,10 +651,7 @@ module.exports = {
type: "doc",
id: "operator-guides/configuring-airbyte",
},
items: [
"operator-guides/configuring-connector-resources",
"operator-guides/telemetry",
],
items: ["operator-guides/configuring-connector-resources", "operator-guides/telemetry"],
},
{
@@ -708,7 +682,7 @@ module.exports = {
items: [
{
type: "doc",
id: "access-management/role-mapping"
id: "access-management/role-mapping",
},
],
},
@@ -791,7 +765,8 @@ module.exports = {
label: "Release Notes",
link: {
type: "generated-index",
description: "We release new self-managed versions of Airbyte regularly. Airbyte Cloud customers always have the latest enhancements.",
description:
"We release new self-managed versions of Airbyte regularly. Airbyte Cloud customers always have the latest enhancements.",
},
items: [
"release_notes/v-1.5",
@@ -805,7 +780,8 @@ module.exports = {
label: "Historical release notes",
link: {
type: "generated-index",
description: "Historical release notes from before Airbyte 1.0 are preserved here for posterity."
description:
"Historical release notes from before Airbyte 1.0 are preserved here for posterity.",
},
items: [
"release_notes/aug_2024",
@@ -834,7 +810,7 @@ module.exports = {
"release_notes/september_2022",
"release_notes/august_2022",
"release_notes/july_2022",
]
],
},
],
},