Merge branch 'issue-117-management-commands' into 'main'

model management commands

Closes #117 et #54

See merge request yaal/canaille!174
This commit is contained in:
Éloi Rivard 2024-05-13 12:17:43 +00:00
commit 72931c1a0e
9 changed files with 769 additions and 2 deletions

View file

@ -4,6 +4,7 @@
Added
^^^^^
- Group member removal can be achieved from the group edition page :issue:`192`
- Model management commands :issue:`117` :issue:`54`
Changed
^^^^^^^

View file

@ -0,0 +1,283 @@
import datetime
import inspect
import json
import typing
import click
from flask.cli import AppGroup
from flask.cli import with_appcontext
from canaille.app.commands import with_backendcontext
from canaille.app.models import MODELS
from canaille.backends import Backend
from canaille.backends.models import Model
class ModelCommand(AppGroup):
"""CLI commands that takes a model subcommand."""
def __init__(self, *args, factory, **kwargs):
super().__init__(*args, **kwargs)
self.factory = factory
@with_appcontext
def list_commands(self, ctx):
base = super().list_commands(ctx)
lazy = sorted(MODELS.keys())
return base + lazy
@with_appcontext
def get_command(self, ctx, cmd_name):
model = MODELS.get(cmd_name)
return self.factory(model)
def model_getter(model):
"""Return a method that gets a model from its id."""
model_name = model.__name__.lower()
model = MODELS.get(model_name)
@with_backendcontext
def wrapped(id):
return Backend.instance.get(model, id) if id else None
return wrapped
def click_type(attribute_type):
"""Find click type for a given model attribute type."""
if typing.get_origin(attribute_type):
attribute_type = typing.get_args(attribute_type)[0]
if typing.get_origin(attribute_type) is typing.Annotated:
attribute_type = typing.get_args(attribute_type)[0]
if issubclass(attribute_type, Model):
return model_getter(attribute_type)
if attribute_type is datetime.datetime:
return datetime.datetime.fromisoformat
return attribute_type
def is_multiple(attribute_type):
return typing.get_origin(attribute_type) is list
def register(cli):
"""Generate commands using factories that each have one subcommand per
available model."""
factories = [get_factory, set_factory, create_factory, delete_factory]
for factory in factories:
command_help = inspect.getdoc(factory)
name = factory.__name__.replace("_factory", "")
@cli.command(cls=ModelCommand, factory=factory, name=name, help=command_help)
def factory_command(): ...
def serialize(instance):
"""Quick and dirty serialization method.
This can probably be made simpler when we will use pydantic models.
"""
def serialize_attribute(attribute_name, value):
multiple = is_multiple(instance.attributes[attribute_name])
if multiple and isinstance(value, list):
return [serialize_attribute(attribute_name, v) for v in value]
model, _ = instance.get_model_annotations(attribute_name)
if model:
return value.id
anonymized = ("password",)
if attribute_name in anonymized and value:
return "***"
if isinstance(value, datetime.datetime):
return value.isoformat()
return value
result = {}
for attribute in instance.attributes:
if serialized := serialize_attribute(attribute, getattr(instance, attribute)):
result[attribute] = serialized
return result
def get_factory(model):
"""Read informations about models.
Options can be used to filter models::
canaille get user --given-name John --last-name Doe
Displays the matching models in JSON format in the standard output.
"""
command_help = f"""Search for {model.__name__.lower()}s and display the
matching models as JSON."""
@click.command(name=model.__name__.lower(), help=command_help)
@with_appcontext
@with_backendcontext
def command(*args, **kwargs):
filter = {
attribute: value for attribute, value in kwargs.items() if value is not None
}
items = Backend.instance.query(model, **filter)
output = json.dumps([serialize(item) for item in items])
click.echo(output)
for attribute, attribute_type in model.attributes.items():
slug = attribute.replace("_", "-")
click.option(f"--{slug}", type=click_type(attribute_type))(command)
return command
def set_factory(model):
"""Update models.
The command takes an model ID and edit one or several attributes::
canaille set user 229d112e-1bb5-452f-b2ac-f7680ffe7fb8 --given-name Jack
Displays the edited model in JSON format in the standard output.
"""
command_help = f"""Update a {model.__name__.lower()} and display the
edited model in JSON format in the standard output.
IDENTIFIER should be a {model.__name__.lower()} id or
{model.identifier_attribute}
"""
@click.command(name=model.__name__.lower(), help=command_help)
@with_appcontext
@with_backendcontext
@click.argument("identifier")
def command(*args, identifier, **kwargs):
instance = Backend.instance.get(model, identifier)
if not instance:
raise click.ClickException(
f"No {model.__name__.lower()} with id '{identifier}'"
)
for attribute, value in kwargs.items():
multiple = is_multiple(model.attributes[attribute])
if multiple:
if value != ():
value = [v for v in value if v]
setattr(instance, attribute, value)
elif value is not None:
setattr(instance, attribute, value or None)
try:
Backend.instance.save(instance)
except Exception as exc: # pragma: no cover
raise click.ClickException(exc) from exc
output = json.dumps(serialize(instance))
click.echo(output)
attributes = dict(model.attributes)
del attributes["id"]
for attribute, attribute_type in attributes.items():
slug = attribute.replace("_", "-")
click.option(
f"--{slug}",
type=click_type(attribute_type),
multiple=is_multiple(attribute_type),
)(command)
return command
def create_factory(model):
"""Create models.
The model attributes can be passed as command options::
canaille create user --given-name John --last-name Doe
Displays the created model in JSON format in the standard output.
"""
command_help = f"""Create a new {model.__name__.lower()} and display the
created model in JSON format in the standard output.
"""
@click.command(name=model.__name__.lower(), help=command_help)
@with_appcontext
@with_backendcontext
def command(*args, **kwargs):
attributes = {}
for attribute, value in kwargs.items():
multiple = is_multiple(model.attributes[attribute])
if multiple:
value = list(value)
if value is not None and value != []:
attributes[attribute] = value
instance = model(**attributes)
try:
Backend.instance.save(instance)
except Exception as exc: # pragma: no cover
raise click.ClickException(exc) from exc
output = json.dumps(serialize(instance))
click.echo(output)
attributes = dict(model.attributes)
del attributes["id"]
for attribute, attribute_type in attributes.items():
slug = attribute.replace("_", "-")
click.option(
f"--{slug}",
type=click_type(attribute_type),
multiple=is_multiple(attribute_type),
)(command)
return command
def delete_factory(model):
"""Delete models.
The command takes a model ID and deletes it::
canaille delete user --id 229d112e-1bb5-452f-b2ac-f7680ffe7fb8
"""
command_help = f"""Delete a {model.__name__.lower()}.
IDENTIFIER should be a {model.__name__.lower()} id or
{model.identifier_attribute}
"""
@click.command(name=model.__name__.lower(), help=command_help)
@with_appcontext
@with_backendcontext
@click.argument("identifier")
def command(*args, identifier, **kwargs):
instance = Backend.instance.get(model, identifier)
if not instance:
raise click.ClickException(
f"No {model.__name__.lower()} with id '{identifier}'"
)
try:
Backend.instance.delete(instance)
except Exception as exc: # pragma: no cover
raise click.ClickException(exc) from exc
return command

