Implements a flask User converter

This commit is contained in:
Éloi Rivard 2023-06-28 17:56:49 +02:00 committed by Éloi Rivard
parent 251e114bc0
commit f504bb3a66
24 changed files with 112 additions and 116 deletions

View file

@ -155,6 +155,13 @@ def setup_flask(app):
return render_template("error.html", description=error, error_code=500), 500
def setup_flask_converters(app):
from canaille.app.flask import model_converter
from canaille.app import models
app.url_map.converters["user"] = model_converter(models.User)
def create_app(config=None, validate=True, backend=None):
from .oidc.oauth import setup_oauth
from .app.i18n import setup_i18n
@ -168,6 +175,7 @@ def create_app(config=None, validate=True, backend=None):
setup_logging(app)
setup_backend(app, backend)
setup_oauth(app)
setup_flask_converters(app)
setup_blueprints(app)
setup_jinja(app)
setup_i18n(app)

View file

@ -10,6 +10,7 @@ from flask import render_template
from flask import request
from flask import session
from flask_babel import gettext as _
from werkzeug.routing import BaseConverter
def current_user():
@ -99,3 +100,19 @@ def render_htmx_template(template, htmx_template=None, **kwargs):
(htmx_template or f"partial/{template}") if request_is_htmx() else template
)
return render_template(template, **kwargs)
def model_converter(model):
class ModelConverter(BaseConverter):
def to_url(self, instance):
return instance.identifier
def to_python(self, identifier):
current_app.backend.setup()
instance = model.get(identifier)
if not instance:
abort(404)
return instance
return ModelConverter

View file

@ -79,6 +79,9 @@ class Backend(BaseBackend):
)
def setup(self):
if self.connection:
return
try: # pragma: no cover
if request.endpoint == "static":
return

View file

@ -78,8 +78,8 @@ class User(canaille.core.models.User, LDAPObject):
return filter_
@classmethod
def get(cls, **kwargs):
user = super().get(**kwargs)
def get(cls, *args, **kwargs):
user = super().get(*args, **kwargs)
if user:
user.load_permissions()

View file

@ -213,7 +213,7 @@ PUBLIC_KEY = "canaille/conf/public.pem"
# PREFERRED_USERNAME = "{{ user.display_name }}"
# LOCALE = "{{ user.preferred_language }}"
# ADDRESS = "{{ user.formatted_address[0] }}"
# PICTURE = "{% if user.photo %}{{ url_for('account.photo', user_name=user.user_name[0], field='photo', _external=True) }}{% endif %}"
# PICTURE = "{% if user.photo %}{{ url_for('account.photo', user=user, field='photo', _external=True) }}{% endif %}"
# WEBSITE = "{{ user.profile_url[0] }}"
# The SMTP server options. If not set, mail related features such as

View file

