mirror of
https://github.com/getredash/redash.git
synced 2025-12-25 01:03:20 -05:00
Measure query time with statsd
This commit is contained in:
@@ -1,99 +1,37 @@
|
||||
from functools import wraps
|
||||
import time
|
||||
import logging
|
||||
from playhouse.gfk import Model
|
||||
from playhouse.postgres_ext import PostgresqlExtDatabase
|
||||
|
||||
from sqlalchemy.engine import Engine
|
||||
|
||||
from redash import statsd_client
|
||||
|
||||
metrics_logger = logging.getLogger("metrics")
|
||||
|
||||
|
||||
class MeteredPostgresqlExtDatabase(PostgresqlExtDatabase):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.query_count = 0
|
||||
self.query_duration = 0
|
||||
return super(MeteredPostgresqlExtDatabase, self).__init__(*args, **kwargs)
|
||||
|
||||
def execute_sql(self, *args, **kwargs):
|
||||
start_time = time.time()
|
||||
|
||||
try:
|
||||
result = super(MeteredPostgresqlExtDatabase, self).execute_sql(*args, **kwargs)
|
||||
return result
|
||||
finally:
|
||||
self.query_count += 1
|
||||
# TODO: there is a noticeable few ms discrepancy between the duration here and the one calculated in
|
||||
# metered_execute. Need to think what to do about it.
|
||||
duration = (time.time() - start_time) * 1000
|
||||
self.query_duration += duration
|
||||
|
||||
def reset_metrics(self):
|
||||
# TODO: instead of manually managing reset of metrics, we should store them in a LocalProxy based object, that
|
||||
# is guaranteed to be "replaced" when the current request is done.
|
||||
self.query_count = 0
|
||||
self.query_duration = 0
|
||||
from sqlalchemy.orm.util import _ORMJoin
|
||||
from sqlalchemy.event import listens_for
|
||||
|
||||
|
||||
def patch_query_execute():
|
||||
real_execute = peewee.Query._execute
|
||||
real_clone = peewee.Query.clone
|
||||
|
||||
@wraps(real_execute)
|
||||
def metered_execute(self, *args, **kwargs):
|
||||
name = self.model_class.__name__
|
||||
|
||||
action = getattr(self, 'model_action', 'unknown')
|
||||
|
||||
start_time = time.time()
|
||||
try:
|
||||
result = real_execute(self, *args, **kwargs)
|
||||
return result
|
||||
finally:
|
||||
duration = (time.time() - start_time) * 1000
|
||||
statsd_client.timing('db.{}.{}'.format(name, action), duration)
|
||||
metrics_logger.debug("model=%s query=%s duration=%.2f", name, action, duration)
|
||||
|
||||
@wraps(real_clone)
|
||||
def extended_clone(self):
|
||||
cloned = real_clone(self)
|
||||
setattr(cloned, 'model_action', getattr(self, 'model_action', 'unknown'))
|
||||
return cloned
|
||||
|
||||
peewee.Query._execute = metered_execute
|
||||
peewee.Query.clone = extended_clone
|
||||
@listens_for(Engine, "before_execute")
|
||||
def before_execute(conn, elt, multiparams, params):
|
||||
conn.info.setdefault('query_start_time', []).append(time.time())
|
||||
|
||||
|
||||
class MeteredModel(Model):
|
||||
@classmethod
|
||||
def select(cls, *args, **kwargs):
|
||||
return cls._execute_and_measure('select', args, kwargs)
|
||||
@listens_for(Engine, "after_execute")
|
||||
def after_execute(conn, elt, multiparams, params, result):
|
||||
duration = time.time() - conn.info['query_start_time'].pop(-1)
|
||||
action = elt.__class__.__name__
|
||||
|
||||
@classmethod
|
||||
def update(cls, *args, **kwargs):
|
||||
return cls._execute_and_measure('update', args, kwargs)
|
||||
|
||||
@classmethod
|
||||
def insert(cls, *args, **kwargs):
|
||||
return cls._execute_and_measure('insert', args, kwargs)
|
||||
|
||||
@classmethod
|
||||
def insert_many(cls, *args, **kwargs):
|
||||
return cls._execute_and_measure('insert_many', args, kwargs)
|
||||
|
||||
@classmethod
|
||||
def insert_from(cls, *args, **kwargs):
|
||||
return cls._execute_and_measure('insert_from', args, kwargs)
|
||||
|
||||
@classmethod
|
||||
def delete(cls, *args, **kwargs):
|
||||
return cls._execute_and_measure('delete', args, kwargs)
|
||||
|
||||
@classmethod
|
||||
def raw(cls, *args, **kwargs):
|
||||
return cls._execute_and_measure('raw', args, kwargs)
|
||||
|
||||
@classmethod
|
||||
def _execute_and_measure(cls, action, args, kwargs):
|
||||
result = getattr(super(MeteredModel, cls), action)(*args, **kwargs)
|
||||
setattr(result, 'model_action', action)
|
||||
return result
|
||||
if action == 'Select':
|
||||
t = elt.froms[0]
|
||||
while isinstance(t, _ORMJoin):
|
||||
t = t.left
|
||||
name = t.name
|
||||
elif action in ['Update', 'Insert', 'Delete']:
|
||||
name = elt.table.name
|
||||
else:
|
||||
# create/drop tables, sqlalchemy internal schema queries, etc
|
||||
return
|
||||
statsd_client.timing('db.{}.{}'.format(name, action), duration)
|
||||
metrics_logger.debug("model=%s query=%s duration=%.2f", name, action,
|
||||
duration)
|
||||
return result
|
||||
|
||||
@@ -25,6 +25,7 @@ from redash.permissions import has_access, view_only
|
||||
from redash.query_runner import get_query_runner, get_configuration_schema_for_query_runner_type
|
||||
from redash.utils import generate_token, json_dumps
|
||||
from redash.utils.configuration import ConfigurationContainer
|
||||
from redash.metrics import database
|
||||
|
||||
db = SQLAlchemy()
|
||||
Column = functools.partial(db.Column, nullable=False)
|
||||
|
||||
Reference in New Issue
Block a user