View file

@ -2,6 +2,7 @@ import click
from flask.cli import FlaskGroup
import canaille.app.commands
import canaille.backends.commands
import canaille.core.commands
import canaille.oidc.commands
from canaille import create_app
@ -18,5 +19,6 @@ def cli():
canaille.app.commands.register(cli)
canaille.backends.commands.register(cli)
canaille.core.commands.register(cli)
canaille.oidc.commands.register(cli)

60
doc/commands.py Normal file
View file

@ -0,0 +1,60 @@
"""Temporary workaround for https://github.com/click-contrib/sphinx-click/issues/139"""
import inspect
import click
from canaille.backends.commands import create_factory
from canaille.backends.commands import delete_factory
from canaille.backends.commands import get_factory
from canaille.backends.commands import set_factory
from canaille.core.models import Group
from canaille.core.models import User
from canaille.oidc.basemodels import AuthorizationCode
from canaille.oidc.basemodels import Client
from canaille.oidc.basemodels import Consent
from canaille.oidc.basemodels import Token
MODELS = {
"user": User,
"group": Group,
"client": Client,
"authorizationcode": AuthorizationCode,
"token": Token,
"consent": Consent,
}
class ModelCommand(click.Group):
def __init__(self, *args, factory, **kwargs):
super().__init__(*args, **kwargs)
self.factory = factory
def list_commands(self, ctx):
base = super().list_commands(ctx)
lazy = sorted(MODELS.keys())
return base + lazy
def get_command(self, ctx, cmd_name):
model = MODELS.get(cmd_name)
return self.factory(model)
@click.command(cls=ModelCommand, factory=get_factory, help=inspect.getdoc(get_factory))
def get(): ...
@click.command(cls=ModelCommand, factory=set_factory, help=inspect.getdoc(set_factory))
def set(): ...
@click.command(
cls=ModelCommand, factory=create_factory, help=inspect.getdoc(create_factory)
)
def create(): ...
@click.command(
cls=ModelCommand, factory=delete_factory, help=inspect.getdoc(delete_factory)
)
def delete(): ...

