Add duckdb support (#7548)

This commit is contained in:
Zafer Balkan
2025-10-01 17:27:13 +03:00
committed by GitHub
parent e0410e2ffe
commit 1cc200843c
6 changed files with 330 additions and 1 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.0 KiB

48
poetry.lock generated
View File

@@ -1195,6 +1195,52 @@ idna = ["idna (>=3.6)"]
trio = ["trio (>=0.23)"]
wmi = ["wmi (>=1.5.1)"]
[[package]]
name = "duckdb"
version = "1.3.2"
description = "DuckDB in-process database"
optional = false
python-versions = ">=3.7.0"
groups = ["all_ds"]
files = [
{file = "duckdb-1.3.2-cp310-cp310-macosx_12_0_arm64.whl", hash = "sha256:14676651b86f827ea10bf965eec698b18e3519fdc6266d4ca849f5af7a8c315e"},
{file = "duckdb-1.3.2-cp310-cp310-macosx_12_0_universal2.whl", hash = "sha256:e584f25892450757919639b148c2410402b17105bd404017a57fa9eec9c98919"},
{file = "duckdb-1.3.2-cp310-cp310-macosx_12_0_x86_64.whl", hash = "sha256:84a19f185ee0c5bc66d95908c6be19103e184b743e594e005dee6f84118dc22c"},
{file = "duckdb-1.3.2-cp310-cp310-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:186fc3f98943e97f88a1e501d5720b11214695571f2c74745d6e300b18bef80e"},
{file = "duckdb-1.3.2-cp310-cp310-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:6b7e6bb613b73745f03bff4bb412f362d4a1e158bdcb3946f61fd18e9e1a8ddf"},
{file = "duckdb-1.3.2-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:1c90646b52a0eccda1f76b10ac98b502deb9017569e84073da00a2ab97763578"},
{file = "duckdb-1.3.2-cp310-cp310-win_amd64.whl", hash = "sha256:4cdffb1e60defbfa75407b7f2ccc322f535fd462976940731dfd1644146f90c6"},
{file = "duckdb-1.3.2-cp311-cp311-macosx_12_0_arm64.whl", hash = "sha256:e1872cf63aae28c3f1dc2e19b5e23940339fc39fb3425a06196c5d00a8d01040"},
{file = "duckdb-1.3.2-cp311-cp311-macosx_12_0_universal2.whl", hash = "sha256:db256c206056468ae6a9e931776bdf7debaffc58e19a0ff4fa9e7e1e82d38b3b"},
{file = "duckdb-1.3.2-cp311-cp311-macosx_12_0_x86_64.whl", hash = "sha256:1d57df2149d6e4e0bd5198689316c5e2ceec7f6ac0a9ec11bc2b216502a57b34"},
{file = "duckdb-1.3.2-cp311-cp311-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:54f76c8b1e2a19dfe194027894209ce9ddb073fd9db69af729a524d2860e4680"},
{file = "duckdb-1.3.2-cp311-cp311-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:45bea70b3e93c6bf766ce2f80fc3876efa94c4ee4de72036417a7bd1e32142fe"},
{file = "duckdb-1.3.2-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:003f7d36f0d8a430cb0e00521f18b7d5ee49ec98aaa541914c6d0e008c306f1a"},
{file = "duckdb-1.3.2-cp311-cp311-win_amd64.whl", hash = "sha256:0eb210cedf08b067fa90c666339688f1c874844a54708562282bc54b0189aac6"},
{file = "duckdb-1.3.2-cp312-cp312-macosx_12_0_arm64.whl", hash = "sha256:2455b1ffef4e3d3c7ef8b806977c0e3973c10ec85aa28f08c993ab7f2598e8dd"},
{file = "duckdb-1.3.2-cp312-cp312-macosx_12_0_universal2.whl", hash = "sha256:9d0ae509713da3461c000af27496d5413f839d26111d2a609242d9d17b37d464"},
{file = "duckdb-1.3.2-cp312-cp312-macosx_12_0_x86_64.whl", hash = "sha256:72ca6143d23c0bf6426396400f01fcbe4785ad9ceec771bd9a4acc5b5ef9a075"},
{file = "duckdb-1.3.2-cp312-cp312-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:b49a11afba36b98436db83770df10faa03ebded06514cb9b180b513d8be7f392"},
{file = "duckdb-1.3.2-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:36abdfe0d1704fe09b08d233165f312dad7d7d0ecaaca5fb3bb869f4838a2d0b"},
{file = "duckdb-1.3.2-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:3380aae1c4f2af3f37b0bf223fabd62077dd0493c84ef441e69b45167188e7b6"},
{file = "duckdb-1.3.2-cp312-cp312-win_amd64.whl", hash = "sha256:11af73963ae174aafd90ea45fb0317f1b2e28a7f1d9902819d47c67cc957d49c"},
{file = "duckdb-1.3.2-cp313-cp313-macosx_12_0_arm64.whl", hash = "sha256:a3418c973b06ac4e97f178f803e032c30c9a9f56a3e3b43a866f33223dfbf60b"},
{file = "duckdb-1.3.2-cp313-cp313-macosx_12_0_universal2.whl", hash = "sha256:2a741eae2cf110fd2223eeebe4151e22c0c02803e1cfac6880dbe8a39fecab6a"},
{file = "duckdb-1.3.2-cp313-cp313-macosx_12_0_x86_64.whl", hash = "sha256:51e62541341ea1a9e31f0f1ade2496a39b742caf513bebd52396f42ddd6525a0"},
{file = "duckdb-1.3.2-cp313-cp313-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:b3e519de5640e5671f1731b3ae6b496e0ed7e4de4a1c25c7a2f34c991ab64d71"},
{file = "duckdb-1.3.2-cp313-cp313-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:4732fb8cc60566b60e7e53b8c19972cb5ed12d285147a3063b16cc64a79f6d9f"},
{file = "duckdb-1.3.2-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:97f7a22dcaa1cca889d12c3dc43a999468375cdb6f6fe56edf840e062d4a8293"},
{file = "duckdb-1.3.2-cp313-cp313-win_amd64.whl", hash = "sha256:cd3d717bf9c49ef4b1016c2216517572258fa645c2923e91c5234053defa3fb5"},
{file = "duckdb-1.3.2-cp39-cp39-macosx_12_0_arm64.whl", hash = "sha256:18862e3b8a805f2204543d42d5f103b629cb7f7f2e69f5188eceb0b8a023f0af"},
{file = "duckdb-1.3.2-cp39-cp39-macosx_12_0_universal2.whl", hash = "sha256:75ed129761b6159f0b8eca4854e496a3c4c416e888537ec47ff8eb35fda2b667"},
{file = "duckdb-1.3.2-cp39-cp39-macosx_12_0_x86_64.whl", hash = "sha256:875193ae9f718bc80ab5635435de5b313e3de3ec99420a9b25275ddc5c45ff58"},
{file = "duckdb-1.3.2-cp39-cp39-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:09b5fd8a112301096668903781ad5944c3aec2af27622bd80eae54149de42b42"},
{file = "duckdb-1.3.2-cp39-cp39-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:10cb87ad964b989175e7757d7ada0b1a7264b401a79be2f828cf8f7c366f7f95"},
{file = "duckdb-1.3.2-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:4389fc3812e26977034fe3ff08d1f7dbfe6d2d8337487b4686f2b50e254d7ee3"},
{file = "duckdb-1.3.2-cp39-cp39-win_amd64.whl", hash = "sha256:07952ec6f45dd3c7db0f825d231232dc889f1f2490b97a4e9b7abb6830145a19"},
{file = "duckdb-1.3.2.tar.gz", hash = "sha256:c658df8a1bc78704f702ad0d954d82a1edd4518d7a04f00027ec53e40f591ff5"},
]
[[package]]
name = "e6data-python-connector"
version = "1.1.9"
@@ -5966,4 +6012,4 @@ testing = ["coverage[toml]", "zope.event", "zope.testing"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.8,<3.11"
content-hash = "ee33f4494c213575cfae29a43d3a9e76deabae423c84779286a768f1a2b15278"
content-hash = "562c89639f1cfbe1214b8cf6852c9dfdc5a3e988fd92ca50e387d4ae3e95d164"

View File

@@ -104,6 +104,7 @@ certifi = ">=2019.9.11"
cmem-cmempy = "21.2.3"
databend-py = "0.4.6"
databend-sqlalchemy = "0.2.4"
duckdb = "1.3.2"
google-api-python-client = "1.7.11"
gspread = "5.11.2"
impyla = "0.16.0"

View File

@@ -0,0 +1,174 @@
import logging
from redash.query_runner import (
TYPE_BOOLEAN,
TYPE_DATE,
TYPE_DATETIME,
TYPE_FLOAT,
TYPE_INTEGER,
TYPE_STRING,
BaseSQLQueryRunner,
InterruptException,
register,
)
logger = logging.getLogger(__name__)
try:
import duckdb
enabled = True
except ImportError:
enabled = False
# Map DuckDB types to Redash column types
TYPES_MAP = {
"BOOLEAN": TYPE_BOOLEAN,
"TINYINT": TYPE_INTEGER,
"SMALLINT": TYPE_INTEGER,
"INTEGER": TYPE_INTEGER,
"BIGINT": TYPE_INTEGER,
"HUGEINT": TYPE_INTEGER,
"REAL": TYPE_FLOAT,
"DOUBLE": TYPE_FLOAT,
"DECIMAL": TYPE_FLOAT,
"VARCHAR": TYPE_STRING,
"BLOB": TYPE_STRING,
"DATE": TYPE_DATE,
"TIMESTAMP": TYPE_DATETIME,
"TIMESTAMP WITH TIME ZONE": TYPE_DATETIME,
"TIME": TYPE_DATETIME,
"INTERVAL": TYPE_STRING,
"UUID": TYPE_STRING,
"JSON": TYPE_STRING,
"STRUCT": TYPE_STRING,
"MAP": TYPE_STRING,
"UNION": TYPE_STRING,
}
class DuckDB(BaseSQLQueryRunner):
noop_query = "SELECT 1"
def __init__(self, configuration):
super().__init__(configuration)
self.dbpath = configuration.get("dbpath", ":memory:")
exts = configuration.get("extensions", "")
self.extensions = [e.strip() for e in exts.split(",") if e.strip()]
self._connect()
@classmethod
def configuration_schema(cls):
return {
"type": "object",
"properties": {
"dbpath": {
"type": "string",
"title": "Database Path",
"default": ":memory:",
},
"extensions": {"type": "string", "title": "Extensions (comma separated)"},
},
"order": ["dbpath", "extensions"],
"required": ["dbpath"],
}
@classmethod
def enabled(cls) -> bool:
return enabled
def _connect(self) -> None:
self.con = duckdb.connect(self.dbpath)
for ext in self.extensions:
try:
if "." in ext:
prefix, name = ext.split(".", 1)
if prefix == "community":
self.con.execute(f"INSTALL {name} FROM community")
self.con.execute(f"LOAD {name}")
else:
raise Exception("Unknown extension prefix.")
else:
self.con.execute(f"INSTALL {ext}")
self.con.execute(f"LOAD {ext}")
except Exception as e:
logger.warning("Failed to load extension %s: %s", ext, e)
def run_query(self, query, user) -> tuple:
try:
cursor = self.con.cursor()
cursor.execute(query)
columns = self.fetch_columns(
[(d[0], TYPES_MAP.get(d[1].upper(), TYPE_STRING)) for d in cursor.description]
)
rows = [dict(zip((col["name"] for col in columns), row)) for row in cursor.fetchall()]
data = {"columns": columns, "rows": rows}
return data, None
except duckdb.InterruptException:
raise InterruptException("Query cancelled by user.")
except Exception as e:
logger.exception("Error running query: %s", e)
return None, str(e)
def get_schema(self, get_stats=False) -> list:
tables_query = """
SELECT table_schema, table_name FROM information_schema.tables
WHERE table_schema NOT IN ('information_schema', 'pg_catalog');
"""
tables_results, error = self.run_query(tables_query, None)
if error:
raise Exception(f"Failed to get tables: {error}")
schema = {}
for table_row in tables_results["rows"]:
full_table_name = f"{table_row['table_schema']}.{table_row['table_name']}"
schema[full_table_name] = {"name": full_table_name, "columns": []}
describe_query = f'DESCRIBE "{table_row["table_schema"]}"."{table_row["table_name"]}";'
columns_results, error = self.run_query(describe_query, None)
if error:
logger.warning("Failed to describe table %s: %s", full_table_name, error)
continue
for col_row in columns_results["rows"]:
col = {"name": col_row["column_name"], "type": col_row["column_type"]}
schema[full_table_name]["columns"].append(col)
if col_row["column_type"].startswith("STRUCT("):
schema[full_table_name]["columns"].extend(
self._expand_struct_fields(col["name"], col_row["column_type"])
)
return list(schema.values())
def _expand_struct_fields(self, base_name: str, struct_type: str) -> list:
"""Recursively expand STRUCT(...) definitions into pseudo-columns."""
fields = []
# strip STRUCT( ... )
inner = struct_type[len("STRUCT(") : -1].strip()
# careful: nested structs, so parse comma-separated parts properly
depth, current, parts = 0, [], []
for c in inner:
if c == "(":
depth += 1
elif c == ")":
depth -= 1
if c == "," and depth == 0:
parts.append("".join(current).strip())
current = []
else:
current.append(c)
if current:
parts.append("".join(current).strip())
for part in parts:
# each part looks like: "fieldname TYPE"
fname, ftype = part.split(" ", 1)
colname = f"{base_name}.{fname}"
fields.append({"name": colname, "type": ftype})
if ftype.startswith("STRUCT("):
fields.extend(self._expand_struct_fields(colname, ftype))
return fields
register(DuckDB)

View File

@@ -348,6 +348,7 @@ default_query_runners = [
"redash.query_runner.oracle",
"redash.query_runner.e6data",
"redash.query_runner.risingwave",
"redash.query_runner.duckdb",
]
enabled_query_runners = array_from_string(

View File

@@ -0,0 +1,107 @@
from unittest import TestCase
from unittest.mock import patch
from redash.query_runner.duckdb import DuckDB
class TestDuckDBSchema(TestCase):
def setUp(self) -> None:
self.runner = DuckDB({"dbpath": ":memory:"})
@patch.object(DuckDB, "run_query")
def test_simple_schema_build(self, mock_run_query) -> None:
# Simulate queries: first for tables, then for DESCRIBE
mock_run_query.side_effect = [
(
{"rows": [{"table_schema": "main", "table_name": "users"}]},
None,
),
(
{
"rows": [
{"column_name": "id", "column_type": "INTEGER"},
{"column_name": "name", "column_type": "VARCHAR"},
]
},
None,
),
]
schema = self.runner.get_schema()
self.assertEqual(len(schema), 1)
self.assertEqual(schema[0]["name"], "main.users")
self.assertListEqual(
schema[0]["columns"],
[{"name": "id", "type": "INTEGER"}, {"name": "name", "type": "VARCHAR"}],
)
@patch.object(DuckDB, "run_query")
def test_struct_column_expansion(self, mock_run_query) -> None:
# First call to run_query -> tables list
mock_run_query.side_effect = [
(
{"rows": [{"table_schema": "main", "table_name": "events"}]},
None,
),
# Second call -> DESCRIBE output
(
{
"rows": [
{
"column_name": "payload",
"column_type": "STRUCT(a INTEGER, b VARCHAR)",
}
]
},
None,
),
]
schema_list = self.runner.get_schema()
self.assertEqual(len(schema_list), 1)
schema = schema_list[0]
# Ensure both raw and expanded struct fields are present
self.assertIn("main.events", schema["name"])
self.assertListEqual(
schema["columns"],
[
{"name": "payload", "type": "STRUCT(a INTEGER, b VARCHAR)"},
{"name": "payload.a", "type": "INTEGER"},
{"name": "payload.b", "type": "VARCHAR"},
],
)
def test_nested_struct_expansion(self) -> None:
runner = DuckDB({"dbpath": ":memory:"})
runner.con.execute(
"""
CREATE TABLE sample_struct_table (
id INTEGER,
info STRUCT(
name VARCHAR,
metrics STRUCT(score DOUBLE, rank INTEGER),
tags STRUCT(primary_tag VARCHAR, secondary_tag VARCHAR)
)
);
"""
)
schema = runner.get_schema()
table = next(t for t in schema if t["name"] == "main.sample_struct_table")
colnames = [c["name"] for c in table["columns"]]
assert "info" in colnames
assert 'info."name"' in colnames
assert "info.metrics" in colnames
assert "info.metrics.score" in colnames
assert "info.metrics.rank" in colnames
assert "info.tags.primary_tag" in colnames
assert "info.tags.secondary_tag" in colnames
@patch.object(DuckDB, "run_query")
def test_error_propagation(self, mock_run_query) -> None:
mock_run_query.return_value = (None, "boom")
with self.assertRaises(Exception) as ctx:
self.runner.get_schema()
self.assertIn("boom", str(ctx.exception))