1
0
mirror of synced 2025-12-25 02:09:19 -05:00

New destination: databend (community PR #19815) (#20909)

* feat: Add databend destination

Co-authored-by: hantmac <hantmac@outlook.com>
Co-authored-by: josephkmh <joseph@airbyte.io>
Co-authored-by: Sajarin <sajarindider@gmail.com>
This commit is contained in:
Greg Solovyev
2023-01-09 10:19:07 -08:00
committed by GitHub
parent 45006a750b
commit 56c686440e
22 changed files with 1044 additions and 2 deletions

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 7.7 KiB

View File

@@ -418,3 +418,10 @@
documentationUrl: https://docs.airbyte.com/integrations/destinations/yugabytedb
icon: yugabytedb.svg
releaseStage: alpha
- name: Databend
destinationDefinitionId: 302e4d8e-08d3-4098-acd4-ac67ca365b88
dockerRepository: airbyte/destination-databend
dockerImageTag: 0.1.0
icon: databend.svg
documentationUrl: https://docs.airbyte.com/integrations/destinations/databend
releaseStage: alpha

View File

@@ -7140,3 +7140,69 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-databend:0.1.0"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/databend"
connectionSpecification:
$schema: "http://json-schema.org/draft-07/schema#"
title: "Destination Databend"
type: "object"
required:
- "host"
- "username"
- "database"
additionalProperties: true
properties:
host:
title: "Host"
description: "Hostname of the database."
type: "string"
order: 0
protocol:
title: "Protocol"
description: "Protocol of the host."
type: "string"
examples:
- "https"
default: "https"
order: 1
port:
title: "Port"
description: "Port of the database."
type: "integer"
minimum: 0
maximum: 65536
default: 443
examples:
- "443"
order: 2
database:
title: "DB Name"
description: "Name of the database."
type: "string"
order: 3
table:
title: "Default Table"
description: "The default table was written to."
type: "string"
examples:
- "default"
default: "default"
order: 4
username:
title: "User"
description: "Username to use to access the database."
type: "string"
order: 5
password:
title: "Password"
description: "Password associated with the username."
type: "string"
airbyte_secret: true
order: 6
supportsIncremental: true
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes:
- "overwrite"
- "append"

View File

@@ -1,5 +1,3 @@
import java.nio.file.Paths
plugins {
id 'airbyte-docker'
id 'airbyte-python'

View File

@@ -0,0 +1,5 @@
*
!Dockerfile
!main.py
!destination_databend
!setup.py

View File

@@ -0,0 +1,38 @@
FROM python:3.9.11-alpine3.15 as base
# build and load all requirements
FROM base as builder
WORKDIR /airbyte/integration_code
# upgrade pip to the latest version
RUN apk --no-cache upgrade \
&& pip install --upgrade pip \
&& apk --no-cache add tzdata build-base
COPY setup.py ./
# install necessary packages to a temporary folder
RUN pip install --prefix=/install .
# build a clean environment
FROM base
WORKDIR /airbyte/integration_code
# copy all loaded and built libraries to a pure basic image
COPY --from=builder /install /usr/local
# add default timezone settings
COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime
RUN echo "Etc/UTC" > /etc/timezone
# bash is installed for more convenient debugging.
RUN apk --no-cache add bash
# copy payload code only
COPY main.py ./
COPY destination_databend ./destination_databend
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/destination-databend

View File

@@ -0,0 +1,123 @@
# Databend Destination
This is the repository for the Databend destination connector, written in Python.
For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/destinations/databend).
## Local development
### Prerequisites
**To iterate on this connector, make sure to complete this prerequisites section.**
#### Minimum Python version required `= 3.7.0`
#### Build & Activate Virtual Environment and install dependencies
From this connector directory, create a virtual environment:
```
python -m venv .venv
```
This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your
development environment of choice. To activate it from the terminal, run:
```
source .venv/bin/activate
pip install -r requirements.txt
```
If you are in an IDE, follow your IDE's instructions to activate the virtualenv.
Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is
used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`.
If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything
should work as you expect.
#### Building via Gradle
From the Airbyte repository root, run:
```
./gradlew :airbyte-integrations:connectors:destination-databend:build
```
#### Create credentials
**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.io/integrations/destinations/databend)
to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `destination_databend/spec.json` file.
Note that the `secrets` directory is gitignored by default, so there is no danger of accidentally checking in sensitive information.
See `integration_tests/sample_config.json` for a sample config file.
**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `destination databend test creds`
and place them into `secrets/config.json`.
### Locally running the connector
```
python main.py spec
python main.py check --config secrets/config.json
python main.py discover --config secrets/config.json
python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json
```
### Locally running the connector docker image
#### Build
First, make sure you build the latest Docker image:
```
docker build . -t airbyte/destination-databend:dev
```
You can also build the connector image via Gradle:
```
./gradlew :airbyte-integrations:connectors:destination-databend:airbyteDocker
```
When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in
the Dockerfile.
#### Run
Then run any of the connector commands as follows:
```
docker run --rm airbyte/destination-databend:dev spec
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-databend:dev check --config /secrets/config.json
# messages.jsonl is a file containing line-separated JSON representing AirbyteMessages
cat messages.jsonl | docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-databend:dev write --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
```
## Testing
Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named.
First install test dependencies into your virtual environment:
```
pip install .[tests]
```
### Unit Tests
To run unit tests locally, from the connector directory run:
```
python -m pytest unit_tests
```
### Integration Tests
There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all destination connectors) and custom integration tests (which are specific to this connector).
#### Custom Integration tests
Place custom tests inside `integration_tests/` folder, then, from the connector root, run
```
python -m pytest integration_tests
```
#### Acceptance Tests
Coming soon:
### Using gradle to run tests
All commands should be run from airbyte project root.
To run unit tests:
```
./gradlew :airbyte-integrations:connectors:destination-databend:unitTest
```
To run acceptance and custom integration tests:
```
./gradlew :airbyte-integrations:connectors:destination-databend:integrationTest
```
## Dependency Management
All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development.
We split dependencies between two groups, dependencies that are:
* required for your connector to work need to go to `MAIN_REQUIREMENTS` list.
* required for the testing need to go to `TEST_REQUIREMENTS` list
### Publishing a new version of the connector
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
1. Make sure your changes are passing unit and integration tests.
1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)).
1. Create a Pull Request.
1. Pat yourself on the back for being an awesome contributor.
1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.

View File

@@ -0,0 +1,8 @@
plugins {
id 'airbyte-python'
id 'airbyte-docker'
}
airbytePython {
moduleDirectory 'destination_databend'
}

View File

@@ -0,0 +1,8 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from .destination import DestinationDatabend
__all__ = ["DestinationDatabend"]

View File

@@ -0,0 +1,21 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from databend_sqlalchemy import connector
class DatabendClient:
def __init__(self, protocol: str, host: str, port: int, database: str, table: str, username: str, password: str = None):
self.protocol = protocol
self.host = host
self.port = port
self.database = database
self.table = table
self.username = username
self.password = password
def open(self):
handle = connector.connect(f"{self.protocol}://{self.username}:{self.password}@{self.host}:{self.port}").cursor()
return handle

View File

@@ -0,0 +1,89 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
import json
from datetime import datetime
from logging import getLogger
from typing import Any, Iterable, Mapping
from uuid import uuid4
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.destinations import Destination
from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, DestinationSyncMode, Status, Type
from destination_databend.client import DatabendClient
from .writer import create_databend_wirter
logger = getLogger("airbyte")
class DestinationDatabend(Destination):
def write(
self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
) -> Iterable[AirbyteMessage]:
"""
TODO
Reads the input stream of messages, config, and catalog to write data to the destination.
This method returns an iterable (typically a generator of AirbyteMessages via yield) containing state messages received
in the input message stream. Outputting a state message means that every AirbyteRecordMessage which came before it has been
successfully persisted to the destination. This is used to ensure fault tolerance in the case that a sync fails before fully completing,
then the source is given the last state message output from this method as the starting point of the next sync.
:param config: dict of JSON configuration matching the configuration declared in spec.json
:param configured_catalog: The Configured Catalog describing the schema of the data being received and how it should be persisted in the
destination
:param input_messages: The stream of input messages received from the source
:return: Iterable of AirbyteStateMessages wrapped in AirbyteMessage structs
"""
streams = {s.stream.name for s in configured_catalog.streams}
client = DatabendClient(**config)
writer = create_databend_wirter(client, logger)
for configured_stream in configured_catalog.streams:
if configured_stream.destination_sync_mode == DestinationSyncMode.overwrite:
writer.delete_table(configured_stream.stream.name)
logger.info(f"Stream {configured_stream.stream.name} is wiped.")
writer.create_raw_table(configured_stream.stream.name)
for message in input_messages:
if message.type == Type.STATE:
yield message
elif message.type == Type.RECORD:
data = message.record.data
stream = message.record.stream
# Skip unselected streams
if stream not in streams:
logger.debug(f"Stream {stream} was not present in configured streams, skipping")
continue
writer.queue_write_data(stream, str(uuid4()), datetime.now(), json.dumps(data))
# Flush any leftover messages
writer.flush()
def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
"""
Tests if the input configuration can be used to successfully connect to the destination with the needed permissions
e.g: if a provided API token or password can be used to connect and write to the destination.
:param logger: Logging object to display debug/info/error to the logs
(logs will not be accessible via airbyte UI if they are not passed to this logger)
:param config: Json object containing the configuration of this destination, content of this json is as specified in
the properties of the spec.json file
:return: AirbyteConnectionStatus indicating a Success or Failure
"""
try:
client = DatabendClient(**config)
cursor = client.open()
cursor.execute("DROP TABLE IF EXISTS test")
cursor.execute("CREATE TABLE if not exists test (x Int32,y VARCHAR)")
cursor.execute("INSERT INTO test (x,y) VALUES (%,%)", [1, "yy", 2, "xx"])
cursor.execute("DROP TABLE IF EXISTS test")
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
except Exception as e:
return AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {repr(e)}")

View File

@@ -0,0 +1,80 @@
{
"documentationUrl" : "https://docs.airbyte.com/integrations/destinations/databend",
"supported_destination_sync_modes" : [
"overwrite",
"append"
],
"supportsIncremental" : true,
"supportsDBT" : false,
"supportsNormalization" : false,
"connectionSpecification" : {
"$schema" : "http://json-schema.org/draft-07/schema#",
"title" : "Destination Databend",
"type" : "object",
"required" : [
"host",
"username",
"database"
],
"additionalProperties" : true,
"properties" : {
"host" : {
"title" : "Host",
"description" : "Hostname of the database.",
"type" : "string",
"order" : 0
},
"protocol" : {
"title" : "Protocol",
"description" : "Protocol of the host.",
"type" : "string",
"examples" : [
"https"
],
"default" : "https",
"order" : 1
},
"port" : {
"title" : "Port",
"description" : "Port of the database.",
"type" : "integer",
"minimum" : 0,
"maximum" : 65536,
"default" : 443,
"examples" : [
"443"
],
"order" : 2
},
"database" : {
"title" : "DB Name",
"description" : "Name of the database.",
"type" : "string",
"order" : 3
},
"table" : {
"title" : "Default Table",
"description" : "The default table was written to.",
"type" : "string",
"examples" : [
"default"
],
"default" : "default",
"order" : 4
},
"username" : {
"title" : "User",
"description" : "Username to use to access the database.",
"type" : "string",
"order" : 5
},
"password" : {
"title" : "Password",
"description" : "Password associated with the username.",
"type" : "string",
"airbyte_secret" : true,
"order" : 6
}
}
}
}

View File

@@ -0,0 +1,134 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from collections import defaultdict
from datetime import datetime
from itertools import chain
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import AirbyteConnectionStatus, Status
from destination_databend.client import DatabendClient
class DatabendWriter:
"""
Base class for shared writer logic.
"""
flush_interval = 1000
def __init__(self, client: DatabendClient) -> None:
"""
:param client: Databend SDK connection class with established connection
to the databse.
"""
try:
# open a cursor and do some work with it
self.client = client
self.cursor = client.open()
self._buffer = defaultdict(list)
self._values = 0
except Exception as e:
# handle the exception
raise AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {repr(e)}")
finally:
# close the cursor
self.cursor.close()
def delete_table(self, name: str) -> None:
"""
Delete the resulting table.
Primarily used in Overwrite strategy to clean up previous data.
:param name: table name to delete.
"""
self.cursor.execute(f"DROP TABLE IF EXISTS _airbyte_raw_{name}")
def create_raw_table(self, name: str):
"""
Create the resulting _airbyte_raw table.
:param name: table name to create.
"""
query = f"""
CREATE TABLE IF NOT EXISTS _airbyte_raw_{name} (
_airbyte_ab_id TEXT,
_airbyte_emitted_at TIMESTAMP,
_airbyte_data TEXT
)
"""
cursor = self.cursor
cursor.execute(query)
def queue_write_data(self, stream_name: str, id: str, time: datetime, record: str) -> None:
"""
Queue up data in a buffer in memory before writing to the database.
When flush_interval is reached data is persisted.
:param stream_name: name of the stream for which the data corresponds.
:param id: unique identifier of this data row.
:param time: time of writing.
:param record: string representation of the json data payload.
"""
self._buffer[stream_name].append((id, time, record))
self._values += 1
if self._values == self.flush_interval:
self._flush()
def _flush(self):
"""
Stub for the intermediate data flush that's triggered during the
buffering operation.
"""
raise NotImplementedError()
def flush(self):
"""
Stub for the data flush at the end of writing operation.
"""
raise NotImplementedError()
class DatabendSQLWriter(DatabendWriter):
"""
Data writer using the SQL writing strategy. Data is buffered in memory
and flushed using INSERT INTO SQL statement.
"""
flush_interval = 1000
def __init__(self, client: DatabendClient) -> None:
"""
:param client: Databend SDK connection class with established connection
to the databse.
"""
super().__init__(client)
def _flush(self) -> None:
"""
Intermediate data flush that's triggered during the
buffering operation. Writes data stored in memory via SQL commands.
databend connector insert into table using stage
"""
cursor = self.cursor
# id, written_at, data
for table, data in self._buffer.items():
cursor.execute(
f"INSERT INTO _airbyte_raw_{table} (_airbyte_ab_id,_airbyte_emitted_at,_airbyte_data) VALUES (%, %, %)",
list(chain.from_iterable(data)),
)
self._buffer.clear()
self._values = 0
def flush(self) -> None:
"""
Final data flush after all data has been written to memory.
"""
self._flush()
def create_databend_wirter(client: DatabendClient, logger: AirbyteLogger) -> DatabendWriter:
logger.info("Using the SQL writing strategy")
writer = DatabendSQLWriter(client)
return writer

View File

@@ -0,0 +1,159 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
import json
import logging
from typing import Any, Dict, List, Mapping
import pytest
from airbyte_cdk.models import (
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStateMessage,
AirbyteStream,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
DestinationSyncMode,
Status,
SyncMode,
Type,
)
from destination_databend import DestinationDatabend
from destination_databend.client import DatabendClient
@pytest.fixture(name="config")
def config_fixture() -> Mapping[str, Any]:
with open("secrets/config.json", "r") as f:
return json.loads(f.read())
@pytest.fixture(name="configured_catalog")
def configured_catalog_fixture() -> ConfiguredAirbyteCatalog:
stream_schema = {"type": "object", "properties": {"string_col": {"type": "str"}, "int_col": {"type": "integer"}}}
append_stream = ConfiguredAirbyteStream(
stream=AirbyteStream(name="append_stream", json_schema=stream_schema, supported_sync_modes=[SyncMode.incremental]),
sync_mode=SyncMode.incremental,
destination_sync_mode=DestinationSyncMode.append,
)
overwrite_stream = ConfiguredAirbyteStream(
stream=AirbyteStream(name="overwrite_stream", json_schema=stream_schema, supported_sync_modes=[SyncMode.incremental]),
sync_mode=SyncMode.incremental,
destination_sync_mode=DestinationSyncMode.overwrite,
)
return ConfiguredAirbyteCatalog(streams=[append_stream, overwrite_stream])
@pytest.fixture(autouse=True)
def teardown(config: Mapping):
yield
client = DatabendClient(**config)
cursor = client.open()
cursor.close()
@pytest.fixture(name="client")
def client_fixture(config) -> DatabendClient:
return DatabendClient(**config)
def test_check_valid_config(config: Mapping):
outcome = DestinationDatabend().check(logging.getLogger('airbyte'), config)
assert outcome.status == Status.SUCCEEDED
def test_check_invalid_config():
outcome = DestinationDatabend().check(logging.getLogger('airbyte'), {"bucket_id": "not_a_real_id"})
assert outcome.status == Status.FAILED
def _state(data: Dict[str, Any]) -> AirbyteMessage:
return AirbyteMessage(type=Type.STATE, state=AirbyteStateMessage(data=data))
def _record(stream: str, str_value: str, int_value: int) -> AirbyteMessage:
return AirbyteMessage(
type=Type.RECORD, record=AirbyteRecordMessage(stream=stream, data={"str_col": str_value, "int_col": int_value}, emitted_at=0)
)
def retrieve_records(stream_name: str, client: DatabendClient) -> List[AirbyteRecordMessage]:
cursor = client.open()
cursor.execute(f"select * from _airbyte_raw_{stream_name}")
all_records = cursor.fetchall()
out = []
for record in all_records:
# key = record[0]
# stream = key.split("__ab__")[0]
value = json.loads(record[2])
out.append(_record(stream_name, value["str_col"], value["int_col"]))
return out
def retrieve_all_records(client: DatabendClient) -> List[AirbyteRecordMessage]:
"""retrieves and formats all records in databend as Airbyte messages"""
overwrite_stream = "overwrite_stream"
append_stream = "append_stream"
overwrite_out = retrieve_records(overwrite_stream, client)
append_out = retrieve_records(append_stream, client)
return overwrite_out + append_out
def test_write(config: Mapping, configured_catalog: ConfiguredAirbyteCatalog, client: DatabendClient):
"""
This test verifies that:
1. writing a stream in "overwrite" mode overwrites any existing data for that stream
2. writing a stream in "append" mode appends new records without deleting the old ones
3. The correct state message is output by the connector at the end of the sync
"""
append_stream, overwrite_stream = configured_catalog.streams[0].stream.name, configured_catalog.streams[1].stream.name
first_state_message = _state({"state": "1"})
first_record_chunk = [_record(append_stream, str(i), i) for i in range(5)] + [_record(overwrite_stream, str(i), i) for i in range(5)]
second_state_message = _state({"state": "2"})
second_record_chunk = [_record(append_stream, str(i), i) for i in range(5, 10)] + [
_record(overwrite_stream, str(i), i) for i in range(5, 10)
]
destination = DestinationDatabend()
expected_states = [first_state_message, second_state_message]
output_states = list(
destination.write(
config, configured_catalog, [*first_record_chunk, first_state_message, *second_record_chunk, second_state_message]
)
)
assert expected_states == output_states, "Checkpoint state messages were expected from the destination"
expected_records = [_record(append_stream, str(i), i) for i in range(10)] + [_record(overwrite_stream, str(i), i) for i in range(10)]
records_in_destination = retrieve_all_records(client)
assert len(expected_records) == len(records_in_destination), "Records in destination should match records expected"
# After this sync we expect the append stream to have 15 messages and the overwrite stream to have 5
third_state_message = _state({"state": "3"})
third_record_chunk = [_record(append_stream, str(i), i) for i in range(10, 15)] + [
_record(overwrite_stream, str(i), i) for i in range(10, 15)
]
output_states = list(destination.write(config, configured_catalog, [*third_record_chunk, third_state_message]))
assert [third_state_message] == output_states
records_in_destination = retrieve_all_records(client)
expected_records = [_record(append_stream, str(i), i) for i in range(15)] + [
_record(overwrite_stream, str(i), i) for i in range(10, 15)
]
assert len(expected_records) == len(records_in_destination)
tear_down(client)
def tear_down(client: DatabendClient):
overwrite_stream = "overwrite_stream"
append_stream = "append_stream"
cursor = client.open()
cursor.execute(f"DROP table _airbyte_raw_{overwrite_stream}")
cursor.execute(f"DROP table _airbyte_raw_{append_stream}")

View File

@@ -0,0 +1,9 @@
{
"protocol" : "https",
"host" : "tnc7yee14--xxxx.ch.datafusecloud.com",
"port" : 443,
"username" : "username",
"password" : "password",
"database" : "default",
"table" : "default"
}

View File

@@ -0,0 +1,11 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
import sys
from destination_databend import DestinationDatabend
if __name__ == "__main__":
DestinationDatabend().run(sys.argv[1:])

View File

@@ -0,0 +1 @@
-e .

View File

@@ -0,0 +1,22 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from setuptools import find_packages, setup
MAIN_REQUIREMENTS = ["airbyte-cdk", "requests", "databend-sqlalchemy"]
TEST_REQUIREMENTS = ["pytest~=6.1"]
setup(
name="destination_databend",
description="Destination implementation for Databend.",
author="Airbyte",
author_email="contact@airbyte.io",
packages=find_packages(),
install_requires=MAIN_REQUIREMENTS,
package_data={"": ["*.json"]},
extras_require={
"tests": TEST_REQUIREMENTS,
},
)

View File

@@ -0,0 +1,162 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from datetime import datetime
from typing import Dict
from unittest.mock import AsyncMock, MagicMock, call, patch
from airbyte_cdk.models import (
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStream,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
DestinationSyncMode,
SyncMode,
Type,
)
from destination_databend.destination import DatabendClient, DestinationDatabend
from pytest import fixture
@fixture
def logger() -> MagicMock:
return MagicMock()
@fixture
def config() -> Dict[str, str]:
args = {
"database": "default",
"username": "root",
"password": "root",
"host": "localhost",
"protocol": "http",
"port": 8081,
"table": "default",
}
return args
@fixture(name="mock_connection")
def async_connection_cursor_mock():
connection = MagicMock()
cursor = AsyncMock()
connection.cursor.return_value = cursor
return connection, cursor
@fixture
def configured_stream1() -> ConfiguredAirbyteStream:
return ConfiguredAirbyteStream(
stream=AirbyteStream(
name="table1",
json_schema={
"type": "object",
"properties": {"col1": {"type": "string"}, "col2": {"type": "integer"}},
},
supported_sync_modes=[SyncMode.incremental],
),
sync_mode=SyncMode.incremental,
destination_sync_mode=DestinationSyncMode.append,
)
@fixture
def configured_stream2() -> ConfiguredAirbyteStream:
return ConfiguredAirbyteStream(
stream=AirbyteStream(
name="table2",
json_schema={
"type": "object",
"properties": {"col1": {"type": "string"}, "col2": {"type": "integer"}},
},
supported_sync_modes=[SyncMode.incremental],
),
sync_mode=SyncMode.incremental,
destination_sync_mode=DestinationSyncMode.append,
)
@fixture
def airbyte_message1() -> AirbyteMessage:
return AirbyteMessage(
type=Type.RECORD,
record=AirbyteRecordMessage(
stream="table1",
data={"key1": "value1", "key2": 2},
emitted_at=int(datetime.now().timestamp()) * 1000,
),
)
@fixture
def airbyte_message2() -> AirbyteMessage:
return AirbyteMessage(
type=Type.RECORD,
record=AirbyteRecordMessage(
stream="table2",
data={"key1": "value2", "key2": 3},
emitted_at=int(datetime.now().timestamp()) * 1000,
),
)
@fixture
def airbyte_state_message() -> AirbyteMessage:
return AirbyteMessage(type=Type.STATE)
@patch("destination_databend.client.DatabendClient", MagicMock())
def test_connection(config: Dict[str, str], logger: MagicMock) -> None:
# Check no log object
DatabendClient(**config)
@patch("destination_databend.writer.DatabendSQLWriter")
@patch("destination_databend.client.DatabendClient")
def test_sql_write_append(
mock_connection: MagicMock,
mock_writer: MagicMock,
config: Dict[str, str],
configured_stream1: ConfiguredAirbyteStream,
configured_stream2: ConfiguredAirbyteStream,
airbyte_message1: AirbyteMessage,
airbyte_message2: AirbyteMessage,
airbyte_state_message: AirbyteMessage,
) -> None:
catalog = ConfiguredAirbyteCatalog(streams=[configured_stream1, configured_stream2])
destination = DestinationDatabend()
result = destination.write(config, catalog, [airbyte_message1, airbyte_state_message, airbyte_message2])
assert list(result) == [airbyte_state_message]
mock_writer.return_value.delete_table.assert_not_called()
mock_writer.return_value.create_raw_table.mock_calls = [call(mock_connection, "table1"), call(mock_connection, "table2")]
assert len(mock_writer.return_value.queue_write_data.mock_calls) == 2
mock_writer.return_value.flush.assert_called_once()
@patch("destination_databend.writer.DatabendSQLWriter")
@patch("destination_databend.client.DatabendClient")
def test_sql_write_overwrite(
mock_connection: MagicMock,
mock_writer: MagicMock,
config: Dict[str, str],
configured_stream1: ConfiguredAirbyteStream,
configured_stream2: ConfiguredAirbyteStream,
airbyte_message1: AirbyteMessage,
airbyte_message2: AirbyteMessage,
airbyte_state_message: AirbyteMessage,
):
# Overwrite triggers a delete
configured_stream1.destination_sync_mode = DestinationSyncMode.overwrite
catalog = ConfiguredAirbyteCatalog(streams=[configured_stream1, configured_stream2])
destination = DestinationDatabend()
result = destination.write(config, catalog, [airbyte_message1, airbyte_state_message, airbyte_message2])
assert list(result) == [airbyte_state_message]
mock_writer.return_value.delete_table.assert_called_once_with("table1")
mock_writer.return_value.create_raw_table.mock_calls = [call(mock_connection, "table1"), call(mock_connection, "table2")]

View File

@@ -0,0 +1,46 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from typing import Any, Union
from unittest.mock import MagicMock
from destination_databend.writer import DatabendSQLWriter
from pytest import fixture, mark
@fixture
def client() -> MagicMock:
return MagicMock()
@fixture
def sql_writer(client: MagicMock) -> DatabendSQLWriter:
return DatabendSQLWriter(client)
def test_sql_default(sql_writer: DatabendSQLWriter) -> None:
assert len(sql_writer._buffer) == 0
assert sql_writer.flush_interval == 1000
@mark.parametrize("writer", ["sql_writer"])
def test_sql_create(client: MagicMock, writer: Union[DatabendSQLWriter], request: Any) -> None:
writer = request.getfixturevalue(writer)
writer.create_raw_table("dummy")
def test_data_buffering(sql_writer: DatabendSQLWriter) -> None:
sql_writer.queue_write_data("dummy", "id1", 20200101, '{"key": "value"}')
sql_writer._buffer["dummy"][0] == ("id1", 20200101, '{"key": "value"}')
assert len(sql_writer._buffer["dummy"]) == 1
assert len(sql_writer._buffer.keys()) == 1
sql_writer.queue_write_data("dummy", "id2", 20200102, '{"key2": "value2"}')
sql_writer._buffer["dummy"][0] == ("id2", 20200102, '{"key2": "value2"}')
assert len(sql_writer._buffer["dummy"]) == 2
assert len(sql_writer._buffer.keys()) == 1
sql_writer.queue_write_data("dummy2", "id3", 20200103, '{"key3": "value3"}')
sql_writer._buffer["dummy"][0] == ("id3", 20200103, '{"key3": "value3"}')
assert len(sql_writer._buffer["dummy"]) == 2
assert len(sql_writer._buffer["dummy2"]) == 1
assert len(sql_writer._buffer.keys()) == 2

View File

@@ -0,0 +1,54 @@
# Databend
This page guides you through the process of setting up the [Databend](https://databend.rs/) destination connector.
## Features
| Feature | Supported?\(Yes/No\) | Notes |
| :--- | :--- | :--- |
| Full Refresh Sync | Yes | |
| Incremental - Append Sync | Yes | |
#### Output Schema
Each stream will be output into its own table in Databend. Each table will contain 3 columns:
* `_airbyte_ab_id`: a uuid assigned by Airbyte to each event that is processed. The column type in Databend is `VARCHAR`.
* `_airbyte_emitted_at`: a timestamp representing when the event was pulled from the data source. The column type in Databend is `TIMESTAMP`.
* `_airbyte_data`: a json blob representing with the event data. The column type in Databend is `VARVHAR`.
## Getting Started
You can follow the [Connecting to a Warehouse docs](https://docs.databend.com/using-databend-cloud/warehouses/connecting-a-warehouse) to get the user, password, host etc.
Or You can create such a user by running:
```
GRANT CREATE ON * TO airbyte_user;
```
Make sure the Databend user with the following permissions:
* can create tables and write rows.
* can create databases e.g:
You can also use a pre-existing user but we highly recommend creating a dedicated user for Airbyte.
#### Target Database
You will need to choose an existing database or create a new database that will be used to store synced data from Airbyte.
### Setup the Databend Destination in Airbyte
You should now have all the requirements needed to configure Databend as a destination in the UI. You'll need the following information to configure the Databend destination:
* **Host**
* **Port**
* **Username**
* **Password**
* **Database**
## Changelog
######TODO: more info