Merge pull request #1085 from getredash/feature/pause-api

Feature: API to pause a data source
This commit is contained in:
Arik Fraimovich
2016-05-31 09:08:03 +03:00
13 changed files with 210 additions and 13 deletions

View File

@@ -406,6 +406,8 @@
}, function(error) { }, function(error) {
if (error.status === 403) { if (error.status === 403) {
queryResult.update(error.data); queryResult.update(error.data);
} else if (error.status === 400 && 'job' in error.data) {
queryResult.update(error.data);
} }
}); });

View File

@@ -6,7 +6,7 @@ from redash.utils import json_dumps
from redash.handlers.base import org_scoped_rule from redash.handlers.base import org_scoped_rule
from redash.handlers.alerts import AlertResource, AlertListResource, AlertSubscriptionListResource, AlertSubscriptionResource from redash.handlers.alerts import AlertResource, AlertListResource, AlertSubscriptionListResource, AlertSubscriptionResource
from redash.handlers.dashboards import DashboardListResource, RecentDashboardsResource, DashboardResource, DashboardShareResource from redash.handlers.dashboards import DashboardListResource, RecentDashboardsResource, DashboardResource, DashboardShareResource
from redash.handlers.data_sources import DataSourceTypeListResource, DataSourceListResource, DataSourceSchemaResource, DataSourceResource from redash.handlers.data_sources import DataSourceTypeListResource, DataSourceListResource, DataSourceSchemaResource, DataSourceResource, DataSourcePauseResource
from redash.handlers.events import EventResource from redash.handlers.events import EventResource
from redash.handlers.queries import QueryRefreshResource, QueryListResource, QueryRecentResource, QuerySearchResource, QueryResource from redash.handlers.queries import QueryRefreshResource, QueryListResource, QueryRecentResource, QuerySearchResource, QueryResource
from redash.handlers.query_results import QueryResultListResource, QueryResultResource, JobResource from redash.handlers.query_results import QueryResultListResource, QueryResultResource, JobResource
@@ -49,6 +49,7 @@ api.add_org_resource(DashboardShareResource, '/api/dashboards/<dashboard_id>/sha
api.add_org_resource(DataSourceTypeListResource, '/api/data_sources/types', endpoint='data_source_types') api.add_org_resource(DataSourceTypeListResource, '/api/data_sources/types', endpoint='data_source_types')
api.add_org_resource(DataSourceListResource, '/api/data_sources', endpoint='data_sources') api.add_org_resource(DataSourceListResource, '/api/data_sources', endpoint='data_sources')
api.add_org_resource(DataSourceSchemaResource, '/api/data_sources/<data_source_id>/schema') api.add_org_resource(DataSourceSchemaResource, '/api/data_sources/<data_source_id>/schema')
api.add_org_resource(DataSourcePauseResource, '/api/data_sources/<data_source_id>/pause')
api.add_org_resource(DataSourceResource, '/api/data_sources/<data_source_id>', endpoint='data_source') api.add_org_resource(DataSourceResource, '/api/data_sources/<data_source_id>', endpoint='data_source')
api.add_org_resource(GroupListResource, '/api/groups', endpoint='groups') api.add_org_resource(GroupListResource, '/api/groups', endpoint='groups')

View File

@@ -106,3 +106,38 @@ class DataSourceSchemaResource(BaseResource):
return schema return schema
class DataSourcePauseResource(BaseResource):
@require_admin
def post(self, data_source_id):
data_source = get_object_or_404(models.DataSource.get_by_id_and_org, data_source_id, self.current_org)
data = request.get_json(force=True, silent=True)
if data:
reason = data.get('reason')
else:
reason = request.args.get('reason')
data_source.pause(reason)
data_source.save()
self.record_event({
'action': 'pause',
'object_id': data_source.id,
'object_type': 'datasource'
})
return data_source.to_dict()
@require_admin
def delete(self, data_source_id):
data_source = get_object_or_404(models.DataSource.get_by_id_and_org, data_source_id, self.current_org)
data_source.resume()
data_source.save()
self.record_event({
'action': 'resume',
'object_id': data_source.id,
'object_type': 'datasource'
})
return data_source.to_dict()

View File

@@ -16,12 +16,23 @@ from redash.utils import collect_query_parameters, collect_parameters_from_reque
from redash.tasks.queries import enqueue_query from redash.tasks.queries import enqueue_query
def error_response(message):
return {'job': {'status': 4, 'error': message}}, 400
def run_query(data_source, parameter_values, query_text, query_id, max_age=0): def run_query(data_source, parameter_values, query_text, query_id, max_age=0):
query_parameters = set(collect_query_parameters(query_text)) query_parameters = set(collect_query_parameters(query_text))
missing_params = set(query_parameters) - set(parameter_values.keys()) missing_params = set(query_parameters) - set(parameter_values.keys())
if missing_params: if missing_params:
return {'job': {'status': 4, return error_response('Missing parameter value for: {}'.format(", ".join(missing_params)))
'error': 'Missing parameter value for: {}'.format(", ".join(missing_params))}}, 400
if data_source.paused:
if data_source.pause_reason:
message = '{} is paused ({}). Please try later.'.format(data_source.name, data_source.pause_reason)
else:
message = '{} is paused. Please try later.'.format(data_source.name)
return error_response(message)
if query_parameters: if query_parameters:
query_text = pystache.render(query_text, parameter_values) query_text = pystache.render(query_text, parameter_values)

View File

@@ -372,7 +372,9 @@ class DataSource(BelongsToOrgMixin, BaseModel):
'id': self.id, 'id': self.id,
'name': self.name, 'name': self.name,
'type': self.type, 'type': self.type,
'syntax': self.query_runner.syntax 'syntax': self.query_runner.syntax,
'paused': self.paused,
'pause_reason': self.pause_reason
} }
if all: if all:
@@ -414,6 +416,23 @@ class DataSource(BelongsToOrgMixin, BaseModel):
return schema return schema
def _pause_key(self):
return 'ds:{}:pause'.format(self.id)
@property
def paused(self):
return redis_connection.exists(self._pause_key())
@property
def pause_reason(self):
return redis_connection.get(self._pause_key())
def pause(self, reason=None):
redis_connection.set(self._pause_key(), reason)
def resume(self):
redis_connection.delete(self._pause_key())
def add_group(self, group, view_only=False): def add_group(self, group, view_only=False):
dsg = DataSourceGroup.create(group=group, data_source=self, view_only=view_only) dsg = DataSourceGroup.create(group=group, data_source=self, view_only=view_only)
setattr(self, 'data_source_groups', dsg) setattr(self, 'data_source_groups', dsg)

