Replace Tornado with Flask

This commit is contained in:
Arik Fraimovich
2014-01-17 11:05:54 +02:00
parent d84d047470
commit b31c5be70e
3 changed files with 349 additions and 387 deletions

View File

@@ -31,6 +31,7 @@
</button>
<a class="navbar-brand" href="/"><strong>re:dash</strong></a>
</div>
{% raw %}
<div class="collapse navbar-collapse navbar-ex1-collapse">
<ul class="nav navbar-nav">
<li class="active" ng-show="pageTitle"><a class="page-title" ng-bind="pageTitle"></a></li>
@@ -42,13 +43,13 @@
<a href="#" ng-bind="name"></a>
<ul class="dropdown-menu">
<li ng-repeat="dashboard in group" role="presentation">
<a role="menu-item" ng-href="/dashboard/{{!dashboard.slug}}" ng-bind="dashboard.name"></a>
<a role="menu-item" ng-href="/dashboard/{{dashboard.slug}}" ng-bind="dashboard.name"></a>
</li>
</ul>
</li>
</span>
<li ng-repeat="dashboard in otherDashboards">
<a role="menu-item" ng-href="/dashboard/{{!dashboard.slug}}" ng-bind="dashboard.name"></a>
<a role="menu-item" ng-href="/dashboard/{{dashboard.slug}}" ng-bind="dashboard.name"></a>
</li>
<li class="divider"></li>
<li><a data-toggle="modal" href="#new_dashboard_dialog">New Dashboard</a></li>
@@ -64,10 +65,11 @@
</ul>
<ul class="nav navbar-nav navbar-right">
<p class="navbar-text avatar">
<img ng-src="{{!currentUser.gravatar_url}}" class="img-circle" alt="{{!currentUser.name}}" width="40" height="40"/>
<img ng-src="{{currentUser.gravatar_url}}" class="img-circle" alt="{{currentUser.name}}" width="40" height="40"/>
</p>
</ul>
</div>
{% endraw %}
</div>
</nav>
@@ -119,13 +121,13 @@
<!-- endbuild -->
<script>
var currentUser = {% raw user %};
var currentUser = {{ user|safe }};
currentUser.canEdit = function(object) {
return object.user && (object.user.indexOf(currentUser.name) != -1);
};
{% raw analytics %}
{{ analytics|safe }}
</script>
</body>

342
redash/api.py Normal file
View File

