mirror of
https://github.com/getredash/redash.git
synced 2025-12-19 17:37:19 -05:00
Merge pull request #1108 from getredash/fix-1097
Remove potnetially concurrency not safe code form enqueue_query
This commit is contained in:
@@ -65,6 +65,7 @@ class QueryTaskTracker(object):
|
|||||||
if l != self._get_list():
|
if l != self._get_list():
|
||||||
connection.zrem(l, key_name)
|
connection.zrem(l, key_name)
|
||||||
|
|
||||||
|
# TOOD: this is not thread/concurrency safe. In current code this is not an issue, but better to fix this.
|
||||||
def update(self, **kwargs):
|
def update(self, **kwargs):
|
||||||
self.data.update(kwargs)
|
self.data.update(kwargs)
|
||||||
self.save()
|
self.save()
|
||||||
@@ -216,12 +217,6 @@ def enqueue_query(query, data_source, scheduled=False, metadata={}):
|
|||||||
logging.info("[%s] Found existing job: %s", query_hash, job_id)
|
logging.info("[%s] Found existing job: %s", query_hash, job_id)
|
||||||
|
|
||||||
job = QueryTask(job_id=job_id)
|
job = QueryTask(job_id=job_id)
|
||||||
tracker = QueryTaskTracker.get_by_task_id(job_id, connection=pipe)
|
|
||||||
# tracker might not exist, if it's an old job
|
|
||||||
if scheduled and tracker:
|
|
||||||
tracker.update(retries=tracker.retries+1)
|
|
||||||
elif tracker:
|
|
||||||
tracker.update(scheduled_retries=tracker.scheduled_retries+1)
|
|
||||||
|
|
||||||
if job.ready():
|
if job.ready():
|
||||||
logging.info("[%s] job found is ready (%s), removing lock", query_hash, job.celery_status)
|
logging.info("[%s] job found is ready (%s), removing lock", query_hash, job.celery_status)
|
||||||
|
|||||||
@@ -1,6 +1,10 @@
|
|||||||
|
from tests import BaseTestCase
|
||||||
from redash import redis_connection
|
from redash import redis_connection
|
||||||
from redash.tasks.queries import QueryTaskTracker
|
from redash.tasks.queries import QueryTaskTracker, enqueue_query, execute_query
|
||||||
from unittest import TestCase
|
from unittest import TestCase
|
||||||
|
from mock import MagicMock
|
||||||
|
from collections import namedtuple
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
|
||||||
class TestPrune(TestCase):
|
class TestPrune(TestCase):
|
||||||
@@ -29,3 +33,38 @@ class TestPrune(TestCase):
|
|||||||
|
|
||||||
for k in self.keys[0:50]:
|
for k in self.keys[0:50]:
|
||||||
self.assertFalse(redis_connection.exists(k))
|
self.assertFalse(redis_connection.exists(k))
|
||||||
|
|
||||||
|
|
||||||
|
FakeResult = namedtuple('FakeResult', 'id')
|
||||||
|
|
||||||
|
|
||||||
|
def gen_hash(*args, **kwargs):
|
||||||
|
return FakeResult(uuid.uuid4().hex)
|
||||||
|
|
||||||
|
|
||||||
|
class TestEnqueueTask(BaseTestCase):
|
||||||
|
def test_multiple_enqueue_of_same_query(self):
|
||||||
|
query = self.factory.create_query()
|
||||||
|
execute_query.apply_async = MagicMock(side_effect=gen_hash)
|
||||||
|
|
||||||
|
enqueue_query(query.query, query.data_source, True, {'Username': 'Arik', 'Query ID': query.id})
|
||||||
|
enqueue_query(query.query, query.data_source, True, {'Username': 'Arik', 'Query ID': query.id})
|
||||||
|
enqueue_query(query.query, query.data_source, True, {'Username': 'Arik', 'Query ID': query.id})
|
||||||
|
|
||||||
|
self.assertEqual(1, execute_query.apply_async.call_count)
|
||||||
|
self.assertEqual(1, redis_connection.zcard(QueryTaskTracker.WAITING_LIST))
|
||||||
|
self.assertEqual(0, redis_connection.zcard(QueryTaskTracker.IN_PROGRESS_LIST))
|
||||||
|
self.assertEqual(0, redis_connection.zcard(QueryTaskTracker.DONE_LIST))
|
||||||
|
|
||||||
|
def test_multiple_enqueue_of_different_query(self):
|
||||||
|
query = self.factory.create_query()
|
||||||
|
execute_query.apply_async = MagicMock(side_effect=gen_hash)
|
||||||
|
|
||||||
|
enqueue_query(query.query, query.data_source, True, {'Username': 'Arik', 'Query ID': query.id})
|
||||||
|
enqueue_query(query.query + '2', query.data_source, True, {'Username': 'Arik', 'Query ID': query.id})
|
||||||
|
enqueue_query(query.query + '3', query.data_source, True, {'Username': 'Arik', 'Query ID': query.id})
|
||||||
|
|
||||||
|
self.assertEqual(3, execute_query.apply_async.call_count)
|
||||||
|
self.assertEqual(3, redis_connection.zcard(QueryTaskTracker.WAITING_LIST))
|
||||||
|
self.assertEqual(0, redis_connection.zcard(QueryTaskTracker.IN_PROGRESS_LIST))
|
||||||
|
self.assertEqual(0, redis_connection.zcard(QueryTaskTracker.DONE_LIST))
|
||||||
|
|||||||
Reference in New Issue
Block a user