Files
redash/tests/tasks/test_worker.py
2023-07-11 19:13:54 +10:00

100 lines
3.0 KiB
Python

from mock import call, patch
from rq import Connection
from rq.job import JobStatus
from redash import rq_redis_connection
from redash.tasks import Queue, Worker
from redash.tasks.queries.execution import enqueue_query
from redash.worker import default_queues, job
from tests import BaseTestCase
@patch("statsd.StatsClient.incr")
class TestWorkerMetrics(BaseTestCase):
def tearDown(self):
with Connection(rq_redis_connection):
for queue_name in default_queues:
Queue(queue_name).empty()
def test_worker_records_success_metrics(self, incr):
query = self.factory.create_query()
with Connection(rq_redis_connection):
enqueue_query(
query.query_text,
query.data_source,
query.user_id,
False,
None,
{"Username": "Patrick", "query_id": query.id},
)
Worker(["queries"]).work(max_jobs=1)
calls = [
call("rq.jobs.running.queries"),
call("rq.jobs.started.queries"),
call("rq.jobs.running.queries", -1, 1),
call("rq.jobs.finished.queries"),
]
incr.assert_has_calls(calls)
@patch("rq.Worker.execute_job")
def test_worker_records_failure_metrics(self, _, incr):
"""
Force superclass execute_job to do nothing and set status to JobStatus.Failed to simulate query failure
"""
query = self.factory.create_query()
with Connection(rq_redis_connection):
job = enqueue_query(
query.query_text,
query.data_source,
query.user_id,
False,
None,
{"Username": "Patrick", "query_id": query.id},
)
job.set_status(JobStatus.FAILED)
Worker(["queries"]).work(max_jobs=1)
calls = [
call("rq.jobs.running.queries"),
call("rq.jobs.started.queries"),
call("rq.jobs.running.queries", -1, 1),
call("rq.jobs.failed.queries"),
]
incr.assert_has_calls(calls)
@patch("statsd.StatsClient.incr")
class TestQueueMetrics(BaseTestCase):
def tearDown(self):
with Connection(rq_redis_connection):
for queue_name in default_queues:
Queue(queue_name).empty()
def test_enqueue_query_records_created_metric(self, incr):
query = self.factory.create_query()
with Connection(rq_redis_connection):
enqueue_query(
query.query_text,
query.data_source,
query.user_id,
False,
None,
{"Username": "Patrick", "query_id": query.id},
)
incr.assert_called_with("rq.jobs.created.queries")
def test_job_delay_records_created_metric(self, incr):
@job("default", timeout=300)
def foo():
pass
foo.delay()
incr.assert_called_with("rq.jobs.created.default")