@@ -0,0 +1,342 @@
"""
Flask-restful based API implementation for re:dash.
Also at the moment the Flask server is used to serve the static assets (and the Angular.js app),
but this is only due to configuration issues and temporary.
Usage:
python api.py [--port=8888] [--debug] [--static=..]
port - port to listen to
debug - enable debug mode (extensive logging, restart on code change)
static - static assets path
If static option isn't specified it will be taken from settings.py.
"""
import csv
import hashlib
import json
import numbers
import urlparse
import cStringIO
import datetime
import dateutil
from flask import Flask, g, render_template, send_from_directory, make_response, request, jsonify
from flask_googleauth import GoogleFederated
from flask.ext.restful import Api, Resource, abort
import redis
import sqlparse
import settings
from data import utils
import data
app = Flask(__name__,
template_folder=settings.STATIC_ASSETS_PATH,
static_folder=settings.STATIC_ASSETS_PATH,
static_path='/static')
api = Api(app)
@api.representation('application/json')
def json_representation(data, code, headers=None):
resp = make_response(json.dumps(data, cls=utils.JSONEncoder), code)
resp.headers.extend(headers or {})
return resp
# TODO: move this out
url = urlparse.urlparse(settings.REDIS_URL)
redis_connection = redis.StrictRedis(host=url.hostname, port=url.port, db=0, password=url.password)
data_manager = data.Manager(redis_connection, settings.INTERNAL_DB_CONNECTION_STRING, settings.MAX_CONNECTIONS)
auth = GoogleFederated(settings.GOOGLE_APPS_DOMAIN, app)
from werkzeug.contrib.fixers import ProxyFix
app.wsgi_app = ProxyFix(app.wsgi_app)
app.secret_key = settings.COOKIE_SECRET
auth.force_auth_on_every_request = True
@app.route('/ping', methods=['GET'])
def ping():
return 'PONG.'
@app.route('/admin/<anything>')
@app.route('/dashboard/<anything>')
@app.route('/queries/<anything>')
@app.route('/')
@auth.required
def index(anything=None):
email_md5 = hashlib.md5(g.user['email'].lower()).hexdigest()
gravatar_url = "https://www.gravatar.com/avatar/%s?s=40" % email_md5
user = {
'gravatar_url': gravatar_url,
'is_admin': g.user['email'] in settings.ADMINS,
'name': g.user['email']
}
return render_template("index.html", user=json.dumps(user), analytics=settings.ANALYTICS)
@app.route('/status.json')
@auth.required
def status_api():
status = {}
info = redis_connection.info()
status['redis_used_memory'] = info['used_memory_human']
status['queries_count'] = data.models.Query.objects.count()
status['query_results_count'] = data.models.QueryResult.objects.count()
status['dashboards_count'] = data.models.Dashboard.objects.count()
status['widgets_count'] = data.models.Widget.objects.count()
status['workers'] = [redis_connection.hgetall(w)
for w in redis_connection.smembers('workers')]
manager_status = redis_connection.hgetall('manager:status')
status['manager'] = manager_status
status['manager']['queue_size'] = redis_connection.zcard('jobs')
return jsonify(status)
@app.route('/api/queries/format', methods=['POST'])
@auth.required
def format_sql_query(self):
arguments = json.loads(self.request.body)
query = arguments.get("query", "")
return sqlparse.format(query, reindent=True, keyword_case='upper')
class BaseResource(Resource):
decorators = [auth.required]
@property
def current_user(self):
return g.user['email']
class DashboardListAPI(BaseResource):
def get(self):
dashboards = [d.to_dict() for d in
data.models.Dashboard.objects.filter(is_archived=False)]
return dashboards
def post(self):
dashboard_properties = json.loads(self.request.body)
dashboard = data.models.Dashboard(name=dashboard_properties['name'],
user=self.current_user,
layout='[]')
dashboard.save()
return dashboard.to_dict()
class DashboardAPI(BaseResource):
def get(self, dashboard_slug=None):
dashboard = data.models.Dashboard.objects.prefetch_related('widgets__query__latest_query_data').get(slug=dashboard_slug)
return dashboard.to_dict(with_widgets=True)
def post(self, dashboard_id):
dashboard_properties = request.json
dashboard = data.models.Dashboard.objects.get(pk=dashboard_id)
dashboard.layout = dashboard_properties['layout']
dashboard.name = dashboard_properties['name']
dashboard.save()
return dashboard.to_dict(with_widgets=True)
def delete(self, dashboard_slug):
dashboard = data.models.Dashboard.objects.get(slug=dashboard_slug)
dashboard.is_archived = True
dashboard.save()
api.add_resource(DashboardListAPI, '/api/dashboards', endpoint='dashboards')
api.add_resource(DashboardAPI, '/api/dashboards/<dashboard_slug>', endpoint='dashboard')
class WidgetListAPI(BaseResource):
def post(self):
widget_properties = request.json
widget_properties['options'] = json.dumps(widget_properties['options'])
widget = data.models.Widget(**widget_properties)
widget.save()
layout = json.loads(widget.dashboard.layout)
new_row = True
if len(layout) == 0 or widget.width == 2:
layout.append([widget.id])
elif len(layout[-1]) == 1:
neighbour_widget = data.models.Widget.objects.get(pk=layout[-1][0])
if neighbour_widget.width == 1:
layout[-1].append(widget.id)
new_row = False
else:
layout.append([widget.id])
else:
layout.append([widget.id])
widget.dashboard.layout = json.dumps(layout)
widget.dashboard.save()
return {'widget': widget.to_dict(), 'layout': layout, 'new_row': new_row}
class WidgetAPI(BaseResource):
def delete(self, widget_id):
widget = data.models.Widget.objects.get(pk=widget_id)
# TODO: reposition existing ones
layout = json.loads(widget.dashboard.layout)
layout = map(lambda row: filter(lambda w: w != widget_id, row), layout)
layout = filter(lambda row: len(row) > 0, layout)
widget.dashboard.layout = json.dumps(layout)
widget.dashboard.save()
widget.delete()
api.add_resource(WidgetListAPI, '/api/widgets', endpoint='widgets')
api.add_resource(WidgetAPI, '/api/widgets/<int:widget_id>', endpoint='widget')
class QueryListAPI(BaseResource):
def post(self):
query_def = request.json
if 'created_at' in query_def:
query_def['created_at'] = dateutil.parser.parse(query_def['created_at'])
query_def.pop('latest_query_data', None)
query_def['user'] = self.current_user
query = data.models.Query(**query_def)
query.save()
return query.to_dict(with_result=False)
def get(self):
return [q.to_dict(with_result=False, with_stats=True) for q in data.models.Query.all_queries()]
class QueryAPI(BaseResource):
def post(self, query_id):
query_def = request.json
if 'created_at' in query_def:
query_def['created_at'] = dateutil.parser.parse(query_def['created_at'])
query_def.pop('latest_query_data', None)
query = data.models.Query(**query_def)
fields = query_def.keys()
fields.remove('id')
query.save(update_fields=fields)
return query.to_dict(with_result=False)
def get(self, query_id):
q = data.models.Query.objects.get(pk=query_id)
if q:
return q.to_dict()
else:
abort(404, message="Query not found.")
api.add_resource(QueryListAPI, '/api/queries', endpoint='queries')
api.add_resource(QueryAPI, '/api/queries/<query_id>', endpoint='query')
class QueryResultListAPI(BaseResource):
def post(self):
params = request.json
if params['ttl'] == 0:
query_result = None
else:
query_result = data_manager.get_query_result(params['query'], int(params['ttl']))
if query_result:
return {'query_result': query_result.to_dict(parse_data=True)}
else:
job = data_manager.add_job(params['query'], data.Job.HIGH_PRIORITY)
return {'job': job.to_dict()}
class QueryResultAPI(BaseResource):
def get(self, query_result_id):
query_result = data_manager.get_query_result_by_id(query_result_id)
if query_result:
return {'query_result': query_result.to_dict(parse_data=True)}
else:
abort(404)
class CsvQueryResultsAPI(BaseResource):
# TODO: bring this functionality:
#def get_current_user(self):
# user = super(CsvQueryResultsHandler, self).get_current_user()
# if not user:
# api_key = self.get_argument("api_key", None)
# query = data.models.Query.objects.get(pk=self.path_args[0])
#
# if query.api_key and query.api_key == api_key:
# user = "API-Key=%s" % api_key
#
# return user
def get(self, query_id, query_result_id=None):
if not query_result_id:
query = data.models.Query.objects.get(pk=query_id)
if query:
query_result_id = query.latest_query_data_id
query_result = query_result_id and data_manager.get_query_result_by_id(query_result_id)
if query_result:
s = cStringIO.StringIO()
query_data = json.loads(query_result.data)
writer = csv.DictWriter(s, fieldnames=[col['name'] for col in query_data['columns']])
writer.writer = utils.UnicodeWriter(s)
writer.writeheader()
for row in query_data['rows']:
for k, v in row.iteritems():
if isinstance(v, numbers.Number) and (v > 1000 * 1000 * 1000 * 100):
row[k] = datetime.datetime.fromtimestamp(v/1000.0)
writer.writerow(row)
return make_response(s.getvalue(), 200, {'Content-Type': "text/csv; charset=UTF-8"})
else:
abort(404)
api.add_resource(CsvQueryResultsAPI, '/api/queries/<query_id>/results/<query_result_id>.csv',
'/api/queries/<query_id>/results.csv',
endpoint='csv_query_results')
api.add_resource(QueryResultListAPI, '/api/query_results', endpoint='query_results')
api.add_resource(QueryResultAPI, '/api/query_results/<query_result_id>', endpoint='query_result')
class JobAPI(BaseResource):
def get(self, job_id):
# TODO: if finished, include the query result
job = data.Job.load(data_manager.redis_connection, job_id)
return {'job': job.to_dict()}
def delete(self, job_id):
job = data.Job.load(data_manager.redis_connection, job_id)
job.cancel()
api.add_resource(JobAPI, '/api/jobs/<job_id>', endpoint='job')
@app.route('/<path:filename>')
@auth.required
def send_static(filename):
return send_from_directory(settings.STATIC_ASSETS_PATH, filename)
if __name__ == '__main__':
app.run(debug=True)

