Adds influxdb v2 query runner. (#6646)

* Adds influxdb v2 query runner.

- Adds test cases
- Adds influxdb v2 icon
- Updates python dependencies

* Fixes import order.

* Fixes code formatting for black tool.

* Adds influxdb version 2 support in readme.

---------

Co-authored-by: Fabian Reiber <reiber@dfn-cert.de>
Co-authored-by: Masayuki Takahashi <masayuki038@gmail.com>
This commit is contained in:
fabrei
2023-12-12 15:01:50 +01:00
committed by GitHub
parent 9bbdb4b765
commit 66ef942572
6 changed files with 1038 additions and 387 deletions

View File

@@ -61,6 +61,7 @@ Redash supports more than 35 SQL and NoSQL [data sources](https://redash.io/help
- Apache Hive
- Apache Impala
- InfluxDB
- InfluxDBv2
- IBM Netezza Performance Server
- JIRA (JQL)
- JSON

Binary file not shown.

After

Width:  |  Height:  |  Size: 18 KiB

857
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -104,6 +104,7 @@ google-api-python-client = "1.7.11"
gspread = "5.11.2"
impyla = "0.16.0"
influxdb = "5.2.3"
influxdb-client = "1.38.0"
memsql = "3.2.0"
mysqlclient = "2.1.1"
nzalchemy = "^11.0.2"

View File

@@ -0,0 +1,218 @@
import logging
import os
from base64 import b64decode
from tempfile import NamedTemporaryFile
from typing import Dict, Optional, Tuple, Type, TypeVar
from redash.query_runner import (
TYPE_BOOLEAN,
TYPE_DATETIME,
TYPE_FLOAT,
TYPE_INTEGER,
TYPE_STRING,
BaseQueryRunner,
register,
)
from redash.utils import json_dumps
try:
from influxdb_client import InfluxDBClient
from influxdb_client.client.flux_table import TableList
enabled = True
except ImportError:
enabled = False
logger = logging.getLogger(__name__)
T = TypeVar("T")
TYPES_MAP = {
"integer": TYPE_INTEGER,
"long": TYPE_INTEGER,
"float": TYPE_FLOAT,
"double": TYPE_FLOAT,
"boolean": TYPE_BOOLEAN,
"string": TYPE_STRING,
"datetime:RFC3339": TYPE_DATETIME,
}
class InfluxDBv2(BaseQueryRunner):
"""
Query runner for influxdb version 2.
"""
should_annotate_query = False
def _get_influx_kwargs(self) -> Dict:
"""
Determines additional arguments for influxdb client connection.
:return: An object with additional arguments for influxdb client.
"""
return {
"verify_ssl": self.configuration.get("verify_ssl", None),
"cert_file": self._create_cert_file("cert_File"),
"cert_key_file": self._create_cert_file("cert_key_File"),
"cert_key_password": self.configuration.get("cert_key_password", None),
"ssl_ca_cert": self._create_cert_file("ssl_ca_cert_File"),
}
def _create_cert_file(self, key: str) -> str:
"""
Creates a temporary file from base64 encoded content from stored
configuration in filesystem.
:param key: The key to get the content from configuration object.
:return: The name of temporary file.
"""
cert_file_name = None
if self.configuration.get(key, None) is not None:
with NamedTemporaryFile(mode="w", delete=False) as cert_file:
cert_bytes = b64decode(self.configuration[key])
cert_file.write(cert_bytes.decode("utf-8"))
cert_file_name = cert_file.name
return cert_file_name
def _cleanup_cert_files(self, influx_kwargs: Dict) -> None:
"""
Deletes temporary stored files in filesystem.
"""
for key in ["cert_file", "cert_key_file", "ssl_ca_cert"]:
cert_path = influx_kwargs.get(key, None)
if cert_path is not None and os.path.exists(cert_path):
os.remove(cert_path)
@classmethod
def configuration_schema(cls: Type[T]) -> Dict:
"""
Defines a configuration schema for this query runner.
:param cls: Object of this class.
:return: The defined configuration schema.
"""
# files has to end with "File" in name
return {
"type": "object",
"properties": {
"url": {"type": "string", "title": "URL"},
"org": {"type": "string", "title": "Organization"},
"token": {"type": "string", "title": "Token"},
"verify_ssl": {"type": "boolean", "title": "Verify SSL", "default": False},
"cert_File": {"type": "string", "title": "SSL Client Certificate", "default": None},
"cert_key_File": {"type": "string", "title": "SSL Client Key", "default": None},
"cert_key_password": {"type": "string", "title": "Password for SSL Client Key", "default": None},
"ssl_ca_cert_File": {"type": "string", "title": "SSL Root Certificate", "default": None},
},
"order": ["url", "org", "token", "cert_File", "cert_key_File", "cert_key_password", "ssl_ca_cert_File"],
"required": ["url", "org", "token"],
"secret": ["token", "cert_File", "cert_key_File", "cert_key_password", "ssl_ca_cert_File"],
"extra_options": ["verify_ssl", "cert_File", "cert_key_File", "cert_key_password", "ssl_ca_cert_File"],
}
@classmethod
def enabled(cls: Type[T]) -> bool:
"""
Determines, if this query runner is enabled or not.
:param cls: Object of this class.
:return: True, if this query runner is enabled; otherwise False.
"""
return enabled
def test_connection(self) -> None:
"""
Tests the healthiness of the influxdb instance. If it is not healthy,
it logs an error message and raises an exception with an appropriate
message.
:raises Exception: If the remote influxdb instance is not healthy.
"""
try:
influx_kwargs = self._get_influx_kwargs()
with InfluxDBClient(
url=self.configuration["url"],
token=self.configuration["token"],
org=self.configuration["org"],
**influx_kwargs,
) as client:
healthy = client.health()
if healthy.status == "fail":
logger.error("Connection test failed, due to: " f"{healthy.message!r}.")
raise Exception("InfluxDB is not healthy. Check logs for more " "information.")
except Exception:
raise
finally:
self._cleanup_cert_files(influx_kwargs)
def _get_type(self, type_: str) -> str:
"""
Determines the internal type of a passed data type which the database
uses.
:param type_: The type from the database to map to internal datatype.
:return: The name of the internal datatype.
"""
return TYPES_MAP.get(type_, "string")
def _get_data_from_tables(self, tables: TableList) -> Dict:
"""
Determines the data of the given tables in an appropriate schema for
redash ui to render it. It retrieves all available columns and records
from the tables.
:param tables: A list of FluxTable instances.
:return: An object with columns and rows list.
"""
columns = []
rows = []
for table in tables:
for column in table.columns:
column_entry = {
"name": column.label,
"type": self._get_type(column.data_type),
"friendly_name": column.label.title(),
}
if column_entry not in columns:
columns.append(column_entry)
rows.extend([row.values for row in [record for record in table.records]])
return {"columns": columns, "rows": rows}
def run_query(self, query: str, user: str) -> Tuple[Optional[str], Optional[str]]:
"""
Runs a given query against the influxdb instance and returns its
result.
:param query: The query, this runner is executed.
:param user: The user who runs the query.
:return: A 2-tuple:
1. element: The queried result in an appropriate format for redash
ui. If an error occurred, it returns None.
2. element: An error message, if an error occured. None, if no
error occurred.
"""
json_data = None
error = None
try:
influx_kwargs = self._get_influx_kwargs()
with InfluxDBClient(
url=self.configuration["url"],
token=self.configuration["token"],
org=self.configuration["org"],
**influx_kwargs,
) as client:
logger.debug(f"InfluxDB got query: {query!r}")
tables = client.query_api().query(query)
data = self._get_data_from_tables(tables)
json_data = json_dumps(data)
except Exception as ex:
error = str(ex)
finally:
self._cleanup_cert_files(influx_kwargs)
return json_data, error
register(InfluxDBv2)

View File

@@ -0,0 +1,348 @@
import json
import mock
import pytest
from influxdb_client.client.flux_table import (
FluxColumn,
FluxRecord,
FluxTable,
TableList,
)
from redash.query_runner.influx_db_v2 import InfluxDBv2
@pytest.fixture()
def influx_table_list():
tables = TableList()
table_1 = FluxTable()
table_2 = FluxTable()
column_1 = FluxColumn(index=0, label="col_1", data_type="string", group=False, default_value="default_value_2")
column_2 = FluxColumn(index=0, label="col_2", data_type="integer", group=False, default_value="default_value_2")
column_3 = FluxColumn(index=1, label="col_3", data_type="float", group=False, default_value=3.0)
record_1 = FluxRecord(table_1, values={"col_1": "col_value_1", "col_2": 1})
record_1.table = column_1.index
record_1.row = ["col_value_1", 1, "field_1", "value_1"]
record_2 = FluxRecord(table_1, values={"col_1": "col_value_2", "col_2": 2})
record_2.table = column_1.index
record_2.row = ["col_value_2", 2, "field_2", "value_2"]
record_3 = FluxRecord(table_2, values={"col_3": 3.0})
record_3.table = column_1.index
record_3.row = ["col_value_1", 1, "field_1", "value_1"]
table_1.columns = [column_1, column_2]
table_1.records = [record_1, record_2]
table_2.columns = [column_3]
table_2.records = [record_3]
tables.append(table_1)
tables.append(table_2)
return tables
class TestInfluxDBv2:
@mock.patch("redash.query_runner.influx_db_v2.InfluxDBv2." "_create_cert_file")
def test_get_influx_kwargs(self, create_cert_file_mock: mock.MagicMock):
# 1. case: without ssl attributes
influx_db_v2 = InfluxDBv2({"url": "url", "token": "token", "org": "org"})
create_cert_file_mock.return_value = None
influx_kwargs = influx_db_v2._get_influx_kwargs()
assert influx_kwargs == {
"verify_ssl": None,
"cert_file": None,
"cert_key_file": None,
"cert_key_password": None,
"ssl_ca_cert": None,
}
create_cert_file_mock.assert_has_calls(
[mock.call("cert_File"), mock.call("cert_key_File"), mock.call("ssl_ca_cert_File")]
)
create_cert_file_mock.reset_mock()
# 2. case: with ssl attributes
create_cert_file_return_dict = {
"cert_File": "cert_file.crt",
"cert_key_File": "cert_key_file.key",
"ssl_ca_cert_File": "ssl_ca_cert_file.crt",
}
create_cert_file_mock.side_effect = lambda key: create_cert_file_return_dict[key]
influx_db_v2 = InfluxDBv2(
{
"url": "url",
"token": "token",
"org": "org",
"verify_ssl": True,
"cert_File": "cert_file",
"cert_key_File": "cert_key_file",
"cert_key_password": "cert_key_password",
"ssl_ca_cert_File": "ssl_ca_cert_file",
}
)
influx_kwargs = influx_db_v2._get_influx_kwargs()
assert influx_kwargs == {
"verify_ssl": True,
"cert_file": "cert_file.crt",
"cert_key_file": "cert_key_file.key",
"cert_key_password": "cert_key_password",
"ssl_ca_cert": "ssl_ca_cert_file.crt",
}
create_cert_file_mock.assert_has_calls(
[mock.call("cert_File"), mock.call("cert_key_File"), mock.call("ssl_ca_cert_File")]
)
@mock.patch("redash.query_runner.influx_db_v2.NamedTemporaryFile")
def test_create_cert_file(self, named_temporary_file_mock: mock.MagicMock):
# 1. case: with none value
influx_db_v2 = InfluxDBv2({"url": "url", "token": "token", "org": "org"})
context_manager_mock = named_temporary_file_mock().__enter__()
cert_file_name = influx_db_v2._create_cert_file("key")
assert cert_file_name is None
context_manager_mock.write().assert_not_called()
named_temporary_file_mock.reset_mock()
# 2. case: with a valid key
influx_db_v2 = InfluxDBv2({"url": "url", "token": "token", "org": "org", "key": "dmFsdWU="})
context_manager_mock = named_temporary_file_mock().__enter__()
context_manager_mock.name = "cert_file_name"
cert_file_name = influx_db_v2._create_cert_file("key")
assert cert_file_name == "cert_file_name"
context_manager_mock.write.assert_called_once_with("value")
@mock.patch("redash.query_runner.influx_db_v2.os")
def test_cleanup_cert_files(self, os_mock: mock.MagicMock):
# 1. case: no file found
influx_db_v2 = InfluxDBv2(
{
"url": "url",
"token": "token",
"org": "org",
"verify_ssl": True,
"cert_File": "cert_file",
"cert_key_File": "cert_key_file",
"cert_key_password": "cert_key_password",
"ssl_ca_cert_File": "ssl_ca_cert_file",
}
)
influx_db_v2._cleanup_cert_files({"any_file": "any_file"})
os_mock.path.exists.assert_not_called()
os_mock.remove.assert_not_called()
# 2. case: file found and deleted
os_mock.path.exists.return_value = True
influx_db_v2._cleanup_cert_files({"cert_file": "cert_file"})
os_mock.path.exists.assert_called_once_with("cert_file")
os_mock.remove.assert_called_once_with("cert_file")
def test_configuration_schema(self):
configuration_schema = InfluxDBv2.configuration_schema()
assert configuration_schema == {
"type": "object",
"properties": {
"url": {"type": "string", "title": "URL"},
"org": {"type": "string", "title": "Organization"},
"token": {"type": "string", "title": "Token"},
"verify_ssl": {"type": "boolean", "title": "Verify SSL", "default": False},
"cert_File": {"type": "string", "title": "SSL Client Certificate", "default": None},
"cert_key_File": {"type": "string", "title": "SSL Client Key", "default": None},
"cert_key_password": {"type": "string", "title": "Password for SSL Client Key", "default": None},
"ssl_ca_cert_File": {"type": "string", "title": "SSL Root Certificate", "default": None},
},
"order": ["url", "org", "token", "cert_File", "cert_key_File", "cert_key_password", "ssl_ca_cert_File"],
"required": ["url", "org", "token"],
"secret": ["token", "cert_File", "cert_key_File", "cert_key_password", "ssl_ca_cert_File"],
"extra_options": ["verify_ssl", "cert_File", "cert_key_File", "cert_key_password", "ssl_ca_cert_File"],
}
def test_enabled(self):
assert InfluxDBv2.enabled() is True
@mock.patch("redash.query_runner.influx_db_v2.InfluxDBClient")
@mock.patch("redash.query_runner.influx_db_v2.InfluxDBv2." "_cleanup_cert_files")
@mock.patch("redash.query_runner.influx_db_v2.logger")
def test_test_connection(
self,
logger_mock: mock.MagicMock,
cleanup_cert_files_mock: mock.MagicMock,
influx_db_client_mock: mock.MagicMock,
):
# 1. case: successful test connection
influx_db_v2 = InfluxDBv2({"url": "url", "token": "token", "org": "org"})
influx_kwargs = {
"verify_ssl": None,
"cert_file": None,
"cert_key_file": None,
"cert_key_password": None,
"ssl_ca_cert": None,
}
health_mock = influx_db_client_mock.return_value.__enter__().health
health_mock.return_value = mock.MagicMock(status="pass")
influx_db_v2.test_connection()
influx_db_client_mock.assert_called_once_with(url="url", token="token", org="org", **influx_kwargs)
health_mock.assert_called_once()
cleanup_cert_files_mock.assert_called_once_with(influx_kwargs)
logger_mock.error.assert_not_called()
cleanup_cert_files_mock.reset_mock()
influx_db_client_mock.reset_mock()
# 2. case: unsuccessful test connection
influx_db_v2 = InfluxDBv2({"url": "url", "token": "token", "org": "org"})
influx_kwargs = {
"verify_ssl": None,
"cert_file": None,
"cert_key_file": None,
"cert_key_password": None,
"ssl_ca_cert": None,
}
health_mock = influx_db_client_mock.return_value.__enter__().health
health_mock.return_value = mock.MagicMock(status="fail", message="Connection failed.")
with pytest.raises(Exception) as exp:
influx_db_v2.test_connection()
assert str(exp.value) == "InfluxDB is not healthy. Check logs for more information."
influx_db_client_mock.assert_called_once_with(url="url", token="token", org="org", **influx_kwargs)
health_mock.assert_called_once()
cleanup_cert_files_mock.assert_called_once_with(influx_kwargs)
logger_mock.error.assert_called_once_with("Connection test failed, due to: 'Connection failed.'.")
def test_get_type(self):
influx_db_v2 = InfluxDBv2(
{
"url": "url",
"token": "token",
"org": "org",
}
)
assert influx_db_v2._get_type("integer") == "integer"
assert influx_db_v2._get_type("long") == "integer"
assert influx_db_v2._get_type("float") == "float"
assert influx_db_v2._get_type("double") == "float"
assert influx_db_v2._get_type("boolean") == "boolean"
assert influx_db_v2._get_type("string") == "string"
assert influx_db_v2._get_type("datetime:RFC3339") == "datetime"
def test_get_data_from_tables(self, influx_table_list: TableList):
# 1. case: get object with coulmns and rows
influx_db_v2 = InfluxDBv2(
{
"url": "url",
"token": "token",
"org": "org",
}
)
data = influx_db_v2._get_data_from_tables(influx_table_list)
assert data == {
"columns": [
{"friendly_name": "Col_1", "name": "col_1", "type": "string"},
{"friendly_name": "Col_2", "name": "col_2", "type": "integer"},
{"friendly_name": "Col_3", "name": "col_3", "type": "float"},
],
"rows": [{"col_1": "col_value_1", "col_2": 1}, {"col_1": "col_value_2", "col_2": 2}, {"col_3": 3.0}],
}
# 2. case: get empty object without coulmns and rows
data = influx_db_v2._get_data_from_tables(TableList())
assert data == {"columns": [], "rows": []}
@mock.patch("redash.query_runner.influx_db_v2.InfluxDBClient")
@mock.patch("redash.query_runner.influx_db_v2.InfluxDBv2." "_cleanup_cert_files")
@mock.patch("redash.query_runner.influx_db_v2.logger")
@mock.patch("redash.query_runner.influx_db_v2.json_dumps")
def test_run_query(
self,
json_dumps_mock: mock.MagicMock,
logger_mock: mock.MagicMock,
cleanup_cert_files_mock: mock.MagicMock,
influx_db_client_mock: mock.MagicMock,
influx_table_list: TableList,
):
influx_db_v2 = InfluxDBv2(
{
"url": "url",
"token": "token",
"org": "org",
}
)
influx_kwargs = {
"verify_ssl": None,
"cert_file": None,
"cert_key_file": None,
"cert_key_password": None,
"ssl_ca_cert": None,
}
query = 'from(bucket: "test")' "|> range(start: 2023-12-04T09:00:00.000Z, " "stop: 2023-12-04T15:00:00.000Z)"
result_data = {
"columns": [
{"friendly_name": "Col_1", "name": "col_1", "type": "string"},
{"friendly_name": "Col_2", "name": "col_2", "type": "integer"},
{"friendly_name": "Col_3", "name": "col_3", "type": "float"},
],
"rows": [{"col_1": "col_value_1", "col_2": 1}, {"col_1": "col_value_2", "col_2": 2}, {"col_3": 3.0}],
}
json_dumps_data = json.dumps(result_data)
query_mock = influx_db_client_mock.return_value.__enter__().query_api().query
query_mock.return_value = influx_table_list
json_dumps_mock.return_value = json_dumps_data
# 1. case: successful query data
data, error = influx_db_v2.run_query(query, "user")
assert data == json_dumps_data
assert error is None
influx_db_client_mock.assert_called_once_with(url="url", token="token", org="org", **influx_kwargs)
logger_mock.debug.assert_called_once_with(f"InfluxDB got query: {query!r}")
query_mock.assert_called_once_with(query)
json_dumps_mock.assert_called_once_with(result_data)
cleanup_cert_files_mock.assert_called_once_with(influx_kwargs)
influx_db_client_mock.reset_mock()
logger_mock.reset_mock()
query_mock.reset_mock()
json_dumps_mock.reset_mock()
cleanup_cert_files_mock.reset_mock()
# 2. case: unsuccessful query data
query_mock.side_effect = Exception("test error")
data, error = influx_db_v2.run_query(query, "user")
assert data is None
assert error == "test error"
influx_db_client_mock.assert_called_once_with(url="url", token="token", org="org", **influx_kwargs)
logger_mock.debug.assert_called_once_with(f"InfluxDB got query: {query!r}")
query_mock.assert_called_once_with(query)
json_dumps_mock.assert_not_called()
cleanup_cert_files_mock.assert_called_once_with(influx_kwargs)