View File

@@ -264,9 +264,13 @@ def refresh_queries():
with statsd_client.timer('manager.outdated_queries_lookup'): with statsd_client.timer('manager.outdated_queries_lookup'):
for query in models.Query.outdated_queries(): for query in models.Query.outdated_queries():
enqueue_query(query.query, query.data_source, if query.data_source.paused:
scheduled=True, logging.info("Skipping refresh of %s because datasource - %s is paused (%s).", query.id, query.data_source.name, query.data_source.pause_reason)
metadata={'Query ID': query.id, 'Username': 'Scheduled'}) else:
enqueue_query(query.query, query.data_source,
scheduled=True,
metadata={'Query ID': query.id, 'Username': 'Scheduled'})
query_ids.append(query.id) query_ids.append(query.id)
outdated_queries_count += 1 outdated_queries_count += 1
@@ -344,11 +348,14 @@ def refresh_schemas():
Refreshes the data sources schemas. Refreshes the data sources schemas.
""" """
for ds in models.DataSource.select(): for ds in models.DataSource.select():
logger.info("Refreshing schema for: {}".format(ds.name)) if ds.paused:
try: logger.info(u"Skipping refresh schema of %s because it is paused (%s).", ds.name, ds.pause_reason)
ds.get_schema(refresh=True) else:
except Exception: logger.info(u"Refreshing schema for: {}".format(ds.name))
logger.exception("Failed refreshing schema for the data source: %s", ds.name) try:
ds.get_schema(refresh=True)
except Exception:
logger.exception(u"Failed refreshing schema for the data source: %s", ds.name)
def signal_handler(*args): def signal_handler(*args):

