1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Source Drift: Migrate Python CDK to Low-code CDK (#29121)

This commit is contained in:
btkcodedev
2023-08-16 04:22:35 +05:30
committed by GitHub
parent d4d6366fd7
commit 7a0e40eeb0
28 changed files with 374 additions and 1156 deletions

View File

@@ -34,5 +34,5 @@ COPY source_drift ./source_drift
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
LABEL io.airbyte.version=0.2.7
LABEL io.airbyte.version=0.3.0
LABEL io.airbyte.name=airbyte/source-drift

View File

@@ -1,64 +1,27 @@
# Drift Source
# Drift Source
This is the repository for the Drift source connector, written in Python.
For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/sources/drift).
This is the repository for the Drift configuration based source connector.
For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.com/integrations/sources/drift).
## Local development
### Prerequisites
**To iterate on this connector, make sure to complete this prerequisites section.**
#### Minimum Python version required `= 3.7.0`
#### Build & Activate Virtual Environment and install dependencies
From this connector directory, create a virtual environment:
```
python -m venv .venv
```
This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your
development environment of choice. To activate it from the terminal, run:
```
source .venv/bin/activate
pip install -r requirements.txt
```
If you are in an IDE, follow your IDE's instructions to activate the virtualenv.
Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is
used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`.
If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything
should work as you expect.
#### Building via Gradle
From the Airbyte repository root, run:
You can also build the connector in Gradle. This is typically used in CI and not needed for your development workflow.
To build using Gradle, from the Airbyte repository root, run:
```
./gradlew :airbyte-integrations:connectors:source-drift:build
```
#### Create credentials
**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.io/integrations/sources/drift)
to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `source_drift/spec.json` file.
Note that the `secrets` directory is gitignored by default, so there is no danger of accidentally checking in sensitive information.
See `sample_files/sample_config.json` for a sample config file.
**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.com/integrations/sources/drift)
to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `source_drift/spec.yaml` file.
Note that any directory named `secrets` is gitignored across the entire Airbyte repo, so there is no danger of accidentally checking in sensitive information.
See `integration_tests/sample_config.json` for a sample config file.
**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `source drift test creds`
and place them into `secrets/config.json`.
### Locally running the connector
```
python main.py spec
python main.py check --config secrets/config.json
python main.py discover --config secrets/config.json
python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json
```
### Unit Tests
To run unit tests locally, from the connector directory run:
```
python -m pytest unit_tests
```
### Locally running the connector docker image
#### Build
@@ -80,31 +43,40 @@ Then run any of the connector commands as follows:
docker run --rm airbyte/source-drift:dev spec
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-drift:dev check --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-drift:dev discover --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/sample_files:/sample_files airbyte/source-drift:dev read --config /secrets/config.json --catalog /sample_files/configured_catalog.json
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-drift:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
```
### Integration Tests
1. From the airbyte project root, run `./gradlew :airbyte-integrations:connectors:source-drift:integrationTest` to run the standard integration test suite.
1. To run additional integration tests, place your integration tests in a new directory `integration_tests` and run them with `python -m pytest -s integration_tests`.
Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named.
## Testing
#### Acceptance Tests
Customize `acceptance-test-config.yml` file to configure tests. See [Connector Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/connector-acceptance-tests-reference) for more information.
Customize `acceptance-test-config.yml` file to configure tests. See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference) for more information.
If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py.
To run your integration tests with acceptance tests, from the connector root, run
To run your integration tests with Docker, run:
```
docker build . --no-cache -t airbyte/source-drift:dev \
&& python -m pytest -p connector_acceptance_test.plugin
./acceptance-test-docker.sh
```
### Using gradle to run tests
All commands should be run from airbyte project root.
To run unit tests:
```
./gradlew :airbyte-integrations:connectors:source-drift:unitTest
```
To run acceptance and custom integration tests:
```
./gradlew :airbyte-integrations:connectors:source-drift:integrationTest
```
To run your integration tests with docker
## Dependency Management
All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development.
We split dependencies between two groups, dependencies that are:
* required for your connector to work need to go to `MAIN_REQUIREMENTS` list.
* required for the testing need to go to `TEST_REQUIREMENTS` list
### Publishing a new version of the connector
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
1. Make sure your changes are passing unit and integration tests
1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use SemVer).
1. Create a Pull Request
1. Pat yourself on the back for being an awesome contributor
1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master
1. Make sure your changes are passing unit and integration tests.
1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)).
1. Create a Pull Request.
1. Pat yourself on the back for being an awesome contributor.
1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.

