mirror of
https://github.com/getredash/redash.git
synced 2025-12-19 17:37:19 -05:00
Switch from flask_script to click, add CLI unit tests.
This commit is contained in:
@@ -12,8 +12,7 @@ dependencies:
|
||||
- pip install -r requirements_dev.txt
|
||||
- pip install -r requirements.txt
|
||||
- pip install pymongo==3.2.1
|
||||
- if [ "$CIRCLE_BRANCH" = "master" ]; then make deps; fi
|
||||
- if [ "$CIRCLE_BRANCH" = "webpack" ]; then make deps; fi
|
||||
- make deps
|
||||
cache_directories:
|
||||
- node_modules/
|
||||
- client/node_modules/
|
||||
|
||||
57
manage.py
57
manage.py
@@ -4,60 +4,73 @@ CLI to manage redash.
|
||||
"""
|
||||
import json
|
||||
|
||||
from flask_script import Manager
|
||||
|
||||
from redash import settings, models, __version__
|
||||
from redash.wsgi import app
|
||||
import click
|
||||
from flask.cli import FlaskGroup, run_command
|
||||
|
||||
from redash import create_app, settings, __version__
|
||||
from redash.cli import users, groups, database, data_sources, organization
|
||||
from redash.monitor import get_status
|
||||
|
||||
manager = Manager(app)
|
||||
manager.add_command("database", database.manager)
|
||||
manager.add_command("users", users.manager)
|
||||
manager.add_command("groups", groups.manager)
|
||||
manager.add_command("ds", data_sources.manager)
|
||||
manager.add_command("org", organization.manager)
|
||||
|
||||
def create(group):
|
||||
app = create_app()
|
||||
group.app = app
|
||||
return app
|
||||
|
||||
|
||||
@click.group(cls=FlaskGroup, create_app=create)
|
||||
def manager():
|
||||
"Management script for redash"
|
||||
|
||||
@manager.command
|
||||
|
||||
manager.add_command(database.manager, "database")
|
||||
manager.add_command(users.manager, "users")
|
||||
manager.add_command(groups.manager, "groups")
|
||||
manager.add_command(data_sources.manager, "ds")
|
||||
manager.add_command(organization.manager, "org")
|
||||
manager.add_command(run_command, "runserver")
|
||||
|
||||
|
||||
@manager.command()
|
||||
def version():
|
||||
"""Displays re:dash version."""
|
||||
print __version__
|
||||
|
||||
@manager.command
|
||||
|
||||
@manager.command()
|
||||
def status():
|
||||
print json.dumps(get_status(), indent=2)
|
||||
|
||||
@manager.command
|
||||
|
||||
@manager.command()
|
||||
def runworkers():
|
||||
"""Start workers (deprecated)."""
|
||||
print "** This command is deprecated. Please use Celery's CLI to control the workers. **"
|
||||
|
||||
|
||||
@manager.shell
|
||||
def make_shell_context():
|
||||
from redash.models import db
|
||||
return dict(app=app, db=db, models=models)
|
||||
|
||||
|
||||
@manager.command
|
||||
@manager.command()
|
||||
def check_settings():
|
||||
"""Show the settings as re:dash sees them (useful for debugging)."""
|
||||
for name, item in settings.all_settings().iteritems():
|
||||
print "{} = {}".format(name, item)
|
||||
|
||||
|
||||
@manager.option('email', default=None, help="Email address to send test message to (default: the address you defined in MAIL_DEFAULT_SENDER)")
|
||||
@manager.command()
|
||||
@click.argument('email', default=settings.MAIL_DEFAULT_SENDER, required=False)
|
||||
def send_test_mail(email=None):
|
||||
"""
|
||||
Send test message to EMAIL (default: the address you defined in MAIL_DEFAULT_SENDER)
|
||||
"""
|
||||
from redash import mail
|
||||
from flask_mail import Message
|
||||
|
||||
if email is None:
|
||||
email = settings.MAIL_DEFAULT_SENDER
|
||||
|
||||
mail.send(Message(subject="Test Message from re:dash", recipients=[email], body="Test message."))
|
||||
mail.send(Message(subject="Test Message from re:dash", recipients=[email],
|
||||
body="Test message."))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
manager.run()
|
||||
manager()
|
||||
|
||||
@@ -1,14 +1,20 @@
|
||||
from sys import exit
|
||||
import json
|
||||
|
||||
import click
|
||||
from flask_script import Manager
|
||||
|
||||
from redash import models
|
||||
from redash.query_runner import query_runners, get_configuration_schema_for_query_runner_type
|
||||
from redash.query_runner import query_runners
|
||||
from redash.query_runner import get_configuration_schema_for_query_runner_type
|
||||
from redash.utils.configuration import ConfigurationContainer
|
||||
|
||||
manager = Manager(help="Data sources management commands.")
|
||||
manager = click.Group(help="Data sources management commands.")
|
||||
|
||||
|
||||
@manager.option('--org', dest='organization', default=None, help="The organization the user belongs to (leave blank for all organizations).")
|
||||
@manager.command()
|
||||
@click.option('--org', 'organization', default=None,
|
||||
help="The organization the user belongs to (leave blank for "
|
||||
"all organizations).")
|
||||
def list(organization=None):
|
||||
"""List currently configured data sources."""
|
||||
if organization:
|
||||
@@ -20,21 +26,24 @@ def list(organization=None):
|
||||
if i > 0:
|
||||
print "-" * 20
|
||||
|
||||
print "Id: {}\nName: {}\nType: {}\nOptions: {}".format(ds.id, ds.name, ds.type, ds.options)
|
||||
print "Id: {}\nName: {}\nType: {}\nOptions: {}".format(
|
||||
ds.id, ds.name, ds.type, ds.options.to_json())
|
||||
|
||||
|
||||
def validate_data_source_type(type):
|
||||
if type not in query_runners.keys():
|
||||
print "Error: the type \"{}\" is not supported (supported types: {}).".format(type, ", ".join(query_runners.keys()))
|
||||
print ("Error: the type \"{}\" is not supported (supported types: {})."
|
||||
.format(type, ", ".join(query_runners.keys())))
|
||||
exit()
|
||||
|
||||
|
||||
@manager.option('name', default=None, help="name of data source to test")
|
||||
@manager.option('--org', dest='organization', default='default',
|
||||
@manager.command()
|
||||
@click.argument('name')
|
||||
@click.option('--org', 'organization', default='default',
|
||||
help="The organization the user belongs to "
|
||||
"(leave blank for 'default').")
|
||||
def test(name, organization='default'):
|
||||
"""Test connection to data source."""
|
||||
"""Test connection to data source by issuing a trivial query."""
|
||||
try:
|
||||
org = models.Organization.get_by_slug(organization)
|
||||
data_source = models.DataSource.get(
|
||||
@@ -47,18 +56,26 @@ def test(name, organization='default'):
|
||||
data_source.query_runner.test_connection()
|
||||
except Exception, e:
|
||||
print "Failure: {}".format(e)
|
||||
exit(1)
|
||||
else:
|
||||
print "Success"
|
||||
except models.DataSource.DoesNotExist:
|
||||
print "Couldn't find data source named: {}".format(name)
|
||||
exit(1)
|
||||
|
||||
|
||||
@manager.option('name', default=None, help="name of data source to create")
|
||||
@manager.option('--type', dest='type', default=None, help="new type for the data source")
|
||||
@manager.option('--options', dest='options', default=None, help="updated options for the data source")
|
||||
@manager.option('--org', dest='organization', default='default', help="The organization the user belongs to (leave blank for 'default').")
|
||||
@manager.command()
|
||||
@click.argument('name', default=None, required=False)
|
||||
@click.option('--type', default=None,
|
||||
help="new type for the data source")
|
||||
@click.option('--options', default=None,
|
||||
help="updated options for the data source")
|
||||
@click.option('--org', 'organization', default='default',
|
||||
help="The organization the user belongs to (leave blank for "
|
||||
"'default').")
|
||||
def new(name=None, type=None, options=None, organization='default'):
|
||||
"""Create new data source."""
|
||||
|
||||
if name is None:
|
||||
name = click.prompt("Name")
|
||||
|
||||
@@ -69,7 +86,8 @@ def new(name=None, type=None, options=None, organization='default'):
|
||||
|
||||
idx = 0
|
||||
while idx < 1 or idx > len(query_runners.keys()):
|
||||
idx = click.prompt("[{}-{}]".format(1, len(query_runners.keys())), type=int)
|
||||
idx = click.prompt("[{}-{}]".format(1, len(query_runners.keys())),
|
||||
type=int)
|
||||
|
||||
type = query_runners.keys()[idx - 1]
|
||||
else:
|
||||
@@ -99,7 +117,8 @@ def new(name=None, type=None, options=None, organization='default'):
|
||||
else:
|
||||
prompt = "{} (optional)".format(prompt)
|
||||
|
||||
value = click.prompt(prompt, default=default_value, type=types[prop['type']], show_default=False)
|
||||
value = click.prompt(prompt, default=default_value,
|
||||
type=types[prop['type']], show_default=False)
|
||||
if value != default_value:
|
||||
options_obj[k] = value
|
||||
|
||||
@@ -111,17 +130,20 @@ def new(name=None, type=None, options=None, organization='default'):
|
||||
print "Error: invalid configuration."
|
||||
exit()
|
||||
|
||||
print "Creating {} data source ({}) with options:\n{}".format(type, name, options.to_json())
|
||||
print "Creating {} data source ({}) with options:\n{}".format(
|
||||
type, name, options.to_json())
|
||||
|
||||
data_source = models.DataSource.create_with_group(name=name,
|
||||
type=type,
|
||||
options=options,
|
||||
data_source = models.DataSource.create_with_group(
|
||||
name=name, type=type, options=options,
|
||||
org=models.Organization.get_by_slug(organization))
|
||||
print "Id: {}".format(data_source.id)
|
||||
|
||||
|
||||
@manager.option('name', default=None, help="name of data source to delete")
|
||||
@manager.option('--org', dest='organization', default='default', help="The organization the user belongs to (leave blank for 'default').")
|
||||
@manager.command()
|
||||
@click.argument('name')
|
||||
@click.option('--org', 'organization', default='default',
|
||||
help="The organization the user belongs to (leave blank for "
|
||||
"'default').")
|
||||
def delete(name, organization='default'):
|
||||
"""Delete data source by name."""
|
||||
try:
|
||||
@@ -134,6 +156,7 @@ def delete(name, organization='default'):
|
||||
data_source.delete_instance(recursive=True)
|
||||
except models.DataSource.DoesNotExist:
|
||||
print "Couldn't find data source named: {}".format(name)
|
||||
exit(1)
|
||||
|
||||
|
||||
def update_attr(obj, attr, new_value):
|
||||
@@ -143,31 +166,37 @@ def update_attr(obj, attr, new_value):
|
||||
setattr(obj, attr, new_value)
|
||||
|
||||
|
||||
@manager.option('name', default=None, help="name of data source to edit")
|
||||
@manager.option('--name', dest='new_name', default=None, help="new name for the data source")
|
||||
@manager.option('--options', dest='options', default=None, help="updated options for the data source")
|
||||
@manager.option('--type', dest='type', default=None, help="new type for the data source")
|
||||
@manager.option('--org', dest='organization', default='default', help="The organization the user belongs to (leave blank for 'default').")
|
||||
@manager.command()
|
||||
@click.argument('name')
|
||||
@click.option('--name', 'new_name', default=None,
|
||||
help="new name for the data source")
|
||||
@click.option('--options', default=None,
|
||||
help="updated options for the data source")
|
||||
@click.option('--type', default=None,
|
||||
help="new type for the data source")
|
||||
@click.option('--org', 'organization', default='default',
|
||||
help="The organization the user belongs to (leave blank for "
|
||||
"'default').")
|
||||
def edit(name, new_name=None, options=None, type=None, organization='default'):
|
||||
"""Edit data source settings (name, options, type)."""
|
||||
try:
|
||||
if type is not None:
|
||||
validate_data_source_type(type)
|
||||
|
||||
org = models.Organization.get_by_slug(organization)
|
||||
data_source = models.DataSource.get(
|
||||
models.DataSource.name==name,
|
||||
models.DataSource.org==org,
|
||||
)
|
||||
update_attr(data_source, "name", new_name)
|
||||
update_attr(data_source, "type", type)
|
||||
|
||||
if options is not None:
|
||||
schema = get_configuration_schema_for_query_runner_type(data_source.type)
|
||||
schema = get_configuration_schema_for_query_runner_type(
|
||||
data_source.type)
|
||||
options = json.loads(options)
|
||||
data_source.options.set_schema(schema)
|
||||
data_source.options.update(options)
|
||||
|
||||
update_attr(data_source, "name", new_name)
|
||||
update_attr(data_source, "type", type)
|
||||
update_attr(data_source, "options", options)
|
||||
data_source.save()
|
||||
|
||||
except models.DataSource.DoesNotExist:
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
from flask_script import Manager
|
||||
from click import Group
|
||||
|
||||
manager = Manager(help="Manage the database (create/drop tables).")
|
||||
manager = Group(help="Manage the database (create/drop tables).")
|
||||
|
||||
@manager.command
|
||||
|
||||
@manager.command()
|
||||
def create_tables():
|
||||
"""Create the database tables."""
|
||||
from redash.models import create_db, init_db
|
||||
@@ -10,7 +11,8 @@ def create_tables():
|
||||
create_db(True, False)
|
||||
init_db()
|
||||
|
||||
@manager.command
|
||||
|
||||
@manager.command()
|
||||
def drop_tables():
|
||||
"""Drop the database tables."""
|
||||
from redash.models import create_db
|
||||
|
||||
@@ -1,11 +1,22 @@
|
||||
from flask_script import Manager, prompt_pass
|
||||
from sys import exit
|
||||
|
||||
from click import Group, argument, option
|
||||
from redash import models
|
||||
|
||||
manager = Manager(help="Groups management commands.")
|
||||
manager = Group(help="Groups management commands.")
|
||||
|
||||
@manager.option('name', help="Group's name")
|
||||
@manager.option('--org', dest='organization', default='default', help="The organization the user belongs to (leave blank for 'default').")
|
||||
@manager.option('--permissions', dest='permissions', default=None, help="Comma seperated list of permissions ('create_dashboard', 'create_query', 'edit_dashboard', 'edit_query', 'view_query', 'view_source', 'execute_query', 'list_users', 'schedule_query', 'list_dashboards', 'list_alerts', 'list_data_sources') (leave blank for default).")
|
||||
|
||||
@manager.command()
|
||||
@argument('name')
|
||||
@option('--org', 'organization', default='default',
|
||||
help="The organization the user belongs to (leave blank for "
|
||||
"'default').")
|
||||
@option('--permissions', default=None,
|
||||
help="Comma separated list of permissions ('create_dashboard',"
|
||||
" 'create_query', 'edit_dashboard', 'edit_query', "
|
||||
"'view_query', 'view_source', 'execute_query', 'list_users',"
|
||||
" 'schedule_query', 'list_dashboards', 'list_alerts',"
|
||||
" 'list_data_sources') (leave blank for default).")
|
||||
def create(name, permissions=None, organization='default'):
|
||||
print "Creating group (%s)..." % (name)
|
||||
|
||||
@@ -19,20 +30,29 @@ def create(name, permissions=None, organization='default'):
|
||||
models.Group.create(name=name, org=org, permissions=permissions)
|
||||
except Exception, e:
|
||||
print "Failed create group: %s" % e.message
|
||||
exit(1)
|
||||
|
||||
@manager.option('id', help="Group's id")
|
||||
@manager.option('--permissions', dest='permissions', default=None, help="Comma seperated list of permissions ('create_dashboard', 'create_query', 'edit_dashboard', 'edit_query', 'view_query', 'view_source', 'execute_query', 'list_users', 'schedule_query', 'list_dashboards', 'list_alerts', 'list_data_sources') (leave blank for default).")
|
||||
def change_permissions(id, permissions=None):
|
||||
print "Change permissions of group %s ..." % id
|
||||
|
||||
@manager.command()
|
||||
@argument('group_id')
|
||||
@option('--permissions', default=None,
|
||||
help="Comma separated list of permissions ('create_dashboard',"
|
||||
" 'create_query', 'edit_dashboard', 'edit_query',"
|
||||
" 'view_query', 'view_source', 'execute_query', 'list_users',"
|
||||
" 'schedule_query', 'list_dashboards', 'list_alerts',"
|
||||
" 'list_data_sources') (leave blank for default).")
|
||||
def change_permissions(group_id, permissions=None):
|
||||
print "Change permissions of group %s ..." % group_id
|
||||
|
||||
try:
|
||||
group = models.Group.get_by_id(id)
|
||||
group = models.Group.get_by_id(group_id)
|
||||
except models.Group.DoesNotExist:
|
||||
print "User [%s] not found." % id
|
||||
return
|
||||
print "User [%s] not found." % group_id
|
||||
exit(1)
|
||||
|
||||
permissions = extract_permissions_string(permissions)
|
||||
print "current permissions [%s] will be modify to [%s]" % (",".join(group.permissions), ",".join(permissions))
|
||||
print "current permissions [%s] will be modify to [%s]" % (
|
||||
",".join(group.permissions), ",".join(permissions))
|
||||
|
||||
group.permissions = permissions
|
||||
|
||||
@@ -40,6 +60,7 @@ def change_permissions(id, permissions=None):
|
||||
group.save()
|
||||
except Exception, e:
|
||||
print "Failed change permission: %s" % e.message
|
||||
exit(1)
|
||||
|
||||
|
||||
def extract_permissions_string(permissions):
|
||||
@@ -51,7 +72,9 @@ def extract_permissions_string(permissions):
|
||||
return permissions
|
||||
|
||||
|
||||
@manager.option('--org', dest='organization', default=None, help="The organization to limit to (leave blank for all).")
|
||||
@manager.command()
|
||||
@option('--org', 'organization', default=None,
|
||||
help="The organization to limit to (leave blank for all).")
|
||||
def list(organization=None):
|
||||
"""List all groups"""
|
||||
if organization:
|
||||
@@ -64,4 +87,5 @@ def list(organization=None):
|
||||
if i > 0:
|
||||
print "-" * 20
|
||||
|
||||
print "Id: {}\nName: {}\nType: {}\nOrganization: {}".format(group.id, group.name, group.type, group.org.slug)
|
||||
print "Id: {}\nName: {}\nType: {}\nOrganization: {}".format(
|
||||
group.id, group.name, group.type, group.org.slug)
|
||||
|
||||
@@ -1,26 +1,32 @@
|
||||
from flask_script import Manager
|
||||
from click import Group, argument
|
||||
from redash import models
|
||||
|
||||
manager = Manager(help="Organization management commands.")
|
||||
manager = Group(help="Organization management commands.")
|
||||
|
||||
|
||||
@manager.option('domains', help="comma separated list of domains to allow")
|
||||
@manager.command()
|
||||
@argument('domains')
|
||||
def set_google_apps_domains(domains):
|
||||
"""
|
||||
Sets the allowable domains to the comma separated list DOMAINS.
|
||||
"""
|
||||
organization = models.Organization.select().first()
|
||||
|
||||
organization.settings[models.Organization.SETTING_GOOGLE_APPS_DOMAINS] = domains.split(',')
|
||||
k = models.Organization.SETTING_GOOGLE_APPS_DOMAINS
|
||||
organization.settings[k] = domains.split(',')
|
||||
organization.save()
|
||||
|
||||
print "Updated list of allowed domains to: {}".format(organization.google_apps_domains)
|
||||
print "Updated list of allowed domains to: {}".format(
|
||||
organization.google_apps_domains)
|
||||
|
||||
|
||||
@manager.command
|
||||
@manager.command()
|
||||
def show_google_apps_domains():
|
||||
organization = models.Organization.select().first()
|
||||
print "Current list of Google Apps domains: {}".format(organization.google_apps_domains)
|
||||
print "Current list of Google Apps domains: {}".format(
|
||||
', '.join(organization.google_apps_domains))
|
||||
|
||||
|
||||
@manager.command
|
||||
@manager.command()
|
||||
def list():
|
||||
"""List all organizations"""
|
||||
orgs = models.Organization.select()
|
||||
|
||||
@@ -1,15 +1,17 @@
|
||||
from flask_script import Manager, prompt_pass
|
||||
from sys import exit
|
||||
|
||||
from click import BOOL, Group, argument, option, prompt
|
||||
from peewee import IntegrityError
|
||||
|
||||
from redash import models
|
||||
from redash.handlers.users import invite_user
|
||||
|
||||
manager = Manager(help="Users management commands.")
|
||||
manager = Group(help="Users management commands.")
|
||||
|
||||
|
||||
def build_groups(org, groups, is_admin):
|
||||
if isinstance(groups, basestring):
|
||||
groups= groups.split(',')
|
||||
groups = groups.split(',')
|
||||
groups.remove('') # in case it was empty string
|
||||
groups = [int(g) for g in groups]
|
||||
|
||||
@@ -21,9 +23,16 @@ def build_groups(org, groups, is_admin):
|
||||
|
||||
return groups
|
||||
|
||||
@manager.option('email', help="email address of the user to grant admin to")
|
||||
@manager.option('--org', dest='organization', default='default', help="the organization the user belongs to, (leave blank for 'default').")
|
||||
|
||||
@manager.command()
|
||||
@argument('email')
|
||||
@option('--org', 'organization', default='default',
|
||||
help="the organization the user belongs to, (leave blank for "
|
||||
"'default').")
|
||||
def grant_admin(email, organization='default'):
|
||||
"""
|
||||
Grant admin access to user EMAIL.
|
||||
"""
|
||||
try:
|
||||
org = models.Organization.get_by_slug(organization)
|
||||
admin_group = org.admin_group
|
||||
@@ -40,15 +49,29 @@ def grant_admin(email, organization='default'):
|
||||
print "User [%s] not found." % email
|
||||
|
||||
|
||||
@manager.option('email', help="User's email")
|
||||
@manager.option('name', help="User's full name")
|
||||
@manager.option('--org', dest='organization', default='default', help="The organization the user belongs to (leave blank for 'default').")
|
||||
@manager.option('--admin', dest='is_admin', action="store_true", default=False, help="set user as admin")
|
||||
@manager.option('--google', dest='google_auth', action="store_true", default=False, help="user uses Google Auth to login")
|
||||
@manager.option('--password', dest='password', default=None, help="Password for users who don't use Google Auth (leave blank for prompt).")
|
||||
@manager.option('--groups', dest='groups', default=None, help="Comma seperated list of groups (leave blank for default).")
|
||||
def create(email, name, groups, is_admin=False, google_auth=False, password=None, organization='default'):
|
||||
print "Creating user (%s, %s) in organization %s..." % (email, name, organization)
|
||||
@manager.command()
|
||||
@argument('email')
|
||||
@argument('name')
|
||||
@option('--org', 'organization', default='default',
|
||||
help="The organization the user belongs to (leave blank for "
|
||||
"'default').")
|
||||
@option('--admin', 'is_admin', is_flag=True, default=False,
|
||||
help="set user as admin")
|
||||
@option('--google', 'google_auth', is_flag=True,
|
||||
default=False, help="user uses Google Auth to login")
|
||||
@option('--password', 'password', default=None,
|
||||
help="Password for users who don't use Google Auth "
|
||||
"(leave blank for prompt).")
|
||||
@option('--groups', 'groups', default=None,
|
||||
help="Comma separated list of groups (leave blank for "
|
||||
"default).")
|
||||
def create(email, name, groups, is_admin=False, google_auth=False,
|
||||
password=None, organization='default'):
|
||||
"""
|
||||
Create user EMAIL with display name NAME.
|
||||
"""
|
||||
print "Creating user (%s, %s) in organization %s..." % (email, name,
|
||||
organization)
|
||||
print "Admin: %r" % is_admin
|
||||
print "Login with Google Auth: %r\n" % google_auth
|
||||
|
||||
@@ -56,19 +79,28 @@ def create(email, name, groups, is_admin=False, google_auth=False, password=None
|
||||
groups = build_groups(org, groups, is_admin)
|
||||
|
||||
user = models.User(org=org, email=email, name=name, groups=groups)
|
||||
if not password and not google_auth:
|
||||
password = prompt("Password", hide_input=True,
|
||||
confirmation_prompt=True)
|
||||
if not google_auth:
|
||||
password = password or prompt_pass("Password")
|
||||
user.hash_password(password)
|
||||
|
||||
try:
|
||||
user.save()
|
||||
except Exception, e:
|
||||
print "Failed creating user: %s" % e.message
|
||||
exit(1)
|
||||
|
||||
|
||||
@manager.option('email', help="email address of user to delete")
|
||||
@manager.option('--org', dest='organization', default=None, help="The organization the user belongs to (leave blank for all organizations).")
|
||||
@manager.command()
|
||||
@argument('email')
|
||||
@option('--org', 'organization', default=None,
|
||||
help="The organization the user belongs to (leave blank for all"
|
||||
" organizations).")
|
||||
def delete(email, organization=None):
|
||||
"""
|
||||
Delete user EMAIL.
|
||||
"""
|
||||
if organization:
|
||||
org = models.Organization.get_by_slug(organization)
|
||||
deleted_count = models.User.delete().where(
|
||||
@@ -80,11 +112,16 @@ def delete(email, organization=None):
|
||||
print "Deleted %d users." % deleted_count
|
||||
|
||||
|
||||
@manager.option('password', help="new password for the user")
|
||||
@manager.option('email', help="email address of the user to change password for")
|
||||
@manager.option('--org', dest='organization', default=None, help="The organization the user belongs to (leave blank for all organizations).")
|
||||
@manager.command()
|
||||
@argument('email')
|
||||
@argument('password')
|
||||
@option('--org', 'organization', default=None,
|
||||
help="The organization the user belongs to (leave blank for all "
|
||||
"organizations).")
|
||||
def password(email, password, organization=None):
|
||||
try:
|
||||
"""
|
||||
Resets password for EMAIL to PASSWORD.
|
||||
"""
|
||||
if organization:
|
||||
org = models.Organization.get_by_slug(organization)
|
||||
user = models.User.select().where(
|
||||
@@ -94,21 +131,30 @@ def password(email, password, organization=None):
|
||||
else:
|
||||
user = models.User.select().where(models.User.email == email).first()
|
||||
|
||||
if user is not None:
|
||||
user.hash_password(password)
|
||||
user.save()
|
||||
|
||||
print "User updated."
|
||||
except models.User.DoesNotExist:
|
||||
else:
|
||||
print "User [%s] not found." % email
|
||||
exit(1)
|
||||
|
||||
|
||||
@manager.option('email', help="The invitee's email")
|
||||
@manager.option('name', help="The invitee's full name")
|
||||
@manager.option('inviter_email', help="The email of the inviter")
|
||||
@manager.option('--org', dest='organization', default='default', help="The organization the user belongs to (leave blank for 'default')")
|
||||
@manager.option('--admin', dest='is_admin', action="store_true", default=False, help="set user as admin")
|
||||
@manager.option('--groups', dest='groups', default=None, help="Comma seperated list of groups (leave blank for default).")
|
||||
def invite(email, name, inviter_email, groups, is_admin=False, organization='default'):
|
||||
@manager.command()
|
||||
@argument('email')
|
||||
@argument('name')
|
||||
@argument('inviter_email')
|
||||
@option('--org', 'organization', default='default',
|
||||
help="The organization the user belongs to (leave blank for 'default')")
|
||||
@option('--admin', 'is_admin', type=BOOL, default=False,
|
||||
help="set user as admin")
|
||||
@option('--groups', 'groups', default=None,
|
||||
help="Comma seperated list of groups (leave blank for default).")
|
||||
def invite(email, name, inviter_email, groups, is_admin=False,
|
||||
organization='default'):
|
||||
"""
|
||||
Sends an invitation to the given NAME and EMAIL from INVITER_EMAIL.
|
||||
"""
|
||||
org = models.Organization.get_by_slug(organization)
|
||||
groups = build_groups(org, groups, is_admin)
|
||||
try:
|
||||
@@ -125,10 +171,13 @@ def invite(email, name, inviter_email, groups, is_admin=False, organization='def
|
||||
else:
|
||||
print e
|
||||
except models.User.DoesNotExist:
|
||||
print "The inviter [%s] was not found." % inviterEmail
|
||||
print "The inviter [%s] was not found." % inviter_email
|
||||
|
||||
|
||||
@manager.option('--org', dest='organization', default=None, help="The organization the user belongs to (leave blank for all organizations)")
|
||||
@manager.command()
|
||||
@option('--org', 'organization', default=None,
|
||||
help="The organization the user belongs to (leave blank for all"
|
||||
" organizations)")
|
||||
def list(organization=None):
|
||||
"""List all users"""
|
||||
if organization:
|
||||
@@ -140,4 +189,5 @@ def list(organization=None):
|
||||
if i > 0:
|
||||
print "-" * 20
|
||||
|
||||
print "Id: {}\nName: {}\nEmail: {}\nOrganization: {}".format(user.id, user.name.encode('utf-8'), user.email, user.org.name)
|
||||
print "Id: {}\nName: {}\nEmail: {}\nOrganization: {}".format(
|
||||
user.id, user.name.encode('utf-8'), user.email, user.org.name)
|
||||
|
||||
@@ -12,7 +12,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Sqlite(BaseSQLQueryRunner):
|
||||
noop_query = "SELECT 1"
|
||||
noop_query = "pragma quick_check"
|
||||
|
||||
@classmethod
|
||||
def configuration_schema(cls):
|
||||
|
||||
@@ -32,7 +32,7 @@ class ConfigurationContainer(object):
|
||||
jsonschema.validate(self._config, self._schema)
|
||||
|
||||
def to_json(self):
|
||||
return json.dumps(self._config)
|
||||
return json.dumps(self._config, sort_keys=True)
|
||||
|
||||
def iteritems(self):
|
||||
return self._config.iteritems()
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
httplib2==0.9.2
|
||||
Flask==0.10.1
|
||||
Flask==0.11.1
|
||||
Flask-Admin==1.1.0
|
||||
Flask-RESTful==0.3.5
|
||||
Flask-Login==0.3.2
|
||||
@@ -22,13 +22,12 @@ requests==2.11.1
|
||||
six==1.10.0
|
||||
sqlparse==0.1.8
|
||||
wsgiref==0.1.2
|
||||
Flask-Script==0.6.6
|
||||
honcho==0.5.0
|
||||
statsd==2.1.2
|
||||
gunicorn==19.4.5
|
||||
celery==3.1.23
|
||||
jsonschema==2.4.0
|
||||
click==3.3
|
||||
click==6.6
|
||||
RestrictedPython==3.6.0
|
||||
wtf-peewee==0.2.3
|
||||
pysaml2==2.4.0
|
||||
|
||||
423
tests/test_cli.py
Normal file
423
tests/test_cli.py
Normal file
@@ -0,0 +1,423 @@
|
||||
import textwrap
|
||||
|
||||
from click.testing import CliRunner
|
||||
import mock
|
||||
|
||||
from tests import BaseTestCase
|
||||
from redash.utils.configuration import ConfigurationContainer
|
||||
from redash.query_runner import query_runners
|
||||
from redash.cli.data_sources import (edit, delete as delete_ds,
|
||||
list as list_ds, new, test)
|
||||
from redash.cli.groups import (change_permissions, create as create_group,
|
||||
list as list_group)
|
||||
from redash.cli.organization import (list as list_org, set_google_apps_domains,
|
||||
show_google_apps_domains)
|
||||
from redash.cli.users import (create as create_user, delete as delete_user,
|
||||
grant_admin, invite, list as list_user, password)
|
||||
from redash.models import DataSource, Group, Organization, User
|
||||
|
||||
|
||||
class DataSourceCommandTests(BaseTestCase):
|
||||
def test_interactive_new(self):
|
||||
runner = CliRunner()
|
||||
pg_i = query_runners.keys().index('pg') + 1
|
||||
result = runner.invoke(
|
||||
new,
|
||||
input="test\n%s\n\n\nexample.com\n\ntestdb\n" % (pg_i,))
|
||||
self.assertFalse(result.exception)
|
||||
self.assertEqual(result.exit_code, 0)
|
||||
self.assertEqual(DataSource.select().count(), 1)
|
||||
ds = DataSource.select().first()
|
||||
self.assertEqual(ds.name, 'test')
|
||||
self.assertEqual(ds.type, 'pg')
|
||||
self.assertEqual(ds.options['dbname'], 'testdb')
|
||||
|
||||
def test_options_new(self):
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(
|
||||
new, ['test', '--options',
|
||||
'{"host": "example.com", "dbname": "testdb"}',
|
||||
'--type', 'pg'])
|
||||
self.assertFalse(result.exception)
|
||||
self.assertEqual(result.exit_code, 0)
|
||||
self.assertEqual(DataSource.select().count(), 1)
|
||||
ds = DataSource.select().first()
|
||||
self.assertEqual(ds.name, 'test')
|
||||
self.assertEqual(ds.type, 'pg')
|
||||
self.assertEqual(ds.options['host'], 'example.com')
|
||||
self.assertEqual(ds.options['dbname'], 'testdb')
|
||||
|
||||
def test_bad_type_new(self):
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(
|
||||
new, ['test', '--type', 'wrong'])
|
||||
self.assertTrue(result.exception)
|
||||
self.assertEqual(result.exit_code, 1)
|
||||
self.assertIn('not supported', result.output)
|
||||
self.assertEqual(DataSource.select().count(), 0)
|
||||
|
||||
def test_bad_options_new(self):
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(
|
||||
new, ['test', '--options',
|
||||
'{"host": 12345, "dbname": "testdb"}',
|
||||
'--type', 'pg'])
|
||||
self.assertTrue(result.exception)
|
||||
self.assertEqual(result.exit_code, 1)
|
||||
self.assertIn('invalid configuration', result.output)
|
||||
self.assertEqual(DataSource.select().count(), 0)
|
||||
|
||||
def test_list(self):
|
||||
self.factory.create_data_source(
|
||||
name='test1', type='pg',
|
||||
options=ConfigurationContainer({"host": "example.com",
|
||||
"dbname": "testdb1"}))
|
||||
self.factory.create_data_source(
|
||||
name='test2', type='sqlite',
|
||||
options=ConfigurationContainer({"dbpath": "/tmp/test.db"}))
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(list_ds)
|
||||
self.assertFalse(result.exception)
|
||||
self.assertEqual(result.exit_code, 0)
|
||||
expected_output = """
|
||||
Id: 1
|
||||
Name: test1
|
||||
Type: pg
|
||||
Options: {"dbname": "testdb1", "host": "example.com"}
|
||||
--------------------
|
||||
Id: 2
|
||||
Name: test2
|
||||
Type: sqlite
|
||||
Options: {"dbpath": "/tmp/test.db"}
|
||||
"""
|
||||
self.assertMultiLineEqual(result.output,
|
||||
textwrap.dedent(expected_output).lstrip())
|
||||
|
||||
def test_connection_test(self):
|
||||
self.factory.create_data_source(
|
||||
name='test1', type='sqlite',
|
||||
options=ConfigurationContainer({"dbpath": "/tmp/test.db"}))
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(test, ['test1'])
|
||||
self.assertFalse(result.exception)
|
||||
self.assertEqual(result.exit_code, 0)
|
||||
self.assertIn('Success', result.output)
|
||||
|
||||
def test_connection_bad_test(self):
|
||||
self.factory.create_data_source(
|
||||
name='test1', type='sqlite',
|
||||
options=ConfigurationContainer({"dbpath": __file__}))
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(test, ['test1'])
|
||||
self.assertTrue(result.exception)
|
||||
self.assertEqual(result.exit_code, 1)
|
||||
self.assertIn('Failure', result.output)
|
||||
|
||||
def test_connection_delete(self):
|
||||
self.factory.create_data_source(
|
||||
name='test1', type='sqlite',
|
||||
options=ConfigurationContainer({"dbpath": "/tmp/test.db"}))
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(delete_ds, ['test1'])
|
||||
self.assertFalse(result.exception)
|
||||
self.assertEqual(result.exit_code, 0)
|
||||
self.assertIn('Deleting', result.output)
|
||||
self.assertEqual(DataSource.select().count(), 0)
|
||||
|
||||
def test_connection_bad_delete(self):
|
||||
self.factory.create_data_source(
|
||||
name='test1', type='sqlite',
|
||||
options=ConfigurationContainer({"dbpath": "/tmp/test.db"}))
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(delete_ds, ['wrong'])
|
||||
self.assertTrue(result.exception)
|
||||
self.assertEqual(result.exit_code, 1)
|
||||
self.assertIn("Couldn't find", result.output)
|
||||
self.assertEqual(DataSource.select().count(), 1)
|
||||
|
||||
def test_options_edit(self):
|
||||
self.factory.create_data_source(
|
||||
name='test1', type='sqlite',
|
||||
options=ConfigurationContainer({"dbpath": "/tmp/test.db"}))
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(
|
||||
edit, ['test1', '--options',
|
||||
'{"host": "example.com", "dbname": "testdb"}',
|
||||
'--name', 'test2',
|
||||
'--type', 'pg'])
|
||||
self.assertFalse(result.exception)
|
||||
self.assertEqual(result.exit_code, 0)
|
||||
self.assertEqual(DataSource.select().count(), 1)
|
||||
ds = DataSource.select().first()
|
||||
self.assertEqual(ds.name, 'test2')
|
||||
self.assertEqual(ds.type, 'pg')
|
||||
self.assertEqual(ds.options['host'], 'example.com')
|
||||
self.assertEqual(ds.options['dbname'], 'testdb')
|
||||
|
||||
def test_bad_type_edit(self):
|
||||
self.factory.create_data_source(
|
||||
name='test1', type='sqlite',
|
||||
options=ConfigurationContainer({"dbpath": "/tmp/test.db"}))
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(
|
||||
edit, ['test', '--type', 'wrong'])
|
||||
self.assertTrue(result.exception)
|
||||
self.assertEqual(result.exit_code, 1)
|
||||
self.assertIn('not supported', result.output)
|
||||
ds = DataSource.select().first()
|
||||
self.assertEqual(ds.type, 'sqlite')
|
||||
|
||||
def test_bad_options_edit(self):
|
||||
ds = self.factory.create_data_source(
|
||||
name='test1', type='sqlite',
|
||||
options=ConfigurationContainer({"dbpath": "/tmp/test.db"}))
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(
|
||||
new, ['test', '--options',
|
||||
'{"host": 12345, "dbname": "testdb"}',
|
||||
'--type', 'pg'])
|
||||
self.assertTrue(result.exception)
|
||||
self.assertEqual(result.exit_code, 1)
|
||||
self.assertIn('invalid configuration', result.output)
|
||||
ds = DataSource.select().first()
|
||||
self.assertEqual(ds.type, 'sqlite')
|
||||
self.assertEqual(ds.options._config, {"dbpath": "/tmp/test.db"})
|
||||
|
||||
|
||||
class GroupCommandTests(BaseTestCase):
|
||||
|
||||
def test_create(self):
|
||||
gcount = Group.select().count()
|
||||
perms = ['create_query', 'edit_query', 'view_query']
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(
|
||||
create_group, ['test', '--permissions', ','.join(perms)])
|
||||
print result.output
|
||||
self.assertFalse(result.exception)
|
||||
self.assertEqual(result.exit_code, 0)
|
||||
self.assertEqual(Group.select().count(), gcount + 1)
|
||||
g = Group.select().order_by(Group.id.desc()).first()
|
||||
self.assertEqual(g.org, self.factory.org)
|
||||
self.assertEqual(g.permissions, perms)
|
||||
|
||||
def test_change_permissions(self):
|
||||
g = self.factory.create_group(permissions=['list_dashboards'])
|
||||
g_id = g.id
|
||||
perms = ['create_query', 'edit_query', 'view_query']
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(
|
||||
change_permissions, [str(g_id), '--permissions', ','.join(perms)])
|
||||
self.assertFalse(result.exception)
|
||||
self.assertEqual(result.exit_code, 0)
|
||||
g = Group.select().where(Group.id == g_id).first()
|
||||
self.assertEqual(g.permissions, perms)
|
||||
|
||||
def test_list(self):
|
||||
self.factory.create_group(name='test', permissions=['list_dashboards'])
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(list_group, [])
|
||||
self.assertFalse(result.exception)
|
||||
self.assertEqual(result.exit_code, 0)
|
||||
output = """
|
||||
Id: 1
|
||||
Name: admin
|
||||
Type: builtin
|
||||
Organization: default
|
||||
--------------------
|
||||
Id: 2
|
||||
Name: default
|
||||
Type: builtin
|
||||
Organization: default
|
||||
--------------------
|
||||
Id: 3
|
||||
Name: test
|
||||
Type: regular
|
||||
Organization: default
|
||||
"""
|
||||
self.assertMultiLineEqual(result.output,
|
||||
textwrap.dedent(output).lstrip())
|
||||
|
||||
|
||||
class OrganizationCommandTests(BaseTestCase):
|
||||
def test_set_google_apps_domains(self):
|
||||
domains = ['example.org', 'example.com']
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(set_google_apps_domains, [','.join(domains)])
|
||||
self.assertFalse(result.exception)
|
||||
self.assertEqual(result.exit_code, 0)
|
||||
o = Organization.select().where(
|
||||
Organization.id == self.factory.org.id).first()
|
||||
self.assertEqual(o.google_apps_domains, domains)
|
||||
|
||||
def test_show_google_apps_domains(self):
|
||||
self.factory.org.settings[Organization.SETTING_GOOGLE_APPS_DOMAINS] = [
|
||||
'example.org', 'example.com']
|
||||
self.factory.org.save()
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(show_google_apps_domains, [])
|
||||
self.assertFalse(result.exception)
|
||||
self.assertEqual(result.exit_code, 0)
|
||||
output = """
|
||||
Current list of Google Apps domains: example.org, example.com
|
||||
"""
|
||||
self.assertMultiLineEqual(result.output,
|
||||
textwrap.dedent(output).lstrip())
|
||||
|
||||
def test_list(self):
|
||||
self.factory.create_org(name='test', slug='test_org')
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(list_org, [])
|
||||
self.assertFalse(result.exception)
|
||||
self.assertEqual(result.exit_code, 0)
|
||||
output = """
|
||||
Id: 1
|
||||
Name: Default
|
||||
Slug: default
|
||||
--------------------
|
||||
Id: 2
|
||||
Name: test
|
||||
Slug: test_org
|
||||
"""
|
||||
self.assertMultiLineEqual(result.output,
|
||||
textwrap.dedent(output).lstrip())
|
||||
|
||||
|
||||
class UserCommandTests(BaseTestCase):
|
||||
def test_create_basic(self):
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(
|
||||
create_user, ['foobar@example.com', 'Fred Foobar'],
|
||||
input="password1\npassword1\n")
|
||||
self.assertFalse(result.exception)
|
||||
self.assertEqual(result.exit_code, 0)
|
||||
u = User.select().where(User.email == "foobar@example.com").first()
|
||||
self.assertEqual(u.name, "Fred Foobar")
|
||||
self.assertTrue(u.verify_password('password1'))
|
||||
self.assertEqual(u.groups, [self.factory.default_group.id])
|
||||
|
||||
def test_create_admin(self):
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(
|
||||
create_user, ['foobar@example.com', 'Fred Foobar',
|
||||
'--password', 'password1', '--admin'])
|
||||
self.assertFalse(result.exception)
|
||||
self.assertEqual(result.exit_code, 0)
|
||||
u = User.select().where(User.email == "foobar@example.com").first()
|
||||
self.assertEqual(u.name, "Fred Foobar")
|
||||
self.assertTrue(u.verify_password('password1'))
|
||||
self.assertEqual(u.groups, [self.factory.default_group.id,
|
||||
self.factory.admin_group.id])
|
||||
|
||||
def test_create_googleauth(self):
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(
|
||||
create_user, ['foobar@example.com', 'Fred Foobar', '--google'])
|
||||
self.assertFalse(result.exception)
|
||||
self.assertEqual(result.exit_code, 0)
|
||||
u = User.select().where(User.email == "foobar@example.com").first()
|
||||
self.assertEqual(u.name, "Fred Foobar")
|
||||
self.assertIsNone(u.password_hash)
|
||||
self.assertEqual(u.groups, [self.factory.default_group.id])
|
||||
|
||||
def test_create_bad(self):
|
||||
self.factory.create_user(email='foobar@example.com')
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(
|
||||
create_user, ['foobar@example.com', 'Fred Foobar'],
|
||||
input="password1\npassword1\n")
|
||||
self.assertTrue(result.exception)
|
||||
self.assertEqual(result.exit_code, 1)
|
||||
self.assertIn('Failed', result.output)
|
||||
|
||||
def test_delete(self):
|
||||
self.factory.create_user(email='foobar@example.com')
|
||||
ucount = User.select().count()
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(
|
||||
delete_user, ['foobar@example.com'])
|
||||
self.assertFalse(result.exception)
|
||||
self.assertEqual(result.exit_code, 0)
|
||||
self.assertEqual(User.select().where(User.email ==
|
||||
"foobar@example.com").count(), 0)
|
||||
self.assertEqual(User.select().count(), ucount - 1)
|
||||
|
||||
def test_delete_bad(self):
|
||||
ucount = User.select().count()
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(
|
||||
delete_user, ['foobar@example.com'])
|
||||
self.assertIn('Deleted 0 users', result.output)
|
||||
self.assertEqual(User.select().count(), ucount)
|
||||
|
||||
def test_password(self):
|
||||
self.factory.create_user(email='foobar@example.com')
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(
|
||||
password, ['foobar@example.com', 'xyzzy'])
|
||||
self.assertFalse(result.exception)
|
||||
self.assertEqual(result.exit_code, 0)
|
||||
u = User.select().where(User.email == "foobar@example.com").first()
|
||||
self.assertTrue(u.verify_password('xyzzy'))
|
||||
|
||||
def test_password_bad(self):
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(
|
||||
password, ['foobar@example.com', 'xyzzy'])
|
||||
self.assertTrue(result.exception)
|
||||
self.assertEqual(result.exit_code, 1)
|
||||
self.assertIn('not found', result.output)
|
||||
|
||||
def test_password_bad_org(self):
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(
|
||||
password, ['foobar@example.com', 'xyzzy', '--org', 'default'])
|
||||
self.assertTrue(result.exception)
|
||||
self.assertEqual(result.exit_code, 1)
|
||||
self.assertIn('not found', result.output)
|
||||
|
||||
def test_invite(self):
|
||||
admin = self.factory.create_user(email='redash-admin@example.com')
|
||||
runner = CliRunner()
|
||||
with mock.patch('redash.cli.users.invite_user') as iu:
|
||||
result = runner.invoke(
|
||||
invite, ['foobar@example.com', 'Fred Foobar',
|
||||
'redash-admin@example.com'])
|
||||
self.assertFalse(result.exception)
|
||||
self.assertEqual(result.exit_code, 0)
|
||||
self.assertTrue(iu.called)
|
||||
c = iu.call_args[0]
|
||||
self.assertEqual(c[0].id, self.factory.org.id)
|
||||
self.assertEqual(c[1].id, admin.id)
|
||||
self.assertEqual(c[2].email, 'foobar@example.com')
|
||||
|
||||
|
||||
def test_list(self):
|
||||
self.factory.create_user(name='Fred Foobar',
|
||||
email='foobar@example.com',
|
||||
organization=self.factory.org)
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(list_user, [])
|
||||
self.assertFalse(result.exception)
|
||||
self.assertEqual(result.exit_code, 0)
|
||||
output = """
|
||||
Id: 1
|
||||
Name: Fred Foobar
|
||||
Email: foobar@example.com
|
||||
Organization: Default
|
||||
"""
|
||||
self.assertMultiLineEqual(result.output,
|
||||
textwrap.dedent(output).lstrip())
|
||||
|
||||
def test_grant_admin(self):
|
||||
self.factory.create_user(name='Fred Foobar',
|
||||
email='foobar@example.com',
|
||||
org=self.factory.org,
|
||||
groups=[self.factory.default_group.id])
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(
|
||||
grant_admin, ['foobar@example.com'])
|
||||
self.assertFalse(result.exception)
|
||||
self.assertEqual(result.exit_code, 0)
|
||||
u = User.select().order_by(User.id.desc()).first()
|
||||
self.assertEqual(u.groups, [self.factory.default_group.id,
|
||||
self.factory.admin_group.id])
|
||||
Reference in New Issue
Block a user