Encapsulate data source/query runner configuration in an object.

This is a step towards adding more complex logic in configuration
handling, like encryption of secrets.
This commit is contained in:
Arik Fraimovich
2016-02-18 17:32:49 +02:00
parent f1e90fde31
commit ed99b8452c
13 changed files with 292 additions and 102 deletions

View File

@@ -1,13 +1,31 @@
import json import json
import jsonschema
from jsonschema import ValidationError
from redash import query_runner from redash import query_runner
from redash.models import DataSource from redash.models import DataSource
def validate_configuration(query_runner_type, configuration_json):
query_runner_class = query_runner.query_runners.get(query_runner_type, None)
if query_runner_class is None:
return False
try:
if isinstance(configuration_json, basestring):
configuration = json.loads(configuration_json)
else:
configuration = configuration_json
jsonschema.validate(configuration, query_runner_class.configuration_schema())
except (ValidationError, ValueError):
return False
return True
def update(data_source): def update(data_source):
print "[%s] Old options: %s" % (data_source.name, data_source.options) print "[%s] Old options: %s" % (data_source.name, data_source.options)
if query_runner.validate_configuration(data_source.type, data_source.options): if validate_configuration(data_source.type, data_source.options):
print "[%s] configuration already valid. skipping." % data_source.name print "[%s] configuration already valid. skipping." % data_source.name
return return

View File

@@ -2,7 +2,8 @@ import json
import click import click
from flask.ext.script import Manager from flask.ext.script import Manager
from redash import models from redash import models
from redash.query_runner import query_runners, validate_configuration from redash.query_runner import query_runners, get_configuration_schema_for_type
from redash.utils.configuration import ConfigurationContainer
manager = Manager(help="Data sources management commands.") manager = Manager(help="Data sources management commands.")
@@ -23,12 +24,6 @@ def validate_data_source_type(type):
exit() exit()
def validate_data_source_options(type, options):
if not validate_configuration(type, options):
print "Error: invalid configuration."
exit()
@manager.command @manager.command
def new(name=None, type=None, options=None): def new(name=None, type=None, options=None):
"""Create new data source""" """Create new data source"""
@@ -76,9 +71,10 @@ def new(name=None, type=None, options=None):
if value != default_value: if value != default_value:
options_obj[k] = value options_obj[k] = value
options = json.dumps(options_obj) options = ConfigurationContainer(options_obj, schema)
if not options.is_valid():
validate_data_source_options(type, options) print "Error: invalid configuration."
exit()
print "Creating {} data source ({}) with options:\n{}".format(type, name, options) print "Creating {} data source ({}) with options:\n{}".format(type, name, options)
@@ -120,7 +116,10 @@ def edit(name, new_name=None, options=None, type=None):
data_source = models.DataSource.get(models.DataSource.name==name) data_source = models.DataSource.get(models.DataSource.name==name)
if options is not None: if options is not None:
validate_data_source_options(data_source.type, options) schema = get_configuration_schema_for_type(data_source.type)
options = json.loads(options)
data_source.options.set_schema(schema)
data_source.options.update(options)
update_attr(data_source, "name", new_name) update_attr(data_source, "name", new_name)
update_attr(data_source, "type", type) update_attr(data_source, "type", type)

View File

@@ -1,13 +1,12 @@
import json
from flask import make_response, request from flask import make_response, request
from flask.ext.restful import abort from flask.ext.restful import abort
from funcy import project from funcy import project
from redash import models from redash import models
from redash.wsgi import api from redash.wsgi import api
from redash.utils.configuration import ConfigurationContainer, ValidationError
from redash.permissions import require_admin from redash.permissions import require_admin
from redash.query_runner import query_runners, validate_configuration from redash.query_runner import query_runners, get_configuration_schema_for_type
from redash.handlers.base import BaseResource, get_object_or_404 from redash.handlers.base import BaseResource, get_object_or_404
@@ -30,14 +29,18 @@ class DataSourceAPI(BaseResource):
data_source = models.DataSource.get_by_id_and_org(data_source_id, self.current_org) data_source = models.DataSource.get_by_id_and_org(data_source_id, self.current_org)
req = request.get_json(True) req = request.get_json(True)
data_source.replace_secret_placeholders(req['options']) schema = get_configuration_schema_for_type(req['type'])
if schema is None:
if not validate_configuration(req['type'], req['options']):
abort(400) abort(400)
data_source.name = req['name'] try:
data_source.options = json.dumps(req['options']) data_source.options.set_schema(schema)
data_source.options.update(req['options'])
except ValidationError:
abort(400)
data_source.type = req['type']
data_source.name = req['name']
data_source.save() data_source.save()
return data_source.to_dict(all=True) return data_source.to_dict(all=True)
@@ -76,12 +79,18 @@ class DataSourceListAPI(BaseResource):
if f not in req: if f not in req:
abort(400) abort(400)
if not validate_configuration(req['type'], req['options']): schema = get_configuration_schema_for_type(req['type'])
if schema is None:
abort(400)
config = ConfigurationContainer(req['options'], schema)
if not config.is_valid():
abort(400) abort(400)
datasource = models.DataSource.create_with_group(org=self.current_org, datasource = models.DataSource.create_with_group(org=self.current_org,
name=req['name'], name=req['name'],
type=req['type'], options=json.dumps(req['options'])) type=req['type'],
options=config)
return datasource.to_dict(all=True) return datasource.to_dict(all=True)

