forked from Github-Mirrors/canaille
feat: model management commands
This commit is contained in:
parent
5beee67a61
commit
25f2b6dedd
9 changed files with 769 additions and 2 deletions
|
@ -4,6 +4,7 @@
|
||||||
Added
|
Added
|
||||||
^^^^^
|
^^^^^
|
||||||
- Group member removal can be achieved from the group edition page :issue:`192`
|
- Group member removal can be achieved from the group edition page :issue:`192`
|
||||||
|
- Model management commands :issue:`117` :issue:`54`
|
||||||
|
|
||||||
Changed
|
Changed
|
||||||
^^^^^^^
|
^^^^^^^
|
||||||
|
|
283
canaille/backends/commands.py
Normal file
283
canaille/backends/commands.py
Normal file
|
@ -0,0 +1,283 @@
|
||||||
|
import datetime
|
||||||
|
import inspect
|
||||||
|
import json
|
||||||
|
import typing
|
||||||
|
|
||||||
|
import click
|
||||||
|
from flask.cli import AppGroup
|
||||||
|
from flask.cli import with_appcontext
|
||||||
|
|
||||||
|
from canaille.app.commands import with_backendcontext
|
||||||
|
from canaille.app.models import MODELS
|
||||||
|
from canaille.backends import Backend
|
||||||
|
from canaille.backends.models import Model
|
||||||
|
|
||||||
|
|
||||||
|
class ModelCommand(AppGroup):
|
||||||
|
"""CLI commands that takes a model subcommand."""
|
||||||
|
|
||||||
|
def __init__(self, *args, factory, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self.factory = factory
|
||||||
|
|
||||||
|
@with_appcontext
|
||||||
|
def list_commands(self, ctx):
|
||||||
|
base = super().list_commands(ctx)
|
||||||
|
lazy = sorted(MODELS.keys())
|
||||||
|
return base + lazy
|
||||||
|
|
||||||
|
@with_appcontext
|
||||||
|
def get_command(self, ctx, cmd_name):
|
||||||
|
model = MODELS.get(cmd_name)
|
||||||
|
return self.factory(model)
|
||||||
|
|
||||||
|
|
||||||
|
def model_getter(model):
|
||||||
|
"""Return a method that gets a model from its id."""
|
||||||
|
model_name = model.__name__.lower()
|
||||||
|
model = MODELS.get(model_name)
|
||||||
|
|
||||||
|
@with_backendcontext
|
||||||
|
def wrapped(id):
|
||||||
|
return Backend.instance.get(model, id) if id else None
|
||||||
|
|
||||||
|
return wrapped
|
||||||
|
|
||||||
|
|
||||||
|
def click_type(attribute_type):
|
||||||
|
"""Find click type for a given model attribute type."""
|
||||||
|
if typing.get_origin(attribute_type):
|
||||||
|
attribute_type = typing.get_args(attribute_type)[0]
|
||||||
|
|
||||||
|
if typing.get_origin(attribute_type) is typing.Annotated:
|
||||||
|
attribute_type = typing.get_args(attribute_type)[0]
|
||||||
|
|
||||||
|
if issubclass(attribute_type, Model):
|
||||||
|
return model_getter(attribute_type)
|
||||||
|
|
||||||
|
if attribute_type is datetime.datetime:
|
||||||
|
return datetime.datetime.fromisoformat
|
||||||
|
|
||||||
|
return attribute_type
|
||||||
|
|
||||||
|
|
||||||
|
def is_multiple(attribute_type):
|
||||||
|
return typing.get_origin(attribute_type) is list
|
||||||
|
|
||||||
|
|
||||||
|
def register(cli):
|
||||||
|
"""Generate commands using factories that each have one subcommand per
|
||||||
|
available model."""
|
||||||
|
factories = [get_factory, set_factory, create_factory, delete_factory]
|
||||||
|
|
||||||
|
for factory in factories:
|
||||||
|
command_help = inspect.getdoc(factory)
|
||||||
|
name = factory.__name__.replace("_factory", "")
|
||||||
|
|
||||||
|
@cli.command(cls=ModelCommand, factory=factory, name=name, help=command_help)
|
||||||
|
def factory_command(): ...
|
||||||
|
|
||||||
|
|
||||||
|
def serialize(instance):
|
||||||
|
"""Quick and dirty serialization method.
|
||||||
|
|
||||||
|
This can probably be made simpler when we will use pydantic models.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def serialize_attribute(attribute_name, value):
|
||||||
|
multiple = is_multiple(instance.attributes[attribute_name])
|
||||||
|
if multiple and isinstance(value, list):
|
||||||
|
return [serialize_attribute(attribute_name, v) for v in value]
|
||||||
|
|
||||||
|
model, _ = instance.get_model_annotations(attribute_name)
|
||||||
|
if model:
|
||||||
|
return value.id
|
||||||
|
|
||||||
|
anonymized = ("password",)
|
||||||
|
if attribute_name in anonymized and value:
|
||||||
|
return "***"
|
||||||
|
|
||||||
|
if isinstance(value, datetime.datetime):
|
||||||
|
return value.isoformat()
|
||||||
|
|
||||||
|
return value
|
||||||
|
|
||||||
|
result = {}
|
||||||
|
for attribute in instance.attributes:
|
||||||
|
if serialized := serialize_attribute(attribute, getattr(instance, attribute)):
|
||||||
|
result[attribute] = serialized
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def get_factory(model):
|
||||||
|
"""Read informations about models.
|
||||||
|
|
||||||
|
Options can be used to filter models::
|
||||||
|
|
||||||
|
canaille get user --given-name John --last-name Doe
|
||||||
|
|
||||||
|
Displays the matching models in JSON format in the standard output.
|
||||||
|
"""
|
||||||
|
|
||||||
|
command_help = f"""Search for {model.__name__.lower()}s and display the
|
||||||
|
matching models as JSON."""
|
||||||
|
|
||||||
|
@click.command(name=model.__name__.lower(), help=command_help)
|
||||||
|
@with_appcontext
|
||||||
|
@with_backendcontext
|
||||||
|
def command(*args, **kwargs):
|
||||||
|
filter = {
|
||||||
|
attribute: value for attribute, value in kwargs.items() if value is not None
|
||||||
|
}
|
||||||
|
items = Backend.instance.query(model, **filter)
|
||||||
|
output = json.dumps([serialize(item) for item in items])
|
||||||
|
click.echo(output)
|
||||||
|
|
||||||
|
for attribute, attribute_type in model.attributes.items():
|
||||||
|
slug = attribute.replace("_", "-")
|
||||||
|
click.option(f"--{slug}", type=click_type(attribute_type))(command)
|
||||||
|
|
||||||
|
return command
|
||||||
|
|
||||||
|
|
||||||
|
def set_factory(model):
|
||||||
|
"""Update models.
|
||||||
|
|
||||||
|
The command takes an model ID and edit one or several attributes::
|
||||||
|
|
||||||
|
canaille set user 229d112e-1bb5-452f-b2ac-f7680ffe7fb8 --given-name Jack
|
||||||
|
|
||||||
|
Displays the edited model in JSON format in the standard output.
|
||||||
|
"""
|
||||||
|
|
||||||
|
command_help = f"""Update a {model.__name__.lower()} and display the
|
||||||
|
edited model in JSON format in the standard output.
|
||||||
|
|
||||||
|
IDENTIFIER should be a {model.__name__.lower()} id or
|
||||||
|
{model.identifier_attribute}
|
||||||
|
"""
|
||||||
|
|
||||||
|
@click.command(name=model.__name__.lower(), help=command_help)
|
||||||
|
@with_appcontext
|
||||||
|
@with_backendcontext
|
||||||
|
@click.argument("identifier")
|
||||||
|
def command(*args, identifier, **kwargs):
|
||||||
|
instance = Backend.instance.get(model, identifier)
|
||||||
|
if not instance:
|
||||||
|
raise click.ClickException(
|
||||||
|
f"No {model.__name__.lower()} with id '{identifier}'"
|
||||||
|
)
|
||||||
|
|
||||||
|
for attribute, value in kwargs.items():
|
||||||
|
multiple = is_multiple(model.attributes[attribute])
|
||||||
|
if multiple:
|
||||||
|
if value != ():
|
||||||
|
value = [v for v in value if v]
|
||||||
|
setattr(instance, attribute, value)
|
||||||
|
|
||||||
|
elif value is not None:
|
||||||
|
setattr(instance, attribute, value or None)
|
||||||
|
|
||||||
|
try:
|
||||||
|
Backend.instance.save(instance)
|
||||||
|
except Exception as exc: # pragma: no cover
|
||||||
|
raise click.ClickException(exc) from exc
|
||||||
|
|
||||||
|
output = json.dumps(serialize(instance))
|
||||||
|
click.echo(output)
|
||||||
|
|
||||||
|
attributes = dict(model.attributes)
|
||||||
|
del attributes["id"]
|
||||||
|
for attribute, attribute_type in attributes.items():
|
||||||
|
slug = attribute.replace("_", "-")
|
||||||
|
click.option(
|
||||||
|
f"--{slug}",
|
||||||
|
type=click_type(attribute_type),
|
||||||
|
multiple=is_multiple(attribute_type),
|
||||||
|
)(command)
|
||||||
|
|
||||||
|
return command
|
||||||
|
|
||||||
|
|
||||||
|
def create_factory(model):
|
||||||
|
"""Create models.
|
||||||
|
|
||||||
|
The model attributes can be passed as command options::
|
||||||
|
|
||||||
|
canaille create user --given-name John --last-name Doe
|
||||||
|
|
||||||
|
Displays the created model in JSON format in the standard output.
|
||||||
|
"""
|
||||||
|
|
||||||
|
command_help = f"""Create a new {model.__name__.lower()} and display the
|
||||||
|
created model in JSON format in the standard output.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@click.command(name=model.__name__.lower(), help=command_help)
|
||||||
|
@with_appcontext
|
||||||
|
@with_backendcontext
|
||||||
|
def command(*args, **kwargs):
|
||||||
|
attributes = {}
|
||||||
|
for attribute, value in kwargs.items():
|
||||||
|
multiple = is_multiple(model.attributes[attribute])
|
||||||
|
if multiple:
|
||||||
|
value = list(value)
|
||||||
|
|
||||||
|
if value is not None and value != []:
|
||||||
|
attributes[attribute] = value
|
||||||
|
|
||||||
|
instance = model(**attributes)
|
||||||
|
|
||||||
|
try:
|
||||||
|
Backend.instance.save(instance)
|
||||||
|
except Exception as exc: # pragma: no cover
|
||||||
|
raise click.ClickException(exc) from exc
|
||||||
|
|
||||||
|
output = json.dumps(serialize(instance))
|
||||||
|
click.echo(output)
|
||||||
|
|
||||||
|
attributes = dict(model.attributes)
|
||||||
|
del attributes["id"]
|
||||||
|
for attribute, attribute_type in attributes.items():
|
||||||
|
slug = attribute.replace("_", "-")
|
||||||
|
click.option(
|
||||||
|
f"--{slug}",
|
||||||
|
type=click_type(attribute_type),
|
||||||
|
multiple=is_multiple(attribute_type),
|
||||||
|
)(command)
|
||||||
|
|
||||||
|
return command
|
||||||
|
|
||||||
|
|
||||||
|
def delete_factory(model):
|
||||||
|
"""Delete models.
|
||||||
|
|
||||||
|
The command takes a model ID and deletes it::
|
||||||
|
|
||||||
|
canaille delete user --id 229d112e-1bb5-452f-b2ac-f7680ffe7fb8
|
||||||
|
"""
|
||||||
|
|
||||||
|
command_help = f"""Delete a {model.__name__.lower()}.
|
||||||
|
|
||||||
|
IDENTIFIER should be a {model.__name__.lower()} id or
|
||||||
|
{model.identifier_attribute}
|
||||||
|
"""
|
||||||
|
|
||||||
|
@click.command(name=model.__name__.lower(), help=command_help)
|
||||||
|
@with_appcontext
|
||||||
|
@with_backendcontext
|
||||||
|
@click.argument("identifier")
|
||||||
|
def command(*args, identifier, **kwargs):
|
||||||
|
instance = Backend.instance.get(model, identifier)
|
||||||
|
if not instance:
|
||||||
|
raise click.ClickException(
|
||||||
|
f"No {model.__name__.lower()} with id '{identifier}'"
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
Backend.instance.delete(instance)
|
||||||
|
except Exception as exc: # pragma: no cover
|
||||||
|
raise click.ClickException(exc) from exc
|
||||||
|
|
||||||
|
return command
|
|
@ -2,6 +2,7 @@ import click
|
||||||
from flask.cli import FlaskGroup
|
from flask.cli import FlaskGroup
|
||||||
|
|
||||||
import canaille.app.commands
|
import canaille.app.commands
|
||||||
|
import canaille.backends.commands
|
||||||
import canaille.core.commands
|
import canaille.core.commands
|
||||||
import canaille.oidc.commands
|
import canaille.oidc.commands
|
||||||
from canaille import create_app
|
from canaille import create_app
|
||||||
|
@ -18,5 +19,6 @@ def cli():
|
||||||
|
|
||||||
|
|
||||||
canaille.app.commands.register(cli)
|
canaille.app.commands.register(cli)
|
||||||
|
canaille.backends.commands.register(cli)
|
||||||
canaille.core.commands.register(cli)
|
canaille.core.commands.register(cli)
|
||||||
canaille.oidc.commands.register(cli)
|
canaille.oidc.commands.register(cli)
|
||||||
|
|
60
doc/commands.py
Normal file
60
doc/commands.py
Normal file
|
@ -0,0 +1,60 @@
|
||||||
|
"""Temporary workaround for https://github.com/click-contrib/sphinx-click/issues/139"""
|
||||||
|
|
||||||
|
import inspect
|
||||||
|
|
||||||
|
import click
|
||||||
|
|
||||||
|
from canaille.backends.commands import create_factory
|
||||||
|
from canaille.backends.commands import delete_factory
|
||||||
|
from canaille.backends.commands import get_factory
|
||||||
|
from canaille.backends.commands import set_factory
|
||||||
|
from canaille.core.models import Group
|
||||||
|
from canaille.core.models import User
|
||||||
|
from canaille.oidc.basemodels import AuthorizationCode
|
||||||
|
from canaille.oidc.basemodels import Client
|
||||||
|
from canaille.oidc.basemodels import Consent
|
||||||
|
from canaille.oidc.basemodels import Token
|
||||||
|
|
||||||
|
MODELS = {
|
||||||
|
"user": User,
|
||||||
|
"group": Group,
|
||||||
|
"client": Client,
|
||||||
|
"authorizationcode": AuthorizationCode,
|
||||||
|
"token": Token,
|
||||||
|
"consent": Consent,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class ModelCommand(click.Group):
|
||||||
|
def __init__(self, *args, factory, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self.factory = factory
|
||||||
|
|
||||||
|
def list_commands(self, ctx):
|
||||||
|
base = super().list_commands(ctx)
|
||||||
|
lazy = sorted(MODELS.keys())
|
||||||
|
return base + lazy
|
||||||
|
|
||||||
|
def get_command(self, ctx, cmd_name):
|
||||||
|
model = MODELS.get(cmd_name)
|
||||||
|
return self.factory(model)
|
||||||
|
|
||||||
|
|
||||||
|
@click.command(cls=ModelCommand, factory=get_factory, help=inspect.getdoc(get_factory))
|
||||||
|
def get(): ...
|
||||||
|
|
||||||
|
|
||||||
|
@click.command(cls=ModelCommand, factory=set_factory, help=inspect.getdoc(set_factory))
|
||||||
|
def set(): ...
|
||||||
|
|
||||||
|
|
||||||
|
@click.command(
|
||||||
|
cls=ModelCommand, factory=create_factory, help=inspect.getdoc(create_factory)
|
||||||
|
)
|
||||||
|
def create(): ...
|
||||||
|
|
||||||
|
|
||||||
|
@click.command(
|
||||||
|
cls=ModelCommand, factory=delete_factory, help=inspect.getdoc(delete_factory)
|
||||||
|
)
|
||||||
|
def delete(): ...
|
|
@ -1,6 +1,36 @@
|
||||||
Command Line Interface
|
Command Line Interface
|
||||||
======================
|
======================
|
||||||
|
|
||||||
.. click:: canaille.commands:cli
|
Canaille provide several commands to help administrator manage their data.
|
||||||
:prog: canaille
|
|
||||||
|
.. click:: canaille.app.commands:check
|
||||||
|
:prog: canaille check
|
||||||
|
:nested: full
|
||||||
|
|
||||||
|
.. click:: canaille.oidc.commands:clean
|
||||||
|
:prog: canaille clean
|
||||||
|
:nested: full
|
||||||
|
|
||||||
|
.. click:: canaille.app.commands:install
|
||||||
|
:prog: canaille install
|
||||||
|
:nested: full
|
||||||
|
|
||||||
|
.. click:: canaille.core.commands:populate
|
||||||
|
:prog: canaille populate
|
||||||
|
:nested: full
|
||||||
|
|
||||||
|
.. click:: doc.commands:get
|
||||||
|
:prog: canaille get
|
||||||
|
:nested: full
|
||||||
|
|
||||||
|
.. click:: doc.commands:set
|
||||||
|
:prog: canaille set
|
||||||
|
:nested: full
|
||||||
|
|
||||||
|
.. click:: doc.commands:create
|
||||||
|
:prog: canaille create
|
||||||
|
:nested: full
|
||||||
|
|
||||||
|
.. click:: doc.commands:delete
|
||||||
|
:prog: canaille delete
|
||||||
:nested: full
|
:nested: full
|
||||||
|
|
59
tests/app/commands/test_create.py
Normal file
59
tests/app/commands/test_create.py
Normal file
|
@ -0,0 +1,59 @@
|
||||||
|
import datetime
|
||||||
|
import json
|
||||||
|
from unittest import mock
|
||||||
|
|
||||||
|
from canaille.app import models
|
||||||
|
from canaille.commands import cli
|
||||||
|
|
||||||
|
|
||||||
|
def test_create(testclient, backend, foo_group):
|
||||||
|
"""Nominal case test for model create command."""
|
||||||
|
|
||||||
|
runner = testclient.app.test_cli_runner()
|
||||||
|
res = runner.invoke(
|
||||||
|
cli,
|
||||||
|
[
|
||||||
|
"create",
|
||||||
|
"user",
|
||||||
|
"--formatted-name",
|
||||||
|
"Johnny",
|
||||||
|
"--emails",
|
||||||
|
"foo@example.org",
|
||||||
|
"--emails",
|
||||||
|
"bar@example.org",
|
||||||
|
"--given-name",
|
||||||
|
"John",
|
||||||
|
"--family-name",
|
||||||
|
"Doe",
|
||||||
|
"--user-name",
|
||||||
|
"johnny",
|
||||||
|
"--groups",
|
||||||
|
foo_group.id,
|
||||||
|
"--lock-date",
|
||||||
|
"2050-01-01T10:10:10+00:00",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
assert res.exit_code == 0, res.stdout
|
||||||
|
output = json.loads(res.stdout)
|
||||||
|
assert output == {
|
||||||
|
"formatted_name": "Johnny",
|
||||||
|
"created": mock.ANY,
|
||||||
|
"last_modified": mock.ANY,
|
||||||
|
"emails": [
|
||||||
|
"foo@example.org",
|
||||||
|
"bar@example.org",
|
||||||
|
],
|
||||||
|
"family_name": "Doe",
|
||||||
|
"given_name": "John",
|
||||||
|
"id": mock.ANY,
|
||||||
|
"user_name": "johnny",
|
||||||
|
"groups": [foo_group.id],
|
||||||
|
"lock_date": "2050-01-01T10:10:10+00:00",
|
||||||
|
}
|
||||||
|
user = backend.get(models.User, output["id"])
|
||||||
|
backend.reload(foo_group)
|
||||||
|
assert user.groups == [foo_group]
|
||||||
|
assert user.lock_date == datetime.datetime(
|
||||||
|
2050, 1, 1, 10, 10, 10, tzinfo=datetime.timezone.utc
|
||||||
|
)
|
||||||
|
backend.delete(user)
|
45
tests/app/commands/test_delete.py
Normal file
45
tests/app/commands/test_delete.py
Normal file
|
@ -0,0 +1,45 @@
|
||||||
|
from canaille.app import models
|
||||||
|
from canaille.commands import cli
|
||||||
|
|
||||||
|
|
||||||
|
def test_delete_by_id(testclient, backend):
|
||||||
|
"""Remove a model identified by its id."""
|
||||||
|
user = models.User(
|
||||||
|
formatted_name="Foo bar",
|
||||||
|
family_name="Bar",
|
||||||
|
emails=["foobar@example.org"],
|
||||||
|
user_name="foobar",
|
||||||
|
)
|
||||||
|
backend.save(user)
|
||||||
|
|
||||||
|
runner = testclient.app.test_cli_runner()
|
||||||
|
res = runner.invoke(cli, ["delete", "user", user.id])
|
||||||
|
assert res.exit_code == 0, res.stdout
|
||||||
|
|
||||||
|
assert not backend.get(models.User, user_name="foobar")
|
||||||
|
|
||||||
|
|
||||||
|
def test_delete_by_identifier(testclient, backend):
|
||||||
|
"""Remove a model identified by its identifier."""
|
||||||
|
user = models.User(
|
||||||
|
formatted_name="Foo bar",
|
||||||
|
family_name="Bar",
|
||||||
|
emails=["foobar@example.org"],
|
||||||
|
user_name="foobar",
|
||||||
|
)
|
||||||
|
backend.save(user)
|
||||||
|
|
||||||
|
runner = testclient.app.test_cli_runner()
|
||||||
|
res = runner.invoke(cli, ["delete", "user", "foobar"])
|
||||||
|
assert res.exit_code == 0, res.stdout
|
||||||
|
|
||||||
|
assert not backend.get(models.User, user_name="foobar")
|
||||||
|
|
||||||
|
|
||||||
|
def test_delete_unknown_id(testclient, backend):
|
||||||
|
"""Error case for trying to set a value for an invalid object."""
|
||||||
|
|
||||||
|
runner = testclient.app.test_cli_runner()
|
||||||
|
res = runner.invoke(cli, ["delete", "user", "invalid"])
|
||||||
|
assert res.exit_code == 1, res.stdout
|
||||||
|
assert res.stdout == "Error: No user with id 'invalid'\n"
|
106
tests/app/commands/test_get.py
Normal file
106
tests/app/commands/test_get.py
Normal file
|
@ -0,0 +1,106 @@
|
||||||
|
import json
|
||||||
|
from unittest import mock
|
||||||
|
|
||||||
|
from canaille.commands import cli
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_list_models(testclient, backend, user):
|
||||||
|
"""Nominal case test for model get command."""
|
||||||
|
|
||||||
|
runner = testclient.app.test_cli_runner()
|
||||||
|
res = runner.invoke(cli, ["get"])
|
||||||
|
assert res.exit_code == 0, res.stdout
|
||||||
|
models = ("user", "group")
|
||||||
|
for model in models:
|
||||||
|
assert model in res.stdout
|
||||||
|
|
||||||
|
|
||||||
|
def test_get(testclient, backend, user):
|
||||||
|
"""Nominal case test for model get command."""
|
||||||
|
|
||||||
|
runner = testclient.app.test_cli_runner()
|
||||||
|
res = runner.invoke(cli, ["get", "user"])
|
||||||
|
assert res.exit_code == 0, res.stdout
|
||||||
|
assert json.loads(res.stdout) == [
|
||||||
|
{
|
||||||
|
"created": mock.ANY,
|
||||||
|
"display_name": "Johnny",
|
||||||
|
"emails": [
|
||||||
|
"john@doe.com",
|
||||||
|
],
|
||||||
|
"family_name": "Doe",
|
||||||
|
"formatted_address": "1235, somewhere",
|
||||||
|
"formatted_name": "John (johnny) Doe",
|
||||||
|
"given_name": "John",
|
||||||
|
"id": user.id,
|
||||||
|
"last_modified": mock.ANY,
|
||||||
|
"password": "***",
|
||||||
|
"phone_numbers": [
|
||||||
|
"555-000-000",
|
||||||
|
],
|
||||||
|
"preferred_language": "en",
|
||||||
|
"profile_url": "https://john.example",
|
||||||
|
"user_name": "user",
|
||||||
|
},
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_model_filter(testclient, backend, user, admin, foo_group):
|
||||||
|
"""Test model get filter."""
|
||||||
|
|
||||||
|
runner = testclient.app.test_cli_runner()
|
||||||
|
res = runner.invoke(cli, ["get", "user", "--groups", foo_group.id])
|
||||||
|
assert res.exit_code == 0, res.stdout
|
||||||
|
assert json.loads(res.stdout) == [
|
||||||
|
{
|
||||||
|
"created": mock.ANY,
|
||||||
|
"display_name": "Johnny",
|
||||||
|
"emails": [
|
||||||
|
"john@doe.com",
|
||||||
|
],
|
||||||
|
"family_name": "Doe",
|
||||||
|
"formatted_address": "1235, somewhere",
|
||||||
|
"formatted_name": "John (johnny) Doe",
|
||||||
|
"given_name": "John",
|
||||||
|
"id": user.id,
|
||||||
|
"last_modified": mock.ANY,
|
||||||
|
"password": "***",
|
||||||
|
"phone_numbers": [
|
||||||
|
"555-000-000",
|
||||||
|
],
|
||||||
|
"preferred_language": "en",
|
||||||
|
"profile_url": "https://john.example",
|
||||||
|
"user_name": "user",
|
||||||
|
"groups": [foo_group.id],
|
||||||
|
},
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_datetime_filter(testclient, backend, user):
|
||||||
|
"""Test model get filter."""
|
||||||
|
|
||||||
|
runner = testclient.app.test_cli_runner()
|
||||||
|
res = runner.invoke(cli, ["get", "user", "--created", user.created.isoformat()])
|
||||||
|
assert res.exit_code == 0, res.stdout
|
||||||
|
assert json.loads(res.stdout) == [
|
||||||
|
{
|
||||||
|
"created": mock.ANY,
|
||||||
|
"display_name": "Johnny",
|
||||||
|
"emails": [
|
||||||
|
"john@doe.com",
|
||||||
|
],
|
||||||
|
"family_name": "Doe",
|
||||||
|
"formatted_address": "1235, somewhere",
|
||||||
|
"formatted_name": "John (johnny) Doe",
|
||||||
|
"given_name": "John",
|
||||||
|
"id": user.id,
|
||||||
|
"last_modified": mock.ANY,
|
||||||
|
"password": "***",
|
||||||
|
"phone_numbers": [
|
||||||
|
"555-000-000",
|
||||||
|
],
|
||||||
|
"preferred_language": "en",
|
||||||
|
"profile_url": "https://john.example",
|
||||||
|
"user_name": "user",
|
||||||
|
},
|
||||||
|
]
|
181
tests/app/commands/test_set.py
Normal file
181
tests/app/commands/test_set.py
Normal file
|
@ -0,0 +1,181 @@
|
||||||
|
import json
|
||||||
|
from unittest import mock
|
||||||
|
|
||||||
|
from canaille.commands import cli
|
||||||
|
|
||||||
|
|
||||||
|
def test_set_string_by_id(testclient, backend, user):
|
||||||
|
"""Set a string attribute to a model identifier by its id."""
|
||||||
|
|
||||||
|
runner = testclient.app.test_cli_runner()
|
||||||
|
res = runner.invoke(cli, ["set", "user", user.id, "--given-name", "foobar"])
|
||||||
|
assert res.exit_code == 0, res.stdout
|
||||||
|
assert json.loads(res.stdout) == {
|
||||||
|
"created": mock.ANY,
|
||||||
|
"display_name": "Johnny",
|
||||||
|
"emails": [
|
||||||
|
"john@doe.com",
|
||||||
|
],
|
||||||
|
"family_name": "Doe",
|
||||||
|
"formatted_address": "1235, somewhere",
|
||||||
|
"formatted_name": "John (johnny) Doe",
|
||||||
|
"given_name": "foobar",
|
||||||
|
"id": user.id,
|
||||||
|
"last_modified": mock.ANY,
|
||||||
|
"password": "***",
|
||||||
|
"phone_numbers": [
|
||||||
|
"555-000-000",
|
||||||
|
],
|
||||||
|
"preferred_language": "en",
|
||||||
|
"profile_url": "https://john.example",
|
||||||
|
"user_name": "user",
|
||||||
|
}
|
||||||
|
backend.reload(user)
|
||||||
|
assert user.given_name == "foobar"
|
||||||
|
|
||||||
|
|
||||||
|
def test_set_string_by_identifier(testclient, backend, user):
|
||||||
|
"""Set a string attribute to a model identifier by its identifier."""
|
||||||
|
|
||||||
|
runner = testclient.app.test_cli_runner()
|
||||||
|
res = runner.invoke(cli, ["set", "user", "user", "--given-name", "foobar"])
|
||||||
|
assert res.exit_code == 0, res.stdout
|
||||||
|
assert json.loads(res.stdout) == {
|
||||||
|
"created": mock.ANY,
|
||||||
|
"display_name": "Johnny",
|
||||||
|
"emails": [
|
||||||
|
"john@doe.com",
|
||||||
|
],
|
||||||
|
"family_name": "Doe",
|
||||||
|
"formatted_address": "1235, somewhere",
|
||||||
|
"formatted_name": "John (johnny) Doe",
|
||||||
|
"given_name": "foobar",
|
||||||
|
"id": user.id,
|
||||||
|
"last_modified": mock.ANY,
|
||||||
|
"password": "***",
|
||||||
|
"phone_numbers": [
|
||||||
|
"555-000-000",
|
||||||
|
],
|
||||||
|
"preferred_language": "en",
|
||||||
|
"profile_url": "https://john.example",
|
||||||
|
"user_name": "user",
|
||||||
|
}
|
||||||
|
backend.reload(user)
|
||||||
|
assert user.given_name == "foobar"
|
||||||
|
|
||||||
|
|
||||||
|
def test_set_multiple(testclient, backend, user):
|
||||||
|
"""Test setting several emails."""
|
||||||
|
|
||||||
|
runner = testclient.app.test_cli_runner()
|
||||||
|
res = runner.invoke(
|
||||||
|
cli,
|
||||||
|
[
|
||||||
|
"set",
|
||||||
|
"user",
|
||||||
|
user.id,
|
||||||
|
"--emails",
|
||||||
|
"foo@example.org",
|
||||||
|
"--emails",
|
||||||
|
"bar@example.org",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
assert res.exit_code == 0, res.stdout
|
||||||
|
assert json.loads(res.stdout) == {
|
||||||
|
"created": mock.ANY,
|
||||||
|
"display_name": "Johnny",
|
||||||
|
"emails": [
|
||||||
|
"foo@example.org",
|
||||||
|
"bar@example.org",
|
||||||
|
],
|
||||||
|
"family_name": "Doe",
|
||||||
|
"formatted_address": "1235, somewhere",
|
||||||
|
"formatted_name": "John (johnny) Doe",
|
||||||
|
"given_name": "John",
|
||||||
|
"id": user.id,
|
||||||
|
"last_modified": mock.ANY,
|
||||||
|
"password": "***",
|
||||||
|
"phone_numbers": [
|
||||||
|
"555-000-000",
|
||||||
|
],
|
||||||
|
"preferred_language": "en",
|
||||||
|
"profile_url": "https://john.example",
|
||||||
|
"user_name": "user",
|
||||||
|
}
|
||||||
|
backend.reload(user)
|
||||||
|
assert user.emails == [
|
||||||
|
"foo@example.org",
|
||||||
|
"bar@example.org",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_set_unknown_id(testclient, backend):
|
||||||
|
"""Error case for trying to set a value for an invalid object."""
|
||||||
|
|
||||||
|
runner = testclient.app.test_cli_runner()
|
||||||
|
res = runner.invoke(cli, ["set", "user", "invalid", "--given-name", "foobar"])
|
||||||
|
assert res.exit_code == 1, res.stdout
|
||||||
|
assert res.stdout == "Error: No user with id 'invalid'\n"
|
||||||
|
|
||||||
|
|
||||||
|
def test_set_remove_simple_attribute(testclient, backend, user, admin):
|
||||||
|
"""Test to remove a non multiple attribute."""
|
||||||
|
assert user.formatted_address is not None
|
||||||
|
|
||||||
|
runner = testclient.app.test_cli_runner()
|
||||||
|
res = runner.invoke(cli, ["set", "user", user.id, "--formatted-address", ""])
|
||||||
|
assert res.exit_code == 0, res.stdout
|
||||||
|
assert json.loads(res.stdout) == {
|
||||||
|
"created": mock.ANY,
|
||||||
|
"display_name": "Johnny",
|
||||||
|
"emails": [
|
||||||
|
"john@doe.com",
|
||||||
|
],
|
||||||
|
"family_name": "Doe",
|
||||||
|
"formatted_name": "John (johnny) Doe",
|
||||||
|
"given_name": "John",
|
||||||
|
"id": user.id,
|
||||||
|
"last_modified": mock.ANY,
|
||||||
|
"password": "***",
|
||||||
|
"phone_numbers": [
|
||||||
|
"555-000-000",
|
||||||
|
],
|
||||||
|
"preferred_language": "en",
|
||||||
|
"profile_url": "https://john.example",
|
||||||
|
"user_name": "user",
|
||||||
|
}
|
||||||
|
backend.reload(user)
|
||||||
|
assert user.formatted_address is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_set_remove_multiple_attribute(testclient, backend, user, admin, foo_group):
|
||||||
|
"""Test to remove a non multiple attribute."""
|
||||||
|
foo_group.members = [user, admin]
|
||||||
|
backend.save(foo_group)
|
||||||
|
assert user.groups == [foo_group]
|
||||||
|
|
||||||
|
runner = testclient.app.test_cli_runner()
|
||||||
|
res = runner.invoke(cli, ["set", "user", user.id, "--groups", ""])
|
||||||
|
assert res.exit_code == 0, res.stdout
|
||||||
|
assert json.loads(res.stdout) == {
|
||||||
|
"created": mock.ANY,
|
||||||
|
"display_name": "Johnny",
|
||||||
|
"emails": [
|
||||||
|
"john@doe.com",
|
||||||
|
],
|
||||||
|
"family_name": "Doe",
|
||||||
|
"formatted_name": "John (johnny) Doe",
|
||||||
|
"formatted_address": "1235, somewhere",
|
||||||
|
"given_name": "John",
|
||||||
|
"id": user.id,
|
||||||
|
"last_modified": mock.ANY,
|
||||||
|
"password": "***",
|
||||||
|
"phone_numbers": [
|
||||||
|
"555-000-000",
|
||||||
|
],
|
||||||
|
"preferred_language": "en",
|
||||||
|
"profile_url": "https://john.example",
|
||||||
|
"user_name": "user",
|
||||||
|
}
|
||||||
|
backend.reload(user)
|
||||||
|
assert user.groups == []
|
Loading…
Reference in a new issue