mirror of
https://github.com/getredash/redash.git
synced 2025-12-19 17:37:19 -05:00
100 lines
3.0 KiB
Python
100 lines
3.0 KiB
Python
from mock import call, patch
|
|
from rq import Connection
|
|
from rq.job import JobStatus
|
|
|
|
from redash import rq_redis_connection
|
|
from redash.tasks import Queue, Worker
|
|
from redash.tasks.queries.execution import enqueue_query
|
|
from redash.worker import default_queues, job
|
|
from tests import BaseTestCase
|
|
|
|
|
|
@patch("statsd.StatsClient.incr")
|
|
class TestWorkerMetrics(BaseTestCase):
|
|
def tearDown(self):
|
|
with Connection(rq_redis_connection):
|
|
for queue_name in default_queues:
|
|
Queue(queue_name).empty()
|
|
|
|
def test_worker_records_success_metrics(self, incr):
|
|
query = self.factory.create_query()
|
|
|
|
with Connection(rq_redis_connection):
|
|
enqueue_query(
|
|
query.query_text,
|
|
query.data_source,
|
|
query.user_id,
|
|
False,
|
|
None,
|
|
{"Username": "Patrick", "query_id": query.id},
|
|
)
|
|
|
|
Worker(["queries"]).work(max_jobs=1)
|
|
|
|
calls = [
|
|
call("rq.jobs.running.queries"),
|
|
call("rq.jobs.started.queries"),
|
|
call("rq.jobs.running.queries", -1, 1),
|
|
call("rq.jobs.finished.queries"),
|
|
]
|
|
incr.assert_has_calls(calls)
|
|
|
|
@patch("rq.Worker.execute_job")
|
|
def test_worker_records_failure_metrics(self, _, incr):
|
|
"""
|
|
Force superclass execute_job to do nothing and set status to JobStatus.Failed to simulate query failure
|
|
"""
|
|
query = self.factory.create_query()
|
|
|
|
with Connection(rq_redis_connection):
|
|
job = enqueue_query(
|
|
query.query_text,
|
|
query.data_source,
|
|
query.user_id,
|
|
False,
|
|
None,
|
|
{"Username": "Patrick", "query_id": query.id},
|
|
)
|
|
job.set_status(JobStatus.FAILED)
|
|
|
|
Worker(["queries"]).work(max_jobs=1)
|
|
|
|
calls = [
|
|
call("rq.jobs.running.queries"),
|
|
call("rq.jobs.started.queries"),
|
|
call("rq.jobs.running.queries", -1, 1),
|
|
call("rq.jobs.failed.queries"),
|
|
]
|
|
incr.assert_has_calls(calls)
|
|
|
|
|
|
@patch("statsd.StatsClient.incr")
|
|
class TestQueueMetrics(BaseTestCase):
|
|
def tearDown(self):
|
|
with Connection(rq_redis_connection):
|
|
for queue_name in default_queues:
|
|
Queue(queue_name).empty()
|
|
|
|
def test_enqueue_query_records_created_metric(self, incr):
|
|
query = self.factory.create_query()
|
|
|
|
with Connection(rq_redis_connection):
|
|
enqueue_query(
|
|
query.query_text,
|
|
query.data_source,
|
|
query.user_id,
|
|
False,
|
|
None,
|
|
{"Username": "Patrick", "query_id": query.id},
|
|
)
|
|
|
|
incr.assert_called_with("rq.jobs.created.queries")
|
|
|
|
def test_job_delay_records_created_metric(self, incr):
|
|
@job("default", timeout=300)
|
|
def foo():
|
|
pass
|
|
|
|
foo.delay()
|
|
incr.assert_called_with("rq.jobs.created.default")
|