Merge pull request #149 from EverythingMe/feature_data_source

Feature: Support multiple data sources (databases) for querying (#12)
This commit is contained in:
Arik Fraimovich
2014-03-23 17:02:55 +02:00
29 changed files with 835 additions and 651 deletions

View File

@@ -0,0 +1,63 @@
"""
Script to test concurrency (multithreading/multiprocess) issues with the workers. Use with caution.
"""
import json
import atfork
atfork.monkeypatch_os_fork_functions()
import atfork.stdlib_fixer
atfork.stdlib_fixer.fix_logging_module()
import time
from redash.data import worker
from redash import models, data_manager, redis_connection
if __name__ == '__main__':
models.create_db(True, False)
print "Creating data source..."
data_source = models.DataSource.create(name="Concurrency", type="pg", options="dbname=postgres")
print "Clear jobs/hashes:"
redis_connection.delete("jobs")
query_hashes = redis_connection.keys("query_hash_*")
if query_hashes:
redis_connection.delete(*query_hashes)
starting_query_results_count = models.QueryResult.select().count()
jobs_count = 5000
workers_count = 10
print "Creating jobs..."
for i in xrange(jobs_count):
query = "SELECT {}".format(i)
print "Inserting: {}".format(query)
data_manager.add_job(query=query, priority=worker.Job.LOW_PRIORITY,
data_source=data_source)
print "Starting workers..."
workers = data_manager.start_workers(workers_count)
print "Waiting for jobs to be done..."
keep_waiting = True
while keep_waiting:
results_count = models.QueryResult.select().count() - starting_query_results_count
print "QueryResults: {}".format(results_count)
time.sleep(5)
if results_count == jobs_count:
print "Yay done..."
keep_waiting = False
data_manager.stop_workers()
qr_count = 0
for qr in models.QueryResult.select():
number = int(qr.query.split()[1])
data_number = json.loads(qr.data)['rows'][0].values()[0]
if number != data_number:
print "Oops? {} != {} ({})".format(number, data_number, qr.id)
qr_count += 1
print "Verified {} query results.".format(qr_count)
print "Done."

View File

@@ -33,7 +33,7 @@ def runworkers():
logging.info("Cleaning old workers: %s", old_workers)
data_manager.start_workers(settings.WORKERS_COUNT, settings.CONNECTION_ADAPTER, settings.CONNECTION_STRING)
data_manager.start_workers(settings.WORKERS_COUNT)
logging.info("Workers started.")
while True:
@@ -101,8 +101,4 @@ manager.add_command("users", users_manager)
manager.add_command("import", import_manager)
if __name__ == '__main__':
channel = logging.StreamHandler()
logging.getLogger().addHandler(channel)
logging.getLogger().setLevel(settings.LOG_LEVEL)
manager.run()

View File

@@ -0,0 +1,48 @@
import logging
import peewee
from playhouse.migrate import Migrator
from redash import db
from redash import models
from redash import settings
if __name__ == '__main__':
db.connect_db()
if not models.DataSource.table_exists():
print "Creating data_sources table..."
models.DataSource.create_table()
default_data_source = models.DataSource.create(name="Default",
type=settings.CONNECTION_ADAPTER,
options=settings.CONNECTION_STRING)
else:
default_data_source = models.DataSource.select().first()
migrator = Migrator(db.database)
models.Query.data_source.null = True
models.QueryResult.data_source.null = True
try:
with db.database.transaction():
migrator.add_column(models.Query, models.Query.data_source, "data_source_id")
except peewee.ProgrammingError:
print "Failed to create data_source_id column -- assuming it already exists"
try:
with db.database.transaction():
migrator.add_column(models.QueryResult, models.QueryResult.data_source, "data_source_id")
except peewee.ProgrammingError:
print "Failed to create data_source_id column -- assuming it already exists"
print "Updating data source to existing one..."
models.Query.update(data_source=default_data_source.id).execute()
models.QueryResult.update(data_source=default_data_source.id).execute()
with db.database.transaction():
print "Setting data source to non nullable..."
migrator.set_nullable(models.Query, models.Query.data_source, False)
with db.database.transaction():
print "Setting data source to non nullable..."
migrator.set_nullable(models.QueryResult, models.QueryResult.data_source, False)
db.close_db(None)

View File

@@ -62,11 +62,6 @@ angular.module('redash', [
'query': ['Query', '$q', '$route', getQuery]
}
});
$routeProvider.when('/queries/:queryId/fiddle', {
templateUrl: '/views/queryfiddle.html',
controller: 'QueryFiddleCtrl',
reloadOnSearch: false
});
$routeProvider.when('/queries/:queryId/source', {
templateUrl: '/views/queryview.html',
controller: 'QueryViewCtrl',
@@ -90,9 +85,6 @@ angular.module('redash', [
redirectTo: '/'
});
Highcharts.setOptions({
colors: ["#4572A7", "#AA4643", "#89A54E", "#80699B", "#3D96AE",
"#DB843D", "#92A8CD", "#A47D7C", "#B5CA92"]
});
}
]);

View File

@@ -72,221 +72,6 @@
$scope.updateTime = '';
}
var QueryFiddleCtrl = function ($scope, $window, $location, $routeParams, $http, $location, growl, notifications, Query, Visualization) {
var DEFAULT_TAB = 'table';
var pristineHash = null;
var leavingPageText = "You will lose your changes if you leave";
$scope.dirty = undefined;
$scope.newVisualization = undefined;
$window.onbeforeunload = function(){
if (currentUser.canEdit($scope.query) && $scope.dirty) {
return leavingPageText;
}
}
Mousetrap.bindGlobal("meta+s", function(e) {
e.preventDefault();
if (currentUser.canEdit($scope.query)) {
$scope.saveQuery();
}
});
$scope.$on('$locationChangeStart', function(event, next, current) {
if (next.split("#")[0] == current.split("#")[0]) {
return;
}
if (!currentUser.canEdit($scope.query)) {
return;
}
if($scope.dirty &&
!confirm(leavingPageText + "\n\nAre you sure you want to leave this page?")) {
event.preventDefault();
} else {
Mousetrap.unbind("meta+s");
}
});
$scope.$parent.pageTitle = "Query Fiddle";
$scope.$watch(function() {return $location.hash()}, function(hash) {
$scope.selectedTab = hash || DEFAULT_TAB;
});
$scope.lockButton = function (lock) {
$scope.queryExecuting = lock;
};
$scope.formatQuery = function() {
$scope.editorOptions.readOnly = 'nocursor';
$http.post('/api/queries/format', {'query': $scope.query.query}).success(function(response) {
$scope.query.query = response;
$scope.editorOptions.readOnly = false;
})
}
$scope.saveQuery = function (duplicate, oldId) {
if (!oldId) {
oldId = $scope.query.id;
}
delete $scope.query.latest_query_data;
$scope.query.$save(function (q) {
pristineHash = q.getHash();
$scope.dirty = false;
if (duplicate) {
growl.addInfoMessage("Query duplicated.", {ttl: 2000});
} else{
growl.addSuccessMessage("Query saved.", {ttl: 2000});
}
if (oldId != q.id) {
if (oldId == undefined) {
$location.path($location.path().replace('new', q.id)).replace();
} else {
// TODO: replace this with a safer method
$location.path($location.path().replace(oldId, q.id)).replace();
// Reset visualizations tab to table after duplicating a query:
$location.hash('table');
}
}
}, function(httpResponse) {
growl.addErrorMessage("Query could not be saved");
});
};
$scope.duplicateQuery = function () {
var oldId = $scope.query.id;
$scope.query.id = null;
$scope.query.ttl = -1;
$scope.saveQuery(true, oldId);
};
// Query Editor:
$scope.editorOptions = {
mode: 'text/x-sql',
lineWrapping: true,
lineNumbers: true,
readOnly: false,
matchBrackets: true,
autoCloseBrackets: true
};
$scope.refreshOptions = [
{value: -1, name: 'No Refresh'},
{value: 60, name: 'Every minute'},
]
_.each(_.range(1, 13), function(i) {
$scope.refreshOptions.push({value: i*3600, name: 'Every ' + i + 'h'});
})
$scope.refreshOptions.push({value: 24*3600, name: 'Every 24h'});
$scope.refreshOptions.push({value: 7*24*3600, name: 'Once a week'});
$scope.$watch('queryResult && queryResult.getError()', function (newError, oldError) {
if (newError == undefined) {
return;
}
if (oldError == undefined && newError != undefined) {
$scope.lockButton(false);
}
});
$scope.$watch('queryResult && queryResult.getData()', function (data, oldData) {
if (!data) {
return;
}
if ($scope.queryResult.getId() == null) {
$scope.dataUri = "";
} else {
$scope.dataUri = '/api/queries/' + $scope.query.id + '/results/' + $scope.queryResult.getId() + '.csv';
$scope.dataFilename = $scope.query.name.replace(" ", "_") + moment($scope.queryResult.getUpdatedAt()).format("_YYYY_MM_DD") + ".csv";
}
});
$scope.$watch("queryResult && queryResult.getStatus()", function (status) {
if (!status) {
return;
}
if (status == "done") {
if ($scope.query.id && $scope.query.latest_query_data_id != $scope.queryResult.getId() &&
$scope.query.query_hash == $scope.queryResult.query_result.query_hash) {
Query.save({'id': $scope.query.id, 'latest_query_data_id': $scope.queryResult.getId()})
}
$scope.query.latest_query_data_id = $scope.queryResult.getId();
notifications.showNotification("re:dash", $scope.query.name + " updated.");
$scope.lockButton(false);
}
});
if ($routeParams.queryId != undefined) {
$scope.query = Query.get({id: $routeParams.queryId}, function(q) {
pristineHash = q.getHash();
$scope.dirty = false;
$scope.queryResult = $scope.query.getQueryResult();
});
} else {
$scope.query = new Query({query: "", name: "New Query", ttl: -1, user: currentUser});
$scope.lockButton(false);
}
$scope.$watch('query.name', function() {
$scope.$parent.pageTitle = $scope.query.name;
});
$scope.$watch(function() {
return $scope.query.getHash();
}, function(newHash) {
$scope.dirty = (newHash !== pristineHash);
});
$scope.executeQuery = function() {
$scope.queryResult = $scope.query.getQueryResult(0);
$scope.lockButton(true);
$scope.cancelling = false;
};
$scope.cancelExecution = function() {
$scope.cancelling = true;
$scope.queryResult.cancelExecution();
};
$scope.deleteVisualization = function($e, vis) {
$e.preventDefault();
if (confirm('Are you sure you want to delete ' + vis.name + ' ?')) {
Visualization.delete(vis);
if ($scope.selectedTab == vis.id) {
$scope.selectedTab = DEFAULT_TAB;
}
$scope.query.visualizations =
$scope.query.visualizations.filter(function(v) {
return vis.id !== v.id;
});
}
};
unbind = $scope.$watch('selectedTab == "add"', function(newPanel) {
if (newPanel && $routeParams.queryId == undefined) {
unbind();
$scope.saveQuery();
}
});
}
var QueriesCtrl = function($scope, $http, $location, $filter, Query) {
$scope.$parent.pageTitle = "All Queries";
$scope.gridConfig = {
@@ -440,7 +225,6 @@
.controller('DashboardCtrl', ['$scope', '$routeParams', '$http', '$timeout', 'Dashboard', DashboardCtrl])
.controller('WidgetCtrl', ['$scope', '$http', '$location', 'Query', WidgetCtrl])
.controller('QueriesCtrl', ['$scope', '$http', '$location', '$filter', 'Query', QueriesCtrl])
.controller('QueryFiddleCtrl', ['$scope', '$window', '$location', '$routeParams', '$http', '$location', 'growl', 'notifications', 'Query', 'Visualization', QueryFiddleCtrl])
.controller('IndexCtrl', ['$scope', 'Dashboard', IndexCtrl])
.controller('MainCtrl', ['$scope', 'Dashboard', 'notifications', MainCtrl]);
})();

