Files
redash/tests/tasks/test_queries.py
Jannis Leidel 44dff83046 Add "Active at" column to user list. (#3026)
* add last_active_at to users page

* Use our JSON encoder as the SQLAlchemy JSON serializer.

* Fixed some inconsistencies in the user query class methods.

* Minor cosmetic fixes.

* Add some make tasks for easier development.

* Add user detail sync system based on Redis backend.

There is a periodic Celery task that updates a new “details” JSONB column in the “user” table with the data from Redis.

Currently this is only used for tracking the date of last activity of a user but can be extended with other user information later.

Updates a few dependencies.

* Normalize a few Flask extension API names.

* Reduce implementation complexity of JSONEncoder.

* Use request_started signal to make sure we have a request context.

Otherwise loading the user based on the request won’t work.

* Fix test that checks if disabled users can login.

This correctly uses a URL path that includes the current organization and checks for the error message.

The previous test seems to have been a red herring.

* Minor cosmetic fixes.

* Remove needs_sync in favor of just deleting things.

* Misc review fixes.

* Ignore line length.

* Split redash.models import several modules.

* Move walrus UTC DateTimeField into redash.models.types.

* Restore distinctly loading dashboards.

* Simplify default values for user details.

* Define __repr__ methods generically.

* Consistently have underscore methods at the top of model methods.

* Fix tests.

* Split redash.models import several modules.

* Update to latest walrus and redis-py.

* Update kombu to 4.2.2 for redis-py 3.x compatibility.

* Remove redis-cli container after running Make task.

* Move buffer condition after datetime/time conditions.

* Update walrus to 0.7.1.

* Refactor some query APIs.

This uses the flask-sqlalchemy helpers consistently and makes more use of mixins.

* Post rebase fixes.

* Use correct kombu version

* Fix migration down revision
2019-01-07 10:30:42 +02:00

152 lines
6.7 KiB
Python

from unittest import TestCase
from collections import namedtuple
import uuid
import mock
from tests import BaseTestCase
from redash import redis_connection, models
from redash.query_runner.pg import PostgreSQL
from redash.tasks.queries import (QueryExecutionError, QueryTaskTracker,
enqueue_query, execute_query)
class TestPrune(TestCase):
def setUp(self):
self.list = "test_list"
redis_connection.delete(self.list)
self.keys = []
for score in range(0, 100):
key = 'k:{}'.format(score)
self.keys.append(key)
redis_connection.zadd(self.list, {key: score})
redis_connection.set(key, 1)
def test_does_nothing_when_below_threshold(self):
remove_count = QueryTaskTracker.prune(self.list, 100)
self.assertEqual(remove_count, 0)
self.assertEqual(redis_connection.zcard(self.list), 100)
def test_removes_oldest_items_first(self):
remove_count = QueryTaskTracker.prune(self.list, 50)
self.assertEqual(remove_count, 50)
self.assertEqual(redis_connection.zcard(self.list), 50)
self.assertEqual(redis_connection.zscore(self.list, 'k:99'), 99.0)
self.assertIsNone(redis_connection.zscore(self.list, 'k:1'))
for k in self.keys[0:50]:
self.assertFalse(redis_connection.exists(k))
FakeResult = namedtuple('FakeResult', 'id')
def gen_hash(*args, **kwargs):
return FakeResult(uuid.uuid4().hex)
class TestEnqueueTask(BaseTestCase):
def test_multiple_enqueue_of_same_query(self):
query = self.factory.create_query()
execute_query.apply_async = mock.MagicMock(side_effect=gen_hash)
enqueue_query(query.query_text, query.data_source, query.user_id, query, {'Username': 'Arik', 'Query ID': query.id})
enqueue_query(query.query_text, query.data_source, query.user_id, query, {'Username': 'Arik', 'Query ID': query.id})
enqueue_query(query.query_text, query.data_source, query.user_id, query, {'Username': 'Arik', 'Query ID': query.id})
self.assertEqual(1, execute_query.apply_async.call_count)
self.assertEqual(1, redis_connection.zcard(QueryTaskTracker.WAITING_LIST))
self.assertEqual(0, redis_connection.zcard(QueryTaskTracker.IN_PROGRESS_LIST))
self.assertEqual(0, redis_connection.zcard(QueryTaskTracker.DONE_LIST))
def test_multiple_enqueue_of_different_query(self):
query = self.factory.create_query()
execute_query.apply_async = mock.MagicMock(side_effect=gen_hash)
enqueue_query(query.query_text, query.data_source, query.user_id, None, {'Username': 'Arik', 'Query ID': query.id})
enqueue_query(query.query_text + '2', query.data_source, query.user_id, None, {'Username': 'Arik', 'Query ID': query.id})
enqueue_query(query.query_text + '3', query.data_source, query.user_id, None, {'Username': 'Arik', 'Query ID': query.id})
self.assertEqual(3, execute_query.apply_async.call_count)
self.assertEqual(3, redis_connection.zcard(QueryTaskTracker.WAITING_LIST))
self.assertEqual(0, redis_connection.zcard(QueryTaskTracker.IN_PROGRESS_LIST))
self.assertEqual(0, redis_connection.zcard(QueryTaskTracker.DONE_LIST))
class QueryExecutorTests(BaseTestCase):
def test_success(self):
"""
``execute_query`` invokes the query runner and stores a query result.
"""
cm = mock.patch("celery.app.task.Context.delivery_info", {'routing_key': 'test'})
with cm, mock.patch.object(PostgreSQL, "run_query") as qr:
qr.return_value = ([1, 2], None)
result_id = execute_query("SELECT 1, 2", self.factory.data_source.id, {})
self.assertEqual(1, qr.call_count)
result = models.QueryResult.query.get(result_id)
self.assertEqual(result.data, '{1,2}')
def test_success_scheduled(self):
"""
Scheduled queries remember their latest results.
"""
cm = mock.patch("celery.app.task.Context.delivery_info",
{'routing_key': 'test'})
q = self.factory.create_query(query_text="SELECT 1, 2", schedule={"interval": 300})
with cm, mock.patch.object(PostgreSQL, "run_query") as qr:
qr.return_value = ([1, 2], None)
result_id = execute_query(
"SELECT 1, 2",
self.factory.data_source.id, {},
scheduled_query_id=q.id)
q = models.Query.get_by_id(q.id)
self.assertEqual(q.schedule_failures, 0)
result = models.QueryResult.query.get(result_id)
self.assertEqual(q.latest_query_data, result)
def test_failure_scheduled(self):
"""
Scheduled queries that fail have their failure recorded.
"""
cm = mock.patch("celery.app.task.Context.delivery_info",
{'routing_key': 'test'})
q = self.factory.create_query(query_text="SELECT 1, 2", schedule={"interval": 300})
with cm, mock.patch.object(PostgreSQL, "run_query") as qr:
qr.side_effect = ValueError("broken")
with self.assertRaises(QueryExecutionError):
execute_query("SELECT 1, 2", self.factory.data_source.id, {},
scheduled_query_id=q.id)
q = models.Query.get_by_id(q.id)
self.assertEqual(q.schedule_failures, 1)
with self.assertRaises(QueryExecutionError):
execute_query("SELECT 1, 2", self.factory.data_source.id, {},
scheduled_query_id=q.id)
q = models.Query.get_by_id(q.id)
self.assertEqual(q.schedule_failures, 2)
def test_success_after_failure(self):
"""
Query execution success resets the failure counter.
"""
cm = mock.patch("celery.app.task.Context.delivery_info",
{'routing_key': 'test'})
q = self.factory.create_query(query_text="SELECT 1, 2", schedule={"interval": 300})
with cm, mock.patch.object(PostgreSQL, "run_query") as qr:
qr.side_effect = ValueError("broken")
with self.assertRaises(QueryExecutionError):
execute_query("SELECT 1, 2",
self.factory.data_source.id, {},
scheduled_query_id=q.id)
q = models.Query.get_by_id(q.id)
self.assertEqual(q.schedule_failures, 1)
with cm, mock.patch.object(PostgreSQL, "run_query") as qr:
qr.return_value = ([1, 2], None)
execute_query("SELECT 1, 2",
self.factory.data_source.id, {},
scheduled_query_id=q.id)
q = models.Query.get_by_id(q.id)
self.assertEqual(q.schedule_failures, 0)