View file

@ -1,6 +1,36 @@
Command Line Interface
======================
.. click:: canaille.commands:cli
:prog: canaille
Canaille provide several commands to help administrator manage their data.
.. click:: canaille.app.commands:check
:prog: canaille check
:nested: full
.. click:: canaille.oidc.commands:clean
:prog: canaille clean
:nested: full
.. click:: canaille.app.commands:install
:prog: canaille install
:nested: full
.. click:: canaille.core.commands:populate
:prog: canaille populate
:nested: full
.. click:: doc.commands:get
:prog: canaille get
:nested: full
.. click:: doc.commands:set
:prog: canaille set
:nested: full
.. click:: doc.commands:create
:prog: canaille create
:nested: full
.. click:: doc.commands:delete
:prog: canaille delete
:nested: full

View file

@ -0,0 +1,59 @@
import datetime
import json
from unittest import mock
from canaille.app import models
from canaille.commands import cli
def test_create(testclient, backend, foo_group):
"""Nominal case test for model create command."""
runner = testclient.app.test_cli_runner()
res = runner.invoke(
cli,
[
"create",
"user",
"--formatted-name",
"Johnny",
"--emails",
"foo@example.org",
"--emails",
"bar@example.org",
"--given-name",
"John",
"--family-name",
"Doe",
"--user-name",
"johnny",
"--groups",
foo_group.id,
"--lock-date",
"2050-01-01T10:10:10+00:00",
],
)
assert res.exit_code == 0, res.stdout
output = json.loads(res.stdout)
assert output == {
"formatted_name": "Johnny",
"created": mock.ANY,
"last_modified": mock.ANY,
"emails": [
"foo@example.org",
"bar@example.org",
],
"family_name": "Doe",
"given_name": "John",
"id": mock.ANY,
"user_name": "johnny",
"groups": [foo_group.id],
"lock_date": "2050-01-01T10:10:10+00:00",
}
user = backend.get(models.User, output["id"])
backend.reload(foo_group)
assert user.groups == [foo_group]
assert user.lock_date == datetime.datetime(
2050, 1, 1, 10, 10, 10, tzinfo=datetime.timezone.utc
)
backend.delete(user)

View file

@ -0,0 +1,45 @@
from canaille.app import models
from canaille.commands import cli
def test_delete_by_id(testclient, backend):
"""Remove a model identified by its id."""
user = models.User(
formatted_name="Foo bar",
family_name="Bar",
emails=["foobar@example.org"],
user_name="foobar",
)
backend.save(user)
runner = testclient.app.test_cli_runner()
res = runner.invoke(cli, ["delete", "user", user.id])
assert res.exit_code == 0, res.stdout
assert not backend.get(models.User, user_name="foobar")
def test_delete_by_identifier(testclient, backend):
"""Remove a model identified by its identifier."""
user = models.User(
formatted_name="Foo bar",
family_name="Bar",
emails=["foobar@example.org"],
user_name="foobar",
)
backend.save(user)
runner = testclient.app.test_cli_runner()
res = runner.invoke(cli, ["delete", "user", "foobar"])
assert res.exit_code == 0, res.stdout
assert not backend.get(models.User, user_name="foobar")
def test_delete_unknown_id(testclient, backend):
"""Error case for trying to set a value for an invalid object."""
runner = testclient.app.test_cli_runner()
res = runner.invoke(cli, ["delete", "user", "invalid"])
assert res.exit_code == 1, res.stdout
assert res.stdout == "Error: No user with id 'invalid'\n"