View File

@@ -0,0 +1,3 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

View File

@@ -1,20 +1,42 @@
# See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference)
# for more information about how to configure these tests
connector_image: airbyte/source-drift:dev
tests:
acceptance_tests:
spec:
- spec_path: "source_drift/spec.json"
tests:
- spec_path: "source_drift/spec.yaml"
connection:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
tests:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
discovery:
- config_path: "secrets/config.json"
tests:
- config_path: "secrets/config.json"
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams: ["accounts", ]
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams:
- name: users
bypass_reason: "Sandbox account can't seed this stream"
- name: contacts
bypass_reason: "Sandbox account can't seed this stream"
- name: accounts
bypass_reason: "Sandbox account can't seed this stream"
# expect_records:
# path: "integration_tests/expected_records.jsonl"
# extra_fields: no
# exact_order: no
# extra_records: yes
incremental:
bypass_reason: "This connector does not implement incremental sync"
# TODO uncomment this block this block if your connector implements incremental sync:
# tests:
# - config_path: "secrets/config.json"
# configured_catalog_path: "integration_tests/configured_catalog.json"
# future_state:
# future_state_path: "integration_tests/abnormal_state.json"
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"

View File

@@ -1,2 +1,3 @@
#!/usr/bin/env sh
source "$(git rev-parse --show-toplevel)/airbyte-integrations/bases/connector-acceptance-test/acceptance-test-docker.sh"

View File

@@ -1,3 +1,3 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

View File

@@ -0,0 +1,5 @@
{
"todo-stream-name": {
"todo-field-name": "todo-abnormal-value"
}
}

View File

@@ -10,5 +10,4 @@ pytest_plugins = ("connector_acceptance_test.plugin",)
@pytest.fixture(scope="session", autouse=True)
def connector_setup():
"""This fixture is a placeholder for external resources that acceptance test might require."""
yield

View File

@@ -1,167 +0,0 @@
{
"streams": [
{
"name": "accounts",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"ownerId": {
"type": "integer"
},
"name": {
"type": "string"
},
"domain": {
"type": "string"
},
"accountId": {
"type": "string"
},
"customProperties": {
"type": "array",
"items": {
"type": "object",
"properties": {
"label": {
"type": "string"
},
"name": {
"type": "string"
},
"value": {},
"type": {
"type": "string"
}
}
}
},
"deleted": {
"type": "boolean"
},
"createDateTime": {
"type": "integer"
},
"updateDateTime": {
"type": "integer"
},
"targeted": {
"type": "boolean"
}
}
},
"supported_sync_modes": ["full_refresh"],
"source_defined_cursor": false
},
{
"name": "conversations",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": "integer"
},
"participants": {
"type": "array",
"items": {
"type": "integer"
}
},
"status": {
"type": "string",
"enum": ["open", "closed", "pending", "bulk_sent"]
},
"contactId": {
"type": "integer"
},
"inboxId": {
"type": "integer"
},
"createdAt": {
"type": "integer"
},
"updatedAt": {
"type": "integer"
},
"relatedPlaybookId": {
"type": ["null", "string"]
},
"conversationTags": {
"type": "array",
"items": {
"type": "object",
"properties": {
"color": {
"type": "string",
"description": "HEX value"
},
"name": {
"type": "string"
}
}
}
}
}
},
"supported_sync_modes": ["full_refresh"],
"source_defined_cursor": false
},
{
"name": "users",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": "integer"
},
"orgId": {
"type": "integer"
},
"name": {
"type": "string"
},
"alias": {
"type": "string"
},
"email": {
"type": "string"
},
"phone": {
"type": "string"
},
"locale": {
"type": "string"
},
"availability": {
"type": "string"
},
"role": {
"type": "string"
},
"timeZone": {
"type": "string"
},
"avatarUrl": {
"type": "string"
},
"verified": {
"type": "boolean"
},
"bot": {
"type": "boolean"
},
"createdAt": {
"type": "integer"
},
"updatedAt": {
"type": "integer"
}
}
},
"supported_sync_modes": ["full_refresh"],
"source_defined_cursor": false
}
]
}

View File

@@ -26,6 +26,24 @@
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "contacts",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "messages",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}

View File