View File

@@ -10,14 +10,16 @@ from funcy import project
import peewee import peewee
from passlib.apps import custom_app_context as pwd_context from passlib.apps import custom_app_context as pwd_context
from playhouse.postgres_ext import ArrayField, DateTimeTZField, PostgresqlExtDatabase from playhouse.postgres_ext import ArrayField, DateTimeTZField
from flask.ext.login import UserMixin, AnonymousUserMixin from flask.ext.login import UserMixin, AnonymousUserMixin
from permissions import has_access, view_only from permissions import has_access, view_only
from redash import utils, settings, redis_connection from redash import utils, settings, redis_connection
from redash.query_runner import get_query_runner from redash.query_runner import get_query_runner
from redash.metrics.database import MeteredPostgresqlExtDatabase, MeteredModel from redash.metrics.database import MeteredPostgresqlExtDatabase, MeteredModel
from utils import generate_token from redash.utils import generate_token
from redash.utils.configuration import ConfigurationContainer
class Database(object): class Database(object):
@@ -314,14 +316,20 @@ class User(ModelTimestampsMixin, BaseModel, BelongsToOrgMixin, UserMixin, Permis
return self.password_hash and pwd_context.verify(password, self.password_hash) return self.password_hash and pwd_context.verify(password, self.password_hash)
class DataSource(BelongsToOrgMixin, BaseModel): class ConfigurationField(peewee.TextField):
SECRET_PLACEHOLDER = '--------' def db_value(self, value):
return value.to_json()
def python_value(self, value):
return ConfigurationContainer.from_json(value)
class DataSource(BelongsToOrgMixin, BaseModel):
id = peewee.PrimaryKeyField() id = peewee.PrimaryKeyField()
org = peewee.ForeignKeyField(Organization, related_name="data_sources") org = peewee.ForeignKeyField(Organization, related_name="data_sources")
name = peewee.CharField() name = peewee.CharField()
type = peewee.CharField() type = peewee.CharField()
options = peewee.TextField() options = ConfigurationField()
queue_name = peewee.CharField(default="queries") queue_name = peewee.CharField(default="queries")
scheduled_queue_name = peewee.CharField(default="scheduled_queries") scheduled_queue_name = peewee.CharField(default="scheduled_queries")
created_at = DateTimeTZField(default=datetime.datetime.now) created_at = DateTimeTZField(default=datetime.datetime.now)
@@ -342,7 +350,7 @@ class DataSource(BelongsToOrgMixin, BaseModel):
} }
if all: if all:
d['options'] = self.configuration d['options'] = self.options.to_dict(mask_secrets=True)
d['queue_name'] = self.queue_name d['queue_name'] = self.queue_name
d['scheduled_queue_name'] = self.scheduled_queue_name d['scheduled_queue_name'] = self.scheduled_queue_name
d['groups'] = self.groups d['groups'] = self.groups
@@ -361,23 +369,6 @@ class DataSource(BelongsToOrgMixin, BaseModel):
DataSourceGroup.create(data_source=data_source, group=data_source.org.default_group) DataSourceGroup.create(data_source=data_source, group=data_source.org.default_group)
return data_source return data_source
@property
def configuration(self):
configuration = json.loads(self.options)
schema = self.query_runner.configuration_schema()
for prop in schema.get('secret', []):
if prop in configuration and configuration[prop]:
configuration[prop] = self.SECRET_PLACEHOLDER
return configuration
def replace_secret_placeholders(self, configuration):
current_configuration = json.loads(self.options)
schema = self.query_runner.configuration_schema()
for prop in schema.get('secret', []):
if prop in configuration and configuration[prop] == self.SECRET_PLACEHOLDER:
configuration[prop] = current_configuration[prop]
def get_schema(self, refresh=False): def get_schema(self, refresh=False):
key = "data_source:schema:{}".format(self.id) key = "data_source:schema:{}".format(self.id)

