Encapsulate data source/query runner configuration in an object.

This is a step towards adding more complex logic in configuration
handling, like encryption of secrets.
This commit is contained in:
Arik Fraimovich
2016-02-18 17:32:49 +02:00
parent f1e90fde31
commit ed99b8452c
13 changed files with 292 additions and 102 deletions

View File

@@ -1,13 +1,31 @@
import json
import jsonschema
from jsonschema import ValidationError
from redash import query_runner
from redash.models import DataSource
def validate_configuration(query_runner_type, configuration_json):
query_runner_class = query_runner.query_runners.get(query_runner_type, None)
if query_runner_class is None:
return False
try:
if isinstance(configuration_json, basestring):
configuration = json.loads(configuration_json)
else:
configuration = configuration_json
jsonschema.validate(configuration, query_runner_class.configuration_schema())
except (ValidationError, ValueError):
return False
return True
def update(data_source):
print "[%s] Old options: %s" % (data_source.name, data_source.options)
if query_runner.validate_configuration(data_source.type, data_source.options):
if validate_configuration(data_source.type, data_source.options):
print "[%s] configuration already valid. skipping." % data_source.name
return

View File

@@ -2,7 +2,8 @@ import json
import click
from flask.ext.script import Manager
from redash import models
from redash.query_runner import query_runners, validate_configuration
from redash.query_runner import query_runners, get_configuration_schema_for_type
from redash.utils.configuration import ConfigurationContainer
manager = Manager(help="Data sources management commands.")
@@ -23,12 +24,6 @@ def validate_data_source_type(type):
exit()
def validate_data_source_options(type, options):
if not validate_configuration(type, options):
print "Error: invalid configuration."
exit()
@manager.command
def new(name=None, type=None, options=None):
"""Create new data source"""
@@ -76,9 +71,10 @@ def new(name=None, type=None, options=None):
if value != default_value:
options_obj[k] = value
options = json.dumps(options_obj)
validate_data_source_options(type, options)
options = ConfigurationContainer(options_obj, schema)
if not options.is_valid():
print "Error: invalid configuration."
exit()
print "Creating {} data source ({}) with options:\n{}".format(type, name, options)
@@ -120,7 +116,10 @@ def edit(name, new_name=None, options=None, type=None):
data_source = models.DataSource.get(models.DataSource.name==name)
if options is not None:
validate_data_source_options(data_source.type, options)
schema = get_configuration_schema_for_type(data_source.type)
options = json.loads(options)
data_source.options.set_schema(schema)
data_source.options.update(options)
update_attr(data_source, "name", new_name)
update_attr(data_source, "type", type)

View File

@@ -1,13 +1,12 @@
import json
from flask import make_response, request
from flask.ext.restful import abort
from funcy import project
from redash import models
from redash.wsgi import api
from redash.utils.configuration import ConfigurationContainer, ValidationError
from redash.permissions import require_admin
from redash.query_runner import query_runners, validate_configuration
from redash.query_runner import query_runners, get_configuration_schema_for_type
from redash.handlers.base import BaseResource, get_object_or_404
@@ -30,14 +29,18 @@ class DataSourceAPI(BaseResource):
data_source = models.DataSource.get_by_id_and_org(data_source_id, self.current_org)
req = request.get_json(True)
data_source.replace_secret_placeholders(req['options'])
if not validate_configuration(req['type'], req['options']):
schema = get_configuration_schema_for_type(req['type'])
if schema is None:
abort(400)
data_source.name = req['name']
data_source.options = json.dumps(req['options'])
try:
data_source.options.set_schema(schema)
data_source.options.update(req['options'])
except ValidationError:
abort(400)
data_source.type = req['type']
data_source.name = req['name']
data_source.save()
return data_source.to_dict(all=True)
@@ -76,12 +79,18 @@ class DataSourceListAPI(BaseResource):
if f not in req:
abort(400)
if not validate_configuration(req['type'], req['options']):
schema = get_configuration_schema_for_type(req['type'])
if schema is None:
abort(400)
config = ConfigurationContainer(req['options'], schema)
if not config.is_valid():
abort(400)
datasource = models.DataSource.create_with_group(org=self.current_org,
name=req['name'],
type=req['type'], options=json.dumps(req['options']))
type=req['type'],
options=config)
return datasource.to_dict(all=True)

View File