View File

@@ -1,12 +1,16 @@
(function () {
'use strict';
var QueryViewCtrl = function ($scope, $window, $route, $http, $location, growl, notifications, Query, Visualization) {
var QueryViewCtrl = function ($scope, $window, $route, $http, $location, growl, notifications, Query, Visualization, DataSource) {
var DEFAULT_TAB = 'table';
var pristineHash = "";
var leavingPageText = "You will lose your changes if you leave";
var route = $route.current;
$scope.dataSources = DataSource.get(function(dataSources) {
$scope.query.data_source_id = $scope.query.data_source_id || dataSources[0].id;
});
$scope.dirty = undefined;
$scope.isNewQuery = false;
$scope.isOwner = false;
@@ -90,6 +94,7 @@
}
delete $scope.query.latest_query_data;
$scope.query.$save(function (q) {
pristineHash = q.getHash();
$scope.dirty = false;
@@ -233,6 +238,19 @@
$scope.dirty = (newHash !== pristineHash);
});
$scope.updateDataSource = function() {
$scope.query.latest_query_data = null;
$scope.query.latest_query_data_id = null;
Query.save({
'id': $scope.query.id,
'data_source_id': $scope.query.data_source_id,
'latest_query_data_id': null
});
$scope.executeQuery();
};
$scope.executeQuery = function () {
$scope.queryResult = $scope.query.getQueryResult(0);
$scope.lockButton(true);
@@ -278,6 +296,7 @@
'notifications',
'Query',
'Visualization',
'DataSource',
QueryViewCtrl
]);

View File

@@ -1,6 +1,11 @@
(function () {
'use strict';
Highcharts.setOptions({
colors: ["#4572A7", "#AA4643", "#89A54E", "#80699B", "#3D96AE",
"#DB843D", "#92A8CD", "#A47D7C", "#B5CA92"]
});
var defaultOptions = {
title: {
"text": null

View File

@@ -284,10 +284,10 @@
return queryResult;
}
QueryResult.get = function (query, ttl) {
QueryResult.get = function (data_source_id, query, ttl) {
var queryResult = new QueryResult();
QueryResultResource.post({'query': query, 'ttl': ttl}, function (response) {
QueryResultResource.post({'data_source_id': data_source_id, 'query': query, 'ttl': ttl}, function (response) {
queryResult.update(response);
if ('job' in response) {
@@ -301,7 +301,7 @@
return QueryResult;
};
var Query = function ($resource, QueryResult) {
var Query = function ($resource, QueryResult, DataSource) {
var Query = $resource('/api/queries/:id', {id: '@id'});
Query.prototype.getQueryResult = function(ttl) {
@@ -309,14 +309,13 @@
ttl = this.ttl;
}
var queryResult = null;
if (this.latest_query_data && ttl != 0) {
queryResult = new QueryResult({'query_result': this.latest_query_data});
} else if (this.latest_query_data_id && ttl != 0) {
queryResult = QueryResult.getById(this.latest_query_data_id);
} else {
queryResult = QueryResult.get(this.query, ttl);
queryResult = QueryResult.get(this.data_source_id, this.query, ttl);
}
return queryResult;
@@ -329,7 +328,14 @@
return Query;
};
var DataSource = function($resource) {
var DataSourceResource = $resource('/api/data_sources/:id', {id: '@id'}, {'get': {'method': 'GET', 'cache': true, 'isArray': true}});
return DataSourceResource;
}
angular.module('redash.services', [])
.factory('QueryResult', ['$resource', '$timeout', QueryResult])
.factory('Query', ['$resource', 'QueryResult', Query])
.factory('Query', ['$resource', 'QueryResult', 'DataSource', Query])
.factory('DataSource', ['$resource', DataSource]);
})();

View File

@@ -67,7 +67,11 @@
template: '<filters></filters>\n' + Visualization.renderVisualizationsTemplate,
replace: false,
link: function(scope) {
scope.filters = scope.queryResult.getFilters();
scope.$watch('queryResult && queryResult.getFilters()', function(filters) {
if (filters) {
scope.filters = filters;
}
});
}
}
};

View File

@@ -1,102 +0,0 @@
<div class="container">
<div class="row">
<div class="panel panel-default">
<div class="panel-heading">
<h3 class="panel-title">
<p>
<edit-in-place editable="currentUser.canEdit(query)" ignore-blanks='true' value="query.name"></edit-in-place>
</p>
</h3>
<p>
<edit-in-place editable="currentUser.canEdit(query)" editor="textarea" placeholder="No description" ignore-blanks='false' value="query.description" class="text-muted"></edit-in-place>
</p>
</div>
<div class="panel-body">
<textarea ui-codemirror="editorOptions" ng-model="query.query"></textarea>
<div>
<a class="btn btn-default" ng-disabled="queryExecuting || !queryResult.getData()" ng-href="{{dataUri}}" download="{{dataFilename}}" target="_self">
<span class="glyphicon glyphicon-floppy-disk"></span> Download Data Set
</a>
<button type="button" class="btn btn-default center-x" ng-click="formatQuery()"><span class="glyphicon glyphicon-ok"></span> Format SQL</button>
<div class="btn-group pull-right">
<button type="button" class="btn btn-default" ng-click="duplicateQuery()">Duplicate</button>
<button type="button" class="btn btn-default" ng-disabled="!currentUser.canEdit(query)" ng-click="saveQuery()">Save
<span ng-show="dirty">&#42;</span>
</button>
<button type="button" class="btn btn-primary" ng-disabled="queryExecuting" ng-click="executeQuery()">Execute</button>
</div>
</div>
</div>
<div class="panel-footer">
<span ng-show="queryResult.getRuntime()>=0">Query runtime: {{queryResult.getRuntime() | durationHumanize}} | </span>
<span ng-show="queryResult.query_result.retrieved_at">Last update time: <span am-time-ago="queryResult.query_result.retrieved_at"></span> | </span>
<span ng-show="queryResult.getStatus() == 'done'">Rows: {{queryResult.getData().length}} | </span>
Created by: {{query.user.name}}
<div class="pull-right">Refresh query: <select ng-model="query.ttl" ng-options="c.value as c.name for c in refreshOptions"></select><br></div>
</div>
</div>
<div class="alert alert-info" ng-show="queryResult.getStatus() == 'processing'">
Executing query... <rd-timer timestamp="queryResult.getUpdatedAt()"></rd-timer>
<button type="button" class="btn btn-warning btn-xs pull-right" ng-disabled="cancelling" ng-click="cancelExecution()">Cancel</button>
</div>
<div class="alert alert-info" ng-show="queryResult.getStatus() == 'waiting'">
Query in queue... <rd-timer timestamp="queryResult.getUpdatedAt()"></rd-timer>
<button type="button" class="btn btn-warning btn-xs pull-right" ng-disabled="cancelling" ng-click="cancelExecution()">Cancel</button>
</div>
<div class="alert alert-danger" ng-show="queryResult.getError()">Error running query: <strong>{{queryResult.getError()}}</strong></div>
</div>
<div class="row" ng-show="queryResult.getStatus() == 'done'">
<ul class="nav nav-tabs">
<rd-tab tab-id="table" name="Table"></rd-tab>
<rd-tab tab-id="pivot" name="Pivot Table"></rd-tab>
<!-- hide the table visualization -->
<rd-tab tab-id="{{vis.id}}" name="{{vis.name}}" ng-hide="vis.type=='TABLE'" ng-repeat="vis in query.visualizations">
<span class="remove" ng-click="deleteVisualization($event, vis)" ng-show="currentUser.canEdit(query)"> &times;</span>
</rd-tab>
<rd-tab tab-id="add" name="&plus; New" removeable="true" ng-show="currentUser.canEdit(query)"></rd-tab>
</ul>
<div class="col-lg-12" ng-show="selectedTab == 'table'">
<grid-renderer query-result="queryResult" items-per-page="50"></grid-renderer>
</div>
<div class="col-lg-12" ng-show="selectedTab == 'pivot'">
<pivot-table-renderer query-result="queryResult"></pivot-table-renderer>
</div>
<div class="col-lg-12" ng-show="selectedTab == vis.id" ng-repeat="vis in query.visualizations">
<div class="row" ng-show="currentUser.canEdit(query)">
<p>
<div class="col-lg-12">
<edit-visulatization-form visualization="vis" query="query" query-result="queryResult"></edit-visulatization-form>
</div>
</p>
</div>
<div class="row">
<p>
<div class="col-lg-12">
<visualization-renderer visualization="vis" query-result="queryResult"></visualization-renderer>
</div>
</p>
</div>
</div>
<div class="col-lg-12" ng-show="selectedTab == 'add'">
<div class="row">
<p>
<div class="col-lg-6">
<edit-visulatization-form visualization="newVisualization" query="query"></edit-visulatization-form>
</div>
<div class="col-lg-6">
<visualization-renderer visualization="newVisualization" query-result="queryResult"></visualization-renderer>
</div>
</p>
</div>
</div>
</div>
</div>

View File

@@ -112,6 +112,12 @@
<select ng-disabled="!isOwner" ng-model="query.ttl" ng-change="saveQuery()" ng-options="c.value as c.name for c in refreshOptions"></select>
</p>
<p>
<span class="glyphicon glyphicon-hdd"></span>
<span class="text-muted">Data Source</span>
<select ng-disabled="!isOwner" ng-model="query.data_source_id" ng-change="updateDataSource()" ng-options="ds.id as ds.name for ds in dataSources"></select>
</p>
<hr>
<p>

View File

@@ -13,7 +13,7 @@
"angular-ui-codemirror": "0.0.5",
"highcharts": "3.0.1",
"underscore": "1.5.1",
"angular-resource": "1.0.7",
"angular-resource": "1.2.7",
"angular-growl": "0.3.1",
"angular-route": "1.2.7",
"pivottable": "https://github.com/arikfr/pivottable.git",

View File

@@ -1,5 +1,6 @@
import json
import urlparse
import logging
from flask import Flask, make_response
from flask.ext.restful import Api
from flask_peewee.db import Database
@@ -10,6 +11,9 @@ from redash import settings, utils
__version__ = '0.3.5'
logging.getLogger().addHandler(logging.StreamHandler())
logging.getLogger().setLevel(settings.LOG_LEVEL)
app = Flask(__name__,
template_folder=settings.STATIC_ASSETS_PATH,
static_folder=settings.STATIC_ASSETS_PATH,
@@ -42,6 +46,6 @@ redis_connection = redis.StrictRedis(host=redis_url.hostname, port=redis_url.por
statsd_client = StatsClient(host=settings.STATSD_HOST, port=settings.STATSD_PORT, prefix=settings.STATSD_PREFIX)
from redash import data
data_manager = data.Manager(redis_connection, db, statsd_client)
data_manager = data.Manager(redis_connection, statsd_client)
from redash import controllers

View File

@@ -129,6 +129,14 @@ class BaseResource(Resource):
return current_user._get_current_object()
class DataSourceListAPI(BaseResource):
def get(self):
data_sources = [ds.to_dict() for ds in models.DataSource.select()]
return data_sources
api.add_resource(DataSourceListAPI, '/api/data_sources', endpoint='data_sources')
class DashboardListAPI(BaseResource):
def get(self):
dashboards = [d.to_dict() for d in
@@ -229,11 +237,11 @@ class QueryListAPI(BaseResource):
@require_permission('create_query')
def post(self):
query_def = request.get_json(force=True)
# id, created_at, api_key
for field in ['id', 'created_at', 'api_key', 'visualizations', 'latest_query_data']:
query_def.pop(field, None)
query_def['user'] = self.current_user
query_def['data_source'] = query_def.pop('data_source_id')
query = models.Query(**query_def)
query.save()
@@ -255,6 +263,9 @@ class QueryAPI(BaseResource):
if 'latest_query_data_id' in query_def:
query_def['latest_query_data'] = query_def.pop('latest_query_data_id')
if 'data_source_id' in query_def:
query_def['data_source'] = query_def.pop('data_source_id')
models.Query.update_instance(query_id, **query_def)
query = models.Query.get_by_id(query_id)
@@ -323,20 +334,21 @@ class QueryResultListAPI(BaseResource):
if params['ttl'] == 0:
query_result = None
else:
query_result = data_manager.get_query_result(params['query'], int(params['ttl']))
query_result = models.QueryResult.get_latest(params['data_source_id'], params['query'], int(params['ttl']))
if query_result:
return {'query_result': query_result.to_dict(parse_data=True)}
return {'query_result': query_result.to_dict()}
else:
job = data_manager.add_job(params['query'], data.Job.HIGH_PRIORITY)
data_source = models.DataSource.get_by_id(params['data_source_id'])
job = data_manager.add_job(params['query'], data.Job.HIGH_PRIORITY, data_source)
return {'job': job.to_dict()}
class QueryResultAPI(BaseResource):
def get(self, query_result_id):
query_result = data_manager.get_query_result_by_id(query_result_id)
query_result = models.QueryResult.get_by_id(query_result_id)
if query_result:
return {'query_result': query_result.to_dict(parse_data=True)}
return {'query_result': query_result.to_dict()}
else:
abort(404)
@@ -348,7 +360,7 @@ class CsvQueryResultsAPI(BaseResource):
if query:
query_result_id = query._data['latest_query_data']
query_result = query_result_id and data_manager.get_query_result_by_id(query_result_id)
query_result = query_result_id and models.QueryResult.get_by_id(query_result_id)
if query_result:
s = cStringIO.StringIO()

View File

@@ -1,32 +1,20 @@
"""
Data manager. Used to manage and coordinate execution of queries.
"""
from contextlib import contextmanager
import collections
import json
import time
import logging
import psycopg2
import peewee
import qr
import redis
from redash import models
from redash.data import worker
from redash.utils import gen_query_hash
class QueryResult(collections.namedtuple('QueryData', 'id query data runtime retrieved_at query_hash')):
def to_dict(self, parse_data=False):
d = self._asdict()
if parse_data and d['data']:
d['data'] = json.loads(d['data'])
return d
class Manager(object):
def __init__(self, redis_connection, db, statsd_client):
def __init__(self, redis_connection, statsd_client):
self.statsd_client = statsd_client
self.redis_connection = redis_connection
self.db = db
self.workers = []
self.queue = qr.PriorityQueue("jobs", **self.redis_connection.connection_pool.connection_kwargs)
self.max_retries = 5
@@ -37,36 +25,7 @@ class Manager(object):
self._save_status()
# TODO: Use our Django Models
def get_query_result_by_id(self, query_result_id):
with self.db_transaction() as cursor:
sql = "SELECT id, query, data, runtime, retrieved_at, query_hash FROM query_results " \
"WHERE id=%s LIMIT 1"
cursor.execute(sql, (query_result_id,))
query_result = cursor.fetchone()
if query_result:
query_result = QueryResult(*query_result)
return query_result
def get_query_result(self, query, ttl=0):
query_hash = gen_query_hash(query)
with self.db_transaction() as cursor:
sql = "SELECT id, query, data, runtime, retrieved_at, query_hash FROM query_results " \
"WHERE query_hash=%s " \
"AND retrieved_at < now() at time zone 'utc' - interval '%s second'" \
"ORDER BY retrieved_at DESC LIMIT 1"
cursor.execute(sql, (query_hash, psycopg2.extensions.AsIs(ttl)))
query_result = cursor.fetchone()
if query_result:
query_result = QueryResult(*query_result)
return query_result
def add_job(self, query, priority):
def add_job(self, query, priority, data_source):
query_hash = gen_query_hash(query)
logging.info("[Manager][%s] Inserting job with priority=%s", query_hash, priority)
try_count = 0
@@ -83,7 +42,11 @@ class Manager(object):
logging.info("[Manager][%s] Found existing job: %s", query_hash, job_id)
job = worker.Job.load(self.redis_connection, job_id)
else:
job = worker.Job(self.redis_connection, query, priority)
job = worker.Job(self.redis_connection, query=query, priority=priority,
data_source_id=data_source.id,
data_source_name=data_source.name,
data_source_type=data_source.type,
data_source_options=data_source.options)
pipe.multi()
job.save(pipe)
logging.info("[Manager][%s] Created new job: %s", query_hash, job.id)
@@ -113,91 +76,61 @@ class Manager(object):
time.time() - float(manager_status['last_refresh_at']))
def refresh_queries(self):
sql = """SELECT first_value(t1."query") over(partition by t1.query_hash)
FROM "queries" AS t1
INNER JOIN "query_results" AS t2 ON (t1."latest_query_data_id" = t2."id")
WHERE ((t1."ttl" > 0) AND ((t2."retrieved_at" + t1.ttl * interval '1 second') <
now() at time zone 'utc'));
"""
# TODO: this will only execute scheduled queries that were executed before. I think this is
# a reasonable assumption, but worth revisiting.
outdated_queries = models.Query.select(peewee.Func('first_value', models.Query.id)\
.over(partition_by=[models.Query.query_hash, models.Query.data_source]))\
.join(models.QueryResult)\
.where(models.Query.ttl > 0,
(models.QueryResult.retrieved_at +
(models.Query.ttl * peewee.SQL("interval '1 second'"))) <
peewee.SQL("(now() at time zone 'utc')"))
queries = models.Query.select(models.Query, models.DataSource).join(models.DataSource)\
.where(models.Query.id << outdated_queries)
self.status['last_refresh_at'] = time.time()
self._save_status()
logging.info("Refreshing queries...")
queries = self.run_query(sql)
outdated_queries_count = 0
for query in queries:
self.add_job(query[0], worker.Job.LOW_PRIORITY)
self.add_job(query.query, worker.Job.LOW_PRIORITY, query.data_source)
outdated_queries_count += 1
self.statsd_client.gauge('manager.outdated_queries', len(queries))
self.statsd_client.gauge('manager.outdated_queries', outdated_queries_count)
self.statsd_client.gauge('manager.queue_size', self.redis_connection.zcard('jobs'))
logging.info("Done refreshing queries... %d" % len(queries))
logging.info("Done refreshing queries... %d" % outdated_queries_count)
def store_query_result(self, query, data, run_time, retrieved_at):
query_result_id = None
def store_query_result(self, data_source_id, query, data, run_time, retrieved_at):
query_hash = gen_query_hash(query)
sql = "INSERT INTO query_results (query_hash, query, data, runtime, retrieved_at) " \
"VALUES (%s, %s, %s, %s, %s) RETURNING id"
with self.db_transaction() as cursor:
cursor.execute(sql, (query_hash, query, data, run_time, retrieved_at))
if cursor.rowcount == 1:
query_result_id = cursor.fetchone()[0]
logging.info("[Manager][%s] Inserted query data; id=%s", query_hash, query_result_id)
sql = "UPDATE queries SET latest_query_data_id=%s WHERE query_hash=%s"
cursor.execute(sql, (query_result_id, query_hash))
query_result = models.QueryResult.create(query_hash=query_hash,
query=query,
runtime=run_time,
data_source=data_source_id,
retrieved_at=retrieved_at,
data=data)
logging.info("[Manager][%s] Updated %s queries.", query_hash, cursor.rowcount)
else:
logging.error("[Manager][%s] Failed inserting query data.", query_hash)
return query_result_id
logging.info("[Manager][%s] Inserted query data; id=%s", query_hash, query_result.id)
def run_query(self, *args):
sql = args[0]
logging.debug("running query: %s %s", sql, args[1:])
updated_count = models.Query.update(latest_query_data=query_result).\
where(models.Query.query_hash==query_hash, models.Query.data_source==data_source_id).\
execute()
with self.db_transaction() as cursor:
cursor.execute(sql, args[1:])
if cursor.description:
data = list(cursor)
else:
data = cursor.rowcount
logging.info("[Manager][%s] Updated %s queries.", query_hash, updated_count)
return data
return query_result.id
def start_workers(self, workers_count, connection_type, connection_string):
def start_workers(self, workers_count):
if self.workers:
return self.workers
if connection_type == 'mysql':
from redash.data import query_runner_mysql
runner = query_runner_mysql.mysql(connection_string)
elif connection_type == 'graphite':
from redash.data import query_runner_graphite
connection_params = json.loads(connection_string)
if connection_params['auth']:
connection_params['auth'] = tuple(connection_params['auth'])
else:
connection_params['auth'] = None
runner = query_runner_graphite.graphite(connection_params)
elif connection_type == 'bigquery':
from redash.data import query_runner_bigquery
connection_params = json.loads(connection_string)
runner = query_runner_bigquery.bigquery(connection_params)
elif connection_type == 'script':
from redash.data import query_runner_script
runner = query_runner_script.script(connection_string)
elif connection_type == 'url':
from redash.data import query_runner_url
runner = query_runner_url.url(connection_string)
else:
from redash.data import query_runner
runner = query_runner.redshift(connection_string)
redis_connection_params = self.redis_connection.connection_pool.connection_kwargs
self.workers = [worker.Worker(worker_id, self, redis_connection_params, runner)
for worker_id in range(workers_count)]
self.workers = [worker.Worker(worker_id, self, redis_connection_params)
for worker_id in xrange(workers_count)]
for w in self.workers:
w.start()
@@ -208,20 +141,5 @@ class Manager(object):
w.continue_working = False
w.join()
@contextmanager
def db_transaction(self):
self.db.connect_db()
cursor = self.db.database.get_cursor()
try:
yield cursor
except:
self.db.database.rollback()
raise
else:
self.db.database.commit()
finally:
self.db.close_db(None)
def _save_status(self):
self.redis_connection.hmset('manager:status', self.status)

View File

@@ -1,69 +1,30 @@
"""
QueryRunner is the function that the workers use, to execute queries. This is the Redshift
(PostgreSQL in fact) version, but easily we can write another to support additional databases
(MySQL and others).
Because the worker just pass the query, this can be used with any data store that has some sort of
query language (for example: HiveQL).
"""
import logging
import json
import sys
import select
import psycopg2
from redash.utils import JSONEncoder
def get_query_runner(connection_type, connection_string):
if connection_type == 'mysql':
from redash.data import query_runner_mysql
runner = query_runner_mysql.mysql(connection_string)
elif connection_type == 'graphite':
from redash.data import query_runner_graphite
connection_params = json.loads(connection_string)
if connection_params['auth']:
connection_params['auth'] = tuple(connection_params['auth'])
else:
connection_params['auth'] = None
runner = query_runner_graphite.graphite(connection_params)
elif connection_type == 'bigquery':
from redash.data import query_runner_bigquery
connection_params = json.loads(connection_string)
runner = query_runner_bigquery.bigquery(connection_params)
elif connection_type == 'script':
from redash.data import query_runner_script
runner = query_runner_script.script(connection_string)
elif connection_type == 'url':
from redash.data import query_runner_url
runner = query_runner_url.url(connection_string)
else:
from redash.data import query_runner_pg
runner = query_runner_pg.pg(connection_string)
def redshift(connection_string):
def column_friendly_name(column_name):
return column_name
def wait(conn):
while 1:
state = conn.poll()
if state == psycopg2.extensions.POLL_OK:
break
elif state == psycopg2.extensions.POLL_WRITE:
select.select([], [conn.fileno()], [])
elif state == psycopg2.extensions.POLL_READ:
select.select([conn.fileno()], [], [])
else:
raise psycopg2.OperationalError("poll() returned %s" % state)
def query_runner(query):
connection = psycopg2.connect(connection_string, async=True)
wait(connection)
cursor = connection.cursor()
try:
cursor.execute(query)
wait(connection)
column_names = [col.name for col in cursor.description]
rows = [dict(zip(column_names, row)) for row in cursor]
columns = [{'name': col.name,
'friendly_name': column_friendly_name(col.name),
'type': None} for col in cursor.description]
data = {'columns': columns, 'rows': rows}
json_data = json.dumps(data, cls=JSONEncoder)
error = None
cursor.close()
except psycopg2.DatabaseError as e:
json_data = None
error = e.message
except KeyboardInterrupt:
connection.cancel()
error = "Query cancelled by user."
json_data = None
except Exception as e:
raise sys.exc_info()[1], None, sys.exc_info()[2]
finally:
connection.close()
return json_data, error
return query_runner
return runner

View File

@@ -0,0 +1,68 @@
"""
QueryRunner is the function that the workers use, to execute queries. This is the PostgreSQL
version, but easily we can write another to support additional databases (MySQL and others).
Because the worker just pass the query, this can be used with any data store that has some sort of
query language (for example: HiveQL).
"""
import json
import sys
import select
import psycopg2
from redash.utils import JSONEncoder
def pg(connection_string):
def column_friendly_name(column_name):
return column_name
def wait(conn):
while 1:
state = conn.poll()
if state == psycopg2.extensions.POLL_OK:
break
elif state == psycopg2.extensions.POLL_WRITE:
select.select([], [conn.fileno()], [])
elif state == psycopg2.extensions.POLL_READ:
select.select([conn.fileno()], [], [])
else:
raise psycopg2.OperationalError("poll() returned %s" % state)
def query_runner(query):
connection = psycopg2.connect(connection_string, async=True)
wait(connection)
cursor = connection.cursor()
try:
cursor.execute(query)
wait(connection)
column_names = [col.name for col in cursor.description]
rows = [dict(zip(column_names, row)) for row in cursor]
columns = [{'name': col.name,
'friendly_name': column_friendly_name(col.name),
'type': None} for col in cursor.description]
data = {'columns': columns, 'rows': rows}
json_data = json.dumps(data, cls=JSONEncoder)
error = None
cursor.close()
except psycopg2.DatabaseError as e:
json_data = None
error = e.message
except KeyboardInterrupt:
connection.cancel()
error = "Query cancelled by user."
json_data = None
except Exception as e:
raise sys.exc_info()[1], None, sys.exc_info()[2]
finally:
connection.close()
return json_data, error
return query_runner

View File

@@ -13,9 +13,74 @@ import setproctitle
import redis
from statsd import StatsClient
from redash.utils import gen_query_hash
from redash.data.query_runner import get_query_runner
from redash import settings
class Job(object):
class RedisObject(object):
# The following should be overriden in the inheriting class:
fields = {}
conversions = {}
id_field = ''
name = ''
def __init__(self, redis_connection, **kwargs):
self.redis_connection = redis_connection
self.values = {}
if not self.fields:
raise ValueError("You must set the fields dictionary, before using RedisObject.")
if not self.name:
raise ValueError("You must set the name, before using RedisObject")
self.update(**kwargs)
def __getattr__(self, name):
if name in self.values:
return self.values[name]
else:
raise AttributeError
def update(self, **kwargs):
for field, default_value in self.fields.iteritems():
value = kwargs.get(field, self.values.get(field, default_value))
if callable(value):
value = value()
if value == 'None':
value = None
if field in self.conversions and value:
value = self.conversions[field](value)
self.values[field] = value
@classmethod
def _redis_key(cls, object_id):
return '{}:{}'.format(cls.name, object_id)
def save(self, pipe):
if not pipe:
pipe = self.redis_connection.pipeline()
pipe.sadd('{}_set'.format(self.name), self.id)
pipe.hmset(self._redis_key(self.id), self.values)
pipe.publish(self._redis_key(self.id), json.dumps(self.to_dict()))
pipe.execute()
@classmethod
def load(cls, redis_connection, object_id):
object_dict = redis_connection.hgetall(cls._redis_key(object_id))
obj = None
if object_dict:
obj = cls(redis_connection, **object_dict)
return obj
class Job(RedisObject):
HIGH_PRIORITY = 1
LOW_PRIORITY = 2
@@ -24,37 +89,43 @@ class Job(object):
DONE = 3
FAILED = 4
def __init__(self, redis_connection, query, priority,
job_id=None,
wait_time=None, query_time=None,
updated_at=None, status=None, error=None, query_result_id=None,
process_id=0):
self.redis_connection = redis_connection
self.query = query
self.priority = priority
self.query_hash = gen_query_hash(self.query)
self.query_result_id = query_result_id
if process_id == 'None':
self.process_id = None
else:
self.process_id = int(process_id)
fields = {
'id': lambda: str(uuid.uuid1()),
'query': None,
'priority': None,
'query_hash': None,
'wait_time': 0,
'query_time': 0,
'error': None,
'updated_at': time.time,
'status': WAITING,
'process_id': None,
'query_result_id': None,
'data_source_id': None,
'data_source_name': None,
'data_source_type': None,
'data_source_options': None
}
if job_id is None:
self.id = str(uuid.uuid1())
self.new_job = True
self.wait_time = 0
self.query_time = 0
self.error = None
self.updated_at = time.time() # job_dict.get('updated_at', time.time())
self.status = self.WAITING # int(job_dict.get('status', self.WAITING))
else:
self.id = job_id
self.new_job = False
self.error = error
self.wait_time = wait_time
self.query_time = query_time
self.updated_at = updated_at
self.status = status
conversions = {
'query': lambda query: query.decode('utf-8'),
'priority': int,
'updated_at': float,
'status': int,
'wait_time': float,
'query_time': float,
'process_id': int,
'query_result_id': int
}
name = 'job'
def __init__(self, redis_connection, query, priority, **kwargs):
kwargs['query'] = query
kwargs['priority'] = priority
kwargs['query_hash'] = gen_query_hash(kwargs['query'])
self.new_job = 'id' not in kwargs
super(Job, self).__init__(redis_connection, **kwargs)
def to_dict(self):
return {
@@ -67,13 +138,11 @@ class Job(object):
'status': self.status,
'error': self.error,
'query_result_id': self.query_result_id,
'process_id': self.process_id
'process_id': self.process_id,
'data_source_name': self.data_source_name,
'data_source_type': self.data_source_type
}
@staticmethod
def _redis_key(job_id):
return 'job:%s' % job_id
def cancel(self):
# TODO: Race condition:
# it's possible that it will be picked up by worker while processing the cancel order
@@ -95,16 +164,14 @@ class Job(object):
if self.is_finished():
pipe.delete('query_hash_job:%s' % self.query_hash)
pipe.sadd('jobs_set', self.id)
pipe.hmset(self._redis_key(self.id), self.to_dict())
pipe.publish(self._redis_key(self.id), json.dumps(self.to_dict()))
pipe.execute()
super(Job, self).save(pipe)
def processing(self, process_id):
self.status = self.PROCESSING
self.process_id = process_id
self.wait_time = time.time() - self.updated_at
self.updated_at = time.time()
self.update(status=self.PROCESSING,
process_id=process_id,
wait_time=time.time() - self.updated_at,
updated_at=time.time())
self.save()
def is_finished(self):
@@ -112,48 +179,32 @@ class Job(object):
def done(self, query_result_id, error):
if error:
self.status = self.FAILED
new_status = self.FAILED
else:
self.status = self.DONE
new_status = self.DONE
self.update(status=new_status,
query_result_id=query_result_id,
error=error,
query_time=time.time() - self.updated_at,
updated_at=time.time())
self.query_result_id = query_result_id
self.error = error
self.query_time = time.time() - self.updated_at
self.updated_at = time.time()
self.save()
def __str__(self):
return "<Job:%s,priority:%d,status:%d>" % (self.id, self.priority, self.status)
@classmethod
def _load(cls, redis_connection, job_id):
return redis_connection.hgetall(cls._redis_key(job_id))
@classmethod
def load(cls, redis_connection, job_id):
job_dict = cls._load(redis_connection, job_id)
job = None
if job_dict:
job = Job(redis_connection, job_id=job_dict['id'], query=job_dict['query'].decode('utf-8'),
priority=int(job_dict['priority']), updated_at=float(job_dict['updated_at']),
status=int(job_dict['status']), wait_time=float(job_dict['wait_time']),
query_time=float(job_dict['query_time']), error=job_dict['error'],
query_result_id=job_dict['query_result_id'],
process_id=job_dict['process_id'])
return job
class Worker(threading.Thread):
def __init__(self, worker_id, manager, redis_connection_params, query_runner, sleep_time=0.1):
def __init__(self, worker_id, manager, redis_connection_params, sleep_time=0.1):
self.manager = manager
self.statsd_client = StatsClient(host=settings.STATSD_HOST, port=settings.STATSD_PORT,
prefix=settings.STATSD_PREFIX)
self.redis_connection_params = {k: v for k, v in redis_connection_params.iteritems()
if k in ('host', 'db', 'password', 'port')}
self.continue_working = True
self.query_runner = query_runner
self.sleep_time = sleep_time
self.child_pid = None
self.worker_id = worker_id
@@ -238,15 +289,22 @@ class Worker(threading.Thread):
start_time = time.time()
self.set_title("running query %s" % job_id)
if getattr(self.query_runner, 'annotate_query', True):
logging.info("[%s][%s] Loading query runner (%s, %s)...", self.name, job.id,
job.data_source_name, job.data_source_type)
query_runner = get_query_runner(job.data_source_type, job.data_source_options)
if getattr(query_runner, 'annotate_query', True):
annotated_query = "/* Pid: %s, Job Id: %s, Query hash: %s, Priority: %s */ %s" % \
(pid, job.id, job.query_hash, job.priority, job.query)
else:
annotated_query = job.query
# TODO: here's the part that needs to be forked, not all of the worker process...
with self.statsd_client.timer('worker_{}.query_runner.run_time'.format(self.worker_id)):
data, error = self.query_runner(annotated_query)
with self.statsd_client.timer('worker_{}.query_runner.{}.{}.run_time'.format(self.worker_id,
job.data_source_type,
job.data_source_name)):
data, error = query_runner(annotated_query)
run_time = time.time() - start_time
logging.info("[%s][%s] query finished... data length=%s, error=%s",
@@ -257,7 +315,8 @@ class Worker(threading.Thread):
query_result_id = None
if not error:
self.set_title("storing results %s" % job_id)
query_result_id = self.manager.store_query_result(job.query, data, run_time,
query_result_id = self.manager.store_query_result(job.data_source_id,
job.query, data, run_time,
datetime.datetime.utcnow())
self.set_title("marking job as done %s" % job_id)

View File

@@ -5,13 +5,15 @@ from flask.ext.script import Manager
class Importer(object):
def __init__(self, object_mapping=None):
def __init__(self, object_mapping=None, data_source=None):
if object_mapping is None:
object_mapping = {}
self.object_mapping = object_mapping
self.data_source = data_source
def import_query_result(self, query_result):
query_result = self._get_or_create(models.QueryResult, query_result['id'],
data_source=self.data_source,
data=json.dumps(query_result['data']),
query_hash=query_result['query_hash'],
retrieved_at=query_result['retrieved_at'],
@@ -30,7 +32,8 @@ class Importer(object):
query=query['query'],
query_hash=query['query_hash'],
description=query['description'],
latest_query_data=query_result)
latest_query_data=query_result,
data_source=self.data_source)
return new_query
@@ -115,12 +118,21 @@ def importer_with_mapping_file(mapping_filename):
with open(mapping_filename) as f:
mapping = json.loads(f.read())
importer = Importer(object_mapping=mapping)
importer = Importer(object_mapping=mapping, data_source=get_data_source())
yield importer
with open(mapping_filename, 'w') as f:
f.write(json.dumps(importer.object_mapping, indent=2))
def get_data_source():
try:
data_source = models.DataSource.get(models.DataSource.name=="Import")
except models.DataSource.DoestNotExist:
data_source = models.DataSource.create(name="Import", type="import", options='{}')
return data_source
@import_manager.command
def query(mapping_filename, query_filename, user_id):
user = models.User.get_by_id(user_id)

View File

@@ -59,7 +59,7 @@ class ActivityLog(BaseModel):
id = peewee.PrimaryKeyField()
user = peewee.ForeignKeyField(User)
type = peewee.IntegerField() # 1 for query execution
type = peewee.IntegerField()
activity = peewee.TextField()
created_at = peewee.DateTimeField(default=datetime.datetime.now)
@@ -79,8 +79,27 @@ class ActivityLog(BaseModel):
return unicode(self.id)
class DataSource(BaseModel):
id = peewee.PrimaryKeyField()
name = peewee.CharField()
type = peewee.CharField()
options = peewee.TextField()
created_at = peewee.DateTimeField(default=datetime.datetime.now)
class Meta:
db_table = 'data_sources'
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'type': self.type
}
class QueryResult(BaseModel):
id = peewee.PrimaryKeyField()
data_source = peewee.ForeignKeyField(DataSource)
query_hash = peewee.CharField(max_length=32, index=True)
query = peewee.TextField()
data = peewee.TextField()
@@ -96,16 +115,27 @@ class QueryResult(BaseModel):
'query_hash': self.query_hash,
'query': self.query,
'data': json.loads(self.data),
'data_source_id': self._data.get('data_source', None),
'runtime': self.runtime,
'retrieved_at': self.retrieved_at
}
@classmethod
def get_latest(cls, data_source, query, ttl=0):
query_hash = utils.gen_query_hash(query)
query = cls.select().where(cls.query_hash == query_hash, cls.data_source == data_source,
peewee.SQL("retrieved_at + interval '%s second' >= now() at time zone 'utc'", ttl)).order_by(cls.retrieved_at.desc())
return query.first()
def __unicode__(self):
return u"%d | %s | %s" % (self.id, self.query_hash, self.retrieved_at)
class Query(BaseModel):
id = peewee.PrimaryKeyField()
data_source = peewee.ForeignKeyField(DataSource)
latest_query_data = peewee.ForeignKeyField(QueryResult, null=True)
name = peewee.CharField(max_length=255)
description = peewee.CharField(max_length=4096, null=True)
@@ -137,6 +167,7 @@ class Query(BaseModel):
'ttl': self.ttl,
'api_key': self.api_key,
'created_at': self.created_at,
'data_source_id': self._data.get('data_source', None)
}
if with_user:
@@ -310,7 +341,7 @@ class Widget(BaseModel):
def __unicode__(self):
return u"%s" % self.id
all_models = (User, QueryResult, Query, Dashboard, Visualization, Widget, ActivityLog)
all_models = (DataSource, User, QueryResult, Query, Dashboard, Visualization, Widget, ActivityLog)
def create_db(create_tables, drop_tables):

View File

@@ -46,19 +46,8 @@ STATSD_PREFIX = os.environ.get('REDASH_STATSD_PREFIX', "redash")
NAME = os.environ.get('REDASH_NAME', 're:dash')
# "pg", "graphite", "mysql", "bigquery" or "script"
# The following is kept for backward compatability, and shouldn't be used any more.
CONNECTION_ADAPTER = os.environ.get("REDASH_CONNECTION_ADAPTER", "pg")
# Connection string for the database that is used to run queries against. Examples:
# -- mysql: CONNECTION_STRING = "Server=;User=;Pwd=;Database="
# -- pg: CONNECTION_STRING = "user= password= host= port=5439 dbname="
# -- graphite: CONNECTION_STRING = {"url": "https://graphite.yourcompany.com", "auth": ["user", "password"], "verify": true}
# -- bigquery: CONNECTION_STRING = {"serviceAccount" : "43242343247-fjdfakljr3r2@developer.gserviceaccount.com", "privateKey" : "/somewhere/23fjkfjdsfj21312-privatekey.p12", "projectId" : "myproject-123" }
# to obtain bigquery credentials follow the guidelines at https://developers.google.com/bigquery/authorization#service-accounts
# -- script: CONNECTION_STRING = "PATH TO ALL SCRIPTS" (.i.e /home/user/redash_scripts/)
# all scripts must be have the executable flag set and reside in the path configured in CONNECTION_STRING.
# The output of the scripts must be in the output format defined here: <TODO: Add link to documentation of output format>
# -- url: CONNECTION_STRING = "base URL" (i.e. http://myserver/somewhere)
# If CONNECTION_STRING is set, the query should be a relative URL. If it is not set a full URL can be used
CONNECTION_STRING = os.environ.get("REDASH_CONNECTION_STRING", "user= password= host= port=5439 dbname=")
# Connection settings for re:dash's own database (where we store the queries, results, etc)

View File

@@ -12,7 +12,7 @@ atfork==0.1.2
blinker==1.3
flask-peewee==0.6.5
itsdangerous==0.23
peewee==2.2.0
peewee==2.2.2
psycopg2==2.5.1
python-dateutil==2.1
pytz==2013.9

View File

@@ -1,3 +1,4 @@
import logging
from unittest import TestCase
from redash import settings, db, app
import redash.models
@@ -11,6 +12,8 @@ settings.DATABASE_CONFIG = {
app.config['DATABASE'] = settings.DATABASE_CONFIG
db.load_database()
logging.getLogger('peewee').setLevel(logging.INFO)
for model in redash.models.all_models:
model._meta.database = db.database

View File

@@ -1,5 +1,6 @@
import datetime
import redash.models
from redash.utils import gen_query_hash
class ModelFactory(object):
@@ -43,6 +44,12 @@ user_factory = ModelFactory(redash.models.User,
is_admin=False)
data_source_factory = ModelFactory(redash.models.DataSource,
name='Test',
type='pg',
options='')
dashboard_factory = ModelFactory(redash.models.Dashboard,
name='test', user=user_factory.create, layout='[]')
@@ -52,14 +59,16 @@ query_factory = ModelFactory(redash.models.Query,
description='',
query='SELECT 1',
ttl=-1,
user=user_factory.create)
user=user_factory.create,
data_source=data_source_factory.create)
query_result_factory = ModelFactory(redash.models.QueryResult,
data='{"columns":{}, "rows":[]}',
runtime=1,
retrieved_at=datetime.datetime.now(),
query=query_factory.create,
query_hash='')
retrieved_at=datetime.datetime.utcnow,
query="SELECT 1",
query_hash=gen_query_hash('SELECT 1'),
data_source=data_source_factory.create)
visualization_factory = ModelFactory(redash.models.Visualization,
type='CHART',
@@ -73,4 +82,4 @@ widget_factory = ModelFactory(redash.models.Widget,
width=1,
options='{}',
dashboard=dashboard_factory.create,
visualization=visualization_factory.create)
visualization=visualization_factory.create)

View File

@@ -7,7 +7,7 @@ from flask.ext.login import current_user
from mock import patch
from tests import BaseTestCase
from tests.factories import dashboard_factory, widget_factory, visualization_factory, query_factory, \
query_result_factory, user_factory
query_result_factory, user_factory, data_source_factory
from redash import app, models, settings
from redash.utils import json_dumps
from redash.authentication import sign
@@ -211,10 +211,12 @@ class QueryAPITest(BaseTestCase, AuthenticationTestMixin):
def test_create_query(self):
user = user_factory.create()
data_source = data_source_factory.create()
query_data = {
'name': 'Testing',
'query': 'SELECT 1',
'ttl': 3600
'ttl': 3600,
'data_source_id': data_source.id
}
with app.test_client() as c, authenticated_user(c, user=user):
@@ -304,12 +306,13 @@ class CsvQueryResultAPITest(BaseTestCase, AuthenticationTestMixin):
super(CsvQueryResultAPITest, self).setUp()
self.paths = []
self.query_result = query_result_factory.create()
self.path = '/api/queries/{0}/results/{1}.csv'.format(self.query_result.query.id, self.query_result.id)
self.query = query_factory.create()
self.path = '/api/queries/{0}/results/{1}.csv'.format(self.query.id, self.query_result.id)
# TODO: factor out the HMAC authentication tests
def signature(self, expires):
return sign(self.query_result.query.api_key, self.path, expires)
return sign(self.query.api_key, self.path, expires)
def test_redirect_when_unauthenticated(self):
with app.test_client() as c:
@@ -318,34 +321,34 @@ class CsvQueryResultAPITest(BaseTestCase, AuthenticationTestMixin):
def test_redirect_for_wrong_signature(self):
with app.test_client() as c:
rv = c.get('/api/queries/{0}/results/{1}.csv'.format(self.query_result.query.id, self.query_result.id), query_string={'signature': 'whatever', 'expires': 0})
rv = c.get('/api/queries/{0}/results/{1}.csv'.format(self.query.id, self.query_result.id), query_string={'signature': 'whatever', 'expires': 0})
self.assertEquals(rv.status_code, 302)
def test_redirect_for_correct_signature_and_wrong_expires(self):
with app.test_client() as c:
rv = c.get('/api/queries/{0}/results/{1}.csv'.format(self.query_result.query.id, self.query_result.id), query_string={'signature': self.signature(0), 'expires': 0})
rv = c.get('/api/queries/{0}/results/{1}.csv'.format(self.query.id, self.query_result.id), query_string={'signature': self.signature(0), 'expires': 0})
self.assertEquals(rv.status_code, 302)
def test_redirect_for_correct_signature_and_no_expires(self):
with app.test_client() as c:
rv = c.get('/api/queries/{0}/results/{1}.csv'.format(self.query_result.query.id, self.query_result.id), query_string={'signature': self.signature(time.time()+3600)})
rv = c.get('/api/queries/{0}/results/{1}.csv'.format(self.query.id, self.query_result.id), query_string={'signature': self.signature(time.time()+3600)})
self.assertEquals(rv.status_code, 302)
def test_redirect_for_correct_signature_and_expires_too_long(self):
with app.test_client() as c:
expires = time.time()+(10*3600)
rv = c.get('/api/queries/{0}/results/{1}.csv'.format(self.query_result.query.id, self.query_result.id), query_string={'signature': self.signature(expires), 'expires': expires})
rv = c.get('/api/queries/{0}/results/{1}.csv'.format(self.query.id, self.query_result.id), query_string={'signature': self.signature(expires), 'expires': expires})
self.assertEquals(rv.status_code, 302)
def test_returns_200_for_correct_signature(self):
with app.test_client() as c:
expires = time.time()+1800
rv = c.get('/api/queries/{0}/results/{1}.csv'.format(self.query_result.query.id, self.query_result.id), query_string={'signature': self.signature(expires), 'expires': expires})
rv = c.get('/api/queries/{0}/results/{1}.csv'.format(self.query.id, self.query_result.id), query_string={'signature': self.signature(expires), 'expires': expires})
self.assertEquals(rv.status_code, 200)
def test_returns_200_for_authenticated_user(self):
with app.test_client() as c, authenticated_user(c):
rv = c.get('/api/queries/{0}/results/{1}.csv'.format(self.query_result.query.id, self.query_result.id))
rv = c.get('/api/queries/{0}/results/{1}.csv'.format(self.query.id, self.query_result.id))
self.assertEquals(rv.status_code, 200)

View File

@@ -3,7 +3,7 @@ import os.path
from tests import BaseTestCase
from redash import models
from redash import import_export
from factories import user_factory, dashboard_factory
from factories import user_factory, dashboard_factory, data_source_factory
class ImportTest(BaseTestCase):
@@ -15,7 +15,7 @@ class ImportTest(BaseTestCase):
self.user = user_factory.create()
def test_imports_dashboard_correctly(self):
importer = import_export.Importer()
importer = import_export.Importer(data_source=data_source_factory.create())
dashboard = importer.import_dashboard(self.user, self.dashboard)
self.assertIsNotNone(dashboard)
@@ -31,7 +31,7 @@ class ImportTest(BaseTestCase):
self.assertEqual(models.QueryResult.select().count(), dashboard.widgets.count()-1)
def test_imports_updates_existing_models(self):
importer = import_export.Importer()
importer = import_export.Importer(data_source=data_source_factory.create())
importer.import_dashboard(self.user, self.dashboard)
self.dashboard['name'] = 'Testing #2'
@@ -47,7 +47,7 @@ class ImportTest(BaseTestCase):
}
}
importer = import_export.Importer(object_mapping=mapping)
importer = import_export.Importer(object_mapping=mapping, data_source=data_source_factory.create())
imported_dashboard = importer.import_dashboard(self.user, self.dashboard)
self.assertEqual(imported_dashboard, dashboard)

90
tests/test_job.py Normal file
View File

@@ -0,0 +1,90 @@
import time
from unittest import TestCase
from mock import patch
from redash.data.worker import Job
from redash import redis_connection
from redash.utils import gen_query_hash
class TestJob(TestCase):
def setUp(self):
self.priority = 1
self.query = "SELECT 1"
self.query_hash = gen_query_hash(self.query)
def test_job_creation(self):
now = time.time()
with patch('time.time', return_value=now):
job = Job(redis_connection, query=self.query, priority=self.priority)
self.assertIsNotNone(job.id)
self.assertTrue(job.new_job)
self.assertEquals(0, job.wait_time)
self.assertEquals(0, job.query_time)
self.assertEquals(None, job.process_id)
self.assertEquals(Job.WAITING, job.status)
self.assertEquals(self.priority, job.priority)
self.assertEquals(self.query, job.query)
self.assertEquals(self.query_hash, job.query_hash)
self.assertIsNone(job.error)
self.assertIsNone(job.query_result_id)
def test_job_loading(self):
job = Job(redis_connection, query=self.query, priority=self.priority)
job.save()
loaded_job = Job.load(redis_connection, job.id)
self.assertFalse(loaded_job.new_job)
self.assertEquals(loaded_job.id, job.id)
self.assertEquals(loaded_job.wait_time, job.wait_time)
self.assertEquals(loaded_job.query_time, job.query_time)
self.assertEquals(loaded_job.process_id, job.process_id)
self.assertEquals(loaded_job.status, job.status)
self.assertEquals(loaded_job.priority, job.priority)
self.assertEquals(loaded_job.query_hash, job.query_hash)
self.assertEquals(loaded_job.query, job.query)
self.assertEquals(loaded_job.error, job.error)
self.assertEquals(loaded_job.query_result_id, job.query_result_id)
def test_update(self):
job = Job(redis_connection, query=self.query, priority=self.priority)
job.update(process_id=1)
self.assertEquals(1, job.process_id)
self.assertEquals(self.query, job.query)
self.assertEquals(self.priority, job.priority)
def test_processing(self):
job = Job(redis_connection, query=self.query, priority=self.priority)
updated_at = job.updated_at
now = time.time()+10
with patch('time.time', return_value=now):
job.processing(10)
job = Job.load(redis_connection, job.id)
self.assertEquals(10, job.process_id)
self.assertEquals(Job.PROCESSING, job.status)
self.assertEquals(now, job.updated_at)
self.assertEquals(now - updated_at, job.wait_time)
def test_done(self):
job = Job(redis_connection, query=self.query, priority=self.priority)
updated_at = job.updated_at
now = time.time()+10
with patch('time.time', return_value=now):
job.done(1, None)
job = Job.load(redis_connection, job.id)
self.assertEquals(Job.DONE, job.status)
self.assertEquals(1, job.query_result_id)
self.assertEquals(now, job.updated_at)
self.assertEquals(now - updated_at, job.query_time)
self.assertIsNone(job.error)
def test_done_failed(self):
pass

149
tests/test_manager.py Normal file
View File

@@ -0,0 +1,149 @@
import datetime
from mock import patch, call
from tests import BaseTestCase
from redash.data import worker
from redash import data_manager, models
from tests.factories import query_factory, query_result_factory, data_source_factory
from redash.utils import gen_query_hash
class TestManagerRefresh(BaseTestCase):
def test_enqueues_outdated_queries(self):
query = query_factory.create(ttl=60)
retrieved_at = datetime.datetime.utcnow() - datetime.timedelta(minutes=10)
query_result = query_result_factory.create(retrieved_at=retrieved_at, query=query.query,
query_hash=query.query_hash)
query.latest_query_data = query_result
query.save()
with patch('redash.data.Manager.add_job') as add_job_mock:
data_manager.refresh_queries()
add_job_mock.assert_called_with(query.query, worker.Job.LOW_PRIORITY, query.data_source)
def test_skips_fresh_queries(self):
query = query_factory.create(ttl=1200)
retrieved_at = datetime.datetime.utcnow() - datetime.timedelta(minutes=10)
query_result = query_result_factory.create(retrieved_at=retrieved_at, query=query.query,
query_hash=query.query_hash)
with patch('redash.data.Manager.add_job') as add_job_mock:
data_manager.refresh_queries()
self.assertFalse(add_job_mock.called)
def test_skips_queries_with_no_ttl(self):
query = query_factory.create(ttl=-1)
retrieved_at = datetime.datetime.utcnow() - datetime.timedelta(minutes=10)
query_result = query_result_factory.create(retrieved_at=retrieved_at, query=query.query,
query_hash=query.query_hash)
with patch('redash.data.Manager.add_job') as add_job_mock:
data_manager.refresh_queries()
self.assertFalse(add_job_mock.called)
def test_enqueues_query_only_once(self):
query = query_factory.create(ttl=60)
query2 = query_factory.create(ttl=60, query=query.query, query_hash=query.query_hash,
data_source=query.data_source)
retrieved_at = datetime.datetime.utcnow() - datetime.timedelta(minutes=10)
query_result = query_result_factory.create(retrieved_at=retrieved_at, query=query.query,
query_hash=query.query_hash)
query.latest_query_data = query_result
query2.latest_query_data = query_result
query.save()
query2.save()
with patch('redash.data.Manager.add_job') as add_job_mock:
data_manager.refresh_queries()
add_job_mock.assert_called_once_with(query.query, worker.Job.LOW_PRIORITY, query.data_source)
def test_enqueues_query_with_correct_data_source(self):
query = query_factory.create(ttl=60)
query2 = query_factory.create(ttl=60, query=query.query, query_hash=query.query_hash)
retrieved_at = datetime.datetime.utcnow() - datetime.timedelta(minutes=10)
query_result = query_result_factory.create(retrieved_at=retrieved_at, query=query.query,
query_hash=query.query_hash)
query.latest_query_data = query_result
query2.latest_query_data = query_result
query.save()
query2.save()
with patch('redash.data.Manager.add_job') as add_job_mock:
data_manager.refresh_queries()
add_job_mock.assert_has_calls([call(query2.query, worker.Job.LOW_PRIORITY, query2.data_source), call(query.query, worker.Job.LOW_PRIORITY, query.data_source)], any_order=True)
self.assertEquals(2, add_job_mock.call_count)
def test_enqueues_only_for_relevant_data_source(self):
query = query_factory.create(ttl=60)
query2 = query_factory.create(ttl=3600, query=query.query, query_hash=query.query_hash)
retrieved_at = datetime.datetime.utcnow() - datetime.timedelta(minutes=10)
query_result = query_result_factory.create(retrieved_at=retrieved_at, query=query.query,
query_hash=query.query_hash)
query.latest_query_data = query_result
query2.latest_query_data = query_result
query.save()
query2.save()
with patch('redash.data.Manager.add_job') as add_job_mock:
data_manager.refresh_queries()
add_job_mock.assert_called_once_with(query.query, worker.Job.LOW_PRIORITY, query.data_source)
class TestManagerStoreResults(BaseTestCase):
def setUp(self):
super(TestManagerStoreResults, self).setUp()
self.data_source = data_source_factory.create()
self.query = "SELECT 1"
self.query_hash = gen_query_hash(self.query)
self.runtime = 123
self.utcnow = datetime.datetime.utcnow()
self.data = "data"
def test_stores_the_result(self):
query_result_id = data_manager.store_query_result(self.data_source.id, self.query,
self.data, self.runtime, self.utcnow)
query_result = models.QueryResult.get_by_id(query_result_id)
self.assertEqual(query_result.data, self.data)
self.assertEqual(query_result.runtime, self.runtime)
self.assertEqual(query_result.retrieved_at, self.utcnow)
self.assertEqual(query_result.query, self.query)
self.assertEqual(query_result.query_hash, self.query_hash)
self.assertEqual(query_result.data_source, self.data_source)
def test_updates_existing_queries(self):
query1 = query_factory.create(query=self.query, data_source=self.data_source)
query2 = query_factory.create(query=self.query, data_source=self.data_source)
query3 = query_factory.create(query=self.query, data_source=self.data_source)
query_result_id = data_manager.store_query_result(self.data_source.id, self.query,
self.data, self.runtime, self.utcnow)
self.assertEqual(models.Query.get_by_id(query1.id)._data['latest_query_data'], query_result_id)
self.assertEqual(models.Query.get_by_id(query2.id)._data['latest_query_data'], query_result_id)
self.assertEqual(models.Query.get_by_id(query3.id)._data['latest_query_data'], query_result_id)
def test_doesnt_update_queries_with_different_hash(self):
query1 = query_factory.create(query=self.query, data_source=self.data_source)
query2 = query_factory.create(query=self.query, data_source=self.data_source)
query3 = query_factory.create(query=self.query + "123", data_source=self.data_source)
query_result_id = data_manager.store_query_result(self.data_source.id, self.query,
self.data, self.runtime, self.utcnow)
self.assertEqual(models.Query.get_by_id(query1.id)._data['latest_query_data'], query_result_id)
self.assertEqual(models.Query.get_by_id(query2.id)._data['latest_query_data'], query_result_id)
self.assertNotEqual(models.Query.get_by_id(query3.id)._data['latest_query_data'], query_result_id)
def test_doesnt_update_queries_with_different_data_source(self):
query1 = query_factory.create(query=self.query, data_source=self.data_source)
query2 = query_factory.create(query=self.query, data_source=self.data_source)
query3 = query_factory.create(query=self.query, data_source=data_source_factory.create())
query_result_id = data_manager.store_query_result(self.data_source.id, self.query,
self.data, self.runtime, self.utcnow)
self.assertEqual(models.Query.get_by_id(query1.id)._data['latest_query_data'], query_result_id)
self.assertEqual(models.Query.get_by_id(query2.id)._data['latest_query_data'], query_result_id)
self.assertNotEqual(models.Query.get_by_id(query3.id)._data['latest_query_data'], query_result_id)

View File

@@ -1,6 +1,7 @@
import datetime
from tests import BaseTestCase
from redash import models
from factories import dashboard_factory, query_factory
from factories import dashboard_factory, query_factory, data_source_factory, query_result_factory
class DashboardTest(BaseTestCase):
@@ -25,4 +26,58 @@ class QueryTest(BaseTestCase):
q = models.Query.get_by_id(q.id)
self.assertNotEquals(old_hash, q.query_hash)
self.assertNotEquals(old_hash, q.query_hash)
class QueryResultTest(BaseTestCase):
def setUp(self):
super(QueryResultTest, self).setUp()
def test_get_latest_returns_none_if_not_found(self):
ds = data_source_factory.create()
found_query_result = models.QueryResult.get_latest(ds, "SELECT 1", 60)
self.assertIsNone(found_query_result)
def test_get_latest_returns_when_found(self):
qr = query_result_factory.create()
found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query, 60)
self.assertEqual(qr, found_query_result)
def test_get_latest_works_with_data_source_id(self):
qr = query_result_factory.create()
found_query_result = models.QueryResult.get_latest(qr.data_source.id, qr.query, 60)
self.assertEqual(qr, found_query_result)
def test_get_latest_doesnt_return_query_from_different_data_source(self):
qr = query_result_factory.create()
data_source = data_source_factory.create()
found_query_result = models.QueryResult.get_latest(data_source, qr.query, 60)
self.assertIsNone(found_query_result)
def test_get_latest_doesnt_return_if_ttl_expired(self):
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
qr = query_result_factory.create(retrieved_at=yesterday)
found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query, ttl=60)
self.assertIsNone(found_query_result)
def test_get_latest_returns_if_ttl_not_expired(self):
yesterday = datetime.datetime.now() - datetime.timedelta(seconds=30)
qr = query_result_factory.create(retrieved_at=yesterday)
found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query, ttl=120)
self.assertEqual(found_query_result, qr)
def test_get_latest_returns_the_most_recent_result(self):
yesterday = datetime.datetime.now() - datetime.timedelta(seconds=30)
old_qr = query_result_factory.create(retrieved_at=yesterday)
qr = query_result_factory.create()
found_query_result = models.QueryResult.get_latest(qr.data_source, qr.query, 60)
self.assertEqual(found_query_result.id, qr.id)