View File

@@ -1,14 +1,11 @@
import logging import logging
import json import json
import jsonschema
from jsonschema import ValidationError
from redash import settings from redash import settings
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
__all__ = [ __all__ = [
'ValidationError',
'BaseQueryRunner', 'BaseQueryRunner',
'InterruptException', 'InterruptException',
'BaseSQLQueryRunner', 'BaseSQLQueryRunner',
@@ -41,12 +38,13 @@ SUPPORTED_COLUMN_TYPES = set([
TYPE_DATE TYPE_DATE
]) ])
class InterruptException(Exception): class InterruptException(Exception):
pass pass
class BaseQueryRunner(object): class BaseQueryRunner(object):
def __init__(self, configuration): def __init__(self, configuration):
jsonschema.validate(configuration, self.configuration_schema())
self.syntax = 'sql' self.syntax = 'sql'
self.configuration = configuration self.configuration = configuration
@@ -142,29 +140,20 @@ def register(query_runner_class):
logger.warning("%s query runner enabled but not supported, not registering. Either disable or install missing dependencies.", query_runner_class.name()) logger.warning("%s query runner enabled but not supported, not registering. Either disable or install missing dependencies.", query_runner_class.name())
def get_query_runner(query_runner_type, configuration_json): def get_query_runner(query_runner_type, configuration):
query_runner_class = query_runners.get(query_runner_type, None) query_runner_class = query_runners.get(query_runner_type, None)
if query_runner_class is None: if query_runner_class is None:
return None return None
return query_runner_class(json.loads(configuration_json)) return query_runner_class(configuration)
def validate_configuration(query_runner_type, configuration_json): def get_configuration_schema_for_type(query_runner_type):
query_runner_class = query_runners.get(query_runner_type, None) query_runner_class = query_runners.get(query_runner_type, None)
if query_runner_class is None: if query_runner_class is None:
return False return None
try: return query_runner_class.configuration_schema()
if isinstance(configuration_json, basestring):
configuration = json.loads(configuration_json)
else:
configuration = configuration_json
jsonschema.validate(configuration, query_runner_class.configuration_schema())
except (ValidationError, ValueError):
return False
return True
def import_query_runners(query_runner_imports): def import_query_runners(query_runner_imports):

View File

@@ -144,9 +144,7 @@ class Python(BaseQueryRunner):
except models.DataSource.DoesNotExist: except models.DataSource.DoesNotExist:
raise Exception("Wrong data source name/id: %s." % data_source_name_or_id) raise Exception("Wrong data source name/id: %s." % data_source_name_or_id)
query_runner = get_query_runner(data_source.type, data_source.options) data, error = data_source.query_runner.run_query(query)
data, error = query_runner.run_query(query)
if error is not None: if error is not None:
raise Exception(error) raise Exception(error)

View File

