Clickhouse: Multi-statements support (#5792)

ClickHouse query runner splits query into several and execute each query in turn. The result of the last execution is returned. Implementation uses ClickHouse sessions in the HTTP protocol. `session_id` is generated for the first query and then it is used with the subsequent queries (together with the `session_check` parameter).

If query runner gets a success response with empty body from ClickHouse (for example, in case of temporary table creation request) query runner returns empty response.

authored-by: Liubov Ulitina <ulitinalm@vl.ru>
This commit is contained in:
Jesse
2022-07-12 12:27:20 -05:00
committed by GitHub
parent f0a390b11a
commit 9abc4f5f1e
2 changed files with 257 additions and 20 deletions

View File

@@ -1,15 +1,21 @@
import logging
import re
from urllib.parse import urlparse
from uuid import uuid4
import requests
from redash.query_runner import *
from redash.query_runner import split_sql_statements
from redash.utils import json_dumps, json_loads
logger = logging.getLogger(__name__)
def split_multi_query(query):
return [st for st in split_sql_statements(query) if st != ""]
class ClickHouse(BaseSQLQueryRunner):
noop_query = "SELECT 1"
@@ -87,25 +93,41 @@ class ClickHouse(BaseSQLQueryRunner):
return list(schema.values())
def _send_query(self, data, stream=False):
def _send_query(self, data, session_id=None, session_check=None):
url = self.configuration.get("url", "http://127.0.0.1:8123")
timeout = self.configuration.get("timeout", 30)
params = {
"user": self.configuration.get("user", "default"),
"password": self.configuration.get("password", ""),
"database": self.configuration["dbname"],
"default_format": "JSON",
}
if session_id:
params["session_id"] = session_id
params["session_check"] = "1" if session_check else "0"
params["session_timeout"] = timeout
try:
verify = self.configuration.get("verify", True)
r = requests.post(
url,
data=data.encode("utf-8","ignore"),
stream=stream,
timeout=self.configuration.get("timeout", 30),
params={
"user": self.configuration.get("user", "default"),
"password": self.configuration.get("password", ""),
"database": self.configuration["dbname"],
},
data=data.encode("utf-8", "ignore"),
stream=False,
timeout=timeout,
params=params,
verify=verify,
)
if r.status_code != 200:
raise Exception(r.text)
# logging.warning(r.json())
# In certain situations the response body can be empty even if the query was successful, for example
# when creating temporary tables.
if not r.text:
return {}
return r.json()
except requests.RequestException as e:
if e.response:
@@ -133,14 +155,19 @@ class ClickHouse(BaseSQLQueryRunner):
else:
return TYPE_STRING
def _clickhouse_query(self, query):
def _clickhouse_query(self, query, session_id=None, session_check=None):
logger.debug("Clickhouse is about to execute query: %s", query)
query += "\nFORMAT JSON"
result = self._send_query(query)
response = self._send_query(query, session_id, session_check)
columns = []
columns_int64 = [] # db converts value to string if its type equals UInt64
columns_totals = {}
for r in result["meta"]:
meta = response.get("meta", [])
for r in meta:
column_name = r["name"]
column_type = self._define_column_type(r["type"])
@@ -155,7 +182,7 @@ class ClickHouse(BaseSQLQueryRunner):
{"name": column_name, "friendly_name": column_name, "type": column_type}
)
rows = result["data"]
rows = response.get("data", [])
for row in rows:
for column in columns_int64:
try:
@@ -163,8 +190,8 @@ class ClickHouse(BaseSQLQueryRunner):
except TypeError:
row[column] = None
if "totals" in result:
totals = result["totals"]
if "totals" in response:
totals = response["totals"]
for column, value in columns_totals.items():
totals[column] = value
rows.append(totals)
@@ -172,14 +199,32 @@ class ClickHouse(BaseSQLQueryRunner):
return {"columns": columns, "rows": rows}
def run_query(self, query, user):
logger.debug("Clickhouse is about to execute query: %s", query)
if query == "":
queries = split_multi_query(query)
if not queries:
json_data = None
error = "Query is empty"
return json_data, error
try:
q = self._clickhouse_query(query)
data = json_dumps(q)
# If just one query was given no session is needed
if len(queries) == 1:
results = self._clickhouse_query(queries[0])
else:
# If more than one query was given, a session is needed. Parameter session_check must be false
# for the first query
session_id = "redash_{}".format(uuid4().hex)
results = self._clickhouse_query(
queries[0], session_id, session_check=False
)
for query in queries[1:]:
results = self._clickhouse_query(
query, session_id, session_check=True
)
data = json_dumps(results)
error = None
except Exception as e:
data = None