@@ -10,14 +10,16 @@ from funcy import project
import peewee
from passlib.apps import custom_app_context as pwd_context
from playhouse.postgres_ext import ArrayField, DateTimeTZField, PostgresqlExtDatabase
from playhouse.postgres_ext import ArrayField, DateTimeTZField
from flask.ext.login import UserMixin, AnonymousUserMixin
from permissions import has_access, view_only
from redash import utils, settings, redis_connection
from redash.query_runner import get_query_runner
from redash.metrics.database import MeteredPostgresqlExtDatabase, MeteredModel
from utils import generate_token
from redash.utils import generate_token
from redash.utils.configuration import ConfigurationContainer
class Database(object):
@@ -314,14 +316,20 @@ class User(ModelTimestampsMixin, BaseModel, BelongsToOrgMixin, UserMixin, Permis
return self.password_hash and pwd_context.verify(password, self.password_hash)
class DataSource(BelongsToOrgMixin, BaseModel):
SECRET_PLACEHOLDER = '--------'
class ConfigurationField(peewee.TextField):
def db_value(self, value):
return value.to_json()
def python_value(self, value):
return ConfigurationContainer.from_json(value)
class DataSource(BelongsToOrgMixin, BaseModel):
id = peewee.PrimaryKeyField()
org = peewee.ForeignKeyField(Organization, related_name="data_sources")
name = peewee.CharField()
type = peewee.CharField()
options = peewee.TextField()
options = ConfigurationField()
queue_name = peewee.CharField(default="queries")
scheduled_queue_name = peewee.CharField(default="scheduled_queries")
created_at = DateTimeTZField(default=datetime.datetime.now)
@@ -342,7 +350,7 @@ class DataSource(BelongsToOrgMixin, BaseModel):
}
if all:
d['options'] = self.configuration
d['options'] = self.options.to_dict(mask_secrets=True)
d['queue_name'] = self.queue_name
d['scheduled_queue_name'] = self.scheduled_queue_name
d['groups'] = self.groups
@@ -361,23 +369,6 @@ class DataSource(BelongsToOrgMixin, BaseModel):
DataSourceGroup.create(data_source=data_source, group=data_source.org.default_group)
return data_source
@property
def configuration(self):
configuration = json.loads(self.options)
schema = self.query_runner.configuration_schema()
for prop in schema.get('secret', []):
if prop in configuration and configuration[prop]:
configuration[prop] = self.SECRET_PLACEHOLDER
return configuration
def replace_secret_placeholders(self, configuration):
current_configuration = json.loads(self.options)
schema = self.query_runner.configuration_schema()
for prop in schema.get('secret', []):
if prop in configuration and configuration[prop] == self.SECRET_PLACEHOLDER:
configuration[prop] = current_configuration[prop]
def get_schema(self, refresh=False):
key = "data_source:schema:{}".format(self.id)

View File