@@ -14,7 +14,7 @@ from celery.utils.log import get_task_logger
from redash import redis_connection, models, statsd_client, settings, utils, mail from redash import redis_connection, models, statsd_client, settings, utils, mail
from redash.utils import gen_query_hash from redash.utils import gen_query_hash
from redash.worker import celery from redash.worker import celery
from redash.query_runner import get_query_runner, InterruptException from redash.query_runner import InterruptException
from version_check import run_version_check from version_check import run_version_check
logger = get_task_logger(__name__) logger = get_task_logger(__name__)
@@ -270,7 +270,7 @@ def execute_query(self, query, data_source_id, metadata):
logger.debug("Executing query:\n%s", query) logger.debug("Executing query:\n%s", query)
query_hash = gen_query_hash(query) query_hash = gen_query_hash(query)
query_runner = get_query_runner(data_source.type, data_source.options) query_runner = data_source.query_runner
logger.info("task=execute_query state=before query_hash=%s type=%s ds_id=%d task_id=%s queue=%s query_id=%s username=%s", logger.info("task=execute_query state=before query_hash=%s type=%s ds_id=%d task_id=%s queue=%s query_id=%s username=%s",
query_hash, data_source.type, data_source.id, self.request.id, self.request.delivery_info['routing_key'], query_hash, data_source.type, data_source.id, self.request.id, self.request.delivery_info['routing_key'],

View File

@@ -0,0 +1,77 @@
import json
import jsonschema
from jsonschema import ValidationError
SECRET_PLACEHOLDER = '--------'
class ConfigurationContainer(object):
def __init__(self, config, schema=None):
self._config = config
self.set_schema(schema)
def set_schema(self, schema):
self._schema = schema
@property
def schema(self):
if self._schema is None:
raise RuntimeError("Schema missing.")
return self._schema
def is_valid(self):
try:
self.validate()
except (ValidationError, ValueError):
return False
return True
def validate(self):
jsonschema.validate(self._config, self._schema)
def to_json(self):
return json.dumps(self._config)
def iteritems(self):
return self._config.iteritems()
def to_dict(self, mask_secrets=False):
if mask_secrets is False:
return self._config
config = self._config.copy()
for key in config:
if key in self.schema['secret']:
config[key] = SECRET_PLACEHOLDER
return config
def update(self, new_config):
jsonschema.validate(new_config, self.schema)
config = {}
for k, v in new_config.iteritems():
if k in self.schema['secret'] and v == SECRET_PLACEHOLDER:
config[k] = self[k]
else:
config[k] = v
self._config = config
def get(self, *args, **kwargs):
return self._config.get(*args, **kwargs)
def __getitem__(self, item):
if item in self._config:
return self._config[item]
raise KeyError(item)
def __contains__(self, item):
return item in self._config
@classmethod
def from_json(cls, config_in_json):
return cls(json.loads(config_in_json))

View File

@@ -1,5 +1,6 @@
import redash.models import redash.models
from redash.utils import gen_query_hash, utcnow from redash.utils import gen_query_hash, utcnow
from redash.utils.configuration import ConfigurationContainer
class ModelFactory(object): class ModelFactory(object):
@@ -51,7 +52,7 @@ org_factory = ModelFactory(redash.models.Organization,
data_source_factory = ModelFactory(redash.models.DataSource, data_source_factory = ModelFactory(redash.models.DataSource,
name=Sequence('Test {}'), name=Sequence('Test {}'),
type='pg', type='pg',
options='{"dbname": "test"}', options=ConfigurationContainer.from_json('{"dbname": "test"}'),
org=1) org=1)
dashboard_factory = ModelFactory(redash.models.Dashboard, dashboard_factory = ModelFactory(redash.models.Dashboard,

View File

@@ -1,4 +1,6 @@
import json
from tests import BaseTestCase from tests import BaseTestCase
from redash.models import DataSource
class TestDataSourceGetSchema(BaseTestCase): class TestDataSourceGetSchema(BaseTestCase):
@@ -23,3 +25,66 @@ class TestDataSourceListGet(BaseTestCase):
self.assertEqual(len(response.json), 1) self.assertEqual(len(response.json), 1)
class DataSourceTypesTest(BaseTestCase):
def test_returns_data_for_admin(self):
admin = self.factory.create_admin()
rv = self.make_request('get', "/api/data_sources/types", user=admin)
self.assertEqual(rv.status_code, 200)
def test_returns_403_for_non_admin(self):
rv = self.make_request('get', "/api/data_sources/types")
self.assertEqual(rv.status_code, 403)
class TestDataSourceAPIPost(BaseTestCase):
def setUp(self):
super(TestDataSourceAPIPost, self).setUp()
self.path = "/api/data_sources/{}".format(self.factory.data_source.id)
def test_returns_400_when_configuration_invalid(self):
admin = self.factory.create_admin()
rv = self.make_request('post', self.path,
data={'name': 'DS 1', 'type': 'pg', 'options': '{}'}, user=admin)
self.assertEqual(rv.status_code, 400)
def test_updates_data_source(self):
admin = self.factory.create_admin()
new_name = 'New Name'
new_options = {"dbname": "newdb"}
rv = self.make_request('post', self.path,
data={'name': new_name, 'type': 'pg', 'options': new_options},
user=admin)
self.assertEqual(rv.status_code, 200)
data_source = DataSource.get_by_id(self.factory.data_source.id)
self.assertEqual(data_source.name, new_name)
self.assertEqual(data_source.options.to_dict(), new_options)
class TestDataSourceListAPIPost(BaseTestCase):
def test_returns_400_when_missing_fields(self):
admin = self.factory.create_admin()
rv = self.make_request('post', "/api/data_sources", user=admin)
self.assertEqual(rv.status_code, 400)
rv = self.make_request('post', "/api/data_sources", data={'name': 'DS 1'}, user=admin)
self.assertEqual(rv.status_code, 400)
def test_returns_400_when_configuration_invalid(self):
admin = self.factory.create_admin()
rv = self.make_request('post', '/api/data_sources',
data={'name': 'DS 1', 'type': 'pg', 'options': '{}'}, user=admin)
self.assertEqual(rv.status_code, 400)
def test_creates_data_source(self):
admin = self.factory.create_admin()
rv = self.make_request('post', '/api/data_sources',
data={'name': 'DS 1', 'type': 'pg', 'options': {"dbname": "redash"}}, user=admin)
self.assertEqual(rv.status_code, 200)

View File

@@ -0,0 +1,76 @@
from unittest import TestCase
from jsonschema import ValidationError
from redash.utils.configuration import ConfigurationContainer
configuration_schema = {
"type": "object",
"properties": {
"a": {
"type": "integer"
},
"e": {
"type": "integer"
},
"b": {
"type": "string"
}
},
"required": ["a"],
"secret": ["b"]
}
class TestConfigurationToJson(TestCase):
def setUp(self):
self.config = {'a': 1, 'b': 'test'}
self.container = ConfigurationContainer(self.config, configuration_schema)
def test_returns_plain_dict(self):
self.assertDictEqual(self.config, self.container.to_dict())
def test_raises_exception_when_no_schema_set(self):
self.container.set_schema(None)
self.assertRaises(RuntimeError, lambda: self.container.to_dict(mask_secrets=True))
def test_returns_dict_with_masked_secrets(self):
d = self.container.to_dict(mask_secrets=True)
self.assertEqual(d['a'], self.config['a'])
self.assertNotEqual(d['b'], self.config['b'])
self.assertEqual(self.config['b'], self.container['b'])
class TestConfigurationUpdate(TestCase):
def setUp(self):
self.config = {'a': 1, 'b': 'test'}
self.container = ConfigurationContainer(self.config, configuration_schema)
def test_rejects_invalid_new_config(self):
self.assertRaises(ValidationError, lambda: self.container.update({'c': 3}))
def test_fails_if_no_schema_set(self):
self.container.set_schema(None)
self.assertRaises(RuntimeError, lambda: self.container.update({'c': 3}))
def test_ignores_secret_placehodler(self):
self.container.update(self.container.to_dict(mask_secrets=True))
self.assertEqual(self.container['b'], self.config['b'])
def test_updates_secret(self):
new_config = {'a': 2, 'b': 'new'}
self.container.update(new_config)
self.assertDictEqual(self.container._config, new_config)
def test_doesnt_leave_leftovers(self):
container = ConfigurationContainer({'a': 1, 'b': 'test', 'e': 3}, configuration_schema)
new_config = container.to_dict(mask_secrets=True)
new_config.pop('e')
container.update(new_config)
self.assertEqual(container['a'], 1)
self.assertEqual('test', container['b'])
self.assertNotIn('e', container)

View File

@@ -420,37 +420,4 @@ class TestLogout(BaseTestCase):
self.assertFalse(current_user.is_authenticated) self.assertFalse(current_user.is_authenticated)
class DataSourceTypesTest(BaseTestCase):
def test_returns_data_for_admin(self):
admin = self.factory.create_admin()
rv = self.make_request('get', "/api/data_sources/types", user=admin)
self.assertEqual(rv.status_code, 200)
def test_returns_403_for_non_admin(self):
rv = self.make_request('get', "/api/data_sources/types")
self.assertEqual(rv.status_code, 403)
class DataSourceTest(BaseTestCase):
def test_returns_400_when_missing_fields(self):
admin = self.factory.create_admin()
rv = self.make_request('post', "/api/data_sources", user=admin)
self.assertEqual(rv.status_code, 400)
rv = self.make_request('post', '/api/data_sources', data={'name': 'DS 1'}, user=admin)
self.assertEqual(rv.status_code, 400)
def test_returns_400_when_configuration_invalid(self):
admin = self.factory.create_admin()
rv = self.make_request('post', '/api/data_sources',
data={'name': 'DS 1', 'type': 'pg', 'options': '{}'}, user=admin)
self.assertEqual(rv.status_code, 400)
def test_creates_data_source(self):
admin = self.factory.create_admin()
rv = self.make_request('post', '/api/data_sources',
data={'name': 'DS 1', 'type': 'pg', 'options': {"dbname": "redash"}}, user=admin)
self.assertEqual(rv.status_code, 200)