View File

@@ -63,6 +63,9 @@ class ConfigurationContainer(object):
def get(self, *args, **kwargs): def get(self, *args, **kwargs):
return self._config.get(*args, **kwargs) return self._config.get(*args, **kwargs)
def __setitem__(self, key, value):
self._config[key] = value
def __getitem__(self, item): def __getitem__(self, item):
if item in self._config: if item in self._config:
return self._config[item] return self._config[item]

View File

@@ -52,7 +52,8 @@ org_factory = ModelFactory(redash.models.Organization,
data_source_factory = ModelFactory(redash.models.DataSource, data_source_factory = ModelFactory(redash.models.DataSource,
name=Sequence('Test {}'), name=Sequence('Test {}'),
type='pg', type='pg',
options=ConfigurationContainer.from_json('{"dbname": "test"}'), # If we don't use lambda here it will reuse the same options between tests:
options=lambda: ConfigurationContainer.from_json('{"dbname": "test"}'),
org=1) org=1)
dashboard_factory = ModelFactory(redash.models.Dashboard, dashboard_factory = ModelFactory(redash.models.Dashboard,

View File

@@ -96,3 +96,39 @@ class TestDataSourceListAPIPost(BaseTestCase):
data={'name': 'DS 1', 'type': 'pg', 'options': {"dbname": "redash"}}, user=admin) data={'name': 'DS 1', 'type': 'pg', 'options': {"dbname": "redash"}}, user=admin)
self.assertEqual(rv.status_code, 200) self.assertEqual(rv.status_code, 200)
class TestDataSourcePausePost(BaseTestCase):
def test_pauses_data_source(self):
admin = self.factory.create_admin()
rv = self.make_request('post', '/api/data_sources/{}/pause'.format(self.factory.data_source.id), user=admin)
self.assertEqual(rv.status_code, 200)
self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).paused, True)
def test_pause_sets_reason(self):
admin = self.factory.create_admin()
rv = self.make_request('post', '/api/data_sources/{}/pause'.format(self.factory.data_source.id), user=admin, data={'reason': 'testing'})
self.assertEqual(rv.status_code, 200)
self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).paused, True)
self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).pause_reason, 'testing')
rv = self.make_request('post', '/api/data_sources/{}/pause?reason=test'.format(self.factory.data_source.id), user=admin)
self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).pause_reason, 'test')
def test_requires_admin(self):
rv = self.make_request('post', '/api/data_sources/{}/pause'.format(self.factory.data_source.id))
self.assertEqual(rv.status_code, 403)
class TestDataSourcePauseDelete(BaseTestCase):
def test_resumes_data_source(self):
admin = self.factory.create_admin()
self.factory.data_source.pause()
self.factory.data_source.save()
rv = self.make_request('delete', '/api/data_sources/{}/pause'.format(self.factory.data_source.id), user=admin)
self.assertEqual(rv.status_code, 200)
self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).paused, False)
def test_requires_admin(self):
rv = self.make_request('delete', '/api/data_sources/{}/pause'.format(self.factory.data_source.id))
self.assertEqual(rv.status_code, 403)

View File

@@ -73,6 +73,18 @@ class TestQueryResultListAPI(BaseTestCase):
self.assertEquals(rv.status_code, 200) self.assertEquals(rv.status_code, 200)
self.assertIn('job', rv.json) self.assertIn('job', rv.json)
def test_execute_on_paused_data_source(self):
self.factory.data_source.pause()
rv = self.make_request('post', '/api/query_results',
data={'data_source_id': self.factory.data_source.id,
'query': 'SELECT 1',
'max_age': 0})
self.assertEquals(rv.status_code, 400)
self.assertNotIn('query_result', rv.json)
self.assertIn('job', rv.json)
class TestQueryResultAPI(BaseTestCase): class TestQueryResultAPI(BaseTestCase):
def test_has_no_access_to_data_source(self): def test_has_no_access_to_data_source(self):