@@ -1,14 +1,11 @@
import logging
import json
import jsonschema
from jsonschema import ValidationError
from redash import settings
logger = logging.getLogger(__name__)
__all__ = [
'ValidationError',
'BaseQueryRunner',
'InterruptException',
'BaseSQLQueryRunner',
@@ -41,12 +38,13 @@ SUPPORTED_COLUMN_TYPES = set([
TYPE_DATE
])
class InterruptException(Exception):
pass
class BaseQueryRunner(object):
def __init__(self, configuration):
jsonschema.validate(configuration, self.configuration_schema())
self.syntax = 'sql'
self.configuration = configuration
@@ -142,29 +140,20 @@ def register(query_runner_class):
logger.warning("%s query runner enabled but not supported, not registering. Either disable or install missing dependencies.", query_runner_class.name())
def get_query_runner(query_runner_type, configuration_json):
def get_query_runner(query_runner_type, configuration):
query_runner_class = query_runners.get(query_runner_type, None)
if query_runner_class is None:
return None
return query_runner_class(json.loads(configuration_json))
return query_runner_class(configuration)
def validate_configuration(query_runner_type, configuration_json):
def get_configuration_schema_for_type(query_runner_type):
query_runner_class = query_runners.get(query_runner_type, None)
if query_runner_class is None:
return False
return None
try:
if isinstance(configuration_json, basestring):
configuration = json.loads(configuration_json)
else:
configuration = configuration_json
jsonschema.validate(configuration, query_runner_class.configuration_schema())
except (ValidationError, ValueError):
return False
return True
return query_runner_class.configuration_schema()
def import_query_runners(query_runner_imports):

View File

@@ -144,9 +144,7 @@ class Python(BaseQueryRunner):
except models.DataSource.DoesNotExist:
raise Exception("Wrong data source name/id: %s." % data_source_name_or_id)
query_runner = get_query_runner(data_source.type, data_source.options)
data, error = query_runner.run_query(query)
data, error = data_source.query_runner.run_query(query)
if error is not None:
raise Exception(error)

View File

@@ -14,7 +14,7 @@ from celery.utils.log import get_task_logger
from redash import redis_connection, models, statsd_client, settings, utils, mail
from redash.utils import gen_query_hash
from redash.worker import celery
from redash.query_runner import get_query_runner, InterruptException
from redash.query_runner import InterruptException
from version_check import run_version_check
logger = get_task_logger(__name__)
@@ -270,7 +270,7 @@ def execute_query(self, query, data_source_id, metadata):
logger.debug("Executing query:\n%s", query)
query_hash = gen_query_hash(query)
query_runner = get_query_runner(data_source.type, data_source.options)
query_runner = data_source.query_runner
logger.info("task=execute_query state=before query_hash=%s type=%s ds_id=%d task_id=%s queue=%s query_id=%s username=%s",
query_hash, data_source.type, data_source.id, self.request.id, self.request.delivery_info['routing_key'],

View File

@@ -0,0 +1,77 @@
import json
import jsonschema
from jsonschema import ValidationError
SECRET_PLACEHOLDER = '--------'
class ConfigurationContainer(object):
def __init__(self, config, schema=None):
self._config = config
self.set_schema(schema)
def set_schema(self, schema):
self._schema = schema
@property
def schema(self):
if self._schema is None:
raise RuntimeError("Schema missing.")
return self._schema
def is_valid(self):
try:
self.validate()
except (ValidationError, ValueError):
return False
return True
def validate(self):
jsonschema.validate(self._config, self._schema)
def to_json(self):
return json.dumps(self._config)
def iteritems(self):
return self._config.iteritems()
def to_dict(self, mask_secrets=False):
if mask_secrets is False:
return self._config
config = self._config.copy()
for key in config:
if key in self.schema['secret']:
config[key] = SECRET_PLACEHOLDER
return config
def update(self, new_config):
jsonschema.validate(new_config, self.schema)
config = {}
for k, v in new_config.iteritems():
if k in self.schema['secret'] and v == SECRET_PLACEHOLDER:
config[k] = self[k]
else:
config[k] = v
self._config = config
def get(self, *args, **kwargs):
return self._config.get(*args, **kwargs)
def __getitem__(self, item):
if item in self._config:
return self._config[item]
raise KeyError(item)
def __contains__(self, item):
return item in self._config
@classmethod
def from_json(cls, config_in_json):
return cls(json.loads(config_in_json))

View File

@@ -1,5 +1,6 @@
import redash.models
from redash.utils import gen_query_hash, utcnow
from redash.utils.configuration import ConfigurationContainer
class ModelFactory(object):
@@ -51,7 +52,7 @@ org_factory = ModelFactory(redash.models.Organization,
data_source_factory = ModelFactory(redash.models.DataSource,
name=Sequence('Test {}'),
type='pg',
options='{"dbname": "test"}',
options=ConfigurationContainer.from_json('{"dbname": "test"}'),
org=1)
dashboard_factory = ModelFactory(redash.models.Dashboard,

View File

@@ -1,4 +1,6 @@
import json
from tests import BaseTestCase
from redash.models import DataSource
class TestDataSourceGetSchema(BaseTestCase):
@@ -23,3 +25,66 @@ class TestDataSourceListGet(BaseTestCase):
self.assertEqual(len(response.json), 1)
class DataSourceTypesTest(BaseTestCase):
def test_returns_data_for_admin(self):
admin = self.factory.create_admin()
rv = self.make_request('get', "/api/data_sources/types", user=admin)
self.assertEqual(rv.status_code, 200)
def test_returns_403_for_non_admin(self):
rv = self.make_request('get', "/api/data_sources/types")
self.assertEqual(rv.status_code, 403)
class TestDataSourceAPIPost(BaseTestCase):
def setUp(self):
super(TestDataSourceAPIPost, self).setUp()
self.path = "/api/data_sources/{}".format(self.factory.data_source.id)
def test_returns_400_when_configuration_invalid(self):
admin = self.factory.create_admin()
rv = self.make_request('post', self.path,
data={'name': 'DS 1', 'type': 'pg', 'options': '{}'}, user=admin)
self.assertEqual(rv.status_code, 400)
def test_updates_data_source(self):
admin = self.factory.create_admin()
new_name = 'New Name'
new_options = {"dbname": "newdb"}
rv = self.make_request('post', self.path,
data={'name': new_name, 'type': 'pg', 'options': new_options},
user=admin)
self.assertEqual(rv.status_code, 200)
data_source = DataSource.get_by_id(self.factory.data_source.id)
self.assertEqual(data_source.name, new_name)
self.assertEqual(data_source.options.to_dict(), new_options)
class TestDataSourceListAPIPost(BaseTestCase):
def test_returns_400_when_missing_fields(self):
admin = self.factory.create_admin()
rv = self.make_request('post', "/api/data_sources", user=admin)
self.assertEqual(rv.status_code, 400)
rv = self.make_request('post', "/api/data_sources", data={'name': 'DS 1'}, user=admin)
self.assertEqual(rv.status_code, 400)
def test_returns_400_when_configuration_invalid(self):
admin = self.factory.create_admin()
rv = self.make_request('post', '/api/data_sources',
data={'name': 'DS 1', 'type': 'pg', 'options': '{}'}, user=admin)
self.assertEqual(rv.status_code, 400)
def test_creates_data_source(self):
admin = self.factory.create_admin()
rv = self.make_request('post', '/api/data_sources',
data={'name': 'DS 1', 'type': 'pg', 'options': {"dbname": "redash"}}, user=admin)
self.assertEqual(rv.status_code, 200)

View File

@@ -0,0 +1,76 @@
from unittest import TestCase
from jsonschema import ValidationError
from redash.utils.configuration import ConfigurationContainer
configuration_schema = {
"type": "object",
"properties": {
"a": {
"type": "integer"
},
"e": {
"type": "integer"
},
"b": {
"type": "string"
}
},
"required": ["a"],
"secret": ["b"]
}
class TestConfigurationToJson(TestCase):
def setUp(self):
self.config = {'a': 1, 'b': 'test'}
self.container = ConfigurationContainer(self.config, configuration_schema)
def test_returns_plain_dict(self):
self.assertDictEqual(self.config, self.container.to_dict())
def test_raises_exception_when_no_schema_set(self):
self.container.set_schema(None)
self.assertRaises(RuntimeError, lambda: self.container.to_dict(mask_secrets=True))
def test_returns_dict_with_masked_secrets(self):
d = self.container.to_dict(mask_secrets=True)
self.assertEqual(d['a'], self.config['a'])
self.assertNotEqual(d['b'], self.config['b'])
self.assertEqual(self.config['b'], self.container['b'])
class TestConfigurationUpdate(TestCase):
def setUp(self):
self.config = {'a': 1, 'b': 'test'}
self.container = ConfigurationContainer(self.config, configuration_schema)
def test_rejects_invalid_new_config(self):
self.assertRaises(ValidationError, lambda: self.container.update({'c': 3}))
def test_fails_if_no_schema_set(self):
self.container.set_schema(None)
self.assertRaises(RuntimeError, lambda: self.container.update({'c': 3}))
def test_ignores_secret_placehodler(self):
self.container.update(self.container.to_dict(mask_secrets=True))
self.assertEqual(self.container['b'], self.config['b'])
def test_updates_secret(self):
new_config = {'a': 2, 'b': 'new'}
self.container.update(new_config)
self.assertDictEqual(self.container._config, new_config)
def test_doesnt_leave_leftovers(self):
container = ConfigurationContainer({'a': 1, 'b': 'test', 'e': 3}, configuration_schema)
new_config = container.to_dict(mask_secrets=True)
new_config.pop('e')
container.update(new_config)
self.assertEqual(container['a'], 1)
self.assertEqual('test', container['b'])
self.assertNotIn('e', container)

View File

@@ -420,37 +420,4 @@ class TestLogout(BaseTestCase):
self.assertFalse(current_user.is_authenticated)
class DataSourceTypesTest(BaseTestCase):
def test_returns_data_for_admin(self):
admin = self.factory.create_admin()
rv = self.make_request('get', "/api/data_sources/types", user=admin)
self.assertEqual(rv.status_code, 200)
def test_returns_403_for_non_admin(self):
rv = self.make_request('get', "/api/data_sources/types")
self.assertEqual(rv.status_code, 403)
class DataSourceTest(BaseTestCase):
def test_returns_400_when_missing_fields(self):
admin = self.factory.create_admin()
rv = self.make_request('post', "/api/data_sources", user=admin)
self.assertEqual(rv.status_code, 400)
rv = self.make_request('post', '/api/data_sources', data={'name': 'DS 1'}, user=admin)
self.assertEqual(rv.status_code, 400)
def test_returns_400_when_configuration_invalid(self):
admin = self.factory.create_admin()
rv = self.make_request('post', '/api/data_sources',
data={'name': 'DS 1', 'type': 'pg', 'options': '{}'}, user=admin)
self.assertEqual(rv.status_code, 400)
def test_creates_data_source(self):
admin = self.factory.create_admin()
rv = self.make_request('post', '/api/data_sources',
data={'name': 'DS 1', 'type': 'pg', 'options': {"dbname": "redash"}}, user=admin)
self.assertEqual(rv.status_code, 200)