View File

@@ -0,0 +1,192 @@
import json
from unittest import TestCase
from unittest.mock import Mock, patch
from redash.query_runner import TYPE_INTEGER
from redash.query_runner.clickhouse import ClickHouse, split_multi_query
split_multi_query_samples = [
# Regular query
("SELECT 1", ["SELECT 1"]),
# Multiple data queries inlined
("SELECT 1; SELECT 2;", ["SELECT 1", "SELECT 2"]),
# Multiline data queries
(
"""
SELECT 1;
SELECT 2;
""",
["SELECT 1", "SELECT 2"],
),
# Commented data queries
(
"""
-- First query single-line commentary
SELECT 1;
/**
* Second query multi-line commentary
*/
SELECT 2;
-- Tail single-line commentary
/**
* Tail multi-line commentary
*/
""",
[
"-- First query single-line commentary\nSELECT 1",
"/**\n * Second query multi-line commentary\n */\nSELECT 2",
],
),
# Should skip empty statements
(
"""
;;;
;
SELECT 1;
""",
["SELECT 1"],
),
]
class TestClickHouseQueriesSplit(TestCase):
def test_split(self):
for sample in split_multi_query_samples:
query, expected = sample
self.assertEqual(split_multi_query(query), expected)
simple_query_response = {
"meta": [
{"name": "1", "type": "UInt8"},
],
"data": [
{"1": 1},
],
"rows": 1,
"statistics": {"elapsed": 0.0001278, "rows_read": 1, "bytes_read": 1},
}
class TestClickHouse(TestCase):
@patch("requests.post")
def test_send_single_query(self, post_request):
query_runner = ClickHouse(
{"url": "http://clickhouse:8123", "dbname": "system", "timeout": 60}
)
response = Mock()
response.status_code = 200
response.text = json.dumps(simple_query_response)
response.json.return_value = simple_query_response
post_request.return_value = response
data, error = query_runner.run_query("SELECT 1", None)
self.assertIsNone(error)
self.assertEqual(
json.loads(data),
{
"columns": [
{"name": "1", "friendly_name": "1", "type": TYPE_INTEGER},
],
"rows": [
{"1": 1},
],
},
)
(url,), kwargs = post_request.call_args
self.assertEqual(url, "http://clickhouse:8123")
self.assertEqual(kwargs["data"], b"SELECT 1\nFORMAT JSON")
self.assertEqual(
kwargs["params"],
{
"user": "default",
"password": "",
"database": "system",
"default_format": "JSON",
},
)
self.assertEqual(kwargs["timeout"], 60)
@patch("requests.post")
def test_send_multi_query(self, post_request):
query_runner = ClickHouse(
{"url": "http://clickhouse:8123", "dbname": "system", "timeout": 60}
)
create_table_response = Mock()
create_table_response.status_code = 200
create_table_response.text = ""
select_response = Mock()
select_response.status_code = 200
select_response.text = json.dumps(simple_query_response)
select_response.json.return_value = simple_query_response
post_request.side_effect = [create_table_response, select_response]
data, error = query_runner.run_query(
"""
CREATE
TEMPORARY TABLE test AS
SELECT 1;
SELECT * FROM test;
""",
None,
)
self.assertIsNone(error)
self.assertEqual(
json.loads(data),
{
"columns": [
{"name": "1", "friendly_name": "1", "type": TYPE_INTEGER},
],
"rows": [
{"1": 1},
],
},
)
(url,), kwargs = post_request.call_args_list[0]
self.assertEqual(url, "http://clickhouse:8123")
self.assertEqual(
kwargs["data"],
b"""CREATE
TEMPORARY TABLE test AS
SELECT 1
FORMAT JSON""",
)
self.assert_session_params(kwargs, expected_check="0", expected_timeout=60)
session_id = kwargs["params"]["session_id"]
(url,), kwargs = post_request.call_args_list[1]
self.assertEqual(url, "http://clickhouse:8123")
self.assertEqual(
kwargs["data"],
b"""SELECT * FROM test
FORMAT JSON""",
)
self.assert_session_params(
kwargs, expected_check="1", expected_timeout=60, expected_id=session_id
)
def assert_session_params(
self, kwargs, expected_check, expected_timeout, expected_id=None
):
self.assertEqual(kwargs["params"]["session_check"], expected_check)
self.assertEqual(kwargs["params"]["session_timeout"], expected_timeout)
session_id = kwargs["params"]["session_id"]
self.assertRegex(session_id, r"redash_[a-f0-9]+")
if expected_id:
self.assertEqual(kwargs["params"]["session_id"], session_id)