Schedule refresh_queries using celery_beat.

This commit is contained in:
Arik Fraimovich
2014-05-16 18:36:42 +03:00
parent 8e3adcd283
commit 5777070bec
4 changed files with 23 additions and 23 deletions

View File

@@ -22,16 +22,6 @@ def runworkers():
"""Prints deprecation warning."""
print "** This command is deprecated. Please use Celery's CLI to control the workers. **"
# TODO: Move to celery beat?
# while True:
# try:
# data_manager.refresh_queries()
# data_manager.report_status()
# except Exception as e:
# logging.error("Something went wrong with refreshing queries...")
# logging.exception(e)
# time.sleep(60)
@manager.shell
def make_shell_context():

View File

@@ -1,6 +1,7 @@
import json
import urlparse
import logging
from datetime import timedelta
from flask import Flask, make_response
from flask.ext.restful import Api
from flask_peewee.db import Database
@@ -33,7 +34,14 @@ app = Flask(__name__,
celery = Celery('redash',
broker=settings.CELERY_BROKER,
include='redash.tasks')
celery.conf.update(CELERY_RESULT_BACKEND=settings.CELERY_BACKEND)
celery.conf.update(CELERY_RESULT_BACKEND=settings.CELERY_BACKEND,
CELERYBEAT_SCHEDULE={
'refresh_queries': {
'task': 'redash.tasks.refresh_queries',
'schedule': timedelta(seconds=30)
},
},
CELERY_TIMEZONE='UTC')
api = Api(app)

View File

@@ -106,12 +106,11 @@ def status_api():
status['dashboards_count'] = models.Dashboard.select().count()
status['widgets_count'] = models.Widget.select().count()
status['workers'] = [redis_connection.hgetall(w)
for w in redis_connection.smembers('workers')]
status['workers'] = []
manager_status = redis_connection.hgetall('manager:status')
manager_status = redis_connection.hgetall('redash:status')
status['manager'] = manager_status
status['manager']['queue_size'] = redis_connection.zcard('jobs')
status['manager']['queue_size'] = 'Unknown'#redis_connection.zcard('jobs')
return jsonify(status)

View File

@@ -97,6 +97,7 @@ class QueryTask(object):
return self._async_result.revoke(terminate=True)
@celery.task
def refresh_queries():
# self.status['last_refresh_at'] = time.time()
# self._save_status()
@@ -110,18 +111,20 @@ def refresh_queries():
outdated_queries_count += 1
statsd_client.gauge('manager.outdated_queries', outdated_queries_count)
# TODO: decide if we still need this
# statsd_client.gauge('manager.queue_size', self.redis_connection.zcard('jobs'))
logger.info("Done refreshing queries... %d" % outdated_queries_count)
logger.info("Done refreshing queries. Found %d outdated queries." % outdated_queries_count)
# def report_status(self):
# manager_status = self.redis_connection.hgetall('manager:status')
# self.statsd_client.gauge('manager.seconds_since_refresh',
# time.time() - float(manager_status['last_refresh_at']))
#
# def _save_status(self):
# self.redis_connection.hmset('manager:status', self.status)
status = redis_connection.hgetall('redash:status')
now = time.time()
redis_connection.hmset('redash:status', {
'outdated_queries_count': outdated_queries_count,
'last_refresh_at': now
})
statsd_client.gauge('manager.seconds_since_refresh', now - float(status['last_refresh_at']))
@celery.task(bind=True, track_started=True)
def execute_query(self, query, data_source_id):