Switch to multiprocessing instead of threading.

This commit is contained in:
Arik Fraimovich
2014-04-22 13:37:06 +03:00
parent e8aba6b682
commit 08d6a90469
3 changed files with 61 additions and 27 deletions

View File

@@ -3,6 +3,8 @@
CLI to manage redash. CLI to manage redash.
""" """
import atfork import atfork
import signal
atfork.monkeypatch_os_fork_functions() atfork.monkeypatch_os_fork_functions()
import atfork.stdlib_fixer import atfork.stdlib_fixer
atfork.stdlib_fixer.fix_logging_module() atfork.stdlib_fixer.fix_logging_module()
@@ -28,26 +30,31 @@ def version():
def runworkers(): def runworkers():
"""Starts the re:dash query executors/workers.""" """Starts the re:dash query executors/workers."""
try: def stop_handler(signum, frame):
old_workers = data_manager.redis_connection.smembers('workers') logging.warning("Exiting; waiting for workers")
data_manager.redis_connection.delete('workers')
logging.info("Cleaning old workers: %s", old_workers)
data_manager.start_workers(settings.WORKERS_COUNT)
logging.info("Workers started.")
while True:
try:
data_manager.refresh_queries()
data_manager.report_status()
except Exception as e:
logging.error("Something went wrong with refreshing queries...")
logging.exception(e)
time.sleep(60)
except KeyboardInterrupt:
logging.warning("Exiting; waiting for threads")
data_manager.stop_workers() data_manager.stop_workers()
exit()
signal.signal(signal.SIGTERM, stop_handler)
signal.signal(signal.SIGINT, stop_handler)
old_workers = data_manager.redis_connection.smembers('workers')
data_manager.redis_connection.delete('workers')
logging.info("Cleaning old workers: %s", old_workers)
data_manager.start_workers(settings.WORKERS_COUNT)
logging.info("Workers started.")
while True:
try:
data_manager.refresh_queries()
data_manager.report_status()
except Exception as e:
logging.error("Something went wrong with refreshing queries...")
logging.exception(e)
time.sleep(60)
@manager.shell @manager.shell
def make_shell_context(): def make_shell_context():

View File

@@ -142,6 +142,7 @@ class Manager(object):
redis_connection_params = self.redis_connection.connection_pool.connection_kwargs redis_connection_params = self.redis_connection.connection_pool.connection_kwargs
self.workers = [worker.Worker(worker_id, self, redis_connection_params) self.workers = [worker.Worker(worker_id, self, redis_connection_params)
for worker_id in xrange(workers_count)] for worker_id in xrange(workers_count)]
for w in self.workers: for w in self.workers:
w.start() w.start()
@@ -149,7 +150,9 @@ class Manager(object):
def stop_workers(self): def stop_workers(self):
for w in self.workers: for w in self.workers:
w.continue_working = False w.terminate()
for w in self.workers:
w.join() w.join()
def _save_status(self): def _save_status(self):

View File

@@ -3,8 +3,8 @@ Worker implementation to execute incoming queries.
""" """
import json import json
import logging import logging
import multiprocessing
import os import os
import threading
import uuid import uuid
import datetime import datetime
import time import time
@@ -209,31 +209,30 @@ class Job(RedisObject):
return "<Job:%s,priority:%d,status:%d>" % (self.id, self.priority, self.status) return "<Job:%s,priority:%d,status:%d>" % (self.id, self.priority, self.status)
class Worker(threading.Thread): class Worker(multiprocessing.Process):
def __init__(self, worker_id, manager, redis_connection_params, sleep_time=0.1): def __init__(self, worker_id, manager, redis_connection_params, sleep_time=0.1):
self.manager = manager self.manager = manager
self.statsd_client = StatsClient(host=settings.STATSD_HOST, port=settings.STATSD_PORT, self.statsd_client = StatsClient(host=settings.STATSD_HOST, port=settings.STATSD_PORT,
prefix=settings.STATSD_PREFIX) prefix=settings.STATSD_PREFIX)
self.redis_connection_params = {k: v for k, v in redis_connection_params.iteritems() self.redis_connection_params = {k: v for k, v in redis_connection_params.iteritems()
if k in ('host', 'db', 'password', 'port')} if k in ('host', 'db', 'password', 'port')}
self.worker_id = None
self.continue_working = True self.continue_working = True
self.sleep_time = sleep_time self.sleep_time = sleep_time
self.child_pid = None self.child_pid = None
self.worker_id = worker_id self.current_job_id = None
self.status = { self.status = {
'id': self.worker_id,
'jobs_count': 0, 'jobs_count': 0,
'cancelled_jobs_count': 0, 'cancelled_jobs_count': 0,
'done_jobs_count': 0, 'done_jobs_count': 0,
'updated_at': time.time(), 'updated_at': time.time(),
'started_at': time.time() 'started_at': time.time()
} }
self._save_status()
self.manager.redis_connection.sadd('workers', self._key)
super(Worker, self).__init__(name="Worker-%s" % self.worker_id) super(Worker, self).__init__(name="Worker")
def set_title(self, title=None): def set_title(self, title=None):
base_title = "redash worker:%s" % self.worker_id base_title = "redash worker:%s" % self.worker_id
@@ -245,7 +244,28 @@ class Worker(threading.Thread):
setproctitle.setproctitle(full_title) setproctitle.setproctitle(full_title)
def run(self): def run(self):
self.worker_id = os.getpid()
self.status['id'] = self.worker_id
self.name = "Worker:%d" % self.worker_id
self.manager.redis_connection.sadd('workers', self._key)
self._save_status()
self.set_title()
logging.info("[%s] started.", self.name) logging.info("[%s] started.", self.name)
signal.signal(signal.SIGINT, self._stop)
signal.signal(signal.SIGTERM, self._stop)
self._wait_for_jobs()
def _stop(self, signum, frame):
self.continue_working = False
if self.current_job_id:
job = Job.load(self.manager.redis_connection, self.current_job_id)
if job:
job.cancel()
def _wait_for_jobs(self):
while self.continue_working: while self.continue_working:
job_id = self.manager.queue.pop() job_id = self.manager.queue.pop()
if job_id: if job_id:
@@ -270,6 +290,7 @@ class Worker(threading.Thread):
self.manager.redis_connection.hmset(self._key, self.status) self.manager.redis_connection.hmset(self._key, self.status)
def _fork_and_process(self, job_id): def _fork_and_process(self, job_id):
self.current_job_id = job_id
self.child_pid = os.fork() self.child_pid = os.fork()
if self.child_pid == 0: if self.child_pid == 0:
self.set_title("processing %s" % job_id) self.set_title("processing %s" % job_id)
@@ -291,6 +312,9 @@ class Worker(threading.Thread):
logging.info("[%s] Finished Processing %s (pid: %d status: %d)", logging.info("[%s] Finished Processing %s (pid: %d status: %d)",
self.name, job_id, self.child_pid, status) self.name, job_id, self.child_pid, status)
self.child_pid = None
self.current_job_id = None
def _process(self, job_id): def _process(self, job_id):
redis_connection = redis.StrictRedis(**self.redis_connection_params) redis_connection = redis.StrictRedis(**self.redis_connection_params)
job = Job.load(redis_connection, job_id) job = Job.load(redis_connection, job_id)