Add additional statsd metrics for worker/scheduler (#4884)

* Add additional statsd metrics for worker/scheduler
This commit is contained in:
Patrick Yang
2020-05-20 14:35:55 -07:00
committed by GitHub
parent b117485571
commit dfc873fb8b
9 changed files with 203 additions and 6 deletions

View File

@@ -18,6 +18,7 @@ from redash.tasks import (
schedule_periodic_jobs,
periodic_job_definitions,
)
from redash.worker import default_queues
manager = AppGroup(help="RQ management commands.")
@@ -38,7 +39,7 @@ def worker(queues):
configure_mappers()
if not queues:
queues = ["scheduled_queries", "queries", "periodic", "emails", "default", "schemas"]
queues = default_queues
with Connection(rq_redis_connection):
w = Worker(queues, log_job_description=False, job_monitoring_interval=5)

View File

@@ -7,7 +7,7 @@ from datetime import datetime, timedelta
from rq.job import Job
from rq_scheduler import Scheduler
from redash import extensions, settings, rq_redis_connection
from redash import extensions, settings, rq_redis_connection, statsd_client
from redash.tasks import (
sync_user_details,
refresh_queries,
@@ -17,11 +17,20 @@ from redash.tasks import (
purge_failed_jobs,
version_check,
send_aggregated_errors,
Queue
)
logger = logging.getLogger(__name__)
rq_scheduler = Scheduler(
class StatsdRecordingScheduler(Scheduler):
"""
RQ Scheduler Mixin that uses Redash's custom RQ Queue class to increment/modify metrics via Statsd
"""
queue_class = Queue
rq_scheduler = StatsdRecordingScheduler(
connection=rq_redis_connection, queue_name="periodic", interval=5
)

View File

@@ -2,6 +2,7 @@ import errno
import os
import signal
import time
from redash import statsd_client
from rq import Worker as BaseWorker, Queue as BaseQueue, get_current_job
from rq.utils import utcnow
from rq.timeouts import UnixSignalDeathPenalty, HorseMonitorTimeoutException
@@ -22,10 +23,43 @@ class CancellableJob(BaseJob):
return self.meta.get("cancelled", False)
class StatsdRecordingQueue(BaseQueue):
"""
RQ Queue Mixin that overrides `enqueue_call` to increment metrics via Statsd
"""
def enqueue_job(self, *args, **kwargs):
job = super().enqueue_job(*args, **kwargs)
statsd_client.incr("rq.jobs.created.{}".format(self.name))
return job
class CancellableQueue(BaseQueue):
job_class = CancellableJob
class RedashQueue(StatsdRecordingQueue, CancellableQueue):
pass
class StatsdRecordingWorker(BaseWorker):
"""
RQ Worker Mixin that overrides `execute_job` to increment/modify metrics via Statsd
"""
def execute_job(self, job, queue):
statsd_client.incr("rq.jobs.running.{}".format(queue.name))
statsd_client.incr("rq.jobs.started.{}".format(queue.name))
try:
super().execute_job(job, queue)
finally:
statsd_client.decr("rq.jobs.running.{}".format(queue.name))
if job.get_status() == JobStatus.FINISHED:
statsd_client.incr("rq.jobs.finished.{}".format(queue.name))
else:
statsd_client.incr("rq.jobs.failed.{}".format(queue.name))
class HardLimitingWorker(BaseWorker):
"""
RQ's work horses enforce time limits by setting a timed alarm and stopping jobs
@@ -130,6 +164,10 @@ class HardLimitingWorker(BaseWorker):
)
class RedashWorker(StatsdRecordingWorker, HardLimitingWorker):
queue_class = RedashQueue
Job = CancellableJob
Queue = CancellableQueue
Worker = HardLimitingWorker
Queue = RedashQueue
Worker = RedashWorker

View File

@@ -14,8 +14,20 @@ from redash import (
redis_connection,
rq_redis_connection,
)
from redash.tasks.worker import Queue as RedashQueue
job = partial(rq_job, connection=rq_redis_connection)
default_queues = ["scheduled_queries", "queries", "periodic", "emails", "default", "schemas"]
class StatsdRecordingJobDecorator(rq_job): # noqa
"""
RQ Job Decorator mixin that uses our Queue class to ensure metrics are accurately incremented in Statsd
"""
queue_class = RedashQueue
job = partial(StatsdRecordingJobDecorator, connection=rq_redis_connection)
class CurrentJobFilter(logging.Filter):

View File

View File

@@ -0,0 +1,9 @@
from mock import patch, ANY
from tests import BaseTestCase
@patch("statsd.StatsClient.timing")
class TestDatabaseMetrics(BaseTestCase):
def test_db_request_records_statsd_metrics(self, timing):
self.factory.create_query()
timing.assert_called_with("db.changes.insert", ANY)

View File

@@ -0,0 +1,9 @@
from mock import patch, ANY
from tests import BaseTestCase
@patch("statsd.StatsClient.timing")
class TestRequestMetrics(BaseTestCase):
def test_flask_request_records_statsd_metrics(self, timing):
self.client.get("/ping")
timing.assert_called_once_with("requests.redash_ping.get", ANY)

View File

@@ -62,3 +62,20 @@ class TestSchedule(TestCase):
self.assertTrue(jobs[0].func_name.endswith("foo"))
self.assertEqual(jobs[0].meta["interval"], 60)
class TestSchedulerMetrics(TestCase):
def setUp(self):
for job in rq_scheduler.get_jobs():
rq_scheduler.cancel(job)
job.delete()
def test_scheduler_enqueue_job_metric(self):
def foo():
pass
schedule_periodic_jobs([{"func": foo, "interval": 60}])
with patch("statsd.StatsClient.incr") as incr:
rq_scheduler.enqueue_jobs()
incr.assert_called_once_with("rq.jobs.created.periodic")

102
tests/tasks/test_worker.py Normal file
View File

@@ -0,0 +1,102 @@
from mock import patch, call
from rq import Connection
from rq.job import JobStatus
from redash.tasks import Worker
from tests import BaseTestCase
from redash import rq_redis_connection
from redash.tasks.worker import Queue
from redash.tasks.queries.execution import (
enqueue_query,
)
from redash.worker import job, default_queues
@patch("statsd.StatsClient.incr")
class TestWorkerMetrics(BaseTestCase):
def tearDown(self):
with Connection(rq_redis_connection):
for queue_name in default_queues:
Queue(queue_name).empty()
def test_worker_records_success_metrics(self, incr):
query = self.factory.create_query()
with Connection(rq_redis_connection):
enqueue_query(
query.query_text,
query.data_source,
query.user_id,
False,
None,
{"Username": "Patrick", "Query ID": query.id},
)
Worker(["queries"]).work(max_jobs=1)
calls = [
call("rq.jobs.running.queries"),
call("rq.jobs.started.queries"),
call("rq.jobs.running.queries", -1, 1),
call("rq.jobs.finished.queries")
]
incr.assert_has_calls(calls)
@patch("rq.Worker.execute_job")
def test_worker_records_failure_metrics(self, _, incr):
"""
Force superclass execute_job to do nothing and set status to JobStatus.Failed to simulate query failure
"""
query = self.factory.create_query()
with Connection(rq_redis_connection):
job = enqueue_query(
query.query_text,
query.data_source,
query.user_id,
False,
None,
{"Username": "Patrick", "Query ID": query.id},
)
job.set_status(JobStatus.FAILED)
Worker(["queries"]).work(max_jobs=1)
calls = [
call("rq.jobs.running.queries"),
call("rq.jobs.started.queries"),
call("rq.jobs.running.queries", -1, 1),
call("rq.jobs.failed.queries")
]
incr.assert_has_calls(calls)
@patch("statsd.StatsClient.incr")
class TestQueueMetrics(BaseTestCase):
def tearDown(self):
with Connection(rq_redis_connection):
for queue_name in default_queues:
Queue(queue_name).empty()
def test_enqueue_query_records_created_metric(self, incr):
query = self.factory.create_query()
with Connection(rq_redis_connection):
enqueue_query(
query.query_text,
query.data_source,
query.user_id,
False,
None,
{"Username": "Patrick", "Query ID": query.id},
)
incr.assert_called_with("rq.jobs.created.queries")
def test_job_delay_records_created_metric(self, incr):
@job("default", timeout=300)
def foo():
pass
foo.delay()
incr.assert_called_with("rq.jobs.created.default")