View file

@ -0,0 +1,106 @@
import json
from unittest import mock
from canaille.commands import cli
def test_get_list_models(testclient, backend, user):
"""Nominal case test for model get command."""
runner = testclient.app.test_cli_runner()
res = runner.invoke(cli, ["get"])
assert res.exit_code == 0, res.stdout
models = ("user", "group")
for model in models:
assert model in res.stdout
def test_get(testclient, backend, user):
"""Nominal case test for model get command."""
runner = testclient.app.test_cli_runner()
res = runner.invoke(cli, ["get", "user"])
assert res.exit_code == 0, res.stdout
assert json.loads(res.stdout) == [
{
"created": mock.ANY,
"display_name": "Johnny",
"emails": [
"john@doe.com",
],
"family_name": "Doe",
"formatted_address": "1235, somewhere",
"formatted_name": "John (johnny) Doe",
"given_name": "John",
"id": user.id,
"last_modified": mock.ANY,
"password": "***",
"phone_numbers": [
"555-000-000",
],
"preferred_language": "en",
"profile_url": "https://john.example",
"user_name": "user",
},
]
def test_get_model_filter(testclient, backend, user, admin, foo_group):
"""Test model get filter."""
runner = testclient.app.test_cli_runner()
res = runner.invoke(cli, ["get", "user", "--groups", foo_group.id])
assert res.exit_code == 0, res.stdout
assert json.loads(res.stdout) == [
{
"created": mock.ANY,
"display_name": "Johnny",
"emails": [
"john@doe.com",
],
"family_name": "Doe",
"formatted_address": "1235, somewhere",
"formatted_name": "John (johnny) Doe",
"given_name": "John",
"id": user.id,
"last_modified": mock.ANY,
"password": "***",
"phone_numbers": [
"555-000-000",
],
"preferred_language": "en",
"profile_url": "https://john.example",
"user_name": "user",
"groups": [foo_group.id],
},
]
def test_get_datetime_filter(testclient, backend, user):
"""Test model get filter."""
runner = testclient.app.test_cli_runner()
res = runner.invoke(cli, ["get", "user", "--created", user.created.isoformat()])
assert res.exit_code == 0, res.stdout
assert json.loads(res.stdout) == [
{
"created": mock.ANY,
"display_name": "Johnny",
"emails": [
"john@doe.com",
],
"family_name": "Doe",
"formatted_address": "1235, somewhere",
"formatted_name": "John (johnny) Doe",
"given_name": "John",
"id": user.id,
"last_modified": mock.ANY,
"password": "***",
"phone_numbers": [
"555-000-000",
],
"preferred_language": "en",
"profile_url": "https://john.example",
"user_name": "user",
},
]

View file