View File

@@ -1,382 +0,0 @@
"""
Tornado based API implementation for re:dash.
Also at the moment the Tornado server is used to serve the static assets (and the Angular.js app),
but this is only due to configuration issues and temporary.
Usage:
python server.py [--port=8888] [--debug] [--static=..]
port - port to listen to
debug - enable debug mode (extensive logging, restart on code change)
static - static assets path
If static option isn't specified it will be taken from settings.py.
"""
import csv
import hashlib
import json
import numbers
import os
import urlparse
import logging
import cStringIO
import datetime
import dateutil.parser
import redis
import sqlparse
import tornado.ioloop
import tornado.web
import tornado.auth
import tornado.options
import settings
import time
from data import utils
import data
class BaseHandler(tornado.web.RequestHandler):
def initialize(self):
self.data_manager = self.application.settings.get('data_manager', None)
self.redis_connection = self.application.settings['redis_connection']
def get_current_user(self):
user = self.get_secure_cookie("user")
return user
def write_json(self, response, encode=True):
if encode:
response = json.dumps(response, cls=utils.JSONEncoder)
self.set_header("Content-Type", "application/json; charset=UTF-8")
self.write(response)
class BaseAuthenticatedHandler(BaseHandler):
@tornado.web.authenticated
def prepare(self):
pass
class PingHandler(tornado.web.RequestHandler):
def get(self):
self.write("PONG")
class GoogleLoginHandler(tornado.web.RequestHandler,
tornado.auth.GoogleMixin):
@tornado.web.asynchronous
@tornado.gen.coroutine
def get(self):
if self.get_argument("openid.mode", None):
user = yield self.get_authenticated_user()
if user['email'] in settings.ALLOWED_USERS or user['email'].endswith("@%s" % settings.GOOGLE_APPS_DOMAIN):
logging.info("Authenticated: %s", user['email'])
self.set_secure_cookie("user", user['email'])
self.redirect("/")
else:
logging.error("Failed logging in with: %s", user)
self.authenticate_redirect()
else:
self.authenticate_redirect()
class MainHandler(BaseAuthenticatedHandler):
def get(self, *args):
email_md5 = hashlib.md5(self.current_user.lower()).hexdigest()
gravatar_url = "https://www.gravatar.com/avatar/%s?s=40" % email_md5
user = {
'gravatar_url': gravatar_url,
'is_admin': self.current_user in settings.ADMINS,
'name': self.current_user
}
self.render("index.html", user=json.dumps(user), analytics=settings.ANALYTICS)
class QueryFormatHandler(BaseAuthenticatedHandler):
def post(self):
arguments = json.loads(self.request.body)
query = arguments.get("query", "")
self.write(sqlparse.format(query, reindent=True, keyword_case='upper'))
class StatusHandler(BaseAuthenticatedHandler):
def get(self):
status = {}
info = self.redis_connection.info()
status['redis_used_memory'] = info['used_memory_human']
status['queries_count'] = data.models.Query.objects.count()
status['query_results_count'] = data.models.QueryResult.objects.count()
status['dashboards_count'] = data.models.Dashboard.objects.count()
status['widgets_count'] = data.models.Widget.objects.count()
status['workers'] = [self.redis_connection.hgetall(w)
for w in self.redis_connection.smembers('workers')]
manager_status = self.redis_connection.hgetall('manager:status')
status['manager'] = manager_status
status['manager']['queue_size'] = self.redis_connection.zcard('jobs')
self.write_json(status)
class WidgetsHandler(BaseAuthenticatedHandler):
def post(self, widget_id=None):
widget_properties = json.loads(self.request.body)
widget_properties['options'] = json.dumps(widget_properties['options'])
widget = data.models.Widget(**widget_properties)
widget.save()
layout = json.loads(widget.dashboard.layout)
new_row = True
if len(layout) == 0 or widget.width == 2:
layout.append([widget.id])
elif len(layout[-1]) == 1:
neighbour_widget = data.models.Widget.objects.get(pk=layout[-1][0])
if neighbour_widget.width == 1:
layout[-1].append(widget.id)
new_row = False
else:
layout.append([widget.id])
else:
layout.append([widget.id])
widget.dashboard.layout = json.dumps(layout)
widget.dashboard.save()
self.write_json({'widget': widget.to_dict(), 'layout': layout, 'new_row': new_row})
def delete(self, widget_id):
widget_id = int(widget_id)
widget = data.models.Widget.objects.get(pk=widget_id)
# TODO: reposition existing ones
layout = json.loads(widget.dashboard.layout)
layout = map(lambda row: filter(lambda w: w != widget_id, row), layout)
layout = filter(lambda row: len(row) > 0, layout)
widget.dashboard.layout = json.dumps(layout)
widget.dashboard.save()
widget.delete()
class DashboardHandler(BaseAuthenticatedHandler):
def get(self, dashboard_slug=None):
if dashboard_slug:
dashboard = data.models.Dashboard.objects.prefetch_related('widgets__visualization__query__latest_query_data').get(slug=dashboard_slug)
self.write_json(dashboard.to_dict(with_widgets=True))
else:
dashboards = [d.to_dict() for d in
data.models.Dashboard.objects.filter(is_archived=False)]
self.write_json(dashboards)
def post(self, dashboard_id):
if dashboard_id:
dashboard_properties = json.loads(self.request.body)
dashboard = data.models.Dashboard.objects.get(pk=dashboard_id)
dashboard.layout = dashboard_properties['layout']
dashboard.name = dashboard_properties['name']
dashboard.save()
self.write_json(dashboard.to_dict(with_widgets=True))
else:
dashboard_properties = json.loads(self.request.body)
dashboard = data.models.Dashboard(name=dashboard_properties['name'],
user=self.current_user,
layout='[]')
dashboard.save()
self.write_json(dashboard.to_dict())
def delete(self, dashboard_slug):
dashboard = data.models.Dashboard.objects.get(slug=dashboard_slug)
dashboard.is_archived = True
dashboard.save()
class QueriesHandler(BaseAuthenticatedHandler):
def post(self, id=None):
query_def = json.loads(self.request.body)
if 'created_at' in query_def:
query_def['created_at'] = dateutil.parser.parse(query_def['created_at'])
query_def.pop('latest_query_data', None)
query_def.pop('visualizations', None)
if id:
query = data.models.Query(**query_def)
fields = query_def.keys()
fields.remove('id')
query.save(update_fields=fields)
else:
query_def['user'] = self.current_user
query = data.models.Query(**query_def)
query.save()
query.create_default_visualizations()
self.write_json(query.to_dict(with_result=False))
def get(self, id=None):
if id:
q = data.models.Query.objects.get(pk=id)
if q:
self.write_json(q.to_dict(with_visualizations=True))
else:
self.send_error(404)
else:
self.write_json([q.to_dict(with_result=False, with_stats=True) for q in data.models.Query.all_queries()])
class QueryResultsHandler(BaseAuthenticatedHandler):
def get(self, query_result_id):
query_result = self.data_manager.get_query_result_by_id(query_result_id)
if query_result:
self.write_json({'query_result': query_result.to_dict(parse_data=True)})
else:
self.send_error(404)
def post(self, _):
params = json.loads(self.request.body)
if params['ttl'] == 0:
query_result = None
else:
query_result = self.data_manager.get_query_result(params['query'], int(params['ttl']))
if query_result:
self.write_json({'query_result': query_result.to_dict(parse_data=True)})
else:
job = self.data_manager.add_job(params['query'], data.Job.HIGH_PRIORITY)
self.write({'job': job.to_dict()})
class VisualizationHandler(BaseAuthenticatedHandler):
def get(self, id):
pass
def post(self, id=None):
kwargs = json.loads(self.request.body)
kwargs['options'] = json.dumps(kwargs['options'])
if id:
vis = data.models.Visualization(**kwargs)
fields = kwargs.keys()
fields.remove('id')
vis.save(update_fields=fields)
else:
vis = data.models.Visualization(**kwargs)
vis.save()
self.write_json(vis.to_dict(with_query=False))
def delete(self, id):
vis = data.models.Visualization.objects.get(pk=id)
vis.delete()
class CsvQueryResultsHandler(BaseAuthenticatedHandler):
def get_current_user(self):
user = super(CsvQueryResultsHandler, self).get_current_user()
if not user:
api_key = self.get_argument("api_key", None)
query = data.models.Query.objects.get(pk=self.path_args[0])
if query.api_key and query.api_key == api_key:
user = "API-Key=%s" % api_key
return user
def get(self, query_id, result_id=None):
if not result_id:
query = data.models.Query.objects.get(pk=query_id)
if query:
result_id = query.latest_query_data_id
query_result = result_id and self.data_manager.get_query_result_by_id(result_id)
if query_result:
self.set_header("Content-Type", "text/csv; charset=UTF-8")
s = cStringIO.StringIO()
query_data = json.loads(query_result.data)
writer = csv.DictWriter(s, fieldnames=[col['name'] for col in query_data['columns']])
writer.writer = utils.UnicodeWriter(s)
writer.writeheader()
for row in query_data['rows']:
for k, v in row.iteritems():
if isinstance(v, numbers.Number) and (v > 1000 * 1000 * 1000 * 100):
row[k] = datetime.datetime.fromtimestamp(v/1000.0)
writer.writerow(row)
self.write(s.getvalue())
else:
self.send_error(404)
class JobsHandler(BaseAuthenticatedHandler):
def get(self, job_id=None):
if job_id:
# TODO: if finished, include the query result
job = data.Job.load(self.data_manager.redis_connection, job_id)
self.write({'job': job.to_dict()})
else:
raise NotImplemented
def delete(self, job_id):
job = data.Job.load(self.data_manager.redis_connection, job_id)
job.cancel()
def get_application(static_path, is_debug, redis_connection, data_manager):
return tornado.web.Application([(r"/", MainHandler),
(r"/ping", PingHandler),
(r"/api/queries/([0-9]*)/results(?:/([0-9]*))?.csv", CsvQueryResultsHandler),
(r"/api/queries/format", QueryFormatHandler),
(r"/api/queries(?:/([0-9]*))?", QueriesHandler),
(r"/api/query_results(?:/([0-9]*))?", QueryResultsHandler),
(r"/api/jobs/(.*)", JobsHandler),
(r"/api/visualizations(?:/([0-9]*))?", VisualizationHandler),
(r"/api/widgets(?:/([0-9]*))?", WidgetsHandler),
(r"/api/dashboards(?:/(.*))?", DashboardHandler),
(r"/admin/(.*)", MainHandler),
(r"/dashboard/(.*)", MainHandler),
(r"/queries(.*)", MainHandler),
(r"/login", GoogleLoginHandler),
(r"/status.json", StatusHandler),
(r"/(.*)", tornado.web.StaticFileHandler,
{"path": static_path})],
template_path=static_path,
static_path=static_path,
debug=is_debug,
login_url="/login",
cookie_secret=settings.COOKIE_SECRET,
redis_connection=redis_connection,
data_manager=data_manager)
if __name__ == '__main__':
tornado.options.define("port", default=8888, type=int)
tornado.options.define("debug", default=False, type=bool)
tornado.options.define("static", default=settings.STATIC_ASSETS_PATH, type=str)
tornado.options.parse_command_line()
root_path = os.path.dirname(__file__)
static_path = os.path.abspath(os.path.join(root_path, tornado.options.options.static))
url = urlparse.urlparse(settings.REDIS_URL)
redis_connection = redis.StrictRedis(host=url.hostname, port=url.port, db=0, password=url.password)
data_manager = data.Manager(redis_connection, settings.INTERNAL_DB_CONNECTION_STRING,
settings.MAX_CONNECTIONS)
logging.info("re:dash web server stating on port: %d...", tornado.options.options.port)
logging.info("UI assets path: %s...", static_path)
application = get_application(static_path, tornado.options.options.debug,
redis_connection, data_manager)
application.listen(tornado.options.options.port)
tornado.ioloop.IOLoop.instance().start()