@ -59,9 +59,7 @@ def index():
return redirect(url_for("account.login"))
if user.can_edit_self or user.can_manage_users:
return redirect(
url_for("account.profile_edition", identifier=current_user().identifier)
)
return redirect(url_for("account.profile_edition", edited_user=user))
if user.can_use_oidc:
return redirect(url_for("oidc.consents.consents"))
@ -81,9 +79,7 @@ def about():
@bp.route("/login", methods=("GET", "POST"))
def login():
if current_user():
return redirect(
url_for("account.profile_edition", identifier=current_user().identifier)
)
return redirect(url_for("account.profile_edition", edited_user=current_user()))
form = LoginForm(request.form or None)
form["login"].render_kw["placeholder"] = BaseBackend.get().login_placeholder()
@ -93,7 +89,7 @@ def login():
user = models.User.get_from_login(form.login.data)
if user and not user.has_password():
return redirect(url_for("account.firstlogin", identifier=user.identifier))
return redirect(url_for("account.firstlogin", user=user))
if not form.validate():
models.User.logout()
@ -104,7 +100,6 @@ def login():
return redirect(url_for("account.password"))
@bp.route("/password", methods=("GET", "POST"))
def password():
if "attempt_login" not in session:
@ -119,7 +114,7 @@ def password():
user = models.User.get_from_login(session["attempt_login"])
if user and not user.has_password():
return redirect(url_for("account.firstlogin", identifier=user.identifier))
return redirect(url_for("account.firstlogin", user=user))
if not form.validate() or not user:
models.User.logout()
@ -148,6 +143,7 @@ def password():
@bp.route("/logout")
def logout():
user = current_user()
if user:
flash(
_(
@ -160,29 +156,29 @@ def logout():
return redirect("/")
@bp.route("/firstlogin/<identifier>", methods=("GET", "POST"))
def firstlogin(identifier):
user = models.User.get_from_login(identifier)
if not user or user.has_password():
@bp.route("/firstlogin/<user:user>", methods=("GET", "POST"))
def firstlogin(user):
if user.has_password():
abort(404)
form = FirstLoginForm(request.form or None)
if not request.form:
return render_template("firstlogin.html", form=form, identifier=identifier)
return render_template("firstlogin.html", form=form, user=user)
form.validate()
if send_password_initialization_mail(user):
flash(
_(
"A password initialization link has been sent at your email address. You should receive it within a few minutes."
"A password initialization link has been sent at your email address. "
"You should receive it within a few minutes."
),
"success",
)
else:
flash(_("Could not send the password initialization email"), "error")
return render_template("firstlogin.html", form=form, identifier=identifier)
return render_template("firstlogin.html", form=form)
@bp.route("/users", methods=["GET", "POST"])
@ -360,7 +356,7 @@ def registration(data, hash):
user = profile_create(current_app, form)
user.login()
flash(_("Your account has been created successfully."), "success")
return redirect(url_for("account.profile_edition", identifier=user.identifier))
return redirect(url_for("account.profile_edition", edited_user=user))
@bp.route("/profile", methods=("GET", "POST"))
@ -393,7 +389,7 @@ def profile_creation(user):
)
user = profile_create(current_app, form)
return redirect(url_for("account.profile_edition", identifier=user.identifier))
return redirect(url_for("account.profile_edition", edited_user=user))
def profile_create(current_app, form):
@ -424,25 +420,15 @@ def profile_create(current_app, form):
return user
@bp.route("/profile/<identifier>", methods=("GET", "POST"))
@bp.route("/profile/<user:edited_user>", methods=("GET", "POST"))
@user_needed()
def profile_edition(user, identifier):
editor = user
if not user.can_manage_users and not (
user.can_edit_self and identifier == user.identifier
):
abort(403)
menuitem = "profile" if identifier == editor.identifier else "users"
fields = editor.read | editor.write
if identifier != editor.identifier:
user = models.User.get_from_login(identifier)
else:
user = editor
if not user:
def profile_edition(user, edited_user):
if not user.can_manage_users and not (user.can_edit_self and edited_user == user):
abort(404)
menuitem = "profile" if edited_user == user else "users"
fields = user.read | user.write
available_fields = {
"formatted_name",
"title",
@ -465,25 +451,25 @@ def profile_edition(user, identifier):
"organization",
}
data = {
field: getattr(user, field)[0]
if getattr(user, field)
and isinstance(getattr(user, field), list)
field: getattr(edited_user, field)[0]
if getattr(edited_user, field)
and isinstance(getattr(edited_user, field), list)
and not PROFILE_FORM_FIELDS[field].field_class == wtforms.FieldList
else getattr(user, field) or ""
else getattr(edited_user, field) or ""
for field in fields
if hasattr(user, field) and field in available_fields
if hasattr(edited_user, field) and field in available_fields
}
form = profile_form(
editor.write & available_fields, editor.read & available_fields, user
user.write & available_fields, user.read & available_fields, edited_user
)
form.process(CombinedMultiDict((request.files, request.form)) or None, data=data)
if request_is_htmx():
form.render_field_macro_file = "macro/profile.html"
form.render_field_extra_context = {
"user": editor,
"edited_user": user,
"user": user,
"edited_user": edited_user,
}
if not request.form or form.form_control():
@ -491,7 +477,7 @@ def profile_edition(user, identifier):
"profile_edit.html",
form=form,
menuitem=menuitem,
edited_user=user,
edited_user=edited_user,
)
if not form.validate():
@ -500,43 +486,37 @@ def profile_edition(user, identifier):
"profile_edit.html",
form=form,
menuitem=menuitem,
edited_user=user,
edited_user=edited_user,
)
for attribute in form:
if attribute.name in user.attributes and attribute.name in editor.write:
if attribute.name in edited_user.attributes and attribute.name in user.write:
if isinstance(attribute.data, FileStorage):
data = attribute.data.stream.read()
else:
data = attribute.data
setattr(user, attribute.name, data)
setattr(edited_user, attribute.name, data)
if "photo" in form and form["photo_delete"].data:
del user.photo
del edited_user.photo
if "preferred_language" in request.form:
# Refresh the babel cache in case the lang is updated
refresh()
if form["preferred_language"].data == "auto":
user.preferred_language = None
edited_user.preferred_language = None
user.save()
edited_user.save()
flash(_("Profile updated successfully."), "success")
return redirect(url_for("account.profile_edition", identifier=identifier))
return redirect(url_for("account.profile_edition", edited_user=edited_user))
@bp.route("/profile/<identifier>/settings", methods=("GET", "POST"))
@bp.route("/profile/<user:edited_user>/settings", methods=("GET", "POST"))
@user_needed()
def profile_settings(user, identifier):
if not user.can_manage_users and not (
user.can_edit_self and identifier == user.identifier
):
abort(403)
edited_user = models.User.get_from_login(identifier)
if not edited_user:
def profile_settings(user, edited_user):
if not user.can_manage_users and not (user.can_edit_self and edited_user == user):
abort(404)
if (
@ -640,7 +620,7 @@ def profile_settings_edit(editor, edited_user):
edited_user.save()
flash(_("Profile updated successfully."), "success")
return redirect(
url_for("account.profile_settings", identifier=edited_user.identifier)
url_for("account.profile_settings", edited_user=edited_user)
)
return render_template(
@ -671,15 +651,10 @@ def profile_delete(user, edited_user):
return redirect(url_for("account.users"))
@bp.route("/impersonate/<identifier>")
@bp.route("/impersonate/<user:puppet>")
@permissions_needed("impersonate_users")
def impersonate(user, identifier):
puppet = models.User.get_from_login(identifier)
if not puppet:
abort(404)
def impersonate(user, puppet):
puppet.login()
flash(
_("Connection successful. Welcome %(user)s", user=puppet.formatted_name),
"success",
@ -735,13 +710,12 @@ def forgotten():
return render_template("forgotten-password.html", form=form)
@bp.route("/reset/<identifier>/<hash>", methods=["GET", "POST"])
def reset(identifier, hash):
@bp.route("/reset/<user:user>/<hash>", methods=["GET", "POST"])
def reset(user, hash):
if not current_app.config.get("ENABLE_PASSWORD_RECOVERY", True):
abort(404)
form = PasswordResetForm(request.form)
user = models.User.get_from_login(identifier)
if not user or hash != profile_hash(
user.identifier,
@ -759,27 +733,21 @@ def reset(identifier, hash):
user.login()
flash(_("Your password has been updated successfully"), "success")
return redirect(url_for("account.profile_edition", identifier=identifier))
return redirect(url_for("account.profile_edition", edited_user=user))
return render_template(
"reset-password.html", form=form, identifier=identifier, hash=hash
)
return render_template("reset-password.html", form=form, user=user, hash=hash)
@bp.route("/profile/<identifier>/<field>")
def photo(identifier, field):
@bp.route("/profile/<user:user>/<field>")
def photo(user, field):
if field.lower() != "photo":
abort(404)
user = models.User.get_from_login(identifier)
if not user:
abort(404)
etag = None
if request.if_modified_since and request.if_modified_since >= user.last_modified:
return "", 304
etag = profile_hash(identifier, user.last_modified.isoformat())
etag = profile_hash(user.identifier, user.last_modified.isoformat())
if request.if_none_match and etag in request.if_none_match:
return "", 304

View file

@ -75,7 +75,7 @@ def password_init_html(user):
base_url = url_for("account.index", _external=True)
reset_url = url_for(
"account.reset",
identifier=user.identifier,
user=user,
hash=profile_hash(user.identifier, user.preferred_email, user.password[0]),
_external=True,
)
@ -98,7 +98,7 @@ def password_init_txt(user):
base_url = url_for("account.index", _external=True)
reset_url = url_for(
"account.reset",
identifier=user.identifier,
user=user,
hash=profile_hash(user.identifier, user.preferred_email, user.password[0]),
_external=True,
)
@ -117,7 +117,7 @@ def password_reset_html(user):
base_url = url_for("account.index", _external=True)
reset_url = url_for(
"account.reset",
identifier=user.identifier,
user=user,
hash=profile_hash(user.identifier, user.preferred_email, user.password[0]),
_external=True,
)
@ -140,7 +140,7 @@ def password_reset_txt(user):
base_url = url_for("account.index", _external=True)
reset_url = url_for(
"account.reset",
identifier=user.identifier,
user=user,
hash=profile_hash(user.identifier, user.preferred_email, user.password[0]),
_external=True,
)

View file

@ -39,7 +39,7 @@ def send_password_reset_mail(user):
base_url = url_for("account.index", _external=True)
reset_url = url_for(
"account.reset",
identifier=user.identifier,
user=user,
hash=profile_hash(
user.identifier,
user.preferred_email,
@ -79,7 +79,7 @@ def send_password_initialization_mail(user):
base_url = url_for("account.index", _external=True)
reset_url = url_for(
"account.reset",
identifier=user.identifier,
user=user,
hash=profile_hash(
user.identifier,
user.preferred_email,

View file

@ -45,7 +45,7 @@ DEFAULT_JWT_MAPPING = {
"PREFERRED_USERNAME": "{% if user.display_name %}{{ user.display_name }}{% endif %}",
"LOCALE": "{% if user.preferred_language %}{{ user.preferred_language }}{% endif %}",
"ADDRESS": "{% if user.formatted_address %}{{ user.formatted_address[0] }}{% endif %}",
"PICTURE": "{% if user.photo %}{{ url_for('account.photo', user_name=user.user_name[0], field='photo', _external=True) }}{% endif %}",
"PICTURE": "{% if user.photo %}{{ url_for('account.photo', user=user, field='photo', _external=True) }}{% endif %}",
"WEBSITE": "{% if user.profile_url %}{{ user.profile_url[0] }}{% endif %}",
}

View file

@ -70,7 +70,7 @@
<tr>
<td>{{ _("Subject") }}</td>
<td>
<a href="{{ url_for("account.profile_edition", identifier=token.subject.identifier) }}">
<a href="{{ url_for("account.profile_edition", edited_user=token.subject) }}">
{{ token.subject.name }} - {{ token.subject.user_name[0] }}
</a>
</td>

View file

@ -14,7 +14,7 @@
<td><a href="{{ url_for('oidc.authorizations.view', authorization_id=authorization.authorization_code_id) }}">{{ authorization.authorization_code_id }}</a></td>
<td><a href="{{ url_for('oidc.clients.edit', client_id=authorization.client.client_id) }}">{{ authorization.client.client_id }}</a></td>
<td>
<a href="{{ url_for("account.profile_edition", identifier=authorization.subject.identifier) }}">
<a href="{{ url_for("account.profile_edition", edited_user=authorization.subject) }}">
{{ authorization.subject.user_name[0] }}
</a>
</td>

View file

@ -22,7 +22,7 @@
</a>
</td>
<td>
<a href="{{ url_for("account.profile_edition", identifier=token.subject.identifier) }}">
<a href="{{ url_for("account.profile_edition", edited_user=token.subject) }}">
{{ token.subject.user_name[0] }}
</a>
</td>

View file

@ -24,11 +24,11 @@
<tr>
{% if user.can_read("photo") %}
<td>
<a href="{{ url_for('account.profile_edition', identifier=watched_user.identifier) }}">
<a href="{{ url_for('account.profile_edition', edited_user=watched_user) }}">
{% if user.can_manage_users and watched_user.locked %}
<i class="lock circle big black icon" title="{% trans %}This account is locked{% endtrans %}"></i>
{% elif watched_user.photo and watched_user.photo[0] %}
<img class="ui avatar image" src="{{ url_for("account.photo", user_name=watched_user.user_name[0], field="photo") }}" alt="User photo">
<img class="ui avatar image" src="{{ url_for("account.photo", user=watched, field="photo") }}" alt="User photo">
{% else %}
<i class="user circle big black icon"></i>
{% endif %}
@ -36,7 +36,7 @@
</td>
{% endif %}
{% if user.can_read("user_name") %}
<td><a href="{{ url_for('account.profile_edition', identifier=watched_user.identifier) }}">{{ watched_user.user_name[0] }}</a></td>
<td><a href="{{ url_for('account.profile_edition', edited_user=watched_user) }}">{{ watched_user.user_name[0] }}</a></td>
{% endif %}
{% if user.can_read("family_name") or user.can_read("given_name") %}
<td>{{ watched_user.formatted_name[0] }}</td>

View file

@ -18,11 +18,11 @@
{% block submenu %}
<nav class="ui bottom attached two item borderless menu">
<a class="active item" href="{{ url_for('account.profile_edition', identifier=edited_user.identifier) }}">
<a class="active item" href="{{ url_for('account.profile_edition', edited_user=edited_user) }}">
<i class="id card icon"></i>
{% trans %}Personal information{% endtrans %}
</a>
<a class="item" href="{{ url_for('account.profile_settings', identifier=edited_user.identifier) }}">
<a class="item" href="{{ url_for('account.profile_settings', edited_user=edited_user) }}">
<i class="tools icon"></i>
{% trans %}Account information{% endtrans %}
</a>
@ -66,7 +66,7 @@
<a class="ui right corner label photo-delete-icon" title="{{ _("Delete the photo") }}">
<i class="times icon"></i>
</a>
<img src="{% if photo %}{{ url_for("account.photo", identifier=edited_user.identifier, field="photo") }}{% endif %}" alt="User photo">
<img src="{% if photo %}{{ url_for("account.photo", user=edited_user, field="photo") }}{% endif %}" alt="User photo">
</label>
<label
class="ui centered photo-placeholder"

View file

@ -15,11 +15,11 @@
{% block submenu %}
<nav class="ui bottom attached two item borderless menu">
<a class="item" href="{{ url_for('account.profile_edition', identifier=edited_user.identifier) }}">
<a class="item" href="{{ url_for('account.profile_edition', edited_user=edited_user) }}">
<i class="id card icon"></i>
{% trans %}Personal information{% endtrans %}
</a>
<a class="active item" href="{{ url_for('account.profile_settings', identifier=edited_user.identifier) }}">
<a class="active item" href="{{ url_for('account.profile_settings', edited_user=edited_user) }}">
<i class="tools icon"></i>
{% trans %}Account information{% endtrans %}
</a>
@ -189,7 +189,7 @@
{% endif %}
{% if user.can_impersonate_users and user.identifier != edited_user.identifier %}
<a href="{{ url_for('account.impersonate', identifier=edited_user.identifier) }}" class="ui right floated basic button" name="action" value="impersonate" id="impersonate" hx-boost="false">
<a href="{{ url_for('account.impersonate', puppet=edited_user) }}" class="ui right floated basic button" name="action" value="impersonate" id="impersonate" hx-boost="false">
{{ _("Impersonate") }}
</a>
{% endif %}

View file

@ -13,7 +13,7 @@
</h3>
<div class="ui attached clearing segment">
{{ fui.render_form(form, _("Password reset"), action=url_for("account.reset", identifier=identifier, hash=hash)) }}
{{ fui.render_form(form, _("Password reset"), action=url_for("account.reset", user=user, hash=hash)) }}
</div>
</div>
{% endblock %}

View file

@ -31,7 +31,7 @@
{% if user.can_edit_self %}
<a class="item {% if menuitem == "profile" %}active{% endif %}"
href="{{ url_for('account.profile_edition', identifier=user.identifier) }}">
href="{{ url_for('account.profile_edition', edited_user=user) }}">
<i class="id card icon"></i>
{% trans %}Profile{% endtrans %}
</a>

View file

@ -218,7 +218,7 @@ PUBLIC_KEY = "conf/public.pem"
# PREFERRED_USERNAME = "{{ user.display_name }}"
# LOCALE = "{{ user.preferred_language }}"
# ADDRESS = "{{ user.formatted_address[0] }}"
# PICTURE = "{% if user.photo %}{{ url_for('account.photo', user_name=user.user_name[0], field='photo', _external=True) }}{% endif %}"
# PICTURE = "{% if user.photo %}{{ url_for('account.photo', user=user, field='photo', _external=True) }}{% endif %}"
# WEBSITE = "{{ user.profile_url[0] }}"
# The SMTP server options. If not set, mail related features such as

View file

@ -219,7 +219,7 @@ PUBLIC_KEY = "conf/public.pem"
# PREFERRED_USERNAME = "{{ user.display_name }}"
# LOCALE = "{{ user.preferred_language }}"
# ADDRESS = "{{ user.formatted_address[0] }}"
# PICTURE = "{% if user.photo %}{{ url_for('account.photo', user_name=user.user_name[0], field='photo', _external=True) }}{% endif %}"
# PICTURE = "{% if user.photo %}{{ url_for('account.photo', user=user, field='photo', _external=True) }}{% endif %}"
# WEBSITE = "{{ user.profile_url[0] }}"
# The SMTP server options. If not set, mail related features such as

View file

@ -28,7 +28,7 @@
</div>
{% endif %}
<a class="item {% if menuitem == "profile" %}active{% endif %}"
href="{{ url_for('account.profile_edition', identifier=user.identifier) }}">
href="{{ url_for('account.profile_edition', edited_user=user) }}">
<i class="id card icon"></i>
{% trans %}My profile{% endtrans %}
</a>

View file

@ -246,7 +246,7 @@ def test_user_deleted_in_session(testclient, backend):
testclient.get("/profile/jake", status=200)
u.delete()
testclient.get("/profile/jake", status=403)
testclient.get("/profile/jake", status=404)
with testclient.session_transaction() as session:
assert not session.get("user_id")

View file

@ -5,7 +5,7 @@ def test_password_forgotten_disabled(smtpd, testclient, user):
testclient.app.config["ENABLE_PASSWORD_RECOVERY"] = False
testclient.get("/reset", status=404)
testclient.get("/reset/user_name/hash", status=404)
testclient.get("/reset/user/hash", status=404)
res = testclient.get("/login")
res.mustcontain(no="Forgotten password")

View file

@ -88,7 +88,7 @@ def test_edition_permission(
admin,
):
testclient.app.config["ACL"]["DEFAULT"]["PERMISSIONS"] = []
testclient.get("/profile/user", status=403)
testclient.get("/profile/user", status=404)
testclient.app.config["ACL"]["DEFAULT"]["PERMISSIONS"] = ["edit_self"]
testclient.get("/profile/user", status=200)
@ -253,18 +253,18 @@ def test_field_permissions_write(testclient, logged_user):
assert logged_user.phone_numbers == ["000-000-000"]
def test_simple_user_cannot_edit_other(testclient, logged_user):
def test_simple_user_cannot_edit_other(testclient, admin, logged_user):
res = testclient.get("/profile/user", status=200)
testclient.get("/profile/admin", status=403)
testclient.get("/profile/admin", status=404)
testclient.post(
"/profile/admin",
{"action": "edit", "csrf_token": res.form["csrf_token"].value},
status=403,
status=404,
)
testclient.post(
"/profile/admin",
{"action": "delete", "csrf_token": res.form["csrf_token"].value},
status=403,
status=404,
)
testclient.get("/users", status=403)

View file

@ -299,7 +299,7 @@ def test_edition_permission(
admin,
):
testclient.app.config["ACL"]["DEFAULT"]["PERMISSIONS"] = []
testclient.get("/profile/user/settings", status=403)
testclient.get("/profile/user/settings", status=404)
testclient.app.config["ACL"]["DEFAULT"]["PERMISSIONS"] = ["edit_self"]
testclient.get("/profile/user/settings", status=200)