Basic stats reporting.

This commit is contained in:
Arik Fraimovich
2014-03-03 20:17:25 +02:00
parent c7af5bdce9
commit e43366f422
4 changed files with 32 additions and 5 deletions

View File

@@ -38,6 +38,7 @@ def runworkers():
while True:
try:
data_manager.refresh_queries()
data_manager.report_status()
except Exception as e:
logging.error("Something went wrong with refreshing queries...")
logging.exception(e)

View File

@@ -5,6 +5,7 @@ from flask.ext.restful import Api
from flask_peewee.db import Database
import redis
from statsd import StatsClient
from redash import settings, utils
__version__ = '0.3.3'
@@ -36,9 +37,11 @@ if redis_url.path:
redis_db = redis_url.path[1]
else:
redis_db = 0
redis_connection = redis.StrictRedis(host=redis_url.hostname, port=redis_url.port, db=redis_db, password=redis_url.password)
statsd_client = StatsClient(host=settings.STATSD_HOST, port=settings.STATSD_PORT, prefix=settings.STATSD_PREFIX)
from redash import data
data_manager = data.Manager(redis_connection, db)
data_manager = data.Manager(redis_connection, db, statsd_client)
from redash import controllers

View File

@@ -9,7 +9,6 @@ import logging
import psycopg2
import qr
import redis
from redash import settings
from redash.data import worker
from redash.utils import gen_query_hash
@@ -24,7 +23,8 @@ class QueryResult(collections.namedtuple('QueryData', 'id query data runtime ret
class Manager(object):
def __init__(self, redis_connection, db):
def __init__(self, redis_connection, db, statsd_client):
self.statsd_client = statsd_client
self.redis_connection = redis_connection
self.db = db
self.workers = []
@@ -98,6 +98,20 @@ class Manager(object):
return job
def report_status(self):
workers = [self.redis_connection.hgetall(w)
for w in self.redis_connection.smembers('workers')]
for w in workers:
self.statsd_client.gauge('worker_{}.seconds_since_update'.format(w['id']),
time.time() - float(w['updated_at']))
self.statsd_client.gauge('worker_{}.jobs_received'.format(w['id']), int(w['jobs_count']))
self.statsd_client.gauge('worker_{}.jobs_done'.format(w['id']), int(w['done_jobs_count']))
manager_status = self.redis_connection.hgetall('manager:status')
self.statsd_client.gauge('manager.seconds_since_refresh',
time.time() - float(manager_status['last_refresh_at']))
def refresh_queries(self):
sql = """SELECT first_value(t1."query") over(partition by t1.query_hash)
FROM "queries" AS t1
@@ -111,9 +125,13 @@ class Manager(object):
logging.info("Refreshing queries...")
queries = self.run_query(sql)
for query in queries:
self.add_job(query[0], worker.Job.LOW_PRIORITY)
self.statsd_client.gauge('manager.outdated_queries', len(queries))
self.statsd_client.gauge('manager.queue_size', self.redis_connection.zcard('jobs'))
logging.info("Done refreshing queries... %d" % len(queries))
def store_query_result(self, query, data, run_time, retrieved_at):

View File

@@ -11,8 +11,9 @@ import time
import signal
import setproctitle
import redis
from statsd import StatsClient
from redash.utils import gen_query_hash
from redash import settings
class Job(object):
HIGH_PRIORITY = 1
@@ -147,6 +148,8 @@ class Worker(threading.Thread):
def __init__(self, worker_id, manager, redis_connection_params, query_runner, sleep_time=0.1):
self.manager = manager
self.statsd_client = StatsClient(host=settings.STATSD_HOST, port=settings.STATSD_PORT,
prefix=settings.STATSD_PREFIX)
self.redis_connection_params = {k: v for k, v in redis_connection_params.iteritems()
if k in ('host', 'db', 'password', 'port')}
self.continue_working = True
@@ -242,7 +245,9 @@ class Worker(threading.Thread):
annotated_query = job.query
# TODO: here's the part that needs to be forked, not all of the worker process...
data, error = self.query_runner(annotated_query)
with self.statsd_client.timer('worker_{}.query_runner.run_time'.format(self.worker_id)):
data, error = self.query_runner(annotated_query)
run_time = time.time() - start_time
logging.info("[%s][%s] query finished... data length=%s, error=%s",
self.name, job.id, data and len(data), error)