Merge pull request #856 from getredash/feature/task_monitoring

Feature: running queries monitor
This commit is contained in:
Arik Fraimovich
2016-04-18 13:49:55 +03:00
19 changed files with 948 additions and 449 deletions

View File

@@ -35,6 +35,14 @@ angular.module('redash', [
$locationProvider.html5Mode(true);
growlProvider.globalTimeToLive(2000);
$routeProvider.when('/admin/queries/outdated', {
templateUrl: '/views/admin/outdated_queries.html',
controller: 'AdminOutdatedQueriesCtrl'
});
$routeProvider.when('/admin/queries/tasks', {
templateUrl: '/views/admin/tasks.html',
controller: 'AdminTasksCtrl'
});
$routeProvider.when('/dashboard/:dashboardSlug', {
templateUrl: '/views/dashboard.html',
controller: 'DashboardCtrl',

View File

@@ -12,12 +12,234 @@
$scope.status = data;
});
$timeout(refresh, 59 * 1000);
var timer = $timeout(refresh, 59 * 1000);
$scope.$on("$destroy", function () {
if (timer) {
$timeout.cancel(timer);
}
});
};
refresh();
};
var dateFormatter = function (value) {
if (!value) {
return "-";
}
return moment(value).format(clientConfig.dateTimeFormat);
};
var timestampFormatter = function(value) {
if (value) {
return dateFormatter(value * 1000.0);
}
return "-";
}
var AdminTasksCtrl = function ($scope, $location, Events, $http, $timeout, $filter) {
Events.record(currentUser, "view", "page", "admin/tasks");
$scope.$parent.pageTitle = "Running Queries";
$scope.gridConfig = {
isPaginationEnabled: true,
itemsByPage: 50,
maxSize: 8,
};
$scope.selectedTab = 'in_progress';
$scope.tasks = {
'pending': [],
'in_progress': [],
'done': []
};
$scope.allGridColumns = [
{
label: 'Data Source ID',
map: 'data_source_id'
},
{
label: 'Username',
map: 'username'
},
{
'label': 'State',
'map': 'state',
"cellTemplate": '{{dataRow.state}} <span ng-if="dataRow.state == \'failed\'" popover="{{dataRow.error}}" popover-trigger="mouseenter" class="zmdi zmdi-help"></span>'
},
{
"label": "Query ID",
"map": "query_id"
},
{
label: 'Query Hash',
map: 'query_hash'
},
{
'label': 'Runtime',
'map': 'run_time',
'formatFunction': function (value) {
return $filter('durationHumanize')(value);
}
},
{
'label': 'Created At',
'map': 'created_at',
'formatFunction': timestampFormatter
},
{
'label': 'Started At',
'map': 'started_at',
'formatFunction': timestampFormatter
},
{
'label': 'Updated At',
'map': 'updated_at',
'formatFunction': timestampFormatter
}
];
$scope.inProgressGridColumns = angular.copy($scope.allGridColumns);
$scope.inProgressGridColumns.push({
'label': '',
"cellTemplate": '<cancel-query-button query-id="dataRow.query_id" task-id="dataRow.task_id"></cancel-query-button>'
});
$scope.setTab = function(tab) {
$scope.selectedTab = tab;
$scope.showingTasks = $scope.tasks[tab];
if (tab == 'in_progress') {
$scope.gridColumns = $scope.inProgressGridColumns;
} else {
$scope.gridColumns = $scope.allGridColumns;
}
};
$scope.setTab($location.hash() || 'in_progress');
var refresh = function () {
$scope.refresh_time = moment().add('minutes', 1);
$http.get('/api/admin/queries/tasks').success(function (data) {
$scope.tasks = data;
$scope.showingTasks = $scope.tasks[$scope.selectedTab];
});
var timer = $timeout(refresh, 5 * 1000);
$scope.$on("$destroy", function () {
if (timer) {
$timeout.cancel(timer);
}
});
};
refresh();
};
var AdminOutdatedQueriesCtrl = function ($scope, Events, $http, $timeout, $filter) {
Events.record(currentUser, "view", "page", "admin/outdated_queries");
$scope.$parent.pageTitle = "Outdated Queries";
$scope.gridConfig = {
isPaginationEnabled: true,
itemsByPage: 50,
maxSize: 8,
};
$scope.gridColumns = [
{
label: 'Data Source ID',
map: 'data_source_id'
},
{
"label": "Name",
"map": "name",
"cellTemplateUrl": "/views/queries_query_name_cell.html"
},
{
'label': 'Created By',
'map': 'user.name'
},
{
'label': 'Runtime',
'map': 'runtime',
'formatFunction': function (value) {
return $filter('durationHumanize')(value);
}
},
{
'label': 'Last Executed At',
'map': 'retrieved_at',
'formatFunction': dateFormatter
},
{
'label': 'Created At',
'map': 'created_at',
'formatFunction': dateFormatter
},
{
'label': 'Update Schedule',
'map': 'schedule',
'formatFunction': function (value) {
return $filter('scheduleHumanize')(value);
}
}
];
var refresh = function () {
$scope.refresh_time = moment().add('minutes', 1);
$http.get('/api/admin/queries/outdated').success(function (data) {
$scope.queries = data.queries;
$scope.updatedAt = data.updated_at * 1000.0;
});
var timer = $timeout(refresh, 59 * 1000);
$scope.$on("$destroy", function () {
if (timer) {
$timeout.cancel(timer);
}
});
};
refresh();
};
var cancelQueryButton = function () {
return {
restrict: 'E',
scope: {
'queryId': '=',
'taskId': '='
},
transclude: true,
template: '<button class="btn btn-default" ng-disabled="inProgress" ng-click="cancelExecution()"><i class="zmdi zmdi-spinner zmdi-hc-spin" ng-if="inProgress"></i> Cancel</button>',
replace: true,
controller: ['$scope', '$http', 'Events', function ($scope, $http, Events) {
$scope.inProgress = false;
$scope.cancelExecution = function() {
$http.delete('api/jobs/' + $scope.taskId).success(function() {
});
var queryId = $scope.queryId;
if ($scope.queryId == 'adhoc') {
queryId = null;
}
Events.record(currentUser, 'cancel_execute', 'query', queryId, {'admin': true});
$scope.inProgress = true;
}
}]
}
};
angular.module('redash.admin_controllers', [])
.controller('AdminStatusCtrl', ['$scope', 'Events', '$http', '$timeout', AdminStatusCtrl])
.controller('AdminTasksCtrl', ['$scope', '$location', 'Events', '$http', '$timeout', '$filter', AdminTasksCtrl])
.controller('AdminOutdatedQueriesCtrl', ['$scope', 'Events', '$http', '$timeout', '$filter', AdminOutdatedQueriesCtrl])
.directive('cancelQueryButton', cancelQueryButton)
})();