@@ -1,27 +1,25 @@
data:
allowedHosts:
hosts:
- driftapi.com
- https://driftapi.com/
registries:
oss:
enabled: true
cloud:
enabled: false # hide Source Drift https://github.com/airbytehq/airbyte/issues/24270
connectorSubtype: api
connectorType: source
definitionId: 445831eb-78db-4b1f-8f1f-0d96ad8739e2
dockerImageTag: 0.2.7
dockerImageTag: 0.3.0
dockerRepository: airbyte/source-drift
githubIssueLabel: source-drift
icon: drift.svg
license: MIT
name: Drift
registries:
cloud:
enabled: false # hide Source Drift https://github.com/airbytehq/airbyte/issues/24270
oss:
enabled: true
releaseDate: 2023-08-10
releaseStage: alpha
supportLevel: community
documentationUrl: https://docs.airbyte.com/integrations/sources/drift
tags:
- language:python
ab_internal:
sl: 100
ql: 200
supportLevel: community
- language:lowcode
metadataSpecVersion: "1.0"

View File

@@ -1,2 +1,2 @@
# This file is autogenerated -- only edit if you know what you are doing. Use setup.py for declaring dependencies.
-e ../../bases/connector-acceptance-test
-e .

View File

@@ -1,161 +0,0 @@
{
"streams": [
{
"name": "accounts",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"ownerId": {
"type": "integer"
},
"name": {
"type": "string"
},
"domain": {
"type": "string"
},
"accountId": {
"type": "string"
},
"customProperties": {
"type": "array",
"items": {
"type": "object",
"properties": {
"label": {
"type": "string"
},
"name": {
"type": "string"
},
"value": {},
"type": {
"type": "string"
}
}
}
},
"deleted": {
"type": "boolean"
},
"createDateTime": {
"type": "integer"
},
"updateDateTime": {
"type": "integer"
},
"targeted": {
"type": "boolean"
}
}
}
},
{
"name": "conversations",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": "integer"
},
"participants": {
"type": "array",
"items": {
"type": "integer"
}
},
"status": {
"type": "string",
"enum": ["open", "closed", "pending", "bulk_sent"]
},
"contactId": {
"type": "integer"
},
"inboxId": {
"type": "integer"
},
"createdAt": {
"type": "integer"
},
"updatedAt": {
"type": "integer"
},
"relatedPlaybookId": {
"type": ["null", "string"]
},
"conversationTags": {
"type": "array",
"items": {
"type": "object",
"properties": {
"color": {
"type": "string",
"description": "HEX value"
},
"name": {
"type": "string"
}
}
}
}
}
}
},
{
"name": "users",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": "integer"
},
"orgId": {
"type": "integer"
},
"name": {
"type": "string"
},
"alias": {
"type": "string"
},
"email": {
"type": "string"
},
"phone": {
"type": "string"
},
"locale": {
"type": "string"
},
"availability": {
"type": "string"
},
"role": {
"type": "string"
},
"timeZone": {
"type": "string"
},
"avatarUrl": {
"type": "string"
},
"verified": {
"type": "boolean"
},
"bot": {
"type": "boolean"
},
"createdAt": {
"type": "integer"
},
"updatedAt": {
"type": "integer"
}
}
}
}
]
}

View File

@@ -1,176 +0,0 @@
{
"streams": [
{
"stream": {
"name": "accounts",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"ownerId": {
"type": "integer"
},
"name": {
"type": "string"
},
"domain": {
"type": "string"
},
"accountId": {
"type": "string"
},
"customProperties": {
"type": "array",
"items": {
"type": "object",
"properties": {
"label": {
"type": "string"
},
"name": {
"type": "string"
},
"value": {},
"type": {
"type": "string"
}
}
}
},
"deleted": {
"type": "boolean"
},
"createDateTime": {
"type": "integer"
},
"updateDateTime": {
"type": "integer"
},
"targeted": {
"type": "boolean"
}
}
},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "conversations",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": "integer"
},
"participants": {
"type": "array",
"items": {
"type": "integer"
}
},
"status": {
"type": "string",
"enum": ["open", "closed", "pending", "bulk_sent"]
},
"contactId": {
"type": "integer"
},
"inboxId": {
"type": "integer"
},
"createdAt": {
"type": "integer"
},
"updatedAt": {
"type": "integer"
},
"relatedPlaybookId": {
"type": ["null", "string"]
},
"conversationTags": {
"type": "array",
"items": {
"type": "object",
"properties": {
"color": {
"type": "string",
"description": "HEX value"
},
"name": {
"type": "string"
}
}
}
}
}
},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "users",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": "integer"
},
"orgId": {
"type": "integer"
},
"name": {
"type": "string"
},
"alias": {
"type": "string"
},
"email": {
"type": "string"
},
"phone": {
"type": "string"
},
"locale": {
"type": "string"
},
"availability": {
"type": "string"
},
"role": {
"type": "string"
},
"timeZone": {
"type": "string"
},
"avatarUrl": {
"type": "string"
},
"verified": {
"type": "boolean"
},
"bot": {
"type": "boolean"
},
"createdAt": {
"type": "integer"
},
"updatedAt": {
"type": "integer"
}
}
},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}

