mirror of
https://github.com/getredash/redash.git
synced 2025-12-19 17:37:19 -05:00
Merge pull request #1085 from getredash/feature/pause-api
Feature: API to pause a data source
This commit is contained in:
@@ -406,6 +406,8 @@
|
|||||||
}, function(error) {
|
}, function(error) {
|
||||||
if (error.status === 403) {
|
if (error.status === 403) {
|
||||||
queryResult.update(error.data);
|
queryResult.update(error.data);
|
||||||
|
} else if (error.status === 400 && 'job' in error.data) {
|
||||||
|
queryResult.update(error.data);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ from redash.utils import json_dumps
|
|||||||
from redash.handlers.base import org_scoped_rule
|
from redash.handlers.base import org_scoped_rule
|
||||||
from redash.handlers.alerts import AlertResource, AlertListResource, AlertSubscriptionListResource, AlertSubscriptionResource
|
from redash.handlers.alerts import AlertResource, AlertListResource, AlertSubscriptionListResource, AlertSubscriptionResource
|
||||||
from redash.handlers.dashboards import DashboardListResource, RecentDashboardsResource, DashboardResource, DashboardShareResource
|
from redash.handlers.dashboards import DashboardListResource, RecentDashboardsResource, DashboardResource, DashboardShareResource
|
||||||
from redash.handlers.data_sources import DataSourceTypeListResource, DataSourceListResource, DataSourceSchemaResource, DataSourceResource
|
from redash.handlers.data_sources import DataSourceTypeListResource, DataSourceListResource, DataSourceSchemaResource, DataSourceResource, DataSourcePauseResource
|
||||||
from redash.handlers.events import EventResource
|
from redash.handlers.events import EventResource
|
||||||
from redash.handlers.queries import QueryRefreshResource, QueryListResource, QueryRecentResource, QuerySearchResource, QueryResource
|
from redash.handlers.queries import QueryRefreshResource, QueryListResource, QueryRecentResource, QuerySearchResource, QueryResource
|
||||||
from redash.handlers.query_results import QueryResultListResource, QueryResultResource, JobResource
|
from redash.handlers.query_results import QueryResultListResource, QueryResultResource, JobResource
|
||||||
@@ -49,6 +49,7 @@ api.add_org_resource(DashboardShareResource, '/api/dashboards/<dashboard_id>/sha
|
|||||||
api.add_org_resource(DataSourceTypeListResource, '/api/data_sources/types', endpoint='data_source_types')
|
api.add_org_resource(DataSourceTypeListResource, '/api/data_sources/types', endpoint='data_source_types')
|
||||||
api.add_org_resource(DataSourceListResource, '/api/data_sources', endpoint='data_sources')
|
api.add_org_resource(DataSourceListResource, '/api/data_sources', endpoint='data_sources')
|
||||||
api.add_org_resource(DataSourceSchemaResource, '/api/data_sources/<data_source_id>/schema')
|
api.add_org_resource(DataSourceSchemaResource, '/api/data_sources/<data_source_id>/schema')
|
||||||
|
api.add_org_resource(DataSourcePauseResource, '/api/data_sources/<data_source_id>/pause')
|
||||||
api.add_org_resource(DataSourceResource, '/api/data_sources/<data_source_id>', endpoint='data_source')
|
api.add_org_resource(DataSourceResource, '/api/data_sources/<data_source_id>', endpoint='data_source')
|
||||||
|
|
||||||
api.add_org_resource(GroupListResource, '/api/groups', endpoint='groups')
|
api.add_org_resource(GroupListResource, '/api/groups', endpoint='groups')
|
||||||
|
|||||||
@@ -106,3 +106,38 @@ class DataSourceSchemaResource(BaseResource):
|
|||||||
|
|
||||||
return schema
|
return schema
|
||||||
|
|
||||||
|
|
||||||
|
class DataSourcePauseResource(BaseResource):
|
||||||
|
@require_admin
|
||||||
|
def post(self, data_source_id):
|
||||||
|
data_source = get_object_or_404(models.DataSource.get_by_id_and_org, data_source_id, self.current_org)
|
||||||
|
data = request.get_json(force=True, silent=True)
|
||||||
|
if data:
|
||||||
|
reason = data.get('reason')
|
||||||
|
else:
|
||||||
|
reason = request.args.get('reason')
|
||||||
|
|
||||||
|
data_source.pause(reason)
|
||||||
|
data_source.save()
|
||||||
|
|
||||||
|
self.record_event({
|
||||||
|
'action': 'pause',
|
||||||
|
'object_id': data_source.id,
|
||||||
|
'object_type': 'datasource'
|
||||||
|
})
|
||||||
|
|
||||||
|
return data_source.to_dict()
|
||||||
|
|
||||||
|
@require_admin
|
||||||
|
def delete(self, data_source_id):
|
||||||
|
data_source = get_object_or_404(models.DataSource.get_by_id_and_org, data_source_id, self.current_org)
|
||||||
|
data_source.resume()
|
||||||
|
data_source.save()
|
||||||
|
|
||||||
|
self.record_event({
|
||||||
|
'action': 'resume',
|
||||||
|
'object_id': data_source.id,
|
||||||
|
'object_type': 'datasource'
|
||||||
|
})
|
||||||
|
|
||||||
|
return data_source.to_dict()
|
||||||
|
|||||||
@@ -16,12 +16,23 @@ from redash.utils import collect_query_parameters, collect_parameters_from_reque
|
|||||||
from redash.tasks.queries import enqueue_query
|
from redash.tasks.queries import enqueue_query
|
||||||
|
|
||||||
|
|
||||||
|
def error_response(message):
|
||||||
|
return {'job': {'status': 4, 'error': message}}, 400
|
||||||
|
|
||||||
|
|
||||||
def run_query(data_source, parameter_values, query_text, query_id, max_age=0):
|
def run_query(data_source, parameter_values, query_text, query_id, max_age=0):
|
||||||
query_parameters = set(collect_query_parameters(query_text))
|
query_parameters = set(collect_query_parameters(query_text))
|
||||||
missing_params = set(query_parameters) - set(parameter_values.keys())
|
missing_params = set(query_parameters) - set(parameter_values.keys())
|
||||||
if missing_params:
|
if missing_params:
|
||||||
return {'job': {'status': 4,
|
return error_response('Missing parameter value for: {}'.format(", ".join(missing_params)))
|
||||||
'error': 'Missing parameter value for: {}'.format(", ".join(missing_params))}}, 400
|
|
||||||
|
if data_source.paused:
|
||||||
|
if data_source.pause_reason:
|
||||||
|
message = '{} is paused ({}). Please try later.'.format(data_source.name, data_source.pause_reason)
|
||||||
|
else:
|
||||||
|
message = '{} is paused. Please try later.'.format(data_source.name)
|
||||||
|
|
||||||
|
return error_response(message)
|
||||||
|
|
||||||
if query_parameters:
|
if query_parameters:
|
||||||
query_text = pystache.render(query_text, parameter_values)
|
query_text = pystache.render(query_text, parameter_values)
|
||||||
|
|||||||
@@ -372,7 +372,9 @@ class DataSource(BelongsToOrgMixin, BaseModel):
|
|||||||
'id': self.id,
|
'id': self.id,
|
||||||
'name': self.name,
|
'name': self.name,
|
||||||
'type': self.type,
|
'type': self.type,
|
||||||
'syntax': self.query_runner.syntax
|
'syntax': self.query_runner.syntax,
|
||||||
|
'paused': self.paused,
|
||||||
|
'pause_reason': self.pause_reason
|
||||||
}
|
}
|
||||||
|
|
||||||
if all:
|
if all:
|
||||||
@@ -414,6 +416,23 @@ class DataSource(BelongsToOrgMixin, BaseModel):
|
|||||||
|
|
||||||
return schema
|
return schema
|
||||||
|
|
||||||
|
def _pause_key(self):
|
||||||
|
return 'ds:{}:pause'.format(self.id)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def paused(self):
|
||||||
|
return redis_connection.exists(self._pause_key())
|
||||||
|
|
||||||
|
@property
|
||||||
|
def pause_reason(self):
|
||||||
|
return redis_connection.get(self._pause_key())
|
||||||
|
|
||||||
|
def pause(self, reason=None):
|
||||||
|
redis_connection.set(self._pause_key(), reason)
|
||||||
|
|
||||||
|
def resume(self):
|
||||||
|
redis_connection.delete(self._pause_key())
|
||||||
|
|
||||||
def add_group(self, group, view_only=False):
|
def add_group(self, group, view_only=False):
|
||||||
dsg = DataSourceGroup.create(group=group, data_source=self, view_only=view_only)
|
dsg = DataSourceGroup.create(group=group, data_source=self, view_only=view_only)
|
||||||
setattr(self, 'data_source_groups', dsg)
|
setattr(self, 'data_source_groups', dsg)
|
||||||
|
|||||||
@@ -264,9 +264,13 @@ def refresh_queries():
|
|||||||
|
|
||||||
with statsd_client.timer('manager.outdated_queries_lookup'):
|
with statsd_client.timer('manager.outdated_queries_lookup'):
|
||||||
for query in models.Query.outdated_queries():
|
for query in models.Query.outdated_queries():
|
||||||
enqueue_query(query.query, query.data_source,
|
if query.data_source.paused:
|
||||||
scheduled=True,
|
logging.info("Skipping refresh of %s because datasource - %s is paused (%s).", query.id, query.data_source.name, query.data_source.pause_reason)
|
||||||
metadata={'Query ID': query.id, 'Username': 'Scheduled'})
|
else:
|
||||||
|
enqueue_query(query.query, query.data_source,
|
||||||
|
scheduled=True,
|
||||||
|
metadata={'Query ID': query.id, 'Username': 'Scheduled'})
|
||||||
|
|
||||||
query_ids.append(query.id)
|
query_ids.append(query.id)
|
||||||
outdated_queries_count += 1
|
outdated_queries_count += 1
|
||||||
|
|
||||||
@@ -344,11 +348,14 @@ def refresh_schemas():
|
|||||||
Refreshes the data sources schemas.
|
Refreshes the data sources schemas.
|
||||||
"""
|
"""
|
||||||
for ds in models.DataSource.select():
|
for ds in models.DataSource.select():
|
||||||
logger.info("Refreshing schema for: {}".format(ds.name))
|
if ds.paused:
|
||||||
try:
|
logger.info(u"Skipping refresh schema of %s because it is paused (%s).", ds.name, ds.pause_reason)
|
||||||
ds.get_schema(refresh=True)
|
else:
|
||||||
except Exception:
|
logger.info(u"Refreshing schema for: {}".format(ds.name))
|
||||||
logger.exception("Failed refreshing schema for the data source: %s", ds.name)
|
try:
|
||||||
|
ds.get_schema(refresh=True)
|
||||||
|
except Exception:
|
||||||
|
logger.exception(u"Failed refreshing schema for the data source: %s", ds.name)
|
||||||
|
|
||||||
|
|
||||||
def signal_handler(*args):
|
def signal_handler(*args):
|
||||||
|
|||||||
@@ -63,6 +63,9 @@ class ConfigurationContainer(object):
|
|||||||
def get(self, *args, **kwargs):
|
def get(self, *args, **kwargs):
|
||||||
return self._config.get(*args, **kwargs)
|
return self._config.get(*args, **kwargs)
|
||||||
|
|
||||||
|
def __setitem__(self, key, value):
|
||||||
|
self._config[key] = value
|
||||||
|
|
||||||
def __getitem__(self, item):
|
def __getitem__(self, item):
|
||||||
if item in self._config:
|
if item in self._config:
|
||||||
return self._config[item]
|
return self._config[item]
|
||||||
|
|||||||
@@ -52,7 +52,8 @@ org_factory = ModelFactory(redash.models.Organization,
|
|||||||
data_source_factory = ModelFactory(redash.models.DataSource,
|
data_source_factory = ModelFactory(redash.models.DataSource,
|
||||||
name=Sequence('Test {}'),
|
name=Sequence('Test {}'),
|
||||||
type='pg',
|
type='pg',
|
||||||
options=ConfigurationContainer.from_json('{"dbname": "test"}'),
|
# If we don't use lambda here it will reuse the same options between tests:
|
||||||
|
options=lambda: ConfigurationContainer.from_json('{"dbname": "test"}'),
|
||||||
org=1)
|
org=1)
|
||||||
|
|
||||||
dashboard_factory = ModelFactory(redash.models.Dashboard,
|
dashboard_factory = ModelFactory(redash.models.Dashboard,
|
||||||
|
|||||||
@@ -96,3 +96,39 @@ class TestDataSourceListAPIPost(BaseTestCase):
|
|||||||
data={'name': 'DS 1', 'type': 'pg', 'options': {"dbname": "redash"}}, user=admin)
|
data={'name': 'DS 1', 'type': 'pg', 'options': {"dbname": "redash"}}, user=admin)
|
||||||
|
|
||||||
self.assertEqual(rv.status_code, 200)
|
self.assertEqual(rv.status_code, 200)
|
||||||
|
|
||||||
|
|
||||||
|
class TestDataSourcePausePost(BaseTestCase):
|
||||||
|
def test_pauses_data_source(self):
|
||||||
|
admin = self.factory.create_admin()
|
||||||
|
rv = self.make_request('post', '/api/data_sources/{}/pause'.format(self.factory.data_source.id), user=admin)
|
||||||
|
self.assertEqual(rv.status_code, 200)
|
||||||
|
self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).paused, True)
|
||||||
|
|
||||||
|
def test_pause_sets_reason(self):
|
||||||
|
admin = self.factory.create_admin()
|
||||||
|
rv = self.make_request('post', '/api/data_sources/{}/pause'.format(self.factory.data_source.id), user=admin, data={'reason': 'testing'})
|
||||||
|
self.assertEqual(rv.status_code, 200)
|
||||||
|
self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).paused, True)
|
||||||
|
self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).pause_reason, 'testing')
|
||||||
|
|
||||||
|
rv = self.make_request('post', '/api/data_sources/{}/pause?reason=test'.format(self.factory.data_source.id), user=admin)
|
||||||
|
self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).pause_reason, 'test')
|
||||||
|
|
||||||
|
def test_requires_admin(self):
|
||||||
|
rv = self.make_request('post', '/api/data_sources/{}/pause'.format(self.factory.data_source.id))
|
||||||
|
self.assertEqual(rv.status_code, 403)
|
||||||
|
|
||||||
|
|
||||||
|
class TestDataSourcePauseDelete(BaseTestCase):
|
||||||
|
def test_resumes_data_source(self):
|
||||||
|
admin = self.factory.create_admin()
|
||||||
|
self.factory.data_source.pause()
|
||||||
|
self.factory.data_source.save()
|
||||||
|
rv = self.make_request('delete', '/api/data_sources/{}/pause'.format(self.factory.data_source.id), user=admin)
|
||||||
|
self.assertEqual(rv.status_code, 200)
|
||||||
|
self.assertEqual(DataSource.get_by_id(self.factory.data_source.id).paused, False)
|
||||||
|
|
||||||
|
def test_requires_admin(self):
|
||||||
|
rv = self.make_request('delete', '/api/data_sources/{}/pause'.format(self.factory.data_source.id))
|
||||||
|
self.assertEqual(rv.status_code, 403)
|
||||||
|
|||||||
@@ -73,6 +73,18 @@ class TestQueryResultListAPI(BaseTestCase):
|
|||||||
self.assertEquals(rv.status_code, 200)
|
self.assertEquals(rv.status_code, 200)
|
||||||
self.assertIn('job', rv.json)
|
self.assertIn('job', rv.json)
|
||||||
|
|
||||||
|
def test_execute_on_paused_data_source(self):
|
||||||
|
self.factory.data_source.pause()
|
||||||
|
|
||||||
|
rv = self.make_request('post', '/api/query_results',
|
||||||
|
data={'data_source_id': self.factory.data_source.id,
|
||||||
|
'query': 'SELECT 1',
|
||||||
|
'max_age': 0})
|
||||||
|
|
||||||
|
self.assertEquals(rv.status_code, 400)
|
||||||
|
self.assertNotIn('query_result', rv.json)
|
||||||
|
self.assertIn('job', rv.json)
|
||||||
|
|
||||||
|
|
||||||
class TestQueryResultAPI(BaseTestCase):
|
class TestQueryResultAPI(BaseTestCase):
|
||||||
def test_has_no_access_to_data_source(self):
|
def test_has_no_access_to_data_source(self):
|
||||||
|
|||||||
@@ -7,3 +7,29 @@ class TestDataSourceCreate(BaseTestCase):
|
|||||||
def test_adds_data_source_to_default_group(self):
|
def test_adds_data_source_to_default_group(self):
|
||||||
data_source = DataSource.create_with_group(org=self.factory.org, name='test', options=ConfigurationContainer.from_json('{"dbname": "test"}'), type='pg')
|
data_source = DataSource.create_with_group(org=self.factory.org, name='test', options=ConfigurationContainer.from_json('{"dbname": "test"}'), type='pg')
|
||||||
self.assertIn(self.factory.org.default_group.id, data_source.groups)
|
self.assertIn(self.factory.org.default_group.id, data_source.groups)
|
||||||
|
|
||||||
|
|
||||||
|
class TestDataSourceIsPaused(BaseTestCase):
|
||||||
|
def test_returns_false_by_default(self):
|
||||||
|
self.assertFalse(self.factory.data_source.paused)
|
||||||
|
|
||||||
|
def test_persists_selection(self):
|
||||||
|
self.factory.data_source.pause()
|
||||||
|
self.assertTrue(self.factory.data_source.paused)
|
||||||
|
|
||||||
|
self.factory.data_source.resume()
|
||||||
|
self.assertFalse(self.factory.data_source.paused)
|
||||||
|
|
||||||
|
def test_allows_setting_reason(self):
|
||||||
|
reason = "Some good reason."
|
||||||
|
self.factory.data_source.pause(reason)
|
||||||
|
self.assertTrue(self.factory.data_source.paused)
|
||||||
|
self.assertEqual(self.factory.data_source.pause_reason, reason)
|
||||||
|
|
||||||
|
def test_resume_clears_reason(self):
|
||||||
|
self.factory.data_source.pause("Reason")
|
||||||
|
self.factory.data_source.resume()
|
||||||
|
self.assertEqual(self.factory.data_source.pause_reason, None)
|
||||||
|
|
||||||
|
def test_reason_is_none_by_default(self):
|
||||||
|
self.assertEqual(self.factory.data_source.pause_reason, None)
|
||||||
|
|||||||
@@ -21,6 +21,26 @@ class TestRefreshQueries(BaseTestCase):
|
|||||||
refresh_queries()
|
refresh_queries()
|
||||||
add_job_mock.assert_called_with(query.query, query.data_source, scheduled=True, metadata=ANY)
|
add_job_mock.assert_called_with(query.query, query.data_source, scheduled=True, metadata=ANY)
|
||||||
|
|
||||||
|
def test_doesnt_enqueue_outdated_queries_for_paused_data_source(self):
|
||||||
|
query = self.factory.create_query(schedule="60")
|
||||||
|
retrieved_at = utcnow() - datetime.timedelta(minutes=10)
|
||||||
|
query_result = self.factory.create_query_result(retrieved_at=retrieved_at, query=query.query,
|
||||||
|
query_hash=query.query_hash)
|
||||||
|
query.latest_query_data = query_result
|
||||||
|
query.save()
|
||||||
|
|
||||||
|
query.data_source.pause()
|
||||||
|
|
||||||
|
with patch('redash.tasks.queries.enqueue_query') as add_job_mock:
|
||||||
|
refresh_queries()
|
||||||
|
add_job_mock.assert_not_called()
|
||||||
|
|
||||||
|
query.data_source.resume()
|
||||||
|
|
||||||
|
with patch('redash.tasks.queries.enqueue_query') as add_job_mock:
|
||||||
|
refresh_queries()
|
||||||
|
add_job_mock.assert_called_with(query.query, query.data_source, scheduled=True, metadata=ANY)
|
||||||
|
|
||||||
def test_skips_fresh_queries(self):
|
def test_skips_fresh_queries(self):
|
||||||
query = self.factory.create_query(schedule="1200")
|
query = self.factory.create_query(schedule="1200")
|
||||||
retrieved_at = utcnow() - datetime.timedelta(minutes=10)
|
retrieved_at = utcnow() - datetime.timedelta(minutes=10)
|
||||||
24
tests/tasks/test_refresh_schemas.py
Normal file
24
tests/tasks/test_refresh_schemas.py
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
import datetime
|
||||||
|
from mock import patch, call, ANY
|
||||||
|
from tests import BaseTestCase
|
||||||
|
from redash.tasks import refresh_schemas
|
||||||
|
|
||||||
|
|
||||||
|
class TestRefreshSchemas(BaseTestCase):
|
||||||
|
def test_calls_refresh_of_all_data_sources(self):
|
||||||
|
with patch('redash.models.DataSource.get_schema') as get_schema:
|
||||||
|
refresh_schemas()
|
||||||
|
get_schema.assert_called_with(refresh=True)
|
||||||
|
|
||||||
|
def test_skips_paused_data_sources(self):
|
||||||
|
self.factory.data_source.pause()
|
||||||
|
|
||||||
|
with patch('redash.models.DataSource.get_schema') as get_schema:
|
||||||
|
refresh_schemas()
|
||||||
|
get_schema.assert_not_called()
|
||||||
|
|
||||||
|
self.factory.data_source.resume()
|
||||||
|
|
||||||
|
with patch('redash.models.DataSource.get_schema') as get_schema:
|
||||||
|
refresh_schemas()
|
||||||
|
get_schema.assert_called_with(refresh=True)
|
||||||
Reference in New Issue
Block a user