Source DuckDB: ✨ Add MotherDuck support 🦆🦆 (#29428)
Co-authored-by: Elena Felder <41136058+elefeint@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
5c56ac1d84
commit
68380cbfef
76
.devcontainer/destination-duckdb/devcontainer.json
Normal file
76
.devcontainer/destination-duckdb/devcontainer.json
Normal file
@@ -0,0 +1,76 @@
|
||||
// For format details, see https://aka.ms/devcontainer.json. For config options, see the
|
||||
// README at: https://github.com/devcontainers/templates/tree/main/src/python
|
||||
{
|
||||
"name": "DuckDB Destination Connector DevContainer (Python)",
|
||||
|
||||
// Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile
|
||||
"image": "mcr.microsoft.com/devcontainers/python:0-3.9",
|
||||
|
||||
// Features to add to the dev container. More info: https://containers.dev/features.
|
||||
"features": {
|
||||
"ghcr.io/devcontainers-contrib/features/poetry:2": {},
|
||||
"ghcr.io/devcontainers/features/docker-in-docker": {}
|
||||
},
|
||||
"overrideFeatureInstallOrder": [
|
||||
// Deterministic order maximizes cache reuse
|
||||
"ghcr.io/devcontainers-contrib/features/poetry",
|
||||
"ghcr.io/devcontainers/features/docker-in-docker"
|
||||
],
|
||||
|
||||
"workspaceFolder": "/workspaces/airbyte/airbyte-integrations/connectors/destination-duckdb",
|
||||
|
||||
// Configure tool-specific properties.
|
||||
"customizations": {
|
||||
"vscode": {
|
||||
"extensions": [
|
||||
// Python extensions:
|
||||
"charliermarsh.ruff",
|
||||
"matangover.mypy",
|
||||
"ms-python.black",
|
||||
"ms-python.python",
|
||||
"ms-python.vscode-pylance",
|
||||
|
||||
// Toml support
|
||||
"tamasfe.even-better-toml",
|
||||
|
||||
// Yaml and JSON Schema support:
|
||||
"redhat.vscode-yaml",
|
||||
|
||||
// Contributing:
|
||||
"GitHub.vscode-pull-request-github"
|
||||
],
|
||||
"settings": {
|
||||
"extensions.ignoreRecommendations": true,
|
||||
"git.openRepositoryInParentFolders": "always",
|
||||
"python.defaultInterpreterPath": ".venv/bin/python",
|
||||
"python.interpreter.infoVisibility": "always",
|
||||
"python.terminal.activateEnvironment": true,
|
||||
"python.testing.pytestEnabled": true,
|
||||
"python.testing.cwd": "/workspaces/airbyte/airbyte-integrations/connectors/destination-duckdb",
|
||||
"python.testing.pytestArgs": [
|
||||
"--rootdir=/workspaces/airbyte/airbyte-integrations/connectors/destination-duckdb",
|
||||
"."
|
||||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
"containerEnv": {
|
||||
"POETRY_VIRTUALENVS_IN_PROJECT": "true"
|
||||
},
|
||||
|
||||
// Mark the root directory as 'safe' for git.
|
||||
"initializeCommand": "git config --add safe.directory /workspaces/airbyte",
|
||||
|
||||
// Use 'postCreateCommand' to run commands after the container is created.
|
||||
// Post-create tasks:
|
||||
// 1. Create a symlink directory.
|
||||
// 2. Create symlinks for the devcontainer.json and docs markdown file.
|
||||
// 3. Install the Python/Poetry dependencies.
|
||||
"postCreateCommand": "mkdir -p ./.symlinks && echo '*' > ./.symlinks/.gitignore && ln -sf /workspaces/airbyte/.devcontainer/destination-duckdb/devcontainer.json ./.symlinks/devcontainer.json && ln -sf /workspaces/airbyte/docs/integrations/destinations/duckdb.md ./.symlinks/duckdb-docs.md && poetry install"
|
||||
|
||||
// Use 'forwardPorts' to make a list of ports inside the container available locally.
|
||||
// "forwardPorts": [],
|
||||
|
||||
// Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root.
|
||||
// "remoteUser": "root"
|
||||
}
|
||||
@@ -251,7 +251,7 @@ class Connector:
|
||||
def language(self) -> ConnectorLanguage:
|
||||
if Path(self.code_directory / self.technical_name.replace("-", "_") / "manifest.yaml").is_file():
|
||||
return ConnectorLanguage.LOW_CODE
|
||||
if Path(self.code_directory / "setup.py").is_file():
|
||||
if Path(self.code_directory / "setup.py").is_file() or Path(self.code_directory / "pyproject.toml").is_file():
|
||||
return ConnectorLanguage.PYTHON
|
||||
try:
|
||||
with open(self.code_directory / "Dockerfile") as dockerfile:
|
||||
|
||||
@@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
|
||||
|
||||
[tool.poetry]
|
||||
name = "connector_ops"
|
||||
version = "0.2.2"
|
||||
version = "0.2.3"
|
||||
description = "Packaged maintained by the connector operations team to perform CI for connectors"
|
||||
authors = ["Airbyte <contact@airbyte.io>"]
|
||||
|
||||
|
||||
@@ -407,6 +407,7 @@ This command runs the Python tests for a airbyte-ci poetry package.
|
||||
## Changelog
|
||||
| Version | PR | Description |
|
||||
|---------| --------------------------------------------------------- |-----------------------------------------------------------------------------------------------------------|
|
||||
| 1.4.0 | [#30330](https://github.com/airbytehq/airbyte/pull/30330) | Add support for pyproject.toml as the prefered entry point for a connector package |
|
||||
| 1.3.0 | [#30461](https://github.com/airbytehq/airbyte/pull/30461) | Add `--use-local-cdk` flag to all connectors commands |
|
||||
| 1.2.3 | [#30477](https://github.com/airbytehq/airbyte/pull/30477) | Fix a test regression introduced the previous version. |
|
||||
| 1.2.2 | [#30438](https://github.com/airbytehq/airbyte/pull/30438) | Add workaround to always stream logs properly with --is-local. |
|
||||
|
||||
@@ -305,6 +305,42 @@ async def find_local_dependencies_in_pyproject_toml(
|
||||
return local_dependency_paths
|
||||
|
||||
|
||||
def _install_python_dependencies_from_setup_py(
|
||||
container: Container,
|
||||
additional_dependency_groups: Optional[List] = None,
|
||||
) -> Container:
|
||||
install_connector_package_cmd = ["python", "-m", "pip", "install", "."]
|
||||
container = container.with_exec(install_connector_package_cmd)
|
||||
|
||||
if additional_dependency_groups:
|
||||
# e.g. .[dev,tests]
|
||||
group_string = f".[{','.join(additional_dependency_groups)}]"
|
||||
group_install_cmd = ["python", "-m", "pip", "install", group_string]
|
||||
|
||||
container = container.with_exec(group_install_cmd)
|
||||
|
||||
return container
|
||||
|
||||
|
||||
def _install_python_dependencies_from_requirements_txt(container: Container) -> Container:
|
||||
install_requirements_cmd = ["python", "-m", "pip", "install", "-r", "requirements.txt"]
|
||||
return container.with_exec(install_requirements_cmd)
|
||||
|
||||
|
||||
def _install_python_dependencies_from_poetry(
|
||||
container: Container,
|
||||
additional_dependency_groups: Optional[List] = None,
|
||||
) -> Container:
|
||||
pip_install_poetry_cmd = ["python", "-m", "pip", "install", "poetry"]
|
||||
poetry_disable_virtual_env_cmd = ["poetry", "config", "virtualenvs.create", "false"]
|
||||
poetry_install_no_venv_cmd = ["poetry", "install", "--no-root"]
|
||||
if additional_dependency_groups:
|
||||
for group in additional_dependency_groups:
|
||||
poetry_install_no_venv_cmd += ["--with", group]
|
||||
|
||||
return container.with_exec(pip_install_poetry_cmd).with_exec(poetry_disable_virtual_env_cmd).with_exec(poetry_install_no_venv_cmd)
|
||||
|
||||
|
||||
async def with_installed_python_package(
|
||||
context: PipelineContext,
|
||||
python_environment: Container,
|
||||
@@ -324,9 +360,6 @@ async def with_installed_python_package(
|
||||
Returns:
|
||||
Container: A python environment container with the python package installed.
|
||||
"""
|
||||
install_requirements_cmd = ["python", "-m", "pip", "install", "-r", "requirements.txt"]
|
||||
install_connector_package_cmd = ["python", "-m", "pip", "install", "."]
|
||||
|
||||
container = with_python_package(context, python_environment, package_source_code_path, exclude=exclude)
|
||||
|
||||
local_dependencies = await find_local_python_dependencies(context, package_source_code_path)
|
||||
@@ -334,19 +367,16 @@ async def with_installed_python_package(
|
||||
for dependency_directory in local_dependencies:
|
||||
container = container.with_mounted_directory("/" + dependency_directory, context.get_repo_dir(dependency_directory))
|
||||
|
||||
has_setup_py, has_requirements_txt = await check_path_in_workdir(container, "setup.py"), await check_path_in_workdir(
|
||||
container, "requirements.txt"
|
||||
)
|
||||
has_setup_py = await check_path_in_workdir(container, "setup.py")
|
||||
has_requirements_txt = await check_path_in_workdir(container, "requirements.txt")
|
||||
has_pyproject_toml = await check_path_in_workdir(container, "pyproject.toml")
|
||||
|
||||
if has_setup_py:
|
||||
container = container.with_exec(install_connector_package_cmd)
|
||||
if has_requirements_txt:
|
||||
container = container.with_exec(install_requirements_cmd)
|
||||
|
||||
if additional_dependency_groups:
|
||||
container = container.with_exec(
|
||||
install_connector_package_cmd[:-1] + [install_connector_package_cmd[-1] + f"[{','.join(additional_dependency_groups)}]"]
|
||||
)
|
||||
if has_pyproject_toml:
|
||||
container = _install_python_dependencies_from_poetry(container)
|
||||
elif has_setup_py:
|
||||
container = _install_python_dependencies_from_setup_py(container, additional_dependency_groups)
|
||||
elif has_requirements_txt:
|
||||
container = _install_python_dependencies_from_requirements_txt(container)
|
||||
|
||||
return container
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
|
||||
|
||||
[tool.poetry]
|
||||
name = "pipelines"
|
||||
version = "1.3.0"
|
||||
version = "1.4.0"
|
||||
description = "Packaged maintained by the connector operations team to perform CI for connectors' pipelines"
|
||||
authors = ["Airbyte <contact@airbyte.io>"]
|
||||
|
||||
|
||||
@@ -13,8 +13,11 @@ pytestmark = [
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def python_connector() -> Connector:
|
||||
return Connector("source-openweather")
|
||||
def python_connectors() -> Connector:
|
||||
return [
|
||||
Connector("source-openweather"), # setup.py based
|
||||
Connector("destination-duckdb"), # pyproject.toml based
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -29,11 +32,12 @@ def context(dagger_client):
|
||||
return context
|
||||
|
||||
|
||||
async def test_with_installed_python_package(context, python_connector):
|
||||
python_environment = context.dagger_client.container().from_("python:3.10")
|
||||
installed_connector_package = await environments.with_installed_python_package(
|
||||
context,
|
||||
python_environment,
|
||||
str(python_connector.code_directory),
|
||||
)
|
||||
await installed_connector_package.with_exec(["python", "main.py", "spec"])
|
||||
async def test_with_installed_python_package(context, python_connectors):
|
||||
for python_connector in python_connectors:
|
||||
python_environment = context.dagger_client.container().from_("python:3.10")
|
||||
installed_connector_package = await environments.with_installed_python_package(
|
||||
context,
|
||||
python_environment,
|
||||
str(python_connector.code_directory),
|
||||
)
|
||||
await installed_connector_package.with_exec(["python", "main.py", "spec"])
|
||||
|
||||
@@ -3,3 +3,6 @@
|
||||
!main.py
|
||||
!destination_duckdb
|
||||
!setup.py
|
||||
!pyproject.toml
|
||||
!poetry.lock
|
||||
!README.md
|
||||
|
||||
2
airbyte-integrations/connectors/destination-duckdb/.gitignore
vendored
Normal file
2
airbyte-integrations/connectors/destination-duckdb/.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
# Ignore symlinks created within the dev container
|
||||
.symlinks
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM python:3.9.11 as base
|
||||
FROM python:3.9.11
|
||||
# FROM python:3.9.11-alpine3.15 as base
|
||||
# switched from alpine as there were tons of errors (in case you want to switch back to alpine)
|
||||
# - https://stackoverflow.com/a/57485724/5246670
|
||||
@@ -6,37 +6,29 @@ FROM python:3.9.11 as base
|
||||
# - libstdc++ https://github.com/amancevice/docker-pandas/issues/12#issuecomment-717215043
|
||||
# - musl-dev linux-headers g++ because of: https://stackoverflow.com/a/40407099/5246670
|
||||
|
||||
# build and load all requirements
|
||||
FROM base as builder
|
||||
WORKDIR /airbyte/integration_code
|
||||
|
||||
# upgrade pip to the latest version
|
||||
# Upgrade system packages and install Poetry
|
||||
RUN apt-get update && apt-get -y upgrade \
|
||||
&& pip install --upgrade pip
|
||||
&& pip install --upgrade pip \
|
||||
&& pip install poetry
|
||||
|
||||
COPY setup.py ./
|
||||
# install necessary packages to a temporary folder
|
||||
RUN pip install --prefix=/install .
|
||||
# build a clean environment
|
||||
FROM base
|
||||
# RUN conda install -c conda-forge python-duckdb
|
||||
WORKDIR /airbyte/integration_code
|
||||
# Copy only poetry.lock* in case it doesn't exist
|
||||
COPY pyproject.toml poetry.lock* ./
|
||||
RUN poetry config virtualenvs.create false \
|
||||
&& poetry install --no-root --no-dev
|
||||
|
||||
# copy all loaded and built libraries to a pure basic image
|
||||
COPY --from=builder /install /usr/local
|
||||
# add default timezone settings
|
||||
COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime
|
||||
RUN echo "Etc/UTC" > /etc/timezone
|
||||
|
||||
#adding duckdb manually (outside of setup.py - lots of errors)
|
||||
RUN pip install duckdb
|
||||
|
||||
# copy payload code only
|
||||
# Copy code
|
||||
COPY main.py ./
|
||||
COPY destination_duckdb ./destination_duckdb
|
||||
|
||||
# Timezone setup
|
||||
COPY --from=python:3.9.11 /usr/share/zoneinfo/Etc/UTC /etc/localtime
|
||||
RUN echo "Etc/UTC" > /etc/timezone
|
||||
|
||||
# Entry point
|
||||
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
|
||||
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
|
||||
|
||||
LABEL io.airbyte.version=0.1.0
|
||||
LABEL io.airbyte.version=0.2.0
|
||||
LABEL io.airbyte.name=airbyte/destination-duckdb
|
||||
|
||||
@@ -0,0 +1,10 @@
|
||||
connector_image: airbyte/destination-duckdb:dev
|
||||
acceptance_tests:
|
||||
spec:
|
||||
tests:
|
||||
- spec_path: integration_tests/spec.json
|
||||
config_path: "integration_tests/config.json"
|
||||
connection:
|
||||
tests:
|
||||
- config_path: "integration_tests/config.json"
|
||||
status: "succeed"
|
||||
@@ -1,8 +0,0 @@
|
||||
plugins {
|
||||
id 'airbyte-python'
|
||||
id 'airbyte-docker'
|
||||
}
|
||||
|
||||
airbytePython {
|
||||
moduleDirectory 'destination_duckdb'
|
||||
}
|
||||
@@ -25,6 +25,9 @@ class DestinationDuckdb(Destination):
|
||||
Get a normalized version of the destination path.
|
||||
Automatically append /local/ to the start of the path
|
||||
"""
|
||||
if destination_path.startswith("md:") or destination_path.startswith("motherduck:"):
|
||||
return destination_path
|
||||
|
||||
if not destination_path.startswith("/local"):
|
||||
destination_path = os.path.join("/local", destination_path)
|
||||
|
||||
@@ -37,9 +40,11 @@ class DestinationDuckdb(Destination):
|
||||
return destination_path
|
||||
|
||||
def write(
|
||||
self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
|
||||
self,
|
||||
config: Mapping[str, Any],
|
||||
configured_catalog: ConfiguredAirbyteCatalog,
|
||||
input_messages: Iterable[AirbyteMessage],
|
||||
) -> Iterable[AirbyteMessage]:
|
||||
|
||||
"""
|
||||
Reads the input stream of messages, config, and catalog to write data to the destination.
|
||||
|
||||
@@ -56,17 +61,19 @@ class DestinationDuckdb(Destination):
|
||||
streams = {s.stream.name for s in configured_catalog.streams}
|
||||
logger.info(f"Starting write to DuckDB with {len(streams)} streams")
|
||||
|
||||
path = config.get("destination_path")
|
||||
path = str(config.get("destination_path"))
|
||||
path = self._get_destination_path(path)
|
||||
# check if file exists
|
||||
|
||||
logger.info(f"Opening DuckDB file at {path}")
|
||||
# Get and register auth token if applicable
|
||||
motherduck_api_key = str(config.get("motherduck_api_key"))
|
||||
if motherduck_api_key:
|
||||
os.environ["motherduck_token"] = motherduck_api_key
|
||||
|
||||
con = duckdb.connect(database=path, read_only=False)
|
||||
|
||||
# create the tables if needed
|
||||
# con.execute("BEGIN TRANSACTION")
|
||||
for configured_stream in configured_catalog.streams:
|
||||
|
||||
name = configured_stream.stream.name
|
||||
table_name = f"_airbyte_raw_{name}"
|
||||
if configured_stream.destination_sync_mode == DestinationSyncMode.overwrite:
|
||||
@@ -82,7 +89,7 @@ class DestinationDuckdb(Destination):
|
||||
query = f"""
|
||||
CREATE TABLE IF NOT EXISTS {table_name} (
|
||||
_airbyte_ab_id TEXT PRIMARY KEY,
|
||||
_airbyte_emitted_at JSON,
|
||||
_airbyte_emitted_at DATETIME,
|
||||
_airbyte_data JSON
|
||||
)
|
||||
"""
|
||||
@@ -92,20 +99,16 @@ class DestinationDuckdb(Destination):
|
||||
buffer = defaultdict(list)
|
||||
|
||||
for message in input_messages:
|
||||
|
||||
if message.type == Type.STATE:
|
||||
# flush the buffer
|
||||
for stream_name in buffer.keys():
|
||||
|
||||
logger.info(f"flushing buffer for state: {message}")
|
||||
query = """
|
||||
INSERT INTO {table_name}
|
||||
INSERT INTO {table_name} (_airbyte_ab_id, _airbyte_emitted_at, _airbyte_data)
|
||||
VALUES (?,?,?)
|
||||
""".format(
|
||||
table_name=f"_airbyte_raw_{stream_name}"
|
||||
)
|
||||
logger.info(f"query: {query}")
|
||||
|
||||
con.executemany(query, buffer[stream_name])
|
||||
|
||||
con.commit()
|
||||
@@ -120,13 +123,18 @@ class DestinationDuckdb(Destination):
|
||||
continue
|
||||
|
||||
# add to buffer
|
||||
buffer[stream].append((str(uuid.uuid4()), datetime.datetime.now().isoformat(), json.dumps(data)))
|
||||
buffer[stream].append(
|
||||
(
|
||||
str(uuid.uuid4()),
|
||||
datetime.datetime.now().isoformat(),
|
||||
json.dumps(data),
|
||||
)
|
||||
)
|
||||
else:
|
||||
logger.info(f"Message type {message.type} not supported, skipping")
|
||||
|
||||
# flush any remaining messages
|
||||
for stream_name in buffer.keys():
|
||||
|
||||
query = """
|
||||
INSERT INTO {table_name}
|
||||
VALUES (?,?,?)
|
||||
@@ -150,11 +158,16 @@ class DestinationDuckdb(Destination):
|
||||
:return: AirbyteConnectionStatus indicating a Success or Failure
|
||||
"""
|
||||
try:
|
||||
# parse the destination path
|
||||
param_path = config.get("destination_path")
|
||||
path = self._get_destination_path(param_path)
|
||||
path = config.get("destination_path")
|
||||
path = self._get_destination_path(path)
|
||||
|
||||
if path.startswith("/local"):
|
||||
logger.info(f"Using DuckDB file at {path}")
|
||||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||
|
||||
if "motherduck_api_key" in config:
|
||||
os.environ["motherduck_token"] = config["motherduck_api_key"]
|
||||
|
||||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||||
con = duckdb.connect(database=path, read_only=False)
|
||||
con.execute("SELECT 1;")
|
||||
|
||||
|
||||
@@ -1,9 +1,5 @@
|
||||
{
|
||||
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/duckdb",
|
||||
"supported_destination_sync_modes": ["overwrite", "append"],
|
||||
"supportsIncremental": true,
|
||||
"supportsDBT": true,
|
||||
"supportsNormalization": true,
|
||||
"connectionSpecification": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"title": "Destination Duckdb",
|
||||
@@ -11,16 +7,28 @@
|
||||
"required": ["destination_path"],
|
||||
"additionalProperties": true,
|
||||
"properties": {
|
||||
"destination_path": {
|
||||
"motherduck_api_key": {
|
||||
"title": "MotherDuck API Key",
|
||||
"type": "string",
|
||||
"description": "Path to the .duckdb file. The file will be placed inside that local mount. For more information check out our <a href=\"https://docs.airbyte.io/integrations/destinations/duckdb\">docs</a>",
|
||||
"example": "/local/destination.duckdb"
|
||||
"description": "API key to use for authentication to a MotherDuck database.",
|
||||
"airbyte_secret": true
|
||||
},
|
||||
"destination_path": {
|
||||
"title": "Destination DB",
|
||||
"type": "string",
|
||||
"description": "Path to the .duckdb file, or the text 'md:' to connect to MotherDuck. The file will be placed inside that local mount. For more information check out our <a href=\"https://docs.airbyte.io/integrations/destinations/duckdb\">docs</a>",
|
||||
"examples": ["/local/destination.duckdb", "md:", "motherduck:"]
|
||||
},
|
||||
"schema": {
|
||||
"title": "Destination Schema",
|
||||
"type": "string",
|
||||
"description": "database schema, default for duckdb is main",
|
||||
"description": "Database schema name, default for duckdb is 'main'.",
|
||||
"example": "main"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"supportsIncremental": true,
|
||||
"supportsNormalization": true,
|
||||
"supportsDBT": true,
|
||||
"supported_destination_sync_modes": ["overwrite", "append"]
|
||||
}
|
||||
|
||||
@@ -2,12 +2,15 @@
|
||||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import random
|
||||
import string
|
||||
import tempfile
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
@@ -27,6 +30,44 @@ from airbyte_cdk.models import (
|
||||
)
|
||||
from destination_duckdb import DestinationDuckdb
|
||||
|
||||
CONFIG_PATH = "integration_tests/config.json"
|
||||
|
||||
|
||||
def pytest_generate_tests(metafunc):
|
||||
if "config" not in metafunc.fixturenames:
|
||||
return
|
||||
|
||||
configs: list[str] = ["local_file_config"]
|
||||
if Path(CONFIG_PATH).is_file():
|
||||
configs.append("motherduck_config")
|
||||
else:
|
||||
print(
|
||||
f"Skipping MotherDuck tests because config file not found at: {CONFIG_PATH}"
|
||||
)
|
||||
|
||||
# for test_name in ["test_check_succeeds", "test_write"]:
|
||||
metafunc.parametrize("config", configs, indirect=True)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def config(request) -> Dict[str, str]:
|
||||
# create a file "myfile" in "mydir" in temp directory
|
||||
if request.param == "local_file_config":
|
||||
tmp_dir = tempfile.TemporaryDirectory()
|
||||
test = os.path.join(str(tmp_dir.name), "test.duckdb")
|
||||
yield {"destination_path": test}
|
||||
|
||||
elif request.param == "motherduck_config":
|
||||
yield json.loads(Path(CONFIG_PATH).read_text())
|
||||
|
||||
else:
|
||||
raise ValueError(f"Unknown config type: {request.param}")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def invalid_config() -> Dict[str, str]:
|
||||
return {"destination_path": "/destination.duckdb"}
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def disable_destination_modification(monkeypatch, request):
|
||||
@@ -36,16 +77,6 @@ def disable_destination_modification(monkeypatch, request):
|
||||
monkeypatch.setattr(DestinationDuckdb, "_get_destination_path", lambda _, x: x)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def local_file_config() -> Dict[str, str]:
|
||||
# create a file "myfile" in "mydir" in temp directory
|
||||
tmp_dir = tempfile.TemporaryDirectory()
|
||||
test = os.path.join(str(tmp_dir), "test.duckdb")
|
||||
|
||||
# f1.write_text("text to myfile")
|
||||
yield {"destination_path": test}
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def test_table_name() -> str:
|
||||
letters = string.ascii_lowercase
|
||||
@@ -60,10 +91,14 @@ def table_schema() -> str:
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def configured_catalogue(test_table_name: str, table_schema: str) -> ConfiguredAirbyteCatalog:
|
||||
def configured_catalogue(
|
||||
test_table_name: str, table_schema: str
|
||||
) -> ConfiguredAirbyteCatalog:
|
||||
append_stream = ConfiguredAirbyteStream(
|
||||
stream=AirbyteStream(
|
||||
name=test_table_name, json_schema=table_schema, supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental]
|
||||
name=test_table_name,
|
||||
json_schema=table_schema,
|
||||
supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental],
|
||||
),
|
||||
sync_mode=SyncMode.incremental,
|
||||
destination_sync_mode=DestinationSyncMode.append,
|
||||
@@ -71,17 +106,14 @@ def configured_catalogue(test_table_name: str, table_schema: str) -> ConfiguredA
|
||||
return ConfiguredAirbyteCatalog(streams=[append_stream])
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def invalid_config() -> Dict[str, str]:
|
||||
return {"destination_path": "/destination.duckdb"}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def airbyte_message1(test_table_name: str):
|
||||
return AirbyteMessage(
|
||||
type=Type.RECORD,
|
||||
record=AirbyteRecordMessage(
|
||||
stream=test_table_name, data={"key1": "value1", "key2": 3}, emitted_at=int(datetime.now().timestamp()) * 1000
|
||||
stream=test_table_name,
|
||||
data={"key1": "value1", "key2": 3},
|
||||
emitted_at=int(datetime.now().timestamp()) * 1000,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -91,28 +123,31 @@ def airbyte_message2(test_table_name: str):
|
||||
return AirbyteMessage(
|
||||
type=Type.RECORD,
|
||||
record=AirbyteRecordMessage(
|
||||
stream=test_table_name, data={"key1": "value2", "key2": 2}, emitted_at=int(datetime.now().timestamp()) * 1000
|
||||
stream=test_table_name,
|
||||
data={"key1": "value2", "key2": 2},
|
||||
emitted_at=int(datetime.now().timestamp()) * 1000,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def airbyte_message3():
|
||||
return AirbyteMessage(type=Type.STATE, state=AirbyteStateMessage(data={"state": "1"}))
|
||||
return AirbyteMessage(
|
||||
type=Type.STATE, state=AirbyteStateMessage(data={"state": "1"})
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("config", ["invalid_config"])
|
||||
@pytest.mark.disable_autouse
|
||||
def test_check_fails(config, request):
|
||||
config = request.getfixturevalue(config)
|
||||
def test_check_fails(invalid_config, request):
|
||||
destination = DestinationDuckdb()
|
||||
status = destination.check(logger=MagicMock(), config=config)
|
||||
status = destination.check(logger=MagicMock(), config=invalid_config)
|
||||
assert status.status == Status.FAILED
|
||||
|
||||
|
||||
@pytest.mark.parametrize("config", ["local_file_config"])
|
||||
def test_check_succeeds(config, request):
|
||||
config = request.getfixturevalue(config)
|
||||
def test_check_succeeds(
|
||||
config: dict[str, str],
|
||||
request,
|
||||
):
|
||||
destination = DestinationDuckdb()
|
||||
status = destination.check(logger=MagicMock(), config=config)
|
||||
assert status.status == Status.SUCCEEDED
|
||||
@@ -122,7 +157,6 @@ def _state(data: Dict[str, Any]) -> AirbyteMessage:
|
||||
return AirbyteMessage(type=Type.STATE, state=AirbyteStateMessage(data=data))
|
||||
|
||||
|
||||
@pytest.mark.parametrize("config", ["local_file_config"])
|
||||
def test_write(
|
||||
config: Dict[str, str],
|
||||
request,
|
||||
@@ -132,9 +166,12 @@ def test_write(
|
||||
airbyte_message3: AirbyteMessage,
|
||||
test_table_name: str,
|
||||
):
|
||||
config = request.getfixturevalue(config)
|
||||
destination = DestinationDuckdb()
|
||||
generator = destination.write(config, configured_catalogue, [airbyte_message1, airbyte_message2, airbyte_message3])
|
||||
generator = destination.write(
|
||||
config,
|
||||
configured_catalogue,
|
||||
[airbyte_message1, airbyte_message2, airbyte_message3],
|
||||
)
|
||||
|
||||
result = list(generator)
|
||||
assert len(result) == 1
|
||||
|
||||
@@ -0,0 +1,34 @@
|
||||
{
|
||||
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/duckdb",
|
||||
"connectionSpecification": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"title": "Destination Duckdb",
|
||||
"type": "object",
|
||||
"required": ["destination_path"],
|
||||
"additionalProperties": true,
|
||||
"properties": {
|
||||
"motherduck_api_key": {
|
||||
"title": "MotherDuck API Key",
|
||||
"type": "string",
|
||||
"description": "API key to use for authentication to a MotherDuck database.",
|
||||
"airbyte_secret": true
|
||||
},
|
||||
"destination_path": {
|
||||
"title": "Destination DB",
|
||||
"type": "string",
|
||||
"description": "Path to the .duckdb file, or the text 'md:' to connect to MotherDuck. The file will be placed inside that local mount. For more information check out our <a href=\"https://docs.airbyte.io/integrations/destinations/duckdb\">docs</a>",
|
||||
"examples": ["/local/destination.duckdb", "md:", "motherduck:"]
|
||||
},
|
||||
"schema": {
|
||||
"title": "Destination Schema",
|
||||
"type": "string",
|
||||
"description": "Database schema name, default for duckdb is 'main'.",
|
||||
"example": "main"
|
||||
}
|
||||
}
|
||||
},
|
||||
"supportsIncremental": true,
|
||||
"supportsNormalization": true,
|
||||
"supportsDBT": true,
|
||||
"supported_destination_sync_modes": ["overwrite", "append"]
|
||||
}
|
||||
@@ -2,7 +2,7 @@ data:
|
||||
connectorSubtype: database
|
||||
connectorType: destination
|
||||
definitionId: 94bd199c-2ff0-4aa2-b98e-17f0acb72610
|
||||
dockerImageTag: 0.1.0
|
||||
dockerImageTag: 0.2.0
|
||||
dockerRepository: airbyte/destination-duckdb
|
||||
githubIssueLabel: destination-duckdb
|
||||
icon: duckdb.svg
|
||||
@@ -10,7 +10,7 @@ data:
|
||||
name: DuckDB
|
||||
registries:
|
||||
cloud:
|
||||
enabled: false
|
||||
enabled: true
|
||||
oss:
|
||||
enabled: true
|
||||
releaseStage: alpha
|
||||
|
||||
1115
airbyte-integrations/connectors/destination-duckdb/poetry.lock
generated
Normal file
1115
airbyte-integrations/connectors/destination-duckdb/poetry.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,22 @@
|
||||
[tool.poetry]
|
||||
name = "destination-duckdb"
|
||||
version = "0.2.1"
|
||||
description = "Destination implementation for Duckdb."
|
||||
authors = ["Simon Späti, Airbyte"]
|
||||
license = "MIT"
|
||||
readme = "README.md"
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
python = ">=3.8"
|
||||
airbyte-cdk = "^0.51.6"
|
||||
duckdb = "^0.8.1"
|
||||
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
pytest = "^7.4.0"
|
||||
ruff = "^0.0.286"
|
||||
black = "^23.7.0"
|
||||
mypy = "^1.5.1"
|
||||
|
||||
[build-system]
|
||||
requires = ["poetry-core"]
|
||||
build-backend = "poetry.core.masonry.api"
|
||||
@@ -1 +0,0 @@
|
||||
-e .
|
||||
@@ -1,23 +0,0 @@
|
||||
#
|
||||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
|
||||
#
|
||||
|
||||
|
||||
from setuptools import find_packages, setup
|
||||
|
||||
MAIN_REQUIREMENTS = ["airbyte-cdk", "duckdb"] # duckdb added manually to dockerfile due to lots of errors
|
||||
|
||||
TEST_REQUIREMENTS = ["pytest~=6.1"]
|
||||
|
||||
setup(
|
||||
name="destination_duckdb",
|
||||
description="Destination implementation for Duckdb.",
|
||||
author="Simon Späti",
|
||||
author_email="contact@airbyte.io",
|
||||
packages=find_packages(),
|
||||
install_requires=MAIN_REQUIREMENTS,
|
||||
package_data={"": ["*.json"]},
|
||||
extras_require={
|
||||
"tests": TEST_REQUIREMENTS,
|
||||
},
|
||||
)
|
||||
@@ -7,7 +7,6 @@ from destination_duckdb import DestinationDuckdb
|
||||
|
||||
|
||||
def test_read_invalid_path():
|
||||
|
||||
invalid_input = "/test.duckdb"
|
||||
with pytest.raises(ValueError):
|
||||
_ = DestinationDuckdb._get_destination_path(invalid_input)
|
||||
|
||||
@@ -1,29 +1,53 @@
|
||||
# DuckDB
|
||||
|
||||
:::danger
|
||||
<!-- env:cloud -->
|
||||
|
||||
This destination is meant to be used on a local workstation and won't work on Kubernetes
|
||||
:::caution
|
||||
|
||||
Local file-based DBs will not work in Airbyte Cloud or Kubernetes. Please use MotherDuck when running in Airbyte Cloud.
|
||||
|
||||
:::
|
||||
|
||||
<!-- /env:cloud -->
|
||||
|
||||
## Overview
|
||||
|
||||
[DuckDB](https://duckdb.org/) is an in-process SQL OLAP database management system and this destination is meant to use locally if you have multiple smaller sources such as GitHub repos, some social media and local CSVs or files you want to run analytics workloads on.
|
||||
[DuckDB](https://duckdb.org/) is an in-process SQL OLAP database management system and this destination is meant to use locally if you have multiple smaller sources such as GitHub repos, some social media and local CSVs or files you want to run analytics workloads on. This destination writes data to the [MotherDuck](https://motherduck.com) service, or to a file on the _local_ filesystem on the host running Airbyte.
|
||||
|
||||
This destination writes data to a file on the _local_ filesystem on the host running Airbyte. By default, data is written to `/tmp/airbyte_local`. To change this location, modify the `LOCAL_ROOT` environment variable for Airbyte.
|
||||
For file-based DBs, data is written to `/tmp/airbyte_local` by default. To change this location, modify the `LOCAL_ROOT` environment variable for Airbyte.
|
||||
|
||||
## Use with MotherDuck
|
||||
|
||||
This DuckDB destination is compatible with [MotherDuck](https://motherduck.com).
|
||||
|
||||
### Specifying a MotherDuck Database
|
||||
|
||||
To specify a MotherDuck-hosted database as your destination, simply provide your database uri with the normal `md:` database prefix in the `destination_path` configuration option.
|
||||
|
||||
:::caution
|
||||
|
||||
We do not recommend providing your API token in the `md:` connection string, as this may cause your token to be printed to execution logs. Please use the `MotherDuck API Key`` setting instead.
|
||||
|
||||
:::
|
||||
|
||||
### Authenticating to MotherDuck
|
||||
|
||||
For authentication, you can can provide your [MotherDuck Service Credential](https://motherduck.com/docs/authenticating-to-motherduck/#syntax) as the `motherduck_api_key` configuration option.
|
||||
|
||||
### Sync Overview
|
||||
|
||||
#### Output schema
|
||||
|
||||
If you set [Normalization](https://docs.airbyte.com/understanding-airbyte/basic-normalization/), source data will be normalized to a tabular form. Let's say you have a source such as GitHub with nested JSONs; the Normalization ensures you end up with tables and columns. Suppose you have a many-to-many relationship between the users and commits. Normalization will create separate tables for it. The end state is the [third normal form](https://en.wikipedia.org/wiki/Third_normal_form) (3NF).
|
||||
|
||||
Each table will contain 3 columns:
|
||||
|
||||
- `_airbyte_ab_id`: a uuid assigned by Airbyte to each event that is processed.
|
||||
- `_airbyte_emitted_at`: a timestamp representing when the event was pulled from the data source.
|
||||
- `_airbyte_data`: a json blob representing with the event data.
|
||||
|
||||
### Normalization
|
||||
|
||||
If you set [Normalization](https://docs.airbyte.com/understanding-airbyte/basic-normalization/), source data will be normalized to a tabular form. Let's say you have a source such as GitHub with nested JSONs; the Normalization ensures you end up with tables and columns. Suppose you have a many-to-many relationship between the users and commits. Normalization will create separate tables for it. The end state is the [third normal form](https://en.wikipedia.org/wiki/Third_normal_form) (3NF).
|
||||
|
||||
#### Features
|
||||
|
||||
| Feature | Supported | |
|
||||
@@ -37,7 +61,9 @@ Each table will contain 3 columns:
|
||||
|
||||
This integration will be constrained by the speed at which your filesystem accepts writes.
|
||||
|
||||
## Getting Started
|
||||
<!-- env:oss -->
|
||||
|
||||
## Getting Started with Local Database Files
|
||||
|
||||
The `destination_path` will always start with `/local` whether it is specified by the user or not. Any directory nesting within local will be mapped onto the local mount.
|
||||
|
||||
@@ -74,8 +100,11 @@ docker cp airbyte-server:/tmp/airbyte_local/{destination_path} .
|
||||
|
||||
Note: If you are running Airbyte on Windows with Docker backed by WSL2, you have to use similar step as above or refer to this [link](../../operator-guides/locating-files-local-destination.md) for an alternative approach.
|
||||
|
||||
<!-- /env:oss -->
|
||||
|
||||
## Changelog
|
||||
|
||||
| Version | Date | Pull Request | Subject |
|
||||
| :------ | :--------- | :------------------------------------------------------- | :--------------------- |
|
||||
| 0.2.0 | 2022-10-14 | [](https://github.com/airbytehq/airbyte/pull/) | Add support for MotherDuck |
|
||||
| 0.1.0 | 2022-10-14 | [17494](https://github.com/airbytehq/airbyte/pull/17494) | New DuckDB destination |
|
||||
|
||||
Reference in New Issue
Block a user