View File

@@ -1,5 +0,0 @@
{
"credentials": {
"access_token": "123412341234sfsdfs"
}
}

View File

@@ -5,12 +5,14 @@
from setuptools import find_packages, setup
MAIN_REQUIREMENTS = ["airbyte-cdk~=0.1", "requests~=2.22"]
MAIN_REQUIREMENTS = [
"airbyte-cdk~=0.1",
]
TEST_REQUIREMENTS = [
"requests-mock~=1.9.3",
"pytest~=6.1",
"pytest~=6.2",
"pytest-mock~=3.6.1",
"connector-acceptance-test",
]
setup(
@@ -20,7 +22,7 @@ setup(
author_email="contact@airbyte.io",
packages=find_packages(),
install_requires=MAIN_REQUIREMENTS,
package_data={"": ["*.json", "schemas/*.json", "schemas/shared/*.json"]},
package_data={"": ["*.json", "*.yaml", "schemas/*.json", "schemas/shared/*.json"]},
extras_require={
"tests": TEST_REQUIREMENTS,
},

View File

@@ -1,4 +1,8 @@
from .client import Client
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from .source import SourceDrift
__all__ = ["SourceDrift", "Client"]
__all__ = ["SourceDrift"]

View File

@@ -1,4 +0,0 @@
from .client import Client
from .common import APIError, AuthError, NotFoundError, ServerError, ValidationError
__all__ = ["Client", "APIError", "AuthError", "ServerError", "ValidationError", "NotFoundError"]

View File

@@ -1,154 +0,0 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import json
from enum import IntEnum
from functools import partial
from typing import Iterator, List
import requests
from .common import _parsed_response, cursor_paginator, next_url_paginator
class APIClient:
USER_AGENT = "Airbyte contact@airbyte.io"
BASE_URL = "https://driftapi.com"
def __init__(self, access_token: str):
self._token = access_token
self._headers = {
"Authorization": f"Bearer {self._token}",
"User-Agent": self.USER_AGENT,
"Content-Type": "application/json",
"Accept": "application/json",
}
self.accounts = Account(client=self)
self.contacts = Contact(client=self)
self.conversations = Conversation(client=self)
self.messages = Message(client=self)
self.users = User(client=self)
@_parsed_response
def post(self, url, data, **kwargs):
return requests.post(self.full_url(url), data=json.dumps(data), headers=self._headers, **kwargs)
@_parsed_response
def patch(self, url, data, **kwargs):
"""Used in fixtures.py only"""
return requests.patch(self.full_url(url), data=json.dumps(data), headers=self._headers, **kwargs)
@_parsed_response
def get(self, url, **kwargs):
return requests.get(self.full_url(url), headers=self._headers, **kwargs)
def full_url(self, url):
return f"{self.BASE_URL}/{url}"
def check_token(self, token: str):
return self.post("app/token_info", data={"accessToken": token})
class User:
def __init__(self, client: APIClient):
self.client = client
def get(self, pk) -> dict:
return self.client.get(f"users/{pk}")["data"]
def list(self) -> Iterator[dict]:
"""Doesn't support pagination and return all users at once"""
yield from self.client.get("users/list")["data"]
def update(self, pk, **attributes) -> dict:
params = {"userId": pk}
return self.client.patch("users/update", data=attributes, params=params)
class Conversation:
pagination = partial(cursor_paginator, per_page=50)
class Status(IntEnum):
OPEN = 1
CLOSED = 2
PENDING = 3
def __init__(self, client: APIClient):
self.client = client
def get(self, pk: int) -> dict:
return self.client.get(f"conversations/{pk}")
def list(self, statuses: List[Status] = None) -> Iterator[dict]:
"""Conversations returned will be ordered by their updatedAt time with the most recently updated at the top of the list."""
statuses = list(map(int, statuses or []))
params = {"statusId": statuses}
request = partial(self.client.get, url="conversations/list")
return self.pagination(request, params=params)
def create(self, **attributes) -> dict:
return self.client.post("conversations/new", data=attributes)
def update(self, pk: int, **attributes) -> dict:
params = {"userId": pk}
return self.client.patch("conversations/update", data=attributes, params=params)
class Message:
pagination = partial(cursor_paginator, per_page=50)
def __init__(self, client: APIClient):
self.client = client
def list(self, conversation_id: int) -> Iterator[dict]:
"""You have to provide conversation ID to get list of messages"""
request = partial(self.client.get, url=f"conversations/{conversation_id}/messages")
for data in self.pagination(request):
yield from data.get("messages", [])
def create(self, conversation_id: int, **attributes) -> dict:
return self.client.post(f"conversations/{conversation_id}/messages", data=attributes).get("data")
class Account:
pagination = partial(next_url_paginator, per_page=100)
def __init__(self, client: APIClient):
self.client = client
def get(self, pk: int) -> dict:
return self.client.get(f"accounts/{pk}")
def list(self) -> Iterator[dict]:
request = partial(self.client.get, url="accounts")
for data in self.pagination(request):
yield from data.get("accounts", [])
def create(self, **attributes) -> dict:
return self.client.post("accounts/create", data=attributes).get("data")
def update(self, pk: int, **attributes) -> dict:
params = {"userId": pk}
return self.client.patch("accounts/update", data=attributes, params=params)
class Contact:
def __init__(self, client: APIClient):
self.client = client
def get(self, pk: int) -> dict:
return self.client.get(f"contacts/{pk}")["data"]
def list(self, email: str) -> Iterator[dict]:
"""List contacts only possible with exact email filter"""
yield from self.client.get("contacts", params={"email": email})["data"]
def create(self, **attributes) -> dict:
return self.client.post("contacts", data=attributes).get("data")
def update(self, pk: int, **attributes) -> dict:
params = {"userId": pk}
return self.client.patch("contacts/update", data=attributes, params=params)

View File

@@ -1,53 +0,0 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from typing import Dict, Iterator, Tuple
from airbyte_cdk.sources.deprecated.client import BaseClient
from .api import APIClient
from .common import AuthError, ValidationError
class DriftAuthenticator:
def __init__(self, config: Dict):
self.config = config
def get_token(self) -> str:
access_token = self.config.get("access_token")
if access_token:
return access_token
else:
return self.config.get("credentials").get("access_token")
class Client(BaseClient):
def __init__(self, **config: Dict):
super().__init__()
self._client = APIClient(access_token=DriftAuthenticator(config).get_token())
def stream__accounts(self, **kwargs) -> Iterator[dict]:
yield from self._client.accounts.list()
def stream__users(self, **kwargs) -> Iterator[dict]:
yield from self._client.users.list()
def stream__conversations(self, **kwargs) -> Iterator[dict]:
yield from self._client.conversations.list()
def health_check(self) -> Tuple[bool, str]:
alive = True
error_msg = None
try:
# we don't care about response, just checking authorisation
self._client.check_token("definitely_not_a_token")
except ValidationError: # this is ok because `definitely_not_a_token`
pass
except AuthError as error:
alive = False
error_msg = str(error)
return alive, error_msg

View File

@@ -1,114 +0,0 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import collections
import functools
from typing import Iterator
from urllib.parse import parse_qs, urlparse
import requests
class ServerError(Exception):
"""Server respond with error"""
class APIError(Exception):
"""Base class for API errors""" ""
class ValidationError(APIError):
"""Provided data has failed validation"""
class AuthError(APIError):
"""Token is wrong or expired"""
class NotFoundError(APIError):
"""Object not found"""
class RateLimitError(APIError):
"""API calls reached allowed limit"""
def cursor_paginator(request, start_index: int = None, per_page: int = 100, params: dict = None) -> Iterator[dict]:
"""Paginator that use cursor offset to navigate"""
params = params or {}
index = start_index
while True:
result = request(params={**params, "next": index, "limit": per_page})
if isinstance(result["data"], collections.abc.Sequence):
yield from result["data"]
else:
yield result["data"]
index = result.get("pagination", {}).get("next")
if not index:
break
def next_url_paginator(request, start_index: int = None, per_page: int = 100, params: dict = None) -> Iterator[dict]:
"""Paginator that use next url to navigate"""
params = params or {}
size = per_page
index = start_index
while True:
result = request(params={**params, "index": index, "size": size})
if isinstance(result["data"], collections.abc.Sequence):
yield from result["data"]
else:
yield result["data"]
next_url = result["data"].get("next")
if not next_url:
break
# parse url to unify request command
next_url = urlparse(next_url)
next_params = parse_qs(next_url.query)
index = next_params.get("index", [None])[0]
size = next_params.get("size", [None])[0]
def exception_from_code(code: int, message: str) -> Exception:
"""Map response code to exception class"""
mapping = {
400: ValidationError,
401: AuthError,
403: AuthError,
429: RateLimitError,
404: NotFoundError,
500: ServerError,
502: ServerError,
503: ServerError,
504: ServerError,
}
return mapping.get(code, APIError)(code, message)
def _parsed_response(func):
"""Decorator to check response status and parse its body"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
response = func(*args, **kwargs)
result = response.json() if response.text else {}
if not response.ok:
msg = result # fallback to the whole response
if "error" in result:
msg = result["error"].get("message", result)
# multiple errors? grab all of them
elif "errors" in result:
msg = result["errors"]
raise exception_from_code(response.status_code, msg)
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as err:
raise ServerError(err.request.status_code, "Connection Error") from err
return result
return wrapper

View File

@@ -1,54 +0,0 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import os
from .api import APIClient
class FakeDataFactory:
@staticmethod
def account(seed):
return {
"ownerId": 2000 + seed,
"name": f"Company Name {seed}",
"domain": f"www.domain.n{seed}.com",
"customProperties": [{"label": "My Number", "name": " my number", "value": 1, "type": "NUMBER"}],
"targeted": True,
}
@staticmethod
def contact(seed):
return {"attributes": {"email": f"airbyte-test-email-{seed}@airbyte.io"}}
@staticmethod
def conversation(seed, email=None):
return {
"email": email or f"airbyte-test-email-{seed}@airbyte.io",
"message": {"body": f"Test conversation message #{seed}", "attributes": {"integrationSource": "Message from airbyte tests"}},
}
@staticmethod
def message(seed):
return {
"type": "chat",
"body": f"Test message #{seed}",
}
def main():
client = APIClient(access_token=os.getenv("DRIFT_TOKEN", "YOUR_TOKEN_HERE"))
# create 120 accounts and 120 conversation with 120 new contacts
for i in range(120):
client.accounts.create(**FakeDataFactory.account(i + 1))
conversation = client.conversations.create(**FakeDataFactory.conversation(i))
# in each conversation create +3 additional messages
for k in range(3):
client.messages.create(conversation_id=conversation["id"], **FakeDataFactory.message(k))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,130 @@
version: "0.29.0"
definitions:
selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: ["data"]
requester:
type: HttpRequester
url_base: "https://driftapi.com"
http_method: "GET"
authenticator:
type: BearerAuthenticator
api_token: "{{ config['credentials']['access_token'] }}"
retriever:
type: SimpleRetriever
record_selector:
$ref: "#/definitions/selector"
paginator:
type: "NoPagination"
requester:
$ref: "#/definitions/requester"
base_stream:
type: DeclarativeStream
retriever:
$ref: "#/definitions/retriever"
base_paginator:
type: "DefaultPaginator"
pagination_strategy:
type: "CursorPagination"
cursor_value: "{{ last_records['next'] }}"
page_token_option:
type: "RequestPath"
field_name: "page_token"
inject_into: "request_parameter"
accounts_stream:
$ref: "#/definitions/base_stream"
retriever:
$ref: "#/definitions/retriever"
paginator:
$ref: "#/definitions/base_paginator"
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: ["data", "accounts"]
name: "accounts"
primary_key: "ownerId"
$parameters:
path: "/accounts"
conversations_stream:
$ref: "#/definitions/base_stream"
retriever:
$ref: "#/definitions/retriever"
paginator:
$ref: "#/definitions/base_paginator"
name: "conversations"
primary_key: "id"
$parameters:
path: "/conversations"
users_stream:
$ref: "#/definitions/base_stream"
name: "users"
primary_key: "id"
$parameters:
path: "/users"
contacts_stream:
$ref: "#/definitions/base_stream"
$parameters:
name: "contacts"
primary_key: "id"
path: "/contacts"
retriever:
$ref: "#/definitions/retriever"
requester:
$ref: "#/definitions/requester"
request_parameters:
email: "{{ config['email'] }}"
messages_partition_router:
type: SubstreamPartitionRouter
parent_stream_configs:
- stream: "#/definitions/conversations_stream"
parent_key: "id"
partition_field: "parent_id"
messages_stream:
$ref: "#/definitions/base_stream"
$parameters:
name: "messages"
primary_key: "id"
path: "/conversations/{{ stream_partition.parent_id }}/messages"
retriever:
$ref: "#/definitions/retriever"
paginator:
$ref: "#/definitions/base_paginator"
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: ["data", "messages"]
partition_router:
$ref: "#/definitions/messages_partition_router"
streams:
- "#/definitions/accounts_stream"
- "#/definitions/conversations_stream"
- "#/definitions/users_stream"
- "#/definitions/contacts_stream"
- "#/definitions/messages_stream"
check:
type: CheckStream
stream_names:
- "accounts"
- "conversations"
- "users"
- "contacts"
- "messages"

View File

@@ -2,11 +2,17 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.sources.deprecated.base_source import BaseSource
"""
This file provides the necessary constructs to interpret a provided declarative YAML configuration file into
source connector.
from .client import Client
WARNING: Do not modify this file.
"""
class SourceDrift(BaseSource):
client_class = Client
# Declarative Source
class SourceDrift(YamlDeclarativeSource):
def __init__(self):
super().__init__(**{"path_to_yaml": "manifest.yaml"})

View File

@@ -1,137 +0,0 @@
{
"documentationUrl": "https://docs.airbyte.com/integrations/sources/drift",
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Drift Spec",
"type": "object",
"required": [],
"additionalProperties": true,
"properties": {
"credentials": {
"title": "Authorization Method",
"type": "object",
"oneOf": [
{
"type": "object",
"title": "OAuth2.0",
"required": [
"client_id",
"client_secret",
"access_token",
"refresh_token"
],
"properties": {
"credentials": {
"type": "string",
"const": "oauth2.0",
"order": 0
},
"client_id": {
"type": "string",
"title": "Client ID",
"description": "The Client ID of your Drift developer application.",
"airbyte_secret": true
},
"client_secret": {
"type": "string",
"title": "Client Secret",
"description": "The Client Secret of your Drift developer application.",
"airbyte_secret": true
},
"access_token": {
"type": "string",
"title": "Access Token",
"description": "Access Token for making authenticated requests.",
"airbyte_secret": true
},
"refresh_token": {
"type": "string",
"title": "Refresh Token",
"description": "Refresh Token to renew the expired Access Token.",
"default": "",
"airbyte_secret": true
}
}
},
{
"title": "Access Token",
"type": "object",
"required": ["access_token"],
"properties": {
"credentials": {
"type": "string",
"const": "access_token",
"order": 0
},
"access_token": {
"type": "string",
"title": "Access Token",
"description": "Drift Access Token. See the <a href=\"https://docs.airbyte.com/integrations/sources/drift\">docs</a> for more information on how to generate this key.",
"airbyte_secret": true
}
}
}
]
}
}
},
"advanced_auth": {
"auth_flow_type" : "oauth2.0",
"predicate_key" : [
"credentials",
"credentials"
],
"predicate_value" : "oauth2.0",
"oauth_config_specification" : {
"complete_oauth_output_specification" : {
"type" : "object",
"properties" : {
"access_token" : {
"type" : "string",
"path_in_connector_config" : [
"credentials",
"access_token"
]
},
"refresh_token" : {
"type" : "string",
"path_in_connector_config" : [
"credentials",
"refresh_token"
]
}
}
},
"complete_oauth_server_input_specification" : {
"type" : "object",
"properties" : {
"client_id" : {
"type" : "string"
},
"client_secret" : {
"type" : "string"
}
}
},
"complete_oauth_server_output_specification" : {
"type" : "object",
"properties" : {
"client_id" : {
"type" : "string",
"path_in_connector_config" : [
"credentials",
"client_id"
]
},
"client_secret" : {
"type" : "string",
"path_in_connector_config" : [
"credentials",
"client_secret"
]
}
}
}
}
}
}

View File

@@ -0,0 +1,105 @@
documentationUrl: https://docs.airbyte.com/integrations/sources/drift
connectionSpecification:
$schema: http://json-schema.org/draft-07/schema#
title: Drift Spec
type: object
required: []
additionalProperties: true
properties:
email:
type: string
description: Email used as parameter for contacts stream
title: Email parameter for contacts stream
default: "test@test.com"
credentials:
title: Authorization Method
type: object
oneOf:
- type: object
title: OAuth2.0
required:
- client_id
- client_secret
- access_token
- refresh_token
properties:
credentials:
type: string
const: oauth2.0
order: 0
client_id:
type: string
title: Client ID
description: The Client ID of your Drift developer application.
airbyte_secret: true
client_secret:
type: string
title: Client Secret
description: The Client Secret of your Drift developer application.
airbyte_secret: true
access_token:
type: string
title: Access Token
description: Access Token for making authenticated requests.
airbyte_secret: true
refresh_token:
type: string
title: Refresh Token
description: Refresh Token to renew the expired Access Token.
default: ''
airbyte_secret: true
- title: Access Token
type: object
required:
- access_token
properties:
credentials:
type: string
const: access_token
order: 0
access_token:
type: string
title: Access Token
description: Drift Access Token. See the <a href="https://docs.airbyte.com/integrations/sources/drift">docs</a>
for more information on how to generate this key.
airbyte_secret: true
advanced_auth:
auth_flow_type: oauth2.0
predicate_key:
- credentials
- credentials
predicate_value: oauth2.0
oauth_config_specification:
complete_oauth_output_specification:
type: object
properties:
access_token:
type: string
path_in_connector_config:
- credentials
- access_token
refresh_token:
type: string
path_in_connector_config:
- credentials
- refresh_token
complete_oauth_server_input_specification:
type: object
properties:
client_id:
type: string
client_secret:
type: string
complete_oauth_server_output_specification:
type: object
properties:
client_id:
type: string
path_in_connector_config:
- credentials
- client_id
client_secret:
type: string
path_in_connector_config:
- credentials
- client_secret

View File

@@ -1,23 +0,0 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import pytest
from source_drift.client import AuthError, Client
config = {"credentials": {"access_token": "wrong_key"}}
def test__heal_check_with_wrong_token():
client = Client(**config)
alive, error = client.health_check()
assert not alive
assert error == "(401, 'The access token is invalid or has expired')"
def test__users_with_wrong_token():
client = Client(**config)
with pytest.raises(AuthError, match="(401, 'The access token is invalid or has expired')"):
next(client.stream__users())

View File

@@ -49,10 +49,11 @@ The Drift connector should not run into Drift API limitations under normal usage
## CHANGELOG
| Version | Date | Pull Request | Subject |
|:--------|:-----------| :--- |:--------------------------------------------------------------------|
| Version | Date | Pull Request | Subject |
|:--------|:-----------| :------------------------------------------------------- |:--------------------------------------------------------------------|
| 0.3.0 | 2023-08-05 | [29121](https://github.com/airbytehq/airbyte/pull/29121) | Migrate Python CDK to Low Code CDK |
| 0.2.7 | 2023-06-09 | [27202](https://github.com/airbytehq/airbyte/pull/27202) | Remove authSpecification in favour of advancedAuth in specification |
| 0.2.6 | 2023-03-07 | [23810](https://github.com/airbytehq/airbyte/pull/23810) | Prepare for cloud |
| 0.2.5 | 2021-12-14 | [8429](https://github.com/airbytehq/airbyte/pull/8429) | Updated titles and descriptions |
| 0.2.3 | 2021-10-25 | [7337](https://github.com/airbytehq/airbyte/pull/7337) | Added support of `OAuth 2.0` authorisation option |
| `0.2.3` | 2021-10-27 | [7247](https://github.com/airbytehq/airbyte/pull/7247) | Migrate to the CDK |
| 0.2.5 | 2021-12-14 | [8429](https://github.com/airbytehq/airbyte/pull/8429) | Updated titles and descriptions |
| 0.2.3 | 2021-10-25 | [7337](https://github.com/airbytehq/airbyte/pull/7337) | Added support of `OAuth 2.0` authorisation option |
| 0.2.3 | 2021-10-27 | [7247](https://github.com/airbytehq/airbyte/pull/7247) | Migrate to the CDK |