From 9abc4f5f1e8ef3e644f907eb4bde1dffefc05fc4 Mon Sep 17 00:00:00 2001 From: Jesse Date: Tue, 12 Jul 2022 12:27:20 -0500 Subject: [PATCH] Clickhouse: Multi-statements support (#5792) ClickHouse query runner splits query into several and execute each query in turn. The result of the last execution is returned. Implementation uses ClickHouse sessions in the HTTP protocol. `session_id` is generated for the first query and then it is used with the subsequent queries (together with the `session_check` parameter). If query runner gets a success response with empty body from ClickHouse (for example, in case of temporary table creation request) query runner returns empty response. authored-by: Liubov Ulitina --- redash/query_runner/clickhouse.py | 85 +++++++++--- tests/query_runner/test_clickhouse.py | 192 ++++++++++++++++++++++++++ 2 files changed, 257 insertions(+), 20 deletions(-) create mode 100644 tests/query_runner/test_clickhouse.py diff --git a/redash/query_runner/clickhouse.py b/redash/query_runner/clickhouse.py index c2a1c6ebb..c06cb5eb9 100644 --- a/redash/query_runner/clickhouse.py +++ b/redash/query_runner/clickhouse.py @@ -1,15 +1,21 @@ import logging import re from urllib.parse import urlparse +from uuid import uuid4 import requests from redash.query_runner import * +from redash.query_runner import split_sql_statements from redash.utils import json_dumps, json_loads logger = logging.getLogger(__name__) +def split_multi_query(query): + return [st for st in split_sql_statements(query) if st != ""] + + class ClickHouse(BaseSQLQueryRunner): noop_query = "SELECT 1" @@ -87,25 +93,41 @@ class ClickHouse(BaseSQLQueryRunner): return list(schema.values()) - def _send_query(self, data, stream=False): + def _send_query(self, data, session_id=None, session_check=None): url = self.configuration.get("url", "http://127.0.0.1:8123") + timeout = self.configuration.get("timeout", 30) + + params = { + "user": self.configuration.get("user", "default"), + "password": self.configuration.get("password", ""), + "database": self.configuration["dbname"], + "default_format": "JSON", + } + + if session_id: + params["session_id"] = session_id + params["session_check"] = "1" if session_check else "0" + params["session_timeout"] = timeout + try: verify = self.configuration.get("verify", True) r = requests.post( url, - data=data.encode("utf-8","ignore"), - stream=stream, - timeout=self.configuration.get("timeout", 30), - params={ - "user": self.configuration.get("user", "default"), - "password": self.configuration.get("password", ""), - "database": self.configuration["dbname"], - }, + data=data.encode("utf-8", "ignore"), + stream=False, + timeout=timeout, + params=params, verify=verify, ) + if r.status_code != 200: raise Exception(r.text) - # logging.warning(r.json()) + + # In certain situations the response body can be empty even if the query was successful, for example + # when creating temporary tables. + if not r.text: + return {} + return r.json() except requests.RequestException as e: if e.response: @@ -133,14 +155,19 @@ class ClickHouse(BaseSQLQueryRunner): else: return TYPE_STRING - def _clickhouse_query(self, query): + def _clickhouse_query(self, query, session_id=None, session_check=None): + logger.debug("Clickhouse is about to execute query: %s", query) + query += "\nFORMAT JSON" - result = self._send_query(query) + + response = self._send_query(query, session_id, session_check) + columns = [] columns_int64 = [] # db converts value to string if its type equals UInt64 columns_totals = {} - for r in result["meta"]: + meta = response.get("meta", []) + for r in meta: column_name = r["name"] column_type = self._define_column_type(r["type"]) @@ -155,7 +182,7 @@ class ClickHouse(BaseSQLQueryRunner): {"name": column_name, "friendly_name": column_name, "type": column_type} ) - rows = result["data"] + rows = response.get("data", []) for row in rows: for column in columns_int64: try: @@ -163,8 +190,8 @@ class ClickHouse(BaseSQLQueryRunner): except TypeError: row[column] = None - if "totals" in result: - totals = result["totals"] + if "totals" in response: + totals = response["totals"] for column, value in columns_totals.items(): totals[column] = value rows.append(totals) @@ -172,14 +199,32 @@ class ClickHouse(BaseSQLQueryRunner): return {"columns": columns, "rows": rows} def run_query(self, query, user): - logger.debug("Clickhouse is about to execute query: %s", query) - if query == "": + queries = split_multi_query(query) + + if not queries: json_data = None error = "Query is empty" return json_data, error + try: - q = self._clickhouse_query(query) - data = json_dumps(q) + # If just one query was given no session is needed + if len(queries) == 1: + results = self._clickhouse_query(queries[0]) + else: + # If more than one query was given, a session is needed. Parameter session_check must be false + # for the first query + session_id = "redash_{}".format(uuid4().hex) + + results = self._clickhouse_query( + queries[0], session_id, session_check=False + ) + + for query in queries[1:]: + results = self._clickhouse_query( + query, session_id, session_check=True + ) + + data = json_dumps(results) error = None except Exception as e: data = None diff --git a/tests/query_runner/test_clickhouse.py b/tests/query_runner/test_clickhouse.py new file mode 100644 index 000000000..fd0221874 --- /dev/null +++ b/tests/query_runner/test_clickhouse.py @@ -0,0 +1,192 @@ +import json +from unittest import TestCase +from unittest.mock import Mock, patch + +from redash.query_runner import TYPE_INTEGER +from redash.query_runner.clickhouse import ClickHouse, split_multi_query + +split_multi_query_samples = [ + # Regular query + ("SELECT 1", ["SELECT 1"]), + # Multiple data queries inlined + ("SELECT 1; SELECT 2;", ["SELECT 1", "SELECT 2"]), + # Multiline data queries + ( + """ +SELECT 1; +SELECT 2; +""", + ["SELECT 1", "SELECT 2"], + ), + # Commented data queries + ( + """ +-- First query single-line commentary +SELECT 1; + +/** + * Second query multi-line commentary + */ +SELECT 2; + +-- Tail single-line commentary + +/** + * Tail multi-line commentary + */ +""", + [ + "-- First query single-line commentary\nSELECT 1", + "/**\n * Second query multi-line commentary\n */\nSELECT 2", + ], + ), + # Should skip empty statements + ( + """ +;;; +; +SELECT 1; +""", + ["SELECT 1"], + ), +] + + +class TestClickHouseQueriesSplit(TestCase): + def test_split(self): + for sample in split_multi_query_samples: + query, expected = sample + + self.assertEqual(split_multi_query(query), expected) + + +simple_query_response = { + "meta": [ + {"name": "1", "type": "UInt8"}, + ], + "data": [ + {"1": 1}, + ], + "rows": 1, + "statistics": {"elapsed": 0.0001278, "rows_read": 1, "bytes_read": 1}, +} + + +class TestClickHouse(TestCase): + @patch("requests.post") + def test_send_single_query(self, post_request): + query_runner = ClickHouse( + {"url": "http://clickhouse:8123", "dbname": "system", "timeout": 60} + ) + + response = Mock() + response.status_code = 200 + response.text = json.dumps(simple_query_response) + response.json.return_value = simple_query_response + post_request.return_value = response + + data, error = query_runner.run_query("SELECT 1", None) + + self.assertIsNone(error) + self.assertEqual( + json.loads(data), + { + "columns": [ + {"name": "1", "friendly_name": "1", "type": TYPE_INTEGER}, + ], + "rows": [ + {"1": 1}, + ], + }, + ) + + (url,), kwargs = post_request.call_args + self.assertEqual(url, "http://clickhouse:8123") + self.assertEqual(kwargs["data"], b"SELECT 1\nFORMAT JSON") + self.assertEqual( + kwargs["params"], + { + "user": "default", + "password": "", + "database": "system", + "default_format": "JSON", + }, + ) + self.assertEqual(kwargs["timeout"], 60) + + @patch("requests.post") + def test_send_multi_query(self, post_request): + query_runner = ClickHouse( + {"url": "http://clickhouse:8123", "dbname": "system", "timeout": 60} + ) + + create_table_response = Mock() + create_table_response.status_code = 200 + create_table_response.text = "" + + select_response = Mock() + select_response.status_code = 200 + select_response.text = json.dumps(simple_query_response) + select_response.json.return_value = simple_query_response + + post_request.side_effect = [create_table_response, select_response] + + data, error = query_runner.run_query( + """ +CREATE +TEMPORARY TABLE test AS +SELECT 1; +SELECT * FROM test; + """, + None, + ) + + self.assertIsNone(error) + self.assertEqual( + json.loads(data), + { + "columns": [ + {"name": "1", "friendly_name": "1", "type": TYPE_INTEGER}, + ], + "rows": [ + {"1": 1}, + ], + }, + ) + + (url,), kwargs = post_request.call_args_list[0] + self.assertEqual(url, "http://clickhouse:8123") + self.assertEqual( + kwargs["data"], + b"""CREATE +TEMPORARY TABLE test AS +SELECT 1 +FORMAT JSON""", + ) + self.assert_session_params(kwargs, expected_check="0", expected_timeout=60) + + session_id = kwargs["params"]["session_id"] + + (url,), kwargs = post_request.call_args_list[1] + self.assertEqual(url, "http://clickhouse:8123") + self.assertEqual( + kwargs["data"], + b"""SELECT * FROM test +FORMAT JSON""", + ) + + self.assert_session_params( + kwargs, expected_check="1", expected_timeout=60, expected_id=session_id + ) + + def assert_session_params( + self, kwargs, expected_check, expected_timeout, expected_id=None + ): + self.assertEqual(kwargs["params"]["session_check"], expected_check) + self.assertEqual(kwargs["params"]["session_timeout"], expected_timeout) + + session_id = kwargs["params"]["session_id"] + self.assertRegex(session_id, r"redash_[a-f0-9]+") + + if expected_id: + self.assertEqual(kwargs["params"]["session_id"], session_id)