View File

@@ -116,7 +116,7 @@
},
{
'label': 'Runtime',
'map': 'runtime',
'map': 'run_time',
'formatFunction': function (value) {
return $filter('durationHumanize')(value);
}

View File

@@ -0,0 +1,19 @@
<page-header title="Admin">
</page-header>
<div class="container">
<div class="container bg-white p-5">
<ul class="tab-nav">
<li><a href="admin/status">System Status</a></li>
<li><a href="admin/queries/tasks">Queries Queue</a></li>
<li class="active"><a href="admin/queries/outdated">Outdated Queries</a></li>
</ul>
<smart-table rows="queries" columns="gridColumns"
config="gridConfig"
class="table table-condensed table-hover"></smart-table>
<div class="badge">
Last update: <span am-time-ago="updatedAt"></span>
</div>
</div>
</div>

View File

@@ -0,0 +1,23 @@
<page-header title="Admin">
</page-header>
<div class="container">
<div class="container bg-white p-5">
<ul class="tab-nav">
<li><a href="admin/status">System Status</a></li>
<li class="active"><a href="admin/queries/tasks">Queries Queue</a></li>
<li><a href="admin/queries/outdated">Outdated Queries</a></li>
</ul>
<ul class="tab-nav">
<rd-tab tab-id="in_progress" name="In Progress ({{tasks.in_progress.length}})" ng-click="setTab('in_progress')"></rd-tab>
<rd-tab tab-id="waiting" name="Waiting ({{tasks.waiting.length}})" ng-click="setTab('waiting')"></rd-tab>
<rd-tab tab-id="done" name="Done ({{tasks.done.length}})" ng-click="setTab('done')"></rd-tab>
</ul>
<smart-table rows="showingTasks" columns="gridColumns"
config="gridConfig"
class="table table-condensed table-hover"></smart-table>
</div>
</div>

View File

@@ -2,12 +2,14 @@
</page-header>
<div class="container">
<div class="container bg-white">
<div class="container bg-white p-5">
<ul class="tab-nav">
<li class="active"><a>System Status</a></li>
<li class="active"><a href="admin/status">System Status</a></li>
<li><a href="admin/queries/tasks">Queries Queue</a></li>
<li><a href="admin/queries/outdated">Outdated Queries</a></li>
</ul>
<div class="p-5">
<div>
<ul class="list-group col-lg-4">
<li class="list-group-item active">General</li>

View File

@@ -28,7 +28,6 @@ def ping():
@require_super_admin
def status_api():
status = get_status()
return jsonify(status)
@@ -40,6 +39,6 @@ def inject_variables():
def init_app(app):
from redash.handlers import embed, queries, static, authentication
from redash.handlers import embed, queries, static, authentication, admin
app.register_blueprint(routes)
api.init_app(app)

48
redash/handlers/admin.py Normal file
View File

@@ -0,0 +1,48 @@
import json
from flask import current_app
from flask_login import login_required
from redash import models, redis_connection
from redash.utils import json_dumps
from redash.handlers import routes
from redash.permissions import require_super_admin
from redash.tasks.queries import QueryTaskTracker
def json_response(response):
return current_app.response_class(json_dumps(response), mimetype='application/json')
@routes.route('/api/admin/queries/outdated', methods=['GET'])
@require_super_admin
@login_required
def outdated_queries():
manager_status = redis_connection.hgetall('redash:status')
query_ids = json.loads(manager_status.get('query_ids', '[]'))
if query_ids:
outdated_queries = models.Query.select(models.Query, models.QueryResult.retrieved_at, models.QueryResult.runtime) \
.join(models.QueryResult, join_type=models.peewee.JOIN_LEFT_OUTER) \
.where(models.Query.id << query_ids) \
.order_by(models.Query.created_at.desc())
else:
outdated_queries = []
return json_response(dict(queries=[q.to_dict(with_stats=True, with_last_modified_by=False) for q in outdated_queries], updated_at=manager_status['last_refresh_at']))
@routes.route('/api/admin/queries/tasks', methods=['GET'])
@require_super_admin
@login_required
def queries_tasks():
waiting = QueryTaskTracker.all(QueryTaskTracker.WAITING_LIST)
in_progress = QueryTaskTracker.all(QueryTaskTracker.IN_PROGRESS_LIST)
done = QueryTaskTracker.all(QueryTaskTracker.DONE_LIST, limit=50)
response = {
'waiting': [t.data for t in waiting],
'in_progress': [t.data for t in in_progress],
'done': [t.data for t in done]
}
return json_response(response)

View File

@@ -13,6 +13,7 @@ from redash.tasks import QueryTask, record_event
from redash.permissions import require_permission, not_view_only, has_access, require_access, view_only
from redash.handlers.base import BaseResource, get_object_or_404
from redash.utils import collect_query_parameters, collect_parameters_from_request
from redash.tasks.queries import enqueue_query
def run_query(data_source, parameter_values, query_text, query_id, max_age=0):
@@ -33,8 +34,7 @@ def run_query(data_source, parameter_values, query_text, query_id, max_age=0):
if query_result:
return {'query_result': query_result.to_dict()}
else:
job = QueryTask.add_task(query_text, data_source,
metadata={"Username": current_user.name, "Query ID": query_id})
job = enqueue_query(query_text, data_source, metadata={"Username": current_user.name, "Query ID": query_id})
return {'job': job.to_dict()}
@@ -190,7 +190,6 @@ class QueryResultResource(BaseResource):
class JobResource(BaseResource):
def get(self, job_id):
# TODO: if finished, include the query result
job = QueryTask(job_id=job_id)
return {'job': job.to_dict()}

View File

@@ -1,431 +0,0 @@
import datetime
import time
import logging
import signal
from flask_mail import Message
import redis
import hipchat
import requests
from redash.utils import json_dumps, base_url
from requests.auth import HTTPBasicAuth
from celery import Task
from celery.result import AsyncResult
from celery.utils.log import get_task_logger
from redash import redis_connection, models, statsd_client, settings, utils, mail
from redash.utils import gen_query_hash
from redash.worker import celery
from redash.query_runner import InterruptException
from version_check import run_version_check
logger = get_task_logger(__name__)
class BaseTask(Task):
abstract = True
def after_return(self, *args, **kwargs):
models.db.close_db(None)
def __call__(self, *args, **kwargs):
models.db.connect_db()
return super(BaseTask, self).__call__(*args, **kwargs)
class QueryTask(object):
MAX_RETRIES = 5
# TODO: this is mapping to the old Job class statuses. Need to update the client side and remove this
STATUSES = {
'PENDING': 1,
'STARTED': 2,
'SUCCESS': 3,
'FAILURE': 4,
'REVOKED': 4
}
def __init__(self, job_id=None, async_result=None):
if async_result:
self._async_result = async_result
else:
self._async_result = AsyncResult(job_id, app=celery)
@property
def id(self):
return self._async_result.id
@classmethod
def add_task(cls, query, data_source, scheduled=False, metadata={}):
query_hash = gen_query_hash(query)
logging.info("[Manager][%s] Inserting job", query_hash)
logging.info("[Manager] Metadata: [%s]", metadata)
try_count = 0
job = None
while try_count < cls.MAX_RETRIES:
try_count += 1
pipe = redis_connection.pipeline()
try:
pipe.watch(cls._job_lock_id(query_hash, data_source.id))
job_id = pipe.get(cls._job_lock_id(query_hash, data_source.id))
if job_id:
logging.info("[Manager][%s] Found existing job: %s", query_hash, job_id)
job = cls(job_id=job_id)
if job.ready():
logging.info("[%s] job found is ready (%s), removing lock", query_hash, job.celery_status)
redis_connection.delete(QueryTask._job_lock_id(query_hash, data_source.id))
job = None
if not job:
pipe.multi()
if scheduled:
queue_name = data_source.scheduled_queue_name
else:
queue_name = data_source.queue_name
result = execute_query.apply_async(args=(query, data_source.id, metadata), queue=queue_name)
job = cls(async_result=result)
logging.info("[Manager][%s] Created new job: %s", query_hash, job.id)
pipe.set(cls._job_lock_id(query_hash, data_source.id), job.id, settings.JOB_EXPIRY_TIME)
pipe.execute()
break
except redis.WatchError:
continue
if not job:
logging.error("[Manager][%s] Failed adding job for query.", query_hash)
return job
def to_dict(self):
if self._async_result.status == 'STARTED':
updated_at = self._async_result.result.get('start_time', 0)
else:
updated_at = 0
if self._async_result.failed() and isinstance(self._async_result.result, Exception):
error = self._async_result.result.message
elif self._async_result.status == 'REVOKED':
error = 'Query execution cancelled.'
else:
error = ''
if self._async_result.successful():
query_result_id = self._async_result.result
else:
query_result_id = None
return {
'id': self._async_result.id,
'updated_at': updated_at,
'status': self.STATUSES[self._async_result.status],
'error': error,
'query_result_id': query_result_id,
}
@property
def is_cancelled(self):
return self._async_result.status == 'REVOKED'
@property
def celery_status(self):
return self._async_result.status
def ready(self):
return self._async_result.ready()
def cancel(self):
return self._async_result.revoke(terminate=True, signal='SIGINT')
@staticmethod
def _job_lock_id(query_hash, data_source_id):
return "query_hash_job:%s:%s" % (data_source_id, query_hash)
@celery.task(base=BaseTask)
def refresh_queries():
# self.status['last_refresh_at'] = time.time()
# self._save_status()
logger.info("Refreshing queries...")
outdated_queries_count = 0
for query in models.Query.outdated_queries():
QueryTask.add_task(query.query, query.data_source, scheduled=True,
metadata={'Query ID': query.id, 'Username': 'Scheduled'})
outdated_queries_count += 1
statsd_client.gauge('manager.outdated_queries', outdated_queries_count)
logger.info("Done refreshing queries. Found %d outdated queries." % outdated_queries_count)
status = redis_connection.hgetall('redash:status')
now = time.time()
redis_connection.hmset('redash:status', {
'outdated_queries_count': outdated_queries_count,
'last_refresh_at': now
})
statsd_client.gauge('manager.seconds_since_refresh', now - float(status.get('last_refresh_at', now)))
@celery.task(base=BaseTask)
def cleanup_tasks():
# in case of cold restart of the workers, there might be jobs that still have their "lock" object, but aren't really
# going to run. this job removes them.
lock_keys = redis_connection.keys("query_hash_job:*") # TODO: use set instead of keys command
if not lock_keys:
return
query_tasks = [QueryTask(job_id=j) for j in redis_connection.mget(lock_keys)]
logger.info("Found %d locks", len(query_tasks))
inspect = celery.control.inspect()
active_tasks = inspect.active()
if active_tasks is None:
active_tasks = []
else:
active_tasks = active_tasks.values()
all_tasks = set()
for task_list in active_tasks:
for task in task_list:
all_tasks.add(task['id'])
logger.info("Active jobs count: %d", len(all_tasks))
for i, t in enumerate(query_tasks):
if t.ready():
# if locked task is ready already (failed, finished, revoked), we don't need the lock anymore
logger.warning("%s is ready (%s), removing lock.", lock_keys[i], t.celery_status)
redis_connection.delete(lock_keys[i])
# if t.celery_status == 'STARTED' and t.id not in all_tasks:
# logger.warning("Couldn't find active job for: %s, removing lock.", lock_keys[i])
# redis_connection.delete(lock_keys[i])
@celery.task(base=BaseTask)
def cleanup_query_results():
"""
Job to cleanup unused query results -- such that no query links to them anymore, and older than a week (so it's less
likely to be open in someone's browser and be used).
Each time the job deletes only 100 query results so it won't choke the database in case of many such results.
"""
logging.info("Running query results clean up (removing maximum of %d unused results, that are %d days old or more)",
settings.QUERY_RESULTS_CLEANUP_COUNT, settings.QUERY_RESULTS_CLEANUP_MAX_AGE)
unused_query_results = models.QueryResult.unused(settings.QUERY_RESULTS_CLEANUP_MAX_AGE).limit(settings.QUERY_RESULTS_CLEANUP_COUNT)
total_unused_query_results = models.QueryResult.unused().count()
deleted_count = models.QueryResult.delete().where(models.QueryResult.id << unused_query_results).execute()
logger.info("Deleted %d unused query results out of total of %d." % (deleted_count, total_unused_query_results))
@celery.task(base=BaseTask)
def refresh_schemas():
"""
Refreshs the datasources schema.
"""
for ds in models.DataSource.select():
logger.info("Refreshing schema for: {}".format(ds.name))
try:
ds.get_schema(refresh=True)
except Exception:
logger.exception("Failed refreshing the data source: %s", ds.name)
def signal_handler(*args):
raise InterruptException
class QueryExecutionError(Exception):
pass
# TODO: convert this into a class, to simplify and avoid code duplication for logging
# class ExecuteQueryTask(BaseTask):
# def run(self, ...):
# # logic
@celery.task(bind=True, base=BaseTask, track_started=True, throws=(QueryExecutionError,))
def execute_query(self, query, data_source_id, metadata):
signal.signal(signal.SIGINT, signal_handler)
start_time = time.time()
logger.info("task=execute_query state=load_ds ds_id=%d", data_source_id)
data_source = models.DataSource.get_by_id(data_source_id)
self.update_state(state='STARTED', meta={'start_time': start_time, 'custom_message': ''})
logger.debug("Executing query:\n%s", query)
query_hash = gen_query_hash(query)
query_runner = data_source.query_runner
logger.info("task=execute_query state=before query_hash=%s type=%s ds_id=%d task_id=%s queue=%s query_id=%s username=%s",
query_hash, data_source.type, data_source.id, self.request.id, self.request.delivery_info['routing_key'],
metadata.get('Query ID', 'unknown'), metadata.get('Username', 'unknown'))
if query_runner.annotate_query():
metadata['Task ID'] = self.request.id
metadata['Query Hash'] = query_hash
metadata['Queue'] = self.request.delivery_info['routing_key']
annotation = u", ".join([u"{}: {}".format(k, v) for k, v in metadata.iteritems()])
logging.debug(u"Annotation: %s", annotation)
annotated_query = u"/* {} */ {}".format(annotation, query)
else:
annotated_query = query
with statsd_client.timer('query_runner.{}.{}.run_time'.format(data_source.type, data_source.name)):
data, error = query_runner.run_query(annotated_query)
logger.info("task=execute_query state=after query_hash=%s type=%s ds_id=%d task_id=%s queue=%s query_id=%s username=%s",
query_hash, data_source.type, data_source.id, self.request.id, self.request.delivery_info['routing_key'],
metadata.get('Query ID', 'unknown'), metadata.get('Username', 'unknown'))
run_time = time.time() - start_time
logger.info("Query finished... data length=%s, error=%s", data and len(data), error)
self.update_state(state='STARTED', meta={'start_time': start_time, 'error': error, 'custom_message': ''})
# Delete query_hash
redis_connection.delete(QueryTask._job_lock_id(query_hash, data_source.id))
if not error:
query_result, updated_query_ids = models.QueryResult.store_result(data_source.org_id, data_source.id, query_hash, query, data, run_time, utils.utcnow())
logger.info("task=execute_query state=after_store query_hash=%s type=%s ds_id=%d task_id=%s queue=%s query_id=%s username=%s",
query_hash, data_source.type, data_source.id, self.request.id, self.request.delivery_info['routing_key'],
metadata.get('Query ID', 'unknown'), metadata.get('Username', 'unknown'))
for query_id in updated_query_ids:
check_alerts_for_query.delay(query_id)
logger.info("task=execute_query state=after_alerts query_hash=%s type=%s ds_id=%d task_id=%s queue=%s query_id=%s username=%s",
query_hash, data_source.type, data_source.id, self.request.id, self.request.delivery_info['routing_key'],
metadata.get('Query ID', 'unknown'), metadata.get('Username', 'unknown'))
else:
raise QueryExecutionError(error)
return query_result.id
@celery.task(base=BaseTask)
def record_event(event):
original_event = event.copy()
models.Event.record(event)
for hook in settings.EVENT_REPORTING_WEBHOOKS:
logging.debug("Forwarding event to: %s", hook)
try:
response = requests.post(hook, original_event)
if response.status_code != 200:
logging.error("Failed posting to %s: %s", hook, response.content)
except Exception:
logging.exception("Failed posting to %s", hook)
@celery.task(base=BaseTask)
def version_check():
run_version_check()
@celery.task(bind=True, base=BaseTask)
def check_alerts_for_query(self, query_id):
from redash.wsgi import app
logger.debug("Checking query %d for alerts", query_id)
query = models.Query.get_by_id(query_id)
for alert in query.alerts:
alert.query = query
new_state = alert.evaluate()
passed_rearm_threshold = False
if alert.rearm and alert.last_triggered_at:
passed_rearm_threshold = alert.last_triggered_at + datetime.timedelta(seconds=alert.rearm) < utils.utcnow()
if new_state != alert.state or (alert.state == models.Alert.TRIGGERED_STATE and passed_rearm_threshold ):
logger.info("Alert %d new state: %s", alert.id, new_state)
old_state = alert.state
alert.update_instance(state=new_state, last_triggered_at=utils.utcnow())
if old_state == models.Alert.UNKNOWN_STATE and new_state == models.Alert.OK_STATE:
logger.debug("Skipping notification (previous state was unknown and now it's ok).")
continue
# message = Message
html = """
Check <a href="{host}/alerts/{alert_id}">alert</a> / check <a href="{host}/queries/{query_id}">query</a>.
""".format(host=base_url(alert.query.org), alert_id=alert.id, query_id=query.id)
notify_mail(alert, html, new_state, app)
if settings.HIPCHAT_API_TOKEN:
notify_hipchat(alert, html, new_state)
if settings.WEBHOOK_ENDPOINT:
notify_webhook(alert, query, html, new_state)
def notify_hipchat(alert, html, new_state):
try:
if settings.HIPCHAT_API_URL:
hipchat_client = hipchat.HipChat(token=settings.HIPCHAT_API_TOKEN, url=settings.HIPCHAT_API_URL)
else:
hipchat_client = hipchat.HipChat(token=settings.HIPCHAT_API_TOKEN)
message = '[' + new_state.upper() + '] ' + alert.name + '<br />' + html
hipchat_client.message_room(settings.HIPCHAT_ROOM_ID, settings.NAME, message.encode('utf-8', 'ignore'), message_format='html')
except Exception:
logger.exception("hipchat send ERROR.")
def notify_mail(alert, html, new_state, app):
recipients = [s.email for s in alert.subscribers()]
logger.debug("Notifying: %s", recipients)
try:
with app.app_context():
message = Message(recipients=recipients,
subject="[{1}] {0}".format(alert.name.encode('utf-8', 'ignore'), new_state.upper()),
html=html)
mail.send(message)
except Exception:
logger.exception("mail send ERROR.")
def notify_webhook(alert, query, html, new_state):
try:
data = {
'event': 'alert_state_change',
'alert': alert.to_dict(full=False),
'url_base': base_url(query.org)
}
headers = {'Content-Type': 'application/json'}
auth = HTTPBasicAuth(settings.WEBHOOK_USERNAME, settings.WEBHOOK_PASSWORD) if settings.WEBHOOK_USERNAME else None
resp = requests.post(settings.WEBHOOK_ENDPOINT, data=json_dumps(data), auth=auth, headers=headers)
if resp.status_code != 200:
logger.error("webhook send ERROR. status_code => {status}".format(status=resp.status_code))
except Exception:
logger.exception("webhook send ERROR.")
@celery.task(base=BaseTask)
def send_mail(to, subject, html, text):
from redash.wsgi import app
try:
with app.app_context():
message = Message(recipients=to,
subject=subject,
html=html,
body=text)
mail.send(message)
except Exception:
logger.exception('Failed sending message: %s', message.subject)

3
redash/tasks/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from .general import record_event, version_check, send_mail
from .queries import QueryTask, refresh_queries, refresh_schemas, cleanup_tasks, cleanup_query_results, execute_query
from .alerts import check_alerts_for_query

97
redash/tasks/alerts.py Normal file
View File

@@ -0,0 +1,97 @@
from celery.utils.log import get_task_logger
import datetime
from flask.ext.mail import Message
import hipchat
import requests
from redash.utils import json_dumps
from requests.auth import HTTPBasicAuth
from redash.worker import celery
from redash import utils, mail
from redash import models, settings
from .base import BaseTask
logger = get_task_logger(__name__)
def base_url(org):
if settings.MULTI_ORG:
return "https://{}/{}".format(settings.HOST, org.slug)
return settings.HOST
@celery.task(name="redash.tasks.check_alerts_for_query", bind=True, base=BaseTask)
def check_alerts_for_query(self, query_id):
from redash.wsgi import app
logger.debug("Checking query %d for alerts", query_id)
query = models.Query.get_by_id(query_id)
for alert in query.alerts:
alert.query = query
new_state = alert.evaluate()
passed_rearm_threshold = False
if alert.rearm and alert.last_triggered_at:
passed_rearm_threshold = alert.last_triggered_at + datetime.timedelta(seconds=alert.rearm) < utils.utcnow()
if new_state != alert.state or (alert.state == models.Alert.TRIGGERED_STATE and passed_rearm_threshold ):
logger.info("Alert %d new state: %s", alert.id, new_state)
old_state = alert.state
alert.update_instance(state=new_state, last_triggered_at=utils.utcnow())
if old_state == models.Alert.UNKNOWN_STATE and new_state == models.Alert.OK_STATE:
logger.debug("Skipping notification (previous state was unknown and now it's ok).")
continue
# message = Message
html = """
Check <a href="{host}/alerts/{alert_id}">alert</a> / check <a href="{host}/queries/{query_id}">query</a>.
""".format(host=base_url(alert.query.org), alert_id=alert.id, query_id=query.id)
notify_mail(alert, html, new_state, app)
if settings.HIPCHAT_API_TOKEN:
notify_hipchat(alert, html, new_state)
if settings.WEBHOOK_ENDPOINT:
notify_webhook(alert, query, html, new_state)
def notify_hipchat(alert, html, new_state):
try:
if settings.HIPCHAT_API_URL:
hipchat_client = hipchat.HipChat(token=settings.HIPCHAT_API_TOKEN, url=settings.HIPCHAT_API_URL)
else:
hipchat_client = hipchat.HipChat(token=settings.HIPCHAT_API_TOKEN)
message = '[' + new_state.upper() + '] ' + alert.name + '<br />' + html
hipchat_client.message_room(settings.HIPCHAT_ROOM_ID, settings.NAME, message.encode('utf-8', 'ignore'), message_format='html')
except Exception:
logger.exception("hipchat send ERROR.")
def notify_mail(alert, html, new_state, app):
recipients = [s.email for s in alert.subscribers()]
logger.debug("Notifying: %s", recipients)
try:
with app.app_context():
message = Message(recipients=recipients,
subject="[{1}] {0}".format(alert.name.encode('utf-8', 'ignore'), new_state.upper()),
html=html)
mail.send(message)
except Exception:
logger.exception("mail send ERROR.")
def notify_webhook(alert, query, html, new_state):
try:
data = {
'event': 'alert_state_change',
'alert': alert.to_dict(full=False),
'url_base': base_url(query.org)
}
headers = {'Content-Type': 'application/json'}
auth = HTTPBasicAuth(settings.WEBHOOK_USERNAME, settings.WEBHOOK_PASSWORD) if settings.WEBHOOK_USERNAME else None
resp = requests.post(settings.WEBHOOK_ENDPOINT, data=json_dumps(data), auth=auth, headers=headers)
if resp.status_code != 200:
logger.error("webhook send ERROR. status_code => {status}".format(status=resp.status_code))
except Exception:
logger.exception("webhook send ERROR.")

13
redash/tasks/base.py Normal file
View File

@@ -0,0 +1,13 @@
from celery import Task
from redash import models
class BaseTask(Task):
abstract = True
def after_return(self, *args, **kwargs):
models.db.close_db(None)
def __call__(self, *args, **kwargs):
models.db.connect_db()
return super(BaseTask, self).__call__(*args, **kwargs)

34
redash/tasks/general.py Normal file
View File

@@ -0,0 +1,34 @@
from celery.utils.log import get_task_logger
from flask.ext.mail import Message
from redash.worker import celery
from redash.version_check import run_version_check
from redash import models, mail
from .base import BaseTask
logger = get_task_logger(__name__)
@celery.task(name="redash.tasks.record_event", base=BaseTask)
def record_event(event):
models.Event.record(event)
@celery.task(name="redash.tasks.version_check", base=BaseTask)
def version_check():
run_version_check()
@celery.task(name="redash.tasks.send_mail", base=BaseTask)
def send_mail(to, subject, html, text):
from redash.wsgi import app
try:
with app.app_context():
message = Message(recipients=to,
subject=subject,
html=html,
body=text)
mail.send(message)
except Exception:
logger.exception('Failed sending message: %s', message.subject)

438
redash/tasks/queries.py Normal file
View File

@@ -0,0 +1,438 @@
import json
import time
import logging
import signal
import redis
from celery.result import AsyncResult
from celery.utils.log import get_task_logger
from redash import redis_connection, models, statsd_client, settings, utils
from redash.utils import gen_query_hash
from redash.worker import celery
from redash.query_runner import InterruptException
from .base import BaseTask
from .alerts import check_alerts_for_query
logger = get_task_logger(__name__)
def _job_lock_id(query_hash, data_source_id):
return "query_hash_job:%s:%s" % (data_source_id, query_hash)
def _unlock(query_hash, data_source_id):
redis_connection.delete(_job_lock_id(query_hash, data_source_id))
# TODO:
# There is some duplication between this class and QueryTask, but I wanted to implement the monitoring features without
# much changes to the existing code, so ended up creating another object. In the future we can merge them.
class QueryTaskTracker(object):
DONE_LIST = 'query_task_trackers:done'
WAITING_LIST = 'query_task_trackers:waiting'
IN_PROGRESS_LIST = 'query_task_trackers:in_progress'
ALL_LISTS = (DONE_LIST, WAITING_LIST, IN_PROGRESS_LIST)
def __init__(self, data):
self.data = data
@classmethod
def create(cls, task_id, state, query_hash, data_source_id, scheduled, metadata):
data = dict(task_id=task_id, state=state,
query_hash=query_hash, data_source_id=data_source_id,
scheduled=scheduled,
username=metadata.get('Username', 'unknown'),
query_id=metadata.get('Query ID', 'unknown'),
retries=0,
scheduled_retries=0,
created_at=time.time(),
started_at=None,
run_time=None)
return cls(data)
def save(self, connection=None):
if connection is None:
connection = redis_connection
self.data['updated_at'] = time.time()
key_name = self._key_name(self.data['task_id'])
connection.set(key_name, utils.json_dumps(self.data))
connection.zadd('query_task_trackers', time.time(), key_name)
connection.zadd(self._get_list(), time.time(), key_name)
for l in self.ALL_LISTS:
if l != self._get_list():
connection.zrem(l, key_name)
def update(self, **kwargs):
self.data.update(kwargs)
self.save()
@staticmethod
def _key_name(task_id):
return 'query_task_tracker:{}'.format(task_id)
def _get_list(self):
if self.state in ('finished', 'failed', 'cancelled'):
return self.DONE_LIST
if self.state in ('created'):
return self.WAITING_LIST
return self.IN_PROGRESS_LIST
@classmethod
def get_by_task_id(cls, task_id, connection=None):
if connection is None:
connection = redis_connection
key_name = cls._key_name(task_id)
data = connection.get(key_name)
return cls.create_from_data(data)
@classmethod
def create_from_data(cls, data):
if data:
data = json.loads(data)
return cls(data)
return None
@classmethod
def all(cls, list_name, offset=0, limit=-1):
if limit != -1:
limit -= 1
if offset != 0:
offset -= 1
ids = redis_connection.zrevrange(list_name, offset, limit)
pipe = redis_connection.pipeline()
for id in ids:
pipe.get(id)
tasks = [cls.create_from_data(data) for data in pipe.execute()]
return tasks
@classmethod
def prune(cls, list_name, keep_count):
count = redis_connection.zcard(list_name)
if count <= keep_count:
return 0
remove_count = count - keep_count
keys = redis_connection.zrange(list_name, 0, remove_count - 1)
redis_connection.delete(keys)
redis_connection.zremrangebyrank(list_name, 0, remove_count - 1)
return remove_count
def __getattr__(self, item):
return self.data[item]
def __contains__(self, item):
return item in self.data
class QueryTask(object):
# TODO: this is mapping to the old Job class statuses. Need to update the client side and remove this
STATUSES = {
'PENDING': 1,
'STARTED': 2,
'SUCCESS': 3,
'FAILURE': 4,
'REVOKED': 4
}
def __init__(self, job_id=None, async_result=None):
if async_result:
self._async_result = async_result
else:
self._async_result = AsyncResult(job_id, app=celery)
@property
def id(self):
return self._async_result.id
def to_dict(self):
if self._async_result.status == 'STARTED':
updated_at = self._async_result.result.get('start_time', 0)
else:
updated_at = 0
status = self.STATUSES[self._async_result.status]
if isinstance(self._async_result.result, Exception):
error = self._async_result.result.message
status = 4
elif self._async_result.status == 'REVOKED':
error = 'Query execution cancelled.'
else:
error = ''
if self._async_result.successful() and not error:
query_result_id = self._async_result.result
else:
query_result_id = None
return {
'id': self._async_result.id,
'updated_at': updated_at,
'status': status,
'error': error,
'query_result_id': query_result_id,
}
@property
def is_cancelled(self):
return self._async_result.status == 'REVOKED'
@property
def celery_status(self):
return self._async_result.status
def ready(self):
return self._async_result.ready()
def cancel(self):
return self._async_result.revoke(terminate=True, signal='SIGINT')
def enqueue_query(query, data_source, scheduled=False, metadata={}):
query_hash = gen_query_hash(query)
logging.info("Inserting job for %s with metadata=%s", query_hash, metadata)
try_count = 0
job = None
while try_count < 5:
try_count += 1
pipe = redis_connection.pipeline()
try:
pipe.watch(_job_lock_id(query_hash, data_source.id))
job_id = pipe.get(_job_lock_id(query_hash, data_source.id))
if job_id:
logging.info("[%s] Found existing job: %s", query_hash, job_id)
job = QueryTask(job_id=job_id)
tracker = QueryTaskTracker.get_by_task_id(job_id, connection=pipe)
# tracker might not exist, if it's an old job
if scheduled and tracker:
tracker.update(retries=tracker.retries+1)
elif tracker:
tracker.update(scheduled_retries=tracker.scheduled_retries+1)
if job.ready():
logging.info("[%s] job found is ready (%s), removing lock", query_hash, job.celery_status)
redis_connection.delete(_job_lock_id(query_hash, data_source.id))
job = None
if not job:
pipe.multi()
if scheduled:
queue_name = data_source.scheduled_queue_name
else:
queue_name = data_source.queue_name
result = execute_query.apply_async(args=(query, data_source.id, metadata), queue=queue_name)
job = QueryTask(async_result=result)
tracker = QueryTaskTracker.create(result.id, 'created', query_hash, data_source.id, scheduled, metadata)
tracker.save(connection=pipe)
logging.info("[%s] Created new job: %s", query_hash, job.id)
pipe.set(_job_lock_id(query_hash, data_source.id), job.id, settings.JOB_EXPIRY_TIME)
pipe.execute()
break
except redis.WatchError:
continue
if not job:
logging.error("[Manager][%s] Failed adding job for query.", query_hash)
return job
@celery.task(name="redash.tasks.refresh_queries", base=BaseTask)
def refresh_queries():
logger.info("Refreshing queries...")
outdated_queries_count = 0
query_ids = []
with statsd_client.timer('manager.outdated_queries_lookup'):
for query in models.Query.outdated_queries():
enqueue_query(query.query, query.data_source,
scheduled=True,
metadata={'Query ID': query.id, 'Username': 'Scheduled'})
query_ids.append(query.id)
outdated_queries_count += 1
statsd_client.gauge('manager.outdated_queries', outdated_queries_count)
logger.info("Done refreshing queries. Found %d outdated queries: %s" % (outdated_queries_count, query_ids))
status = redis_connection.hgetall('redash:status')
now = time.time()
redis_connection.hmset('redash:status', {
'outdated_queries_count': outdated_queries_count,
'last_refresh_at': now,
'query_ids': json.dumps(query_ids)
})
statsd_client.gauge('manager.seconds_since_refresh', now - float(status.get('last_refresh_at', now)))
@celery.task(name="redash.tasks.cleanup_tasks", base=BaseTask)
def cleanup_tasks():
in_progress = QueryTaskTracker.all(QueryTaskTracker.IN_PROGRESS_LIST)
for tracker in in_progress:
result = AsyncResult(tracker.task_id)
# If the AsyncResult status is PENDING it means there is no celery task object for this tracker, and we can
# mark it as "dead":
if result.status == 'PENDING':
logging.info("In progress tracker for %s is no longer enqueued, cancelling (task: %s).",
tracker.query_hash, tracker.task_id)
_unlock(tracker.query_hash, tracker.data_source_id)
tracker.update(state='cancelled')
if result.ready():
logging.info("in progress tracker %s finished", tracker.query_hash)
_unlock(tracker.query_hash, tracker.data_source_id)
tracker.update(state='finished')
waiting = QueryTaskTracker.all(QueryTaskTracker.WAITING_LIST)
for tracker in waiting:
result = AsyncResult(tracker.task_id)
if result.ready():
logging.info("waiting tracker %s finished", tracker.query_hash)
_unlock(tracker.query_hash, tracker.data_source_id)
tracker.update(state='finished')
# Maintain constant size of the finished tasks list:
QueryTaskTracker.prune(QueryTaskTracker.DONE_LIST, 1000)
@celery.task(name="redash.tasks.cleanup_query_results", base=BaseTask)
def cleanup_query_results():
"""
Job to cleanup unused query results -- such that no query links to them anymore, and older than
settings.QUERY_RESULTS_MAX_AGE (a week by default, so it's less likely to be open in someone's browser and be used).
Each time the job deletes only settings.QUERY_RESULTS_CLEANUP_COUNT (100 by default) query results so it won't choke
the database in case of many such results.
"""
logging.info("Running query results clean up (removing maximum of %d unused results, that are %d days old or more)",
settings.QUERY_RESULTS_CLEANUP_COUNT, settings.QUERY_RESULTS_CLEANUP_MAX_AGE)
unused_query_results = models.QueryResult.unused(settings.QUERY_RESULTS_CLEANUP_MAX_AGE).limit(settings.QUERY_RESULTS_CLEANUP_COUNT)
total_unused_query_results = models.QueryResult.unused().count()
deleted_count = models.QueryResult.delete().where(models.QueryResult.id << unused_query_results).execute()
logger.info("Deleted %d unused query results out of total of %d." % (deleted_count, total_unused_query_results))
@celery.task(name="redash.tasks.refresh_schemas", base=BaseTask)
def refresh_schemas():
"""
Refreshes the data sources schemas.
"""
for ds in models.DataSource.select():
logger.info("Refreshing schema for: {}".format(ds.name))
try:
ds.get_schema(refresh=True)
except Exception:
logger.exception("Failed refreshing schema for the data source: %s", ds.name)
def signal_handler(*args):
raise InterruptException
class QueryExecutionError(Exception):
pass
# We could have created this as a celery.Task derived class, and act as the task itself. But this might result in weird
# issues as the task class created once per process, so decided to have a plain object instead.
class QueryExecutor(object):
def __init__(self, task, query, data_source_id, metadata):
self.task = task
self.query = query
self.data_source_id = data_source_id
self.metadata = metadata
self.data_source = self._load_data_source()
self.query_hash = gen_query_hash(self.query)
# Load existing tracker or create a new one if the job was created before code update:
self.tracker = QueryTaskTracker.get_by_task_id(task.request.id) or QueryTaskTracker.create(task.request.id,
'created',
self.query_hash,
self.data_source_id,
False, metadata)
def run(self):
signal.signal(signal.SIGINT, signal_handler)
self.tracker.update(started_at=time.time(), state='started')
logger.debug("Executing query:\n%s", self.query)
self._log_progress('executing_query')
query_runner = self.data_source.query_runner
annotated_query = self._annotate_query(query_runner)
data, error = query_runner.run_query(annotated_query)
run_time = time.time() - self.tracker.started_at
self.tracker.update(error=error, run_time=run_time, state='saving_results')
logger.info(u"task=execute_query query_hash=%s data_length=%s error=[%s]", self.query_hash, data and len(data), error)
_unlock(self.query_hash, self.data_source.id)
if error:
self.tracker.update(state='failed')
result = QueryExecutionError(error)
else:
query_result, updated_query_ids = models.QueryResult.store_result(self.data_source.org_id, self.data_source.id,
self.query_hash, self.query, data,
run_time, utils.utcnow())
self._log_progress('checking_alerts')
for query_id in updated_query_ids:
check_alerts_for_query.delay(query_id)
self._log_progress('finished')
result = query_result.id
return result
def _annotate_query(self, query_runner):
if query_runner.annotate_query():
self.metadata['Task ID'] = self.task.request.id
self.metadata['Query Hash'] = self.query_hash
self.metadata['Queue'] = self.task.request.delivery_info['routing_key']
annotation = u", ".join([u"{}: {}".format(k, v) for k, v in self.metadata.iteritems()])
annotated_query = u"/* {} */ {}".format(annotation, self.query)
else:
annotated_query = self.query
return annotated_query
def _log_progress(self, state):
logger.info(u"task=execute_query state=%s query_hash=%s type=%s ds_id=%d task_id=%s queue=%s query_id=%s username=%s",
state,
self.query_hash, self.data_source.type, self.data_source.id, self.task.request.id, self.task.request.delivery_info['routing_key'],
self.metadata.get('Query ID', 'unknown'), self.metadata.get('Username', 'unknown'))
self.tracker.update(state=state)
def _load_data_source(self):
logger.info("task=execute_query state=load_ds ds_id=%d", self.data_source_id)
return models.DataSource.get_by_id(self.data_source_id)
@celery.task(name="redash.tasks.execute_query", bind=True, base=BaseTask, track_started=True)
def execute_query(self, query, data_source_id, metadata):
return QueryExecutor(self, query, data_source_id, metadata).run()

1
tests/tasks/__init__.py Normal file
View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,24 @@
from redash import redis_connection
from redash.tasks.queries import QueryTaskTracker
from unittest import TestCase
class TestPrune(TestCase):
def setUp(self):
self.list = "test_list"
redis_connection.delete(self.list)
for score in range(0, 100):
redis_connection.zadd(self.list, score, 'k:{}'.format(score))
def test_does_nothing_when_below_threshold(self):
remove_count = QueryTaskTracker.prune(self.list, 100)
self.assertEqual(remove_count, 0)
self.assertEqual(redis_connection.zcard(self.list), 100)
def test_removes_oldest_items_first(self):
remove_count = QueryTaskTracker.prune(self.list, 50)
self.assertEqual(remove_count, 50)
self.assertEqual(redis_connection.zcard(self.list), 50)
self.assertEqual(redis_connection.zscore(self.list, 'k:99'), 99.0)
self.assertIsNone(redis_connection.zscore(self.list, 'k:1'))

View File

@@ -73,7 +73,7 @@ class TestHMACAuthentication(BaseTestCase):
super(TestHMACAuthentication, self).setUp()
self.api_key = 10
self.query = self.factory.create_query(api_key=self.api_key)
self.path = '/api/queries/{0}'.format(self.query.id)
self.path = '/{}/api/queries/{}'.format(self.query.org.slug, self.query.id)
self.expires = time.time() + 1800
def signature(self, expires):
@@ -91,7 +91,7 @@ class TestHMACAuthentication(BaseTestCase):
def test_correct_signature(self):
with app.test_client() as c:
rv = c.get('/api/queries/{0}'.format(self.query.id), query_string={'signature': self.signature(self.expires), 'expires': self.expires})
rv = c.get(self.path, query_string={'signature': self.signature(self.expires), 'expires': self.expires})
self.assertIsNotNone(hmac_load_user_from_request(request))
def test_no_query_id(self):

View File

@@ -17,7 +17,7 @@ class TestRefreshQueries(BaseTestCase):
query.latest_query_data = query_result
query.save()
with patch('redash.tasks.QueryTask.add_task') as add_job_mock:
with patch('redash.tasks.queries.enqueue_query') as add_job_mock:
refresh_queries()
add_job_mock.assert_called_with(query.query, query.data_source, scheduled=True, metadata=ANY)
@@ -27,7 +27,7 @@ class TestRefreshQueries(BaseTestCase):
query_result = self.factory.create_query_result(retrieved_at=retrieved_at, query=query.query,
query_hash=query.query_hash)
with patch('redash.tasks.QueryTask.add_task') as add_job_mock:
with patch('redash.tasks.queries.enqueue_query') as add_job_mock:
refresh_queries()
self.assertFalse(add_job_mock.called)
@@ -37,7 +37,7 @@ class TestRefreshQueries(BaseTestCase):
query_result = self.factory.create_query_result(retrieved_at=retrieved_at, query=query.query,
query_hash=query.query_hash)
with patch('redash.tasks.QueryTask.add_task') as add_job_mock:
with patch('redash.tasks.queries.enqueue_query') as add_job_mock:
refresh_queries()
self.assertFalse(add_job_mock.called)
@@ -52,7 +52,7 @@ class TestRefreshQueries(BaseTestCase):
query.save()
query2.save()
with patch('redash.tasks.QueryTask.add_task') as add_job_mock:
with patch('redash.tasks.queries.enqueue_query') as add_job_mock:
refresh_queries()
add_job_mock.assert_called_once_with(query.query, query.data_source, scheduled=True, metadata=ANY)#{'Query ID': query.id, 'Username': 'Scheduled'})
@@ -67,7 +67,7 @@ class TestRefreshQueries(BaseTestCase):
query.save()
query2.save()
with patch('redash.tasks.QueryTask.add_task') as add_job_mock:
with patch('redash.tasks.queries.enqueue_query') as add_job_mock:
refresh_queries()
add_job_mock.assert_has_calls([call(query2.query, query2.data_source, scheduled=True, metadata=ANY),
call(query.query, query.data_source, scheduled=True, metadata=ANY)],
@@ -86,6 +86,6 @@ class TestRefreshQueries(BaseTestCase):
query.save()
query2.save()
with patch('redash.tasks.QueryTask.add_task') as add_job_mock:
with patch('redash.tasks.queries.enqueue_query') as add_job_mock:
refresh_queries()
add_job_mock.assert_called_once_with(query.query, query.data_source, scheduled=True, metadata=ANY)