@ -0,0 +1,181 @@
import json
from unittest import mock
from canaille.commands import cli
def test_set_string_by_id(testclient, backend, user):
"""Set a string attribute to a model identifier by its id."""
runner = testclient.app.test_cli_runner()
res = runner.invoke(cli, ["set", "user", user.id, "--given-name", "foobar"])
assert res.exit_code == 0, res.stdout
assert json.loads(res.stdout) == {
"created": mock.ANY,
"display_name": "Johnny",
"emails": [
"john@doe.com",
],
"family_name": "Doe",
"formatted_address": "1235, somewhere",
"formatted_name": "John (johnny) Doe",
"given_name": "foobar",
"id": user.id,
"last_modified": mock.ANY,
"password": "***",
"phone_numbers": [
"555-000-000",
],
"preferred_language": "en",
"profile_url": "https://john.example",
"user_name": "user",
}
backend.reload(user)
assert user.given_name == "foobar"
def test_set_string_by_identifier(testclient, backend, user):
"""Set a string attribute to a model identifier by its identifier."""
runner = testclient.app.test_cli_runner()
res = runner.invoke(cli, ["set", "user", "user", "--given-name", "foobar"])
assert res.exit_code == 0, res.stdout
assert json.loads(res.stdout) == {
"created": mock.ANY,
"display_name": "Johnny",
"emails": [
"john@doe.com",
],
"family_name": "Doe",
"formatted_address": "1235, somewhere",
"formatted_name": "John (johnny) Doe",
"given_name": "foobar",
"id": user.id,
"last_modified": mock.ANY,
"password": "***",
"phone_numbers": [
"555-000-000",
],
"preferred_language": "en",
"profile_url": "https://john.example",
"user_name": "user",
}
backend.reload(user)
assert user.given_name == "foobar"
def test_set_multiple(testclient, backend, user):
"""Test setting several emails."""
runner = testclient.app.test_cli_runner()
res = runner.invoke(
cli,
[
"set",
"user",
user.id,
"--emails",
"foo@example.org",
"--emails",
"bar@example.org",
],
)
assert res.exit_code == 0, res.stdout
assert json.loads(res.stdout) == {
"created": mock.ANY,
"display_name": "Johnny",
"emails": [
"foo@example.org",
"bar@example.org",
],
"family_name": "Doe",
"formatted_address": "1235, somewhere",
"formatted_name": "John (johnny) Doe",
"given_name": "John",
"id": user.id,
"last_modified": mock.ANY,
"password": "***",
"phone_numbers": [
"555-000-000",
],
"preferred_language": "en",
"profile_url": "https://john.example",
"user_name": "user",
}
backend.reload(user)
assert user.emails == [
"foo@example.org",
"bar@example.org",
]
def test_set_unknown_id(testclient, backend):
"""Error case for trying to set a value for an invalid object."""
runner = testclient.app.test_cli_runner()
res = runner.invoke(cli, ["set", "user", "invalid", "--given-name", "foobar"])
assert res.exit_code == 1, res.stdout
assert res.stdout == "Error: No user with id 'invalid'\n"
def test_set_remove_simple_attribute(testclient, backend, user, admin):
"""Test to remove a non multiple attribute."""
assert user.formatted_address is not None
runner = testclient.app.test_cli_runner()
res = runner.invoke(cli, ["set", "user", user.id, "--formatted-address", ""])
assert res.exit_code == 0, res.stdout
assert json.loads(res.stdout) == {
"created": mock.ANY,
"display_name": "Johnny",
"emails": [
"john@doe.com",
],
"family_name": "Doe",
"formatted_name": "John (johnny) Doe",
"given_name": "John",
"id": user.id,
"last_modified": mock.ANY,
"password": "***",
"phone_numbers": [
"555-000-000",
],
"preferred_language": "en",
"profile_url": "https://john.example",
"user_name": "user",
}
backend.reload(user)
assert user.formatted_address is None
def test_set_remove_multiple_attribute(testclient, backend, user, admin, foo_group):
"""Test to remove a non multiple attribute."""
foo_group.members = [user, admin]
backend.save(foo_group)
assert user.groups == [foo_group]
runner = testclient.app.test_cli_runner()
res = runner.invoke(cli, ["set", "user", user.id, "--groups", ""])
assert res.exit_code == 0, res.stdout
assert json.loads(res.stdout) == {
"created": mock.ANY,
"display_name": "Johnny",
"emails": [
"john@doe.com",
],
"family_name": "Doe",
"formatted_name": "John (johnny) Doe",
"formatted_address": "1235, somewhere",
"given_name": "John",
"id": user.id,
"last_modified": mock.ANY,
"password": "***",
"phone_numbers": [
"555-000-000",
],
"preferred_language": "en",
"profile_url": "https://john.example",
"user_name": "user",
}
backend.reload(user)
assert user.groups == []