diff --git a/manage.py b/manage.py index 6781748f0..e2b60e03d 100755 --- a/manage.py +++ b/manage.py @@ -22,16 +22,6 @@ def runworkers(): """Prints deprecation warning.""" print "** This command is deprecated. Please use Celery's CLI to control the workers. **" - # TODO: Move to celery beat? - # while True: - # try: - # data_manager.refresh_queries() - # data_manager.report_status() - # except Exception as e: - # logging.error("Something went wrong with refreshing queries...") - # logging.exception(e) - # time.sleep(60) - @manager.shell def make_shell_context(): diff --git a/redash/__init__.py b/redash/__init__.py index e36125bd0..0891c549f 100644 --- a/redash/__init__.py +++ b/redash/__init__.py @@ -1,6 +1,7 @@ import json import urlparse import logging +from datetime import timedelta from flask import Flask, make_response from flask.ext.restful import Api from flask_peewee.db import Database @@ -33,7 +34,14 @@ app = Flask(__name__, celery = Celery('redash', broker=settings.CELERY_BROKER, include='redash.tasks') -celery.conf.update(CELERY_RESULT_BACKEND=settings.CELERY_BACKEND) +celery.conf.update(CELERY_RESULT_BACKEND=settings.CELERY_BACKEND, + CELERYBEAT_SCHEDULE={ + 'refresh_queries': { + 'task': 'redash.tasks.refresh_queries', + 'schedule': timedelta(seconds=30) + }, + }, + CELERY_TIMEZONE='UTC') api = Api(app) diff --git a/redash/controllers.py b/redash/controllers.py index 677e87405..691fab7a6 100644 --- a/redash/controllers.py +++ b/redash/controllers.py @@ -106,12 +106,11 @@ def status_api(): status['dashboards_count'] = models.Dashboard.select().count() status['widgets_count'] = models.Widget.select().count() - status['workers'] = [redis_connection.hgetall(w) - for w in redis_connection.smembers('workers')] + status['workers'] = [] - manager_status = redis_connection.hgetall('manager:status') + manager_status = redis_connection.hgetall('redash:status') status['manager'] = manager_status - status['manager']['queue_size'] = redis_connection.zcard('jobs') + status['manager']['queue_size'] = 'Unknown'#redis_connection.zcard('jobs') return jsonify(status) diff --git a/redash/tasks.py b/redash/tasks.py index 111b33e19..c057a7d39 100644 --- a/redash/tasks.py +++ b/redash/tasks.py @@ -97,6 +97,7 @@ class QueryTask(object): return self._async_result.revoke(terminate=True) +@celery.task def refresh_queries(): # self.status['last_refresh_at'] = time.time() # self._save_status() @@ -110,18 +111,20 @@ def refresh_queries(): outdated_queries_count += 1 statsd_client.gauge('manager.outdated_queries', outdated_queries_count) + # TODO: decide if we still need this # statsd_client.gauge('manager.queue_size', self.redis_connection.zcard('jobs')) - logger.info("Done refreshing queries... %d" % outdated_queries_count) + logger.info("Done refreshing queries. Found %d outdated queries." % outdated_queries_count) - # def report_status(self): - # manager_status = self.redis_connection.hgetall('manager:status') - # self.statsd_client.gauge('manager.seconds_since_refresh', - # time.time() - float(manager_status['last_refresh_at'])) - # - # def _save_status(self): - # self.redis_connection.hmset('manager:status', self.status) + status = redis_connection.hgetall('redash:status') + now = time.time() + redis_connection.hmset('redash:status', { + 'outdated_queries_count': outdated_queries_count, + 'last_refresh_at': now + }) + + statsd_client.gauge('manager.seconds_since_refresh', now - float(status['last_refresh_at'])) @celery.task(bind=True, track_started=True) def execute_query(self, query, data_source_id):