From b67f412f581de07eca4deadba30b3bb1e37ebf50 Mon Sep 17 00:00:00 2001 From: Arik Fraimovich Date: Wed, 8 Jun 2016 20:00:59 +0300 Subject: [PATCH 1/2] Add test for enqueue_query --- tests/tasks/test_queries.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/tests/tasks/test_queries.py b/tests/tasks/test_queries.py index a8819a0f7..9d58faa6b 100644 --- a/tests/tasks/test_queries.py +++ b/tests/tasks/test_queries.py @@ -1,6 +1,9 @@ from redash import redis_connection -from redash.tasks.queries import QueryTaskTracker +from redash.tasks.queries import QueryTaskTracker, enqueue_query, execute_query from unittest import TestCase +from tests import BaseTestCase +from mock import MagicMock +from collections import namedtuple class TestPrune(TestCase): @@ -29,3 +32,19 @@ class TestPrune(TestCase): for k in self.keys[0:50]: self.assertFalse(redis_connection.exists(k)) + + +result = namedtuple('Result', 'id') + + +class TestEnqueueTask(BaseTestCase): + def test_enqueue(self): + query = self.factory.create_query() + execute_query.apply_async = MagicMock(return_value=result('testing')) + + enqueue_query(query.query, query.data_source, True, {'Username': 'Arik', 'Query ID': query.id}) + enqueue_query(query.query, query.data_source, True, {'Username': 'Arik', 'Query ID': query.id}) + enqueue_query(query.query, query.data_source, True, {'Username': 'Arik', 'Query ID': query.id}) + + self.assertEqual(1, execute_query.apply_async.call_count) + self.assertEqual(1, redis_connection.zcard(QueryTaskTracker.WAITING_LIST)) From 7159f0beb048eb5ba472dcf194fc93cf733c32b7 Mon Sep 17 00:00:00 2001 From: Arik Fraimovich Date: Wed, 8 Jun 2016 20:28:35 +0300 Subject: [PATCH 2/2] Remove potnetially concurrency not safe code form enqueue_query. This might have been causing the behavior described in #1097. --- redash/tasks/queries.py | 7 +------ tests/tasks/test_queries.py | 30 +++++++++++++++++++++++++----- 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/redash/tasks/queries.py b/redash/tasks/queries.py index 4ce6dbd56..a0ac1a74f 100644 --- a/redash/tasks/queries.py +++ b/redash/tasks/queries.py @@ -65,6 +65,7 @@ class QueryTaskTracker(object): if l != self._get_list(): connection.zrem(l, key_name) + # TOOD: this is not thread/concurrency safe. In current code this is not an issue, but better to fix this. def update(self, **kwargs): self.data.update(kwargs) self.save() @@ -216,12 +217,6 @@ def enqueue_query(query, data_source, scheduled=False, metadata={}): logging.info("[%s] Found existing job: %s", query_hash, job_id) job = QueryTask(job_id=job_id) - tracker = QueryTaskTracker.get_by_task_id(job_id, connection=pipe) - # tracker might not exist, if it's an old job - if scheduled and tracker: - tracker.update(retries=tracker.retries+1) - elif tracker: - tracker.update(scheduled_retries=tracker.scheduled_retries+1) if job.ready(): logging.info("[%s] job found is ready (%s), removing lock", query_hash, job.celery_status) diff --git a/tests/tasks/test_queries.py b/tests/tasks/test_queries.py index 9d58faa6b..1df8e1c85 100644 --- a/tests/tasks/test_queries.py +++ b/tests/tasks/test_queries.py @@ -1,9 +1,10 @@ +from tests import BaseTestCase from redash import redis_connection from redash.tasks.queries import QueryTaskTracker, enqueue_query, execute_query from unittest import TestCase -from tests import BaseTestCase from mock import MagicMock from collections import namedtuple +import uuid class TestPrune(TestCase): @@ -34,17 +35,36 @@ class TestPrune(TestCase): self.assertFalse(redis_connection.exists(k)) -result = namedtuple('Result', 'id') +FakeResult = namedtuple('FakeResult', 'id') + + +def gen_hash(*args, **kwargs): + return FakeResult(uuid.uuid4().hex) class TestEnqueueTask(BaseTestCase): - def test_enqueue(self): + def test_multiple_enqueue_of_same_query(self): query = self.factory.create_query() - execute_query.apply_async = MagicMock(return_value=result('testing')) + execute_query.apply_async = MagicMock(side_effect=gen_hash) enqueue_query(query.query, query.data_source, True, {'Username': 'Arik', 'Query ID': query.id}) enqueue_query(query.query, query.data_source, True, {'Username': 'Arik', 'Query ID': query.id}) enqueue_query(query.query, query.data_source, True, {'Username': 'Arik', 'Query ID': query.id}) - + self.assertEqual(1, execute_query.apply_async.call_count) self.assertEqual(1, redis_connection.zcard(QueryTaskTracker.WAITING_LIST)) + self.assertEqual(0, redis_connection.zcard(QueryTaskTracker.IN_PROGRESS_LIST)) + self.assertEqual(0, redis_connection.zcard(QueryTaskTracker.DONE_LIST)) + + def test_multiple_enqueue_of_different_query(self): + query = self.factory.create_query() + execute_query.apply_async = MagicMock(side_effect=gen_hash) + + enqueue_query(query.query, query.data_source, True, {'Username': 'Arik', 'Query ID': query.id}) + enqueue_query(query.query + '2', query.data_source, True, {'Username': 'Arik', 'Query ID': query.id}) + enqueue_query(query.query + '3', query.data_source, True, {'Username': 'Arik', 'Query ID': query.id}) + + self.assertEqual(3, execute_query.apply_async.call_count) + self.assertEqual(3, redis_connection.zcard(QueryTaskTracker.WAITING_LIST)) + self.assertEqual(0, redis_connection.zcard(QueryTaskTracker.IN_PROGRESS_LIST)) + self.assertEqual(0, redis_connection.zcard(QueryTaskTracker.DONE_LIST))