Multiprocess RQ workers (using supervisor) (#4371)

* launch and monitor multiple workers using supervisor

* run supervisord in non-daemon mode

* redirect all output to stdout/stderr

* no need to log supervisord's output because it is redirected to stdout anyway

* updated and less brittle healthcheck

* add supervisor healthchecks

* remove redundant supervisor installation as it is installed by pip

* add a 5 minute check gate
This commit is contained in:
Omer Lachish
2020-01-01 15:32:29 +02:00
committed by Arik Fraimovich
parent f85490cf50
commit 260bfca767
5 changed files with 89 additions and 12 deletions

View File

@@ -25,7 +25,10 @@ dev_scheduler() {
worker() { worker() {
echo "Starting RQ worker..." echo "Starting RQ worker..."
exec /app/manage.py rq worker $QUEUES export WORKERS_COUNT=${WORKERS_COUNT:-2}
export QUEUES=${QUEUES:-}
supervisord -c worker.conf
} }
dev_worker() { dev_worker() {

View File

@@ -41,7 +41,6 @@ services:
target: /app target: /app
depends_on: depends_on:
- server - server
tty: true
environment: environment:
PYTHONUNBUFFERED: 0 PYTHONUNBUFFERED: 0
REDASH_LOG_LEVEL: "INFO" REDASH_LOG_LEVEL: "INFO"

View File

@@ -6,7 +6,10 @@ import datetime
from click import argument from click import argument
from flask.cli import AppGroup from flask.cli import AppGroup
from rq import Connection from rq import Connection
from rq.worker import WorkerStatus
from sqlalchemy.orm import configure_mappers from sqlalchemy.orm import configure_mappers
from supervisor_checks import check_runner
from supervisor_checks.check_modules import base
from redash import rq_redis_connection from redash import rq_redis_connection
from redash.tasks import Worker from redash.tasks import Worker
@@ -42,15 +45,53 @@ def worker(queues):
w.work() w.work()
class WorkerHealthcheck(base.BaseCheck):
NAME = 'RQ Worker Healthcheck'
INTERVAL = datetime.timedelta(minutes=5)
_last_check_time = {}
def time_to_check(self, pid):
now = datetime.datetime.utcnow()
if pid not in self._last_check_time:
self._last_check_time[pid] = now
if now - self._last_check_time[pid] >= self.INTERVAL:
self._last_check_time[pid] = now
return True
return False
def __call__(self, process_spec):
pid = process_spec['pid']
if not self.time_to_check(pid):
return True
all_workers = Worker.all(connection=rq_redis_connection)
worker = [w for w in all_workers if w.hostname == socket.gethostname().encode() and
w.pid == pid].pop()
is_busy = worker.get_state() == WorkerStatus.BUSY
time_since_seen = datetime.datetime.utcnow() - worker.last_heartbeat
seen_lately = time_since_seen.seconds < 60
total_jobs_in_watched_queues = sum([len(q.jobs) for q in worker.queues])
has_nothing_to_do = total_jobs_in_watched_queues == 0
is_healthy = is_busy or seen_lately or has_nothing_to_do
self._log("Worker %s healthcheck: Is busy? %s. "
"Seen lately? %s (%d seconds ago). "
"Has nothing to do? %s (%d jobs in watched queues). "
"==> Is healthy? %s",
worker.key, is_busy, seen_lately, time_since_seen.seconds,
has_nothing_to_do, total_jobs_in_watched_queues, is_healthy)
return is_healthy
@manager.command() @manager.command()
def healthcheck(): def healthcheck():
hostname = socket.gethostname() return check_runner.CheckRunner(
with Connection(rq_redis_connection): 'worker_healthcheck', 'worker', None, [(WorkerHealthcheck, {})]).run()
all_workers = Worker.all()
local_workers = [w for w in all_workers if w.hostname == hostname]
heartbeats = [w.last_heartbeat for w in local_workers]
time_since_seen = [datetime.datetime.utcnow() - hb for hb in heartbeats]
active = [t.seconds < 60 for t in time_since_seen]
sys.exit(int(not all(active)))

View File

@@ -58,6 +58,8 @@ maxminddb-geolite2==2018.703
pypd==1.1.0 pypd==1.1.0
disposable-email-domains>=0.0.52 disposable-email-domains>=0.0.52
gevent==1.4.0 gevent==1.4.0
supervisor==4.1.0
supervisor_checks==0.8.1
# Install the dependencies of the bin/bundle-extensions script here. # Install the dependencies of the bin/bundle-extensions script here.
# It has its own requirements file to simplify the frontend client build process # It has its own requirements file to simplify the frontend client build process
-r requirements_bundles.txt -r requirements_bundles.txt

32
worker.conf Normal file
View File

@@ -0,0 +1,32 @@
[supervisord]
logfile=/dev/null
pidfile=/tmp/supervisord.pid
nodaemon=true
[unix_http_server]
file = /tmp/supervisor.sock
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[program:worker]
command=./manage.py rq worker %(ENV_QUEUES)s
process_name=%(program_name)s-%(process_num)s
numprocs=%(ENV_WORKERS_COUNT)s
directory=/app
stopsignal=TERM
autostart=true
autorestart=true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
[eventlistener:worker_healthcheck]
serverurl=AUTO
command=./manage.py rq healthcheck
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
events=TICK_60