View File

@@ -7,3 +7,29 @@ class TestDataSourceCreate(BaseTestCase):
def test_adds_data_source_to_default_group(self): def test_adds_data_source_to_default_group(self):
data_source = DataSource.create_with_group(org=self.factory.org, name='test', options=ConfigurationContainer.from_json('{"dbname": "test"}'), type='pg') data_source = DataSource.create_with_group(org=self.factory.org, name='test', options=ConfigurationContainer.from_json('{"dbname": "test"}'), type='pg')
self.assertIn(self.factory.org.default_group.id, data_source.groups) self.assertIn(self.factory.org.default_group.id, data_source.groups)
class TestDataSourceIsPaused(BaseTestCase):
def test_returns_false_by_default(self):
self.assertFalse(self.factory.data_source.paused)
def test_persists_selection(self):
self.factory.data_source.pause()
self.assertTrue(self.factory.data_source.paused)
self.factory.data_source.resume()
self.assertFalse(self.factory.data_source.paused)
def test_allows_setting_reason(self):
reason = "Some good reason."
self.factory.data_source.pause(reason)
self.assertTrue(self.factory.data_source.paused)
self.assertEqual(self.factory.data_source.pause_reason, reason)
def test_resume_clears_reason(self):
self.factory.data_source.pause("Reason")
self.factory.data_source.resume()
self.assertEqual(self.factory.data_source.pause_reason, None)
def test_reason_is_none_by_default(self):
self.assertEqual(self.factory.data_source.pause_reason, None)

View File

@@ -21,6 +21,26 @@ class TestRefreshQueries(BaseTestCase):
refresh_queries() refresh_queries()
add_job_mock.assert_called_with(query.query, query.data_source, scheduled=True, metadata=ANY) add_job_mock.assert_called_with(query.query, query.data_source, scheduled=True, metadata=ANY)
def test_doesnt_enqueue_outdated_queries_for_paused_data_source(self):
query = self.factory.create_query(schedule="60")
retrieved_at = utcnow() - datetime.timedelta(minutes=10)
query_result = self.factory.create_query_result(retrieved_at=retrieved_at, query=query.query,
query_hash=query.query_hash)
query.latest_query_data = query_result
query.save()
query.data_source.pause()
with patch('redash.tasks.queries.enqueue_query') as add_job_mock:
refresh_queries()
add_job_mock.assert_not_called()
query.data_source.resume()
with patch('redash.tasks.queries.enqueue_query') as add_job_mock:
refresh_queries()
add_job_mock.assert_called_with(query.query, query.data_source, scheduled=True, metadata=ANY)
def test_skips_fresh_queries(self): def test_skips_fresh_queries(self):
query = self.factory.create_query(schedule="1200") query = self.factory.create_query(schedule="1200")
retrieved_at = utcnow() - datetime.timedelta(minutes=10) retrieved_at = utcnow() - datetime.timedelta(minutes=10)

View File

@@ -0,0 +1,24 @@
import datetime
from mock import patch, call, ANY
from tests import BaseTestCase
from redash.tasks import refresh_schemas
class TestRefreshSchemas(BaseTestCase):
def test_calls_refresh_of_all_data_sources(self):
with patch('redash.models.DataSource.get_schema') as get_schema:
refresh_schemas()
get_schema.assert_called_with(refresh=True)
def test_skips_paused_data_sources(self):
self.factory.data_source.pause()
with patch('redash.models.DataSource.get_schema') as get_schema:
refresh_schemas()
get_schema.assert_not_called()
self.factory.data_source.resume()
with patch('redash.models.DataSource.get_schema') as get_schema:
refresh_schemas()
get_schema.assert_called_with(refresh=True)