Yandex.Disk Query runner (#6598)

* Snapshot: 23.11.0-dev

* dataframe_to_result function moved outside python query runner

* added yandex disk query runner

* moved file_extension check

* skip unsupported extensions in schema

* removed unused variable

* added support for xlsx with multiple sheets

* moved pandas-converters to utils file

* added tests

* fixed backend tests

* fixed pandas to redash type conversion

* added more tests

* added tests for pandas

* added tests for pandas converter and yandex disk

* added tests for read_file and multiple sheets

* pandas: do not load if lib is not installed

* added test for yaml read

* fixed test for yaml read

---------

Co-authored-by: github-actions <github-actions@github.com>
Co-authored-by: Guido Petri <18634426+guidopetri@users.noreply.github.com>
This commit is contained in:
Vladislav Denisov
2023-11-22 05:16:29 +03:00
committed by GitHub
parent 8bfc57430d
commit a07b8a6bd3
7 changed files with 536 additions and 25 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 8.5 KiB

View File

@@ -9,6 +9,7 @@ from RestrictedPython.Guards import (
guarded_unpack_sequence,
safe_builtins,
)
from RestrictedPython.transformer import IOPERATOR_TO_STR
from redash import models
from redash.query_runner import (
@@ -23,16 +24,17 @@ from redash.query_runner import (
register,
)
from redash.utils import json_dumps, json_loads
from redash.utils.pandas import pandas_installed
try:
import numpy as np
if pandas_installed:
import pandas as pd
pandas_installed = True
except ImportError:
pandas_installed = False
from redash.utils.pandas import pandas_to_result
enabled = True
else:
enabled = False
from RestrictedPython.transformer import IOPERATOR_TO_STR
logger = logging.getLogger(__name__)
@@ -271,26 +273,11 @@ class Python(BaseQueryRunner):
return query.latest_query_data.data
def dataframe_to_result(self, result, df):
result["rows"] = df.to_dict("records")
converted_result = pandas_to_result(df)
for column_name, column_type in df.dtypes.items():
if column_type == np.bool_:
redash_type = TYPE_BOOLEAN
elif column_type == np.inexact:
redash_type = TYPE_FLOAT
elif column_type == np.integer:
redash_type = TYPE_INTEGER
elif column_type in (np.datetime64, np.dtype("<M8[ns]")):
if df.empty:
redash_type = TYPE_DATETIME
elif len(df[column_name].head(1).astype(str).loc[0]) > 10:
redash_type = TYPE_DATETIME
else:
redash_type = TYPE_DATE
else:
redash_type = TYPE_STRING
self.add_result_column(result, column_name, column_name, redash_type)
result["rows"] = converted_result["rows"]
for column in converted_result["columns"]:
self.add_result_column(result, column["name"], column["friendly_name"], column["type"])
def get_current_user(self):
return self._current_user.to_dict()

View File

@@ -0,0 +1,166 @@
import logging
from importlib.util import find_spec
import requests
import yaml
from redash.query_runner import BaseSQLQueryRunner, register
from redash.utils import json_dumps
from redash.utils.pandas import pandas_installed
openpyxl_installed = find_spec("openpyxl")
if pandas_installed and openpyxl_installed:
import openpyxl # noqa: F401
import pandas as pd
from redash.utils.pandas import pandas_to_result
enabled = True
EXTENSIONS_READERS = {
"csv": pd.read_csv,
"tsv": pd.read_table,
"xls": pd.read_excel,
"xlsx": pd.read_excel,
}
else:
enabled = False
logger = logging.getLogger(__name__)
class YandexDisk(BaseSQLQueryRunner):
should_annotate_query = False
@classmethod
def type(cls):
return "yandex_disk"
@classmethod
def name(cls):
return "Yandex Disk"
@classmethod
def configuration_schema(cls):
return {
"type": "object",
"properties": {
"token": {"type": "string", "title": "OAuth Token"},
},
"secret": ["token"],
"required": ["token"],
}
def __init__(self, configuration):
super(YandexDisk, self).__init__(configuration)
self.syntax = "yaml"
self.base_url = "https://cloud-api.yandex.net/v1/disk"
self.list_path = "counters"
def _get_tables(self, schema):
offset = 0
limit = 100
while True:
tmp_response = self._send_query(
"resources/public", media_type="spreadsheet,text", limit=limit, offset=offset
)
tmp_items = tmp_response["items"]
for file_info in tmp_items:
file_name = file_info["name"]
file_path = file_info["path"].replace("disk:", "")
file_extension = file_name.split(".")[-1].lower()
if file_extension not in EXTENSIONS_READERS:
continue
schema[file_name] = {"name": file_name, "columns": [file_path]}
if len(tmp_items) < limit:
break
offset += limit
return list(schema.values())
def test_connection(self):
self._send_query()
def _send_query(self, url_path="", **kwargs):
token = kwargs.pop("oauth_token", self.configuration["token"])
r = requests.get(
f"{self.base_url}/{url_path}",
headers={"Authorization": f"OAuth {token}"},
params=kwargs,
)
response_data = r.json()
if not r.ok:
error_message = f"Code: {r.status_code}, message: {r.text}"
raise Exception(error_message)
return response_data
def run_query(self, query, user):
logger.debug("Yandex Disk is about to execute query: %s", query)
data = None
if not query:
error = "Query is empty"
return data, error
try:
params = yaml.safe_load(query)
except (ValueError, AttributeError) as e:
logger.exception(e)
error = f"YAML read error: {str(e)}"
return data, error
if not isinstance(params, dict):
error = "The query format must be JSON or YAML"
return data, error
if "path" not in params:
error = "The query must contain path"
return data, error
file_extension = params["path"].split(".")[-1].lower()
read_params = {}
is_multiple_sheets = False
if file_extension not in EXTENSIONS_READERS:
error = f"Unsupported file extension: {file_extension}"
return data, error
elif file_extension in ("xls", "xlsx"):
read_params["sheet_name"] = params.get("sheet_name", 0)
if read_params["sheet_name"] is None:
is_multiple_sheets = True
file_url = self._send_query("resources/download", path=params["path"])["href"]
try:
df = EXTENSIONS_READERS[file_extension](file_url, **read_params)
except Exception as e:
logger.exception(e)
error = f"Read file error: {str(e)}"
return data, error
if is_multiple_sheets:
new_df = []
for sheet_name, sheet_df in df.items():
sheet_df["sheet_name"] = sheet_name
new_df.append(sheet_df)
new_df = pd.concat(new_df, ignore_index=True)
df = new_df.copy()
data = json_dumps(pandas_to_result(df))
error = None
return data, error
register(YandexDisk)

View File

@@ -298,6 +298,7 @@ default_query_runners = [
"redash.query_runner.clickhouse",
"redash.query_runner.tinybird",
"redash.query_runner.yandex_metrica",
"redash.query_runner.yandex_disk",
"redash.query_runner.rockset",
"redash.query_runner.treasuredata",
"redash.query_runner.sqlite",

47
redash/utils/pandas.py Normal file
View File

@@ -0,0 +1,47 @@
import logging
from importlib.util import find_spec
from redash.query_runner import (
TYPE_BOOLEAN,
TYPE_DATE,
TYPE_DATETIME,
TYPE_FLOAT,
TYPE_INTEGER,
TYPE_STRING,
)
logger = logging.getLogger(__name__)
pandas_installed = find_spec("pandas") and find_spec("numpy")
if pandas_installed:
import numpy as np
import pandas as pd
def get_column_types_from_dataframe(df: pd.DataFrame) -> list:
columns = []
for column_name, column_type in df.dtypes.items():
if column_type in (np.bool_,):
redash_type = TYPE_BOOLEAN
elif column_type in (np.int64, np.int32):
redash_type = TYPE_INTEGER
elif column_type in (np.float64,):
redash_type = TYPE_FLOAT
elif column_type in (np.datetime64, np.dtype("<M8[ns]")):
if df.empty:
redash_type = TYPE_DATETIME
elif len(df[column_name].head(1).astype(str).loc[0]) > 10:
redash_type = TYPE_DATETIME
else:
redash_type = TYPE_DATE
else:
redash_type = TYPE_STRING
columns.append({"name": column_name, "friendly_name": column_name, "type": redash_type})
return columns
def pandas_to_result(df: pd.DataFrame) -> dict:
columns = get_column_types_from_dataframe(df)
rows = df.to_dict("records")
return {"columns": columns, "rows": rows}

View File

@@ -0,0 +1,250 @@
from io import BytesIO
from unittest import mock
import yaml
from redash.query_runner.yandex_disk import enabled
from redash.utils import json_dumps
if enabled:
import pandas as pd
from redash.query_runner.yandex_disk import EXTENSIONS_READERS, YandexDisk
test_df = pd.DataFrame(
[
{"id": 1, "name": "Alice", "age": 20},
{"id": 2, "name": "Bob", "age": 21},
{"id": 3, "name": "Charlie", "age": 22},
{"id": 4, "name": "Dave", "age": 23},
{"id": 5, "name": "Eve", "age": 24},
]
)
import pytest
test_token = "AAAAQAA"
skip_condition = pytest.mark.skipif(not enabled, reason="pandas and/or openpyxl are not installed")
@pytest.fixture
def mock_yandex_disk():
return YandexDisk(configuration={"token": test_token})
@skip_condition
def test_yandex_disk_type():
assert YandexDisk.type() == "yandex_disk"
@skip_condition
def test_yandex_disk_name():
assert YandexDisk.name() == "Yandex Disk"
@skip_condition
@mock.patch("requests.get")
def test__send_query(mock_requests_get):
mock_requests_get.return_value.ok = True
mock_requests_get.return_value.json.return_value = {"foo": "bar"}
configuration = {"token": test_token}
disk = YandexDisk(configuration)
response = disk._send_query("test_url")
assert response == {"foo": "bar"}
mock_requests_get.assert_called_once()
@skip_condition
@pytest.mark.parametrize(
"configuration, error_message",
[({"token": test_token}, None), ({"token": ""}, "Code: 400, message: Unauthorized")],
)
@mock.patch("requests.get")
def test_test_connection(mock_requests_get, configuration, error_message):
if error_message:
mock_requests_get.return_value.ok = False
mock_requests_get.return_value.status_code = 400
mock_requests_get.return_value.text = "Unauthorized"
else:
mock_requests_get.return_value.ok = True
disk = YandexDisk(configuration)
if error_message:
with pytest.raises(Exception, match=error_message):
disk.test_connection()
else:
assert disk.test_connection() is None
@skip_condition
def test_get_tables(mock_yandex_disk):
mock_files = {
"items": [
{"name": "test_file.csv", "path": "disk:/test_path/test_file.csv"},
{"name": "invalid_file.txt", "path": "disk:/test_path/invalid_file.txt"},
]
}
mock_yandex_disk._send_query = mock.MagicMock(return_value=mock_files)
tables = mock_yandex_disk._get_tables({})
assert len(tables) == 1
assert tables[0]["name"] == "test_file.csv"
assert tables[0]["columns"] == ["/test_path/test_file.csv"]
def mock_ext_readers_return(url, **params):
return test_df
def mock_ext_readers_return_multiple_sheets(url, **params):
return {"sheet1": test_df}
@skip_condition
@mock.patch("requests.get")
def test_run_query(mocked_requests, mock_yandex_disk):
mocked_response = mock.MagicMock()
mocked_response.ok = True
mocked_response.json.return_value = {"href": "test_file.csv"}
mocked_requests.return_value = mocked_response
mock_readers = EXTENSIONS_READERS.copy()
mock_readers["csv"] = mock_ext_readers_return
expected_data = json_dumps(
{
"columns": [
{"name": "id", "friendly_name": "id", "type": "integer"},
{"name": "name", "friendly_name": "name", "type": "string"},
{"name": "age", "friendly_name": "age", "type": "integer"},
],
"rows": [
{"id": 1, "name": "Alice", "age": 20},
{"id": 2, "name": "Bob", "age": 21},
{"id": 3, "name": "Charlie", "age": 22},
{"id": 4, "name": "Dave", "age": 23},
{"id": 5, "name": "Eve", "age": 24},
],
}
)
with mock.patch.dict("redash.query_runner.yandex_disk.EXTENSIONS_READERS", mock_readers, clear=True):
data, error = mock_yandex_disk.run_query(yaml.dump({"path": "/tmp/file.csv"}), "user")
assert error is None
assert data == expected_data
@skip_condition
def test_run_query_with_empty_query(mock_yandex_disk):
result = mock_yandex_disk.run_query("", "user")
assert result == (None, "Query is empty")
@skip_condition
def test_run_query_nonstring_yaml(mock_yandex_disk):
bad_yaml_query = [0, 1]
data, error = mock_yandex_disk.run_query(bad_yaml_query, "user")
assert data is None
assert error.startswith("YAML read error: ")
@skip_condition
def test_run_query_bad_yaml(mock_yandex_disk):
bad_yaml_query = "unparseable = yaml"
result = mock_yandex_disk.run_query(bad_yaml_query, "user")
assert result == (None, "The query format must be JSON or YAML")
@skip_condition
def test_run_query_without_path(mock_yandex_disk):
bad_yaml_query = "without: path"
result = mock_yandex_disk.run_query(bad_yaml_query, "user")
assert result == (None, "The query must contain path")
@skip_condition
def test_run_query_unsupported_extension(mock_yandex_disk):
bad_yaml_query = "path: /tmp/file.txt"
result = mock_yandex_disk.run_query(bad_yaml_query, "user")
assert result == (None, "Unsupported file extension: txt")
@skip_condition
def test_run_query_read_file_error(mock_yandex_disk):
mock_yandex_disk._send_query = mock.MagicMock(return_value={"href": "test_file.csv"})
mock_yandex_disk._get_tables = mock.MagicMock(return_value=[{"name": "test_file.csv", "columns": []}])
mock_yandex_disk._read_file = mock.MagicMock(side_effect=Exception("Read file error"))
data, error = mock_yandex_disk.run_query(yaml.dump({"path": "/tmp/file.csv"}), "user")
assert data is None
assert error is not None and error.startswith("Read file error")
@skip_condition
@mock.patch("requests.get")
def test_run_query_multiple_sheets(mocked_requests, mock_yandex_disk):
mocked_response = mock.MagicMock()
mocked_response.ok = True
mocked_response.json.return_value = {"href": "test_file.xlsx"}
mocked_requests.return_value = mocked_response
query = """
path: /tmp/file.xlsx
sheet_name: null
"""
mock_readers = EXTENSIONS_READERS.copy()
mock_readers["xlsx"] = mock_ext_readers_return_multiple_sheets
with mock.patch.dict("redash.query_runner.yandex_disk.EXTENSIONS_READERS", mock_readers, clear=True):
data, error = mock_yandex_disk.run_query(query, "user")
assert error is None
assert data == json_dumps(
{
"columns": [
{"name": "id", "friendly_name": "id", "type": "integer"},
{"name": "name", "friendly_name": "name", "type": "string"},
{"name": "age", "friendly_name": "age", "type": "integer"},
{"name": "sheet_name", "friendly_name": "sheet_name", "type": "string"},
],
"rows": [
{"id": 1, "name": "Alice", "age": 20, "sheet_name": "sheet1"},
{"id": 2, "name": "Bob", "age": 21, "sheet_name": "sheet1"},
{"id": 3, "name": "Charlie", "age": 22, "sheet_name": "sheet1"},
{"id": 4, "name": "Dave", "age": 23, "sheet_name": "sheet1"},
{"id": 5, "name": "Eve", "age": 24, "sheet_name": "sheet1"},
],
}
)
@skip_condition
def test_read_xlsx():
output = BytesIO()
writer = pd.ExcelWriter(output)
test_df.to_excel(writer, index=False)
writer.save()
assert test_df.equals(EXTENSIONS_READERS["xlsx"](output))
@skip_condition
def test_read_csv():
output = BytesIO()
test_df.to_csv(output, index=False)
output.seek(0)
assert test_df.equals(EXTENSIONS_READERS["csv"](output))
@skip_condition
def test_tsv():
output = BytesIO()
test_df.to_csv(output, index=False, sep="\t")
output.seek(0)
assert test_df.equals(EXTENSIONS_READERS["tsv"](output))

View File

@@ -1,7 +1,17 @@
from collections import namedtuple
from unittest import TestCase
import pytest
from redash import create_app
from redash.query_runner import (
TYPE_BOOLEAN,
TYPE_DATE,
TYPE_DATETIME,
TYPE_FLOAT,
TYPE_INTEGER,
TYPE_STRING,
)
from redash.utils import (
build_url,
collect_parameters_from_request,
@@ -10,9 +20,18 @@ from redash.utils import (
json_dumps,
render_template,
)
from redash.utils.pandas import pandas_installed
DummyRequest = namedtuple("DummyRequest", ["host", "scheme"])
skip_condition = pytest.mark.skipif(not pandas_installed, reason="pandas is not installed")
if pandas_installed:
import numpy as np
import pandas as pd
from redash.utils.pandas import get_column_types_from_dataframe, pandas_to_result
class TestBuildUrl(TestCase):
def test_simple_case(self):
@@ -100,3 +119,44 @@ class TestRenderTemplate(TestCase):
html, text = [render_template("emails/failures.{}".format(f), d) for f in ["html", "txt"]]
self.assertIn("Failure Unit Test", html)
self.assertIn("Failure Unit Test", text)
@pytest.fixture
@skip_condition
def mock_dataframe():
df = pd.DataFrame(
{
"boolean_col": [True, False],
"integer_col": [1, 2],
"float_col": [1.1, 2.2],
"date_col": [np.datetime64("2020-01-01"), np.datetime64("2020-05-05")],
"datetime_col": [np.datetime64("2020-01-01 12:00:00"), np.datetime64("2020-05-05 14:30:00")],
"string_col": ["A", "B"],
}
)
return df
@skip_condition
def test_get_column_types_from_dataframe(mock_dataframe):
result = get_column_types_from_dataframe(mock_dataframe)
expected_output = [
{"name": "boolean_col", "friendly_name": "boolean_col", "type": TYPE_BOOLEAN},
{"name": "integer_col", "friendly_name": "integer_col", "type": TYPE_INTEGER},
{"name": "float_col", "friendly_name": "float_col", "type": TYPE_FLOAT},
{"name": "date_col", "friendly_name": "date_col", "type": TYPE_DATE},
{"name": "datetime_col", "friendly_name": "datetime_col", "type": TYPE_DATETIME},
{"name": "string_col", "friendly_name": "string_col", "type": TYPE_STRING},
]
assert result == expected_output
@skip_condition
def test_pandas_to_result(mock_dataframe):
result = pandas_to_result(mock_dataframe)
assert "columns" in result
assert "rows" in result
assert mock_dataframe.equals(pd.DataFrame(result["rows"]))