New Celery/Queries Execution Status API (#3057)

* Remove QueryTaskTracker

* Remove scheudling of cleanup_tasks

* Add Celery introspection tools

* First iteration of updating the admin API.

* Show more details

* Add option to skip building npm in Dockerfile

* Show started_at

* update the refresh schedule, as it's too fast

* Update Celery monitor to report on all active tasks.

* Update task parsing for new format

* WIP: improved celery status screen

* Fix property name.

* Update counters

* Update tab name

* Update counters names

* Move component to its own file and fix lint issues

* Add migratin to remove Redis keys

* Improve columns layout

* Remove skip_npm_build arg as it's not used anymore.

* Convert query from SQL to Python

* Simplify column definition.

* Show alert on error.
This commit is contained in:
Arik Fraimovich
2019-03-10 11:19:31 +02:00
committed by GitHub
parent 12d2a04946
commit 26f0ce0749
13 changed files with 404 additions and 341 deletions

View File

@@ -25,10 +25,15 @@
@import '~antd/lib/dropdown/style/index';
@import '~antd/lib/menu/style/index';
@import '~antd/lib/list/style/index';
@import "~antd/lib/badge/style/index";
@import "~antd/lib/card/style/index";
@import "~antd/lib/spin/style/index";
@import "~antd/lib/tabs/style/index";
@import 'inc/ant-variables';
// Remove bold in labels for Ant checkboxes and radio buttons
.ant-checkbox-wrapper, .ant-radio-wrapper {
.ant-checkbox-wrapper,
.ant-radio-wrapper {
font-weight: normal;
}
@@ -158,7 +163,9 @@
.@{table-prefix-cls} {
color: inherit;
tr, th, td {
tr,
th,
td {
transition: none !important;
}
@@ -213,7 +220,8 @@
// styling for short modals (no lines)
.@{dialog-prefix-cls}.shortModal {
.@{dialog-prefix-cls} {
&-header, &-footer {
&-header,
&-footer {
border: none;
padding: 16px;
}

View File

@@ -0,0 +1,219 @@
import React from 'react';
import PropTypes from 'prop-types';
import { $http } from '@/services/ng';
import Table from 'antd/lib/table';
import Col from 'antd/lib/col';
import Row from 'antd/lib/row';
import Card from 'antd/lib/card';
import Spin from 'antd/lib/spin';
import Badge from 'antd/lib/badge';
import Tabs from 'antd/lib/tabs';
import Alert from 'antd/lib/alert';
import moment from 'moment';
import values from 'lodash/values';
import { Columns } from '@/components/items-list/components/ItemsTable';
function parseTasks(tasks) {
const queues = {};
const queries = [];
const otherTasks = [];
const counters = { active: 0, reserved: 0, waiting: 0 };
tasks.forEach((task) => {
queues[task.queue] = queues[task.queue] || { name: task.queue, active: 0, reserved: 0, waiting: 0 };
queues[task.queue][task.state] += 1;
if (task.enqueue_time) {
task.enqueue_time = moment(task.enqueue_time * 1000.0);
}
if (task.start_time) {
task.start_time = moment(task.start_time * 1000.0);
}
counters[task.state] += 1;
if (task.task_name === 'redash.tasks.execute_query') {
queries.push(task);
} else {
otherTasks.push(task);
}
});
return { queues: values(queues), queries, otherTasks, counters };
}
function QueuesTable({ loading, queues }) {
const columns = ['Name', 'Active', 'Reserved', 'Waiting'].map(c => ({ title: c, dataIndex: c.toLowerCase() }));
return <Table columns={columns} rowKey="name" dataSource={queues} loading={loading} />;
}
QueuesTable.propTypes = {
loading: PropTypes.bool.isRequired,
queues: PropTypes.arrayOf(PropTypes.any).isRequired,
};
function CounterCard({ title, value, loading }) {
return (
<Spin spinning={loading}>
<Card>
{title}
<div className="f-20">{value}</div>
</Card>
</Spin>
);
}
CounterCard.propTypes = {
title: PropTypes.string.isRequired,
value: PropTypes.oneOfType([PropTypes.number, PropTypes.string]),
loading: PropTypes.bool.isRequired,
};
CounterCard.defaultProps = {
value: '',
};
export default class AdminCeleryStatus extends React.Component {
state = {
loading: true,
error: false,
counters: {},
queries: [],
otherTasks: [],
queues: [],
};
constructor(props) {
super(props);
this.fetch();
}
fetch() {
// TODO: handle error
$http
.get('/api/admin/queries/tasks')
.then(({ data }) => {
const { queues, queries, otherTasks, counters } = parseTasks(data.tasks);
this.setState({ loading: false, queries, otherTasks, queues, counters });
})
.catch(() => {
this.setState({ loading: false, error: true });
});
}
render() {
const commonColumns = [
{
title: 'Worker Name',
dataIndex: 'worker',
},
{
title: 'PID',
dataIndex: 'worker_pid',
},
{
title: 'Queue',
dataIndex: 'queue',
},
{
title: 'State',
dataIndex: 'state',
render: (value) => {
if (value === 'active') {
return (
<span>
<Badge status="processing" /> Active
</span>
);
}
return (
<span>
<Badge status="warning" /> {value}
</span>
);
},
},
Columns.timeAgo({ title: 'Start Time', dataIndex: 'start_time' }),
];
const queryColumns = commonColumns.concat([
Columns.timeAgo({ title: 'Enqueue Time', dataIndex: 'enqueue_time' }),
{
title: 'Query ID',
dataIndex: 'query_id',
},
{
title: 'Org ID',
dataIndex: 'org_id',
},
{
title: 'Data Source ID',
dataIndex: 'data_source_id',
},
{
title: 'User ID',
dataIndex: 'user_id',
},
{
title: 'Scheduled',
dataIndex: 'scheduled',
},
]);
const otherTasksColumns = commonColumns.concat([
{
title: 'Task Name',
dataIndex: 'task_name',
},
]);
if (this.state.error) {
return (
<div className="p-5">
<Alert type="error" message="Failed loading status. Please refresh." />
</div>
);
}
return (
<div className="p-5">
<Row gutter={16}>
<Col span={4}>
<CounterCard title="Active Tasks" value={this.state.counters.active} loading={this.state.loading} />
</Col>
<Col span={4}>
<CounterCard title="Reserved Tasks" value={this.state.counters.reserved} loading={this.state.loading} />
</Col>
<Col span={4}>
<CounterCard title="Waiting Tasks" value={this.state.counters.waiting} loading={this.state.loading} />
</Col>
</Row>
<Row>
<Tabs defaultActiveKey="queues">
<Tabs.TabPane key="queues" tab="Queues">
<QueuesTable loading={this.state.loading} queues={this.state.queues} />
</Tabs.TabPane>
<Tabs.TabPane key="queries" tab="Queries">
<Table
rowKey="task_id"
dataSource={this.state.queries}
loading={this.state.loading}
columns={queryColumns}
/>
</Tabs.TabPane>
<Tabs.TabPane key="other" tab="Other Tasks">
<Table
rowKey="task_id"
dataSource={this.state.otherTasks}
loading={this.state.loading}
columns={otherTasksColumns}
/>
</Tabs.TabPane>
</Tabs>
</Row>
</div>
);
}
}

View File

@@ -4,7 +4,7 @@
<div class="bg-white tiled">
<ul class="tab-nav">
<li><a href="admin/status">System Status</a></li>
<li><a href="admin/queries/tasks">Queries Queue</a></li>
<li><a href="admin/queries/tasks">Celery Status</a></li>
<li class="active"><a href="admin/queries/outdated">Outdated Queries</a></li>
</ul>

View File

@@ -7,27 +7,29 @@
<a href="admin/status">System Status</a>
</li>
<li>
<a href="admin/queries/tasks">Queries Queue</a>
<a href="admin/queries/tasks">Celery Status</a>
</li>
<li>
<a href="admin/queries/outdated">Outdated Queries</a>
</li>
</ul>
<div class="row p-15">
<ul class="list-group p-l-15 col-lg-4">
<li class="list-group-item active">General</li>
<li class="list-group-item" ng-repeat="(name, value) in status">
<span class="badge">{{value}}</span>
{{name | toHuman}}
<span class="badge">{{ value }}</span>
{{ name | toHuman }}
</li>
</ul>
<ul class="list-group col-lg-4">
<li class="list-group-item active">Manager</li>
<li class="list-group-item">
<span class="badge" am-time-ago="manager.last_refresh_at*1000.0"></span>
<span
class="badge"
am-time-ago="manager.last_refresh_at*1000.0"
></span>
Last Refresh
</li>
<li class="list-group-item">
@@ -35,7 +37,7 @@
Started
</li>
<li class="list-group-item">
<span class="badge">{{manager.outdated_queries_count}}</span>
<span class="badge">{{ manager.outdated_queries_count }}</span>
Outdated Queries Count
</li>
</ul>
@@ -43,9 +45,12 @@
<ul class="list-group col-lg-4">
<li class="list-group-item active">Queues</li>
<li class="list-group-item" ng-repeat="(name, value) in manager.queues">
<span class="badge">{{value.size}}</span>
{{name}}
<span uib-popover="{{value.data_sources}}" popover-trigger="'mouseenter'">
<span class="badge">{{ value.size }}</span>
{{ name }}
<span
uib-popover="{{ value.data_sources }}"
popover-trigger="'mouseenter'"
>
<i class="fa fa-question-circle"></i>
</span>
</li>
@@ -54,9 +59,12 @@
<div class="row p-15">
<ul class="list-group col-lg-4">
<li class="list-group-item active">Redash Database</li>
<li class="list-group-item" ng-repeat="(name, size) in database_metrics.metrics">
<span class="badge">{{size[1] | prettySize}}</span>
<span> {{size[0]}} </span>
<li
class="list-group-item"
ng-repeat="(name, size) in database_metrics.metrics"
>
<span class="badge">{{ size[1] | prettySize }}</span>
<span> {{ size[0] }} </span>
</li>
</ul>
</div>

View File

@@ -1,51 +1,13 @@
import moment from 'moment';
import { Paginator } from '@/lib/pagination';
import { react2angular } from 'react2angular';
import CeleryStatus from '@/components/admin/CeleryStatus';
import template from './tasks.html';
function TasksCtrl($scope, $location, $http, $timeout, Events) {
function TasksCtrl(Events) {
Events.record('view', 'page', 'admin/tasks');
$scope.autoUpdate = true;
$scope.selectedTab = 'in_progress';
$scope.tasks = {
waiting: [],
in_progress: [],
done: [],
};
this.tasksPaginator = new Paginator([], { itemsPerPage: 50 });
$scope.setTab = (tab) => {
$scope.selectedTab = tab;
this.tasksPaginator.updateRows($scope.tasks[tab]);
};
$scope.setTab($location.hash() || 'in_progress');
const refresh = () => {
if ($scope.autoUpdate) {
$scope.refresh_time = moment().add(1, 'minutes');
$http.get('/api/admin/queries/tasks').success((data) => {
$scope.tasks = data;
this.tasksPaginator.updateRows($scope.tasks[$scope.selectedTab]);
});
}
const timer = $timeout(refresh, 5 * 1000);
$scope.$on('$destroy', () => {
if (timer) {
$timeout.cancel(timer);
}
});
};
refresh();
}
export default function init(ngModule) {
ngModule.component('adminCeleryStatus', react2angular(CeleryStatus));
ngModule.component('tasksPage', {
template,
controller: TasksCtrl,
@@ -54,7 +16,7 @@ export default function init(ngModule) {
return {
'/admin/queries/tasks': {
template: '<tasks-page></tasks-page>',
title: 'Running Queries',
title: 'Celery Status',
},
};
}

View File

@@ -4,52 +4,10 @@
<div class="bg-white tiled">
<ul class="tab-nav">
<li><a href="admin/status">System Status</a></li>
<li class="active"><a href="admin/queries/tasks">Queries Queue</a></li>
<li class="active"><a href="admin/queries/tasks">Celery Status</a></li>
<li><a href="admin/queries/outdated">Outdated Queries</a></li>
</ul>
<ul class="tab-nav">
<rd-tab tab-id="in_progress" name="In Progress ({{tasks.in_progress.length}})" ng-click="setTab('in_progress')"></rd-tab>
<rd-tab tab-id="waiting" name="Waiting ({{tasks.waiting.length}})" ng-click="setTab('waiting')"></rd-tab>
<rd-tab tab-id="done" name="Done" ng-click="setTab('done')"></rd-tab>
</ul>
<table class="table table-condensed table-hover">
<thead>
<tr>
<th>Data Source ID</th>
<th>Username</th>
<th>State</th>
<th>Query ID</th>
<th>Query Hash</th>
<th>Runtime</th>
<th>Created At</th>
<th>Started At</th>
<th>Updated At</th>
<th ng-if="selectedTab === 'in_progress'"></th>
</tr>
</thead>
<tbody>
<tr ng-repeat="row in $ctrl.tasksPaginator.getPageRows()">
<td>{{row.data_source_id}}</td>
<td>{{row.username}}</td>
<td>{{row.state}} <span ng-if="row.state === 'failed'" uib-popover="{{row.error}}" popover-trigger="mouseenter" class="zmdi zmdi-help"></span></td>
<td><a href="queries/{{row.query_id}}">{{row.query_id}}</a></td>
<td>{{row.query_hash}}</td>
<td>{{row.run_time | durationHumanize}}</td>
<td>{{row.created_at | toMilliseconds | dateTime }}</td>
<td>{{row.started_at | toMilliseconds | dateTime }}</td>
<td>{{row.updated_at | toMilliseconds | dateTime }}</td>
<td ng-if="selectedTab === 'in_progress'">
<cancel-query-button query-id="row.query_id" task-id="row.task_id"></cancel-query-button>
</td>
</tr>
</tbody>
</table>
<paginator paginator="$ctrl.tasksPaginator"></paginator>
<div class="p-15">
<label><input type="checkbox" ng-model="autoUpdate"> Auto Update</label>
</div>
<admin-celery-status></admin-celery-status>
</div>
</div>

View File

@@ -0,0 +1,51 @@
"""remove_query_tracker_keys
Revision ID: e5c7a4e2df4d
Revises: 98af61feea92
Create Date: 2019-02-27 11:30:15.375318
"""
from alembic import op
import sqlalchemy as sa
from redash import redis_connection
# revision identifiers, used by Alembic.
revision = 'e5c7a4e2df4d'
down_revision = '98af61feea92'
branch_labels = None
depends_on = None
DONE_LIST = 'query_task_trackers:done'
WAITING_LIST = 'query_task_trackers:waiting'
IN_PROGRESS_LIST = 'query_task_trackers:in_progress'
def prune(list_name, keep_count, max_keys=100):
count = redis_connection.zcard(list_name)
if count <= keep_count:
return 0
remove_count = min(max_keys, count - keep_count)
keys = redis_connection.zrange(list_name, 0, remove_count - 1)
redis_connection.delete(*keys)
redis_connection.zremrangebyrank(list_name, 0, remove_count - 1)
return remove_count
def prune_all(list_name):
removed = 1000
while removed > 0:
removed = prune(list_name, 0)
def upgrade():
prune_all(DONE_LIST)
prune_all(WAITING_LIST)
prune_all(IN_PROGRESS_LIST)
def downgrade():
pass

View File

@@ -7,8 +7,8 @@ from redash.handlers import routes
from redash.handlers.base import json_response, record_event
from redash.permissions import require_super_admin
from redash.serializers import QuerySerializer
from redash.tasks.queries import QueryTaskTracker
from redash.utils import json_loads
from redash.monitor import celery_tasks
@routes.route('/api/admin/queries/outdated', methods=['GET'])
@@ -44,23 +44,11 @@ def outdated_queries():
def queries_tasks():
record_event(current_org, current_user._get_current_object(), {
'action': 'list',
'object_id': 'admin/tasks',
'object_type': 'celery_tasks'
})
global_limit = int(request.args.get('limit', 50))
waiting_limit = int(request.args.get('waiting_limit', global_limit))
progress_limit = int(request.args.get('progress_limit', global_limit))
done_limit = int(request.args.get('done_limit', global_limit))
waiting = QueryTaskTracker.all(QueryTaskTracker.WAITING_LIST, limit=waiting_limit)
in_progress = QueryTaskTracker.all(QueryTaskTracker.IN_PROGRESS_LIST, limit=progress_limit)
done = QueryTaskTracker.all(QueryTaskTracker.DONE_LIST, limit=done_limit)
response = {
'waiting': [t.data for t in waiting if t is not None],
'in_progress': [t.data for t in in_progress if t is not None],
'done': [t.data for t in done if t is not None]
'tasks': celery_tasks(),
}
return json_response(response)

View File

@@ -1,4 +1,11 @@
from redash import redis_connection, models, __version__, settings
import ast
import itertools
import json
import base64
from sqlalchemy import union_all
from redash import redis_connection, __version__, settings
from redash.models import db, DataSource, Query, QueryResult, Dashboard, Widget
from redash.worker import celery
def get_redis_status():
@@ -8,39 +15,31 @@ def get_redis_status():
def get_object_counts():
status = {}
status['queries_count'] = models.Query.query.count()
status['queries_count'] = Query.query.count()
if settings.FEATURE_SHOW_QUERY_RESULTS_COUNT:
status['query_results_count'] = models.QueryResult.query.count()
status['unused_query_results_count'] = models.QueryResult.unused().count()
status['dashboards_count'] = models.Dashboard.query.count()
status['widgets_count'] = models.Widget.query.count()
status['query_results_count'] = QueryResult.query.count()
status['unused_query_results_count'] = QueryResult.unused().count()
status['dashboards_count'] = Dashboard.query.count()
status['widgets_count'] = Widget.query.count()
return status
def get_queues():
queues = {}
for ds in models.DataSource.query:
for queue in (ds.queue_name, ds.scheduled_queue_name):
queues.setdefault(queue, set())
queues[queue].add(ds.name)
queue_names = db.session.query(DataSource.queue_name).distinct()
scheduled_queue_names = db.session.query(DataSource.scheduled_queue_name).distinct()
query = db.session.execute(union_all(queue_names, scheduled_queue_names))
return queues
return ['celery'] + [row[0] for row in query]
def get_queues_status():
queues = get_queues()
queues = {}
for queue, sources in queues.iteritems():
for queue in get_queues():
queues[queue] = {
'data_sources': ', '.join(sources),
'size': redis_connection.llen(queue)
}
queues['celery'] = {
'size': redis_connection.llen('celery'),
'data_sources': ''
}
return queues
@@ -51,7 +50,7 @@ def get_db_sizes():
['Redash DB Size', "select pg_database_size('postgres') as size"]
]
for query_name, query in queries:
result = models.db.session.execute(query).first()
result = db.session.execute(query).first()
database_metrics.append([query_name, result[0]])
return database_metrics
@@ -70,3 +69,70 @@ def get_status():
status['database_metrics']['metrics'] = get_db_sizes()
return status
def get_waiting_in_queue(queue_name):
jobs = []
for raw in redis_connection.lrange(queue_name, 0, -1):
job = json.loads(raw)
try:
args = json.loads(job['headers']['argsrepr'])
if args.get('query_id') == 'adhoc':
args['query_id'] = None
except ValueError:
args = {}
job_row = {
'state': 'waiting_in_queue',
'task_name': job['headers']['task'],
'worker': None,
'worker_pid': None,
'start_time': None,
'task_id': job['headers']['id'],
'queue': job['properties']['delivery_info']['routing_key']
}
job_row.update(args)
jobs.append(job_row)
return jobs
def parse_tasks(task_lists, state):
rows = []
for task in itertools.chain(*task_lists.values()):
task_row = {
'state': state,
'task_name': task['name'],
'worker': task['hostname'],
'queue': task['delivery_info']['routing_key'],
'task_id': task['id'],
'worker_pid': task['worker_pid'],
'start_time': task['time_start'],
}
if task['name'] == 'redash.tasks.execute_query':
try:
args = json.loads(task['args'])
except ValueError:
args = {}
if args.get('query_id') == 'adhoc':
args['query_id'] = None
task_row.update(args)
rows.append(task_row)
return rows
def celery_tasks():
tasks = parse_tasks(celery.control.inspect().active(), 'active')
tasks += parse_tasks(celery.control.inspect().reserved(), 'reserved')
for queue_name in get_queues():
tasks += get_waiting_in_queue(queue_name)
return tasks

View File

@@ -1,3 +1,3 @@
from .general import record_event, version_check, send_mail, sync_user_details
from .queries import QueryTask, refresh_queries, refresh_schemas, cleanup_tasks, cleanup_query_results, execute_query
from .queries import QueryTask, refresh_queries, refresh_schemas, cleanup_query_results, execute_query
from .alerts import check_alerts_for_query

View File

@@ -25,116 +25,6 @@ def _unlock(query_hash, data_source_id):
redis_connection.delete(_job_lock_id(query_hash, data_source_id))
# TODO:
# There is some duplication between this class and QueryTask, but I wanted to implement the monitoring features without
# much changes to the existing code, so ended up creating another object. In the future we can merge them.
class QueryTaskTracker(object):
DONE_LIST = 'query_task_trackers:done'
WAITING_LIST = 'query_task_trackers:waiting'
IN_PROGRESS_LIST = 'query_task_trackers:in_progress'
ALL_LISTS = (DONE_LIST, WAITING_LIST, IN_PROGRESS_LIST)
def __init__(self, data):
self.data = data
@classmethod
def create(cls, task_id, state, query_hash, data_source_id, scheduled, metadata):
data = dict(task_id=task_id, state=state,
query_hash=query_hash, data_source_id=data_source_id,
scheduled=scheduled,
username=metadata.get('Username', 'unknown'),
query_id=metadata.get('Query ID', 'unknown'),
retries=0,
scheduled_retries=0,
created_at=time.time(),
started_at=None,
run_time=None)
return cls(data)
def save(self, connection=None):
if connection is None:
connection = redis_connection
self.data['updated_at'] = time.time()
key_name = self._key_name(self.data['task_id'])
connection.set(key_name, json_dumps(self.data))
connection.zadd(self._get_list(), {key_name: time.time()})
for _list in self.ALL_LISTS:
if _list != self._get_list():
connection.zrem(_list, key_name)
# TOOD: this is not thread/concurrency safe. In current code this is not an issue, but better to fix this.
def update(self, **kwargs):
self.data.update(kwargs)
self.save()
@staticmethod
def _key_name(task_id):
return 'query_task_tracker:{}'.format(task_id)
def _get_list(self):
if self.state in ('finished', 'failed', 'cancelled'):
return self.DONE_LIST
if self.state in ('created'):
return self.WAITING_LIST
return self.IN_PROGRESS_LIST
@classmethod
def get_by_task_id(cls, task_id, connection=None):
if connection is None:
connection = redis_connection
key_name = cls._key_name(task_id)
data = connection.get(key_name)
return cls.create_from_data(data)
@classmethod
def create_from_data(cls, data):
if data:
data = json_loads(data)
return cls(data)
return None
@classmethod
def all(cls, list_name, offset=0, limit=-1):
if limit != -1:
limit -= 1
if offset != 0:
offset -= 1
ids = redis_connection.zrevrange(list_name, offset, limit)
pipe = redis_connection.pipeline()
for id in ids:
pipe.get(id)
tasks = [cls.create_from_data(data) for data in pipe.execute()]
return tasks
@classmethod
def prune(cls, list_name, keep_count, max_keys=100):
count = redis_connection.zcard(list_name)
if count <= keep_count:
return 0
remove_count = min(max_keys, count - keep_count)
keys = redis_connection.zrange(list_name, 0, remove_count - 1)
redis_connection.delete(*keys)
redis_connection.zremrangebyrank(list_name, 0, remove_count - 1)
return remove_count
def __getattr__(self, item):
return self.data[item]
def __contains__(self, item):
return item in self.data
class QueryTask(object):
# TODO: this is mapping to the old Job class statuses. Need to update the client side and remove this
STATUSES = {
@@ -256,11 +146,6 @@ def enqueue_query(query, data_source, user_id, scheduled_query=None, metadata={}
time_limit=time_limit)
job = QueryTask(async_result=result)
tracker = QueryTaskTracker.create(
result.id, 'created', query_hash, data_source.id,
scheduled_query is not None, metadata)
tracker.save(connection=pipe)
logging.info("[%s] Created new job: %s", query_hash, job.id)
pipe.set(_job_lock_id(query_hash, data_source.id), job.id, settings.JOB_EXPIRY_TIME)
pipe.execute()
@@ -323,35 +208,6 @@ def refresh_queries():
statsd_client.gauge('manager.seconds_since_refresh', now - float(status.get('last_refresh_at', now)))
@celery.task(name="redash.tasks.cleanup_tasks")
def cleanup_tasks():
in_progress = QueryTaskTracker.all(QueryTaskTracker.IN_PROGRESS_LIST)
for tracker in in_progress:
result = AsyncResult(tracker.task_id)
if result.ready():
logging.info("in progress tracker %s finished", tracker.query_hash)
_unlock(tracker.query_hash, tracker.data_source_id)
tracker.update(state='finished')
# Maintain constant size of the finished tasks list:
removed = 1000
while removed > 0:
removed = QueryTaskTracker.prune(QueryTaskTracker.DONE_LIST, 1000)
waiting = QueryTaskTracker.all(QueryTaskTracker.WAITING_LIST)
for tracker in waiting:
if tracker is None:
continue
result = AsyncResult(tracker.task_id)
if result.ready():
logging.info("waiting tracker %s finished", tracker.query_hash)
_unlock(tracker.query_hash, tracker.data_source_id)
tracker.update(state='finished')
@celery.task(name="redash.tasks.cleanup_query_results")
def cleanup_query_results():
"""
@@ -441,23 +297,12 @@ class QueryExecutor(object):
self.query_hash = gen_query_hash(self.query)
self.scheduled_query = scheduled_query
# Load existing tracker or create a new one if the job was created before code update:
self.tracker = (
QueryTaskTracker.get_by_task_id(task.request.id) or
QueryTaskTracker.create(
task.request.id,
'created',
self.query_hash,
self.data_source_id,
False,
metadata
)
)
if self.tracker.scheduled:
models.scheduled_queries_executions.update(self.tracker.query_id)
if scheduled_query:
models.scheduled_queries_executions.update(scheduled_query.id)
def run(self):
signal.signal(signal.SIGINT, signal_handler)
self.tracker.update(started_at=time.time(), state='started')
started_at = time.time()
logger.debug("Executing query:\n%s", self.query)
self._log_progress('executing_query')
@@ -472,15 +317,13 @@ class QueryExecutor(object):
data = None
logging.warning('Unexpected error while running query:', exc_info=1)
run_time = time.time() - self.tracker.started_at
self.tracker.update(error=error, run_time=run_time, state='saving_results')
run_time = time.time() - started_at
logger.info(u"task=execute_query query_hash=%s data_length=%s error=[%s]", self.query_hash, data and len(data), error)
_unlock(self.query_hash, self.data_source.id)
if error:
self.tracker.update(state='failed')
result = QueryExecutionError(error)
if self.scheduled_query is not None:
self.scheduled_query = models.db.session.merge(self.scheduled_query, load=False)
@@ -528,7 +371,6 @@ class QueryExecutor(object):
self.task.request.delivery_info['routing_key'],
self.metadata.get('Query ID', 'unknown'),
self.metadata.get('Username', 'unknown'))
self.tracker.update(state=state)
def _load_data_source(self):
logger.info("task=execute_query state=load_ds ds_id=%d", self.data_source_id)

View File

@@ -20,10 +20,6 @@ celery_schedule = {
'task': 'redash.tasks.refresh_queries',
'schedule': timedelta(seconds=30)
},
'cleanup_tasks': {
'task': 'redash.tasks.cleanup_tasks',
'schedule': timedelta(minutes=5)
},
'refresh_schemas': {
'task': 'redash.tasks.refresh_schemas',
'schedule': timedelta(minutes=settings.SCHEMAS_REFRESH_SCHEDULE)

View File

@@ -7,36 +7,7 @@ import mock
from tests import BaseTestCase
from redash import redis_connection, models
from redash.query_runner.pg import PostgreSQL
from redash.tasks.queries import (QueryExecutionError, QueryTaskTracker,
enqueue_query, execute_query)
class TestPrune(TestCase):
def setUp(self):
self.list = "test_list"
redis_connection.delete(self.list)
self.keys = []
for score in range(0, 100):
key = 'k:{}'.format(score)
self.keys.append(key)
redis_connection.zadd(self.list, {key: score})
redis_connection.set(key, 1)
def test_does_nothing_when_below_threshold(self):
remove_count = QueryTaskTracker.prune(self.list, 100)
self.assertEqual(remove_count, 0)
self.assertEqual(redis_connection.zcard(self.list), 100)
def test_removes_oldest_items_first(self):
remove_count = QueryTaskTracker.prune(self.list, 50)
self.assertEqual(remove_count, 50)
self.assertEqual(redis_connection.zcard(self.list), 50)
self.assertEqual(redis_connection.zscore(self.list, 'k:99'), 99.0)
self.assertIsNone(redis_connection.zscore(self.list, 'k:1'))
for k in self.keys[0:50]:
self.assertFalse(redis_connection.exists(k))
from redash.tasks.queries import QueryExecutionError, enqueue_query, execute_query
FakeResult = namedtuple('FakeResult', 'id')
@@ -56,9 +27,6 @@ class TestEnqueueTask(BaseTestCase):
enqueue_query(query.query_text, query.data_source, query.user_id, query, {'Username': 'Arik', 'Query ID': query.id})
self.assertEqual(1, execute_query.apply_async.call_count)
self.assertEqual(1, redis_connection.zcard(QueryTaskTracker.WAITING_LIST))
self.assertEqual(0, redis_connection.zcard(QueryTaskTracker.IN_PROGRESS_LIST))
self.assertEqual(0, redis_connection.zcard(QueryTaskTracker.DONE_LIST))
def test_multiple_enqueue_of_different_query(self):
query = self.factory.create_query()
@@ -69,9 +37,6 @@ class TestEnqueueTask(BaseTestCase):
enqueue_query(query.query_text + '3', query.data_source, query.user_id, None, {'Username': 'Arik', 'Query ID': query.id})
self.assertEqual(3, execute_query.apply_async.call_count)
self.assertEqual(3, redis_connection.zcard(QueryTaskTracker.WAITING_LIST))
self.assertEqual(0, redis_connection.zcard(QueryTaskTracker.IN_PROGRESS_LIST))
self.assertEqual(0, redis_connection.zcard(QueryTaskTracker.DONE_LIST))
class QueryExecutorTests(BaseTestCase):