diff --git a/redash/tasks/queries.py b/redash/tasks/queries.py index 4ce6dbd56..a0ac1a74f 100644 --- a/redash/tasks/queries.py +++ b/redash/tasks/queries.py @@ -65,6 +65,7 @@ class QueryTaskTracker(object): if l != self._get_list(): connection.zrem(l, key_name) + # TOOD: this is not thread/concurrency safe. In current code this is not an issue, but better to fix this. def update(self, **kwargs): self.data.update(kwargs) self.save() @@ -216,12 +217,6 @@ def enqueue_query(query, data_source, scheduled=False, metadata={}): logging.info("[%s] Found existing job: %s", query_hash, job_id) job = QueryTask(job_id=job_id) - tracker = QueryTaskTracker.get_by_task_id(job_id, connection=pipe) - # tracker might not exist, if it's an old job - if scheduled and tracker: - tracker.update(retries=tracker.retries+1) - elif tracker: - tracker.update(scheduled_retries=tracker.scheduled_retries+1) if job.ready(): logging.info("[%s] job found is ready (%s), removing lock", query_hash, job.celery_status) diff --git a/tests/tasks/test_queries.py b/tests/tasks/test_queries.py index a8819a0f7..1df8e1c85 100644 --- a/tests/tasks/test_queries.py +++ b/tests/tasks/test_queries.py @@ -1,6 +1,10 @@ +from tests import BaseTestCase from redash import redis_connection -from redash.tasks.queries import QueryTaskTracker +from redash.tasks.queries import QueryTaskTracker, enqueue_query, execute_query from unittest import TestCase +from mock import MagicMock +from collections import namedtuple +import uuid class TestPrune(TestCase): @@ -29,3 +33,38 @@ class TestPrune(TestCase): for k in self.keys[0:50]: self.assertFalse(redis_connection.exists(k)) + + +FakeResult = namedtuple('FakeResult', 'id') + + +def gen_hash(*args, **kwargs): + return FakeResult(uuid.uuid4().hex) + + +class TestEnqueueTask(BaseTestCase): + def test_multiple_enqueue_of_same_query(self): + query = self.factory.create_query() + execute_query.apply_async = MagicMock(side_effect=gen_hash) + + enqueue_query(query.query, query.data_source, True, {'Username': 'Arik', 'Query ID': query.id}) + enqueue_query(query.query, query.data_source, True, {'Username': 'Arik', 'Query ID': query.id}) + enqueue_query(query.query, query.data_source, True, {'Username': 'Arik', 'Query ID': query.id}) + + self.assertEqual(1, execute_query.apply_async.call_count) + self.assertEqual(1, redis_connection.zcard(QueryTaskTracker.WAITING_LIST)) + self.assertEqual(0, redis_connection.zcard(QueryTaskTracker.IN_PROGRESS_LIST)) + self.assertEqual(0, redis_connection.zcard(QueryTaskTracker.DONE_LIST)) + + def test_multiple_enqueue_of_different_query(self): + query = self.factory.create_query() + execute_query.apply_async = MagicMock(side_effect=gen_hash) + + enqueue_query(query.query, query.data_source, True, {'Username': 'Arik', 'Query ID': query.id}) + enqueue_query(query.query + '2', query.data_source, True, {'Username': 'Arik', 'Query ID': query.id}) + enqueue_query(query.query + '3', query.data_source, True, {'Username': 'Arik', 'Query ID': query.id}) + + self.assertEqual(3, execute_query.apply_async.call_count) + self.assertEqual(3, redis_connection.zcard(QueryTaskTracker.WAITING_LIST)) + self.assertEqual(0, redis_connection.zcard(QueryTaskTracker.IN_PROGRESS_LIST)) + self.assertEqual(0, redis_connection.zcard(QueryTaskTracker.DONE_LIST))