Enable graceful shutdown of rq workers (#5214)

* Enable graceful shutdown of rq workers

* Use `exec` in the `worker` command of the entrypoint to propagate
  the `TERM` signal
* Allow rq processes managed by supervisor to exit without restart on
  expected status codes
* Allow supervisorctl to contact the running supervisor
* Add a `shutdown_worker` command that will send `TERM` to all running
  worker processes and then sleep. This allows orchestration systems
  to initiate a graceful shutdown before sending `SIGTERM` to
  supervisord

* Use Heroku worker as the BaseWorker

This implements a graceful shutdown on SIGTERM, which simplifies
external shutdown procedures.

* Fix imports based upon review

* Remove supervisorctl config
This commit is contained in:
Josh Bohde
2020-11-05 03:49:45 -06:00
committed by GitHub
parent c6bf8a1c55
commit e2e8714155
2 changed files with 6 additions and 5 deletions

View File

@@ -18,8 +18,8 @@ worker() {
export WORKERS_COUNT=${WORKERS_COUNT:-2} export WORKERS_COUNT=${WORKERS_COUNT:-2}
export QUEUES=${QUEUES:-} export QUEUES=${QUEUES:-}
supervisord -c worker.conf exec supervisord -c worker.conf
} }
dev_worker() { dev_worker() {

View File

@@ -3,7 +3,8 @@ import os
import signal import signal
import time import time
from redash import statsd_client from redash import statsd_client
from rq import Worker as BaseWorker, Queue as BaseQueue, get_current_job from rq import Queue as BaseQueue, get_current_job
from rq.worker import HerokuWorker # HerokuWorker implements graceful shutdown on SIGTERM
from rq.utils import utcnow from rq.utils import utcnow
from rq.timeouts import UnixSignalDeathPenalty, HorseMonitorTimeoutException from rq.timeouts import UnixSignalDeathPenalty, HorseMonitorTimeoutException
from rq.job import Job as BaseJob, JobStatus from rq.job import Job as BaseJob, JobStatus
@@ -40,7 +41,7 @@ class RedashQueue(StatsdRecordingQueue, CancellableQueue):
pass pass
class StatsdRecordingWorker(BaseWorker): class StatsdRecordingWorker(HerokuWorker):
""" """
RQ Worker Mixin that overrides `execute_job` to increment/modify metrics via Statsd RQ Worker Mixin that overrides `execute_job` to increment/modify metrics via Statsd
""" """
@@ -58,7 +59,7 @@ class StatsdRecordingWorker(BaseWorker):
statsd_client.incr("rq.jobs.failed.{}".format(queue.name)) statsd_client.incr("rq.jobs.failed.{}".format(queue.name))
class HardLimitingWorker(BaseWorker): class HardLimitingWorker(HerokuWorker):
""" """
RQ's work horses enforce time limits by setting a timed alarm and stopping jobs RQ's work horses enforce time limits by setting a timed alarm and stopping jobs
when they reach their time limits. However, the work horse may be entirely blocked when they reach their time limits. However, the work horse may be entirely blocked