refactor: store user profile in g.user

This commit is contained in:
Éloi Rivard 2023-08-13 22:08:28 +02:00
parent 1f9d148c0d
commit c895366684
No known key found for this signature in database
GPG key ID: 7EDA204EA57DD184
10 changed files with 45 additions and 8 deletions

View file

@ -6,6 +6,7 @@ from urllib.parse import urlunsplit
from canaille.app import models from canaille.app import models
from flask import abort from flask import abort
from flask import current_app from flask import current_app
from flask import g
from flask import render_template from flask import render_template
from flask import request from flask import request
from flask import session from flask import session
@ -14,12 +15,16 @@ from werkzeug.routing import BaseConverter
def current_user(): def current_user():
if "user" in g:
return g.user
for user_id in session.get("user_id", [])[::-1]: for user_id in session.get("user_id", [])[::-1]:
user = models.User.get(id=user_id) user = models.User.get(id=user_id)
if user and ( if user and (
not current_app.backend.has_account_lockability() or not user.locked not current_app.backend.has_account_lockability() or not user.locked
): ):
return user g.user = user
return g.user
session["user_id"].remove(user_id) session["user_id"].remove(user_id)

View file

@ -174,6 +174,9 @@ class User(canaille.core.models.User, LDAPObject):
def load_permissions(self): def load_permissions(self):
conn = Backend.get().connection conn = Backend.get().connection
self.permissions = set()
self.read = set()
self.write = set()
for access_group_name, details in current_app.config["ACL"].items(): for access_group_name, details in current_app.config["ACL"].items():
filter_ = self.acl_filter_to_ldap_filter(details.get("FILTER")) filter_ = self.acl_filter_to_ldap_filter(details.get("FILTER"))

View file

@ -26,6 +26,7 @@ from flask import abort
from flask import Blueprint from flask import Blueprint
from flask import current_app from flask import current_app
from flask import flash from flask import flash
from flask import g
from flask import redirect from flask import redirect
from flask import request from flask import request
from flask import send_file from flask import send_file
@ -494,6 +495,8 @@ def profile_create(current_app, form):
user.set_password(form["password1"].data) user.set_password(form["password1"].data)
user.save() user.save()
user.load_permissions()
flash(_("User account creation succeed."), "success") flash(_("User account creation succeed."), "success")
return user return user
@ -568,6 +571,7 @@ def profile_edition_main_form_validation(user, edited_user, profile_form):
edited_user.preferred_language = None edited_user.preferred_language = None
edited_user.save() edited_user.save()
g.user.reload()
def profile_edition_emails_form(user, edited_user, has_smtp): def profile_edition_emails_form(user, edited_user, has_smtp):
@ -611,10 +615,12 @@ def profile_edition_remove_email(user, edited_user, email):
@bp.route("/profile/<user:edited_user>", methods=("GET", "POST")) @bp.route("/profile/<user:edited_user>", methods=("GET", "POST"))
@user_needed() @user_needed()
def profile_edition(user, edited_user): def profile_edition(user, edited_user):
if not user.can_manage_users and not (user.can_edit_self and edited_user == user): if not user.can_manage_users and not (
user.can_edit_self and edited_user.id == user.id
):
abort(404) abort(404)
menuitem = "profile" if edited_user == user else "users" menuitem = "profile" if edited_user.id == user.id else "users"
has_smtp = "SMTP" in current_app.config has_smtp = "SMTP" in current_app.config
has_email_confirmation = current_app.config.get("EMAIL_CONFIRMATION") is True or ( has_email_confirmation = current_app.config.get("EMAIL_CONFIRMATION") is True or (
current_app.config.get("EMAIL_CONFIRMATION") is None and has_smtp current_app.config.get("EMAIL_CONFIRMATION") is None and has_smtp
@ -681,7 +687,9 @@ def profile_edition(user, edited_user):
@bp.route("/profile/<user:edited_user>/settings", methods=("GET", "POST")) @bp.route("/profile/<user:edited_user>/settings", methods=("GET", "POST"))
@user_needed() @user_needed()
def profile_settings(user, edited_user): def profile_settings(user, edited_user):
if not user.can_manage_users and not (user.can_edit_self and edited_user == user): if not user.can_manage_users and not (
user.can_edit_self and edited_user.id == user.id
):
abort(404) abort(404)
if ( if (

View file

@ -1,5 +1,6 @@
import datetime import datetime
from flask import g
from flask import session from flask import session
@ -15,6 +16,7 @@ class User:
raise NotImplementedError() raise NotImplementedError()
def login(self): def login(self):
g.user = self
try: try:
previous = ( previous = (
session["user_id"] session["user_id"]
@ -29,6 +31,7 @@ class User:
def logout(self): def logout(self):
try: try:
session["user_id"].pop() session["user_id"].pop()
del g.user
if not session["user_id"]: if not session["user_id"]:
del session["user_id"] del session["user_id"]
except (IndexError, KeyError): except (IndexError, KeyError):

View file

@ -150,6 +150,7 @@ def user(app, backend):
formatted_address="1235, somewhere", formatted_address="1235, somewhere",
) )
u.save() u.save()
u.load_permissions()
yield u yield u
u.delete() u.delete()
@ -164,6 +165,7 @@ def admin(app, backend):
password="admin", password="admin",
) )
u.save() u.save()
u.load_permissions()
yield u yield u
u.delete() u.delete()
@ -178,6 +180,7 @@ def moderator(app, backend):
password="moderator", password="moderator",
) )
u.save() u.save()
u.load_permissions()
yield u yield u
u.delete() u.delete()

View file

@ -2,22 +2,24 @@ import datetime
from unittest import mock from unittest import mock
from canaille.app import models from canaille.app import models
from flask import g
def test_index(testclient, user): def test_index(testclient, user):
res = testclient.get("/", status=302) res = testclient.get("/", status=302)
assert res.location == "/login" assert res.location == "/login"
with testclient.session_transaction() as sess: g.user = user
sess["user_id"] = [user.id]
res = testclient.get("/", status=302) res = testclient.get("/", status=302)
assert res.location == "/profile/user" assert res.location == "/profile/user"
testclient.app.config["ACL"]["DEFAULT"]["PERMISSIONS"] = ["use_oidc"] testclient.app.config["ACL"]["DEFAULT"]["PERMISSIONS"] = ["use_oidc"]
g.user.reload()
res = testclient.get("/", status=302) res = testclient.get("/", status=302)
assert res.location == "/consent/" assert res.location == "/consent/"
testclient.app.config["ACL"]["DEFAULT"]["PERMISSIONS"] = [] testclient.app.config["ACL"]["DEFAULT"]["PERMISSIONS"] = []
g.user.reload()
res = testclient.get("/", status=302) res = testclient.get("/", status=302)
assert res.location == "/about" assert res.location == "/about"
@ -243,7 +245,6 @@ def test_user_deleted_in_session(testclient, backend):
with testclient.session_transaction() as session: with testclient.session_transaction() as session:
session["user_id"] = [u.id] session["user_id"] = [u.id]
testclient.get("/profile/jake", status=200)
u.delete() u.delete()
testclient.get("/profile/jake", status=404) testclient.get("/profile/jake", status=404)
@ -425,7 +426,6 @@ def test_signin_locked_account(testclient, user):
def test_account_locked_during_session(testclient, logged_user): def test_account_locked_during_session(testclient, logged_user):
testclient.get("/profile/user/settings", status=200)
logged_user.lock_date = datetime.datetime.now(datetime.timezone.utc) logged_user.lock_date = datetime.datetime.now(datetime.timezone.utc)
logged_user.save() logged_user.save()
testclient.get("/profile/user/settings", status=403) testclient.get("/profile/user/settings", status=403)

View file

@ -2,6 +2,7 @@ import datetime
from canaille.app import models from canaille.app import models
from canaille.core.account import Invitation from canaille.core.account import Invitation
from flask import g
def test_invitation(testclient, logged_admin, foo_group, smtpd): def test_invitation(testclient, logged_admin, foo_group, smtpd):
@ -19,9 +20,11 @@ def test_invitation(testclient, logged_admin, foo_group, smtpd):
url = res.pyquery(".copy-text")[0].value url = res.pyquery(".copy-text")[0].value
# logout # logout
g.user = None
with testclient.session_transaction() as sess: with testclient.session_transaction() as sess:
del sess["user_id"] del sess["user_id"]
testclient.get("/logout")
res = testclient.get(url, status=200) res = testclient.get(url, status=200)
assert res.form["user_name"].value == "someone" assert res.form["user_name"].value == "someone"
@ -68,6 +71,7 @@ def test_invitation_editable_user_name(testclient, logged_admin, foo_group, smtp
url = res.pyquery(".copy-text")[0].value url = res.pyquery(".copy-text")[0].value
# logout # logout
g.user = None
with testclient.session_transaction() as sess: with testclient.session_transaction() as sess:
del sess["user_id"] del sess["user_id"]
@ -114,6 +118,7 @@ def test_generate_link(testclient, logged_admin, foo_group, smtpd):
url = res.pyquery(".copy-text")[0].value url = res.pyquery(".copy-text")[0].value
# logout # logout
g.user = None
with testclient.session_transaction() as sess: with testclient.session_transaction() as sess:
del sess["user_id"] del sess["user_id"]

View file

@ -1,5 +1,6 @@
import pytest import pytest
from canaille.core.populate import fake_users from canaille.core.populate import fake_users
from flask import g
from webtest import Upload from webtest import Upload
@ -86,6 +87,7 @@ def test_user_list_search_only_allowed_fields(
res.mustcontain(no=moderator.formatted_name[0]) res.mustcontain(no=moderator.formatted_name[0])
testclient.app.config["ACL"]["DEFAULT"]["READ"].remove("user_name") testclient.app.config["ACL"]["DEFAULT"]["READ"].remove("user_name")
g.user.reload()
form = res.forms["search"] form = res.forms["search"]
form["query"] = "user" form["query"] = "user"
@ -105,6 +107,7 @@ def test_edition_permission(
testclient.get("/profile/user", status=404) testclient.get("/profile/user", status=404)
testclient.app.config["ACL"]["DEFAULT"]["PERMISSIONS"] = ["edit_self"] testclient.app.config["ACL"]["DEFAULT"]["PERMISSIONS"] = ["edit_self"]
g.user.reload()
testclient.get("/profile/user", status=200) testclient.get("/profile/user", status=200)
@ -204,6 +207,7 @@ def test_field_permissions_none(testclient, logged_user):
"PERMISSIONS": ["edit_self"], "PERMISSIONS": ["edit_self"],
} }
g.user.reload()
res = testclient.get("/profile/user", status=200) res = testclient.get("/profile/user", status=200)
form = res.forms["baseform"] form = res.forms["baseform"]
assert "phone_numbers-0" not in form.fields assert "phone_numbers-0" not in form.fields
@ -230,6 +234,7 @@ def test_field_permissions_read(testclient, logged_user):
"WRITE": [], "WRITE": [],
"PERMISSIONS": ["edit_self"], "PERMISSIONS": ["edit_self"],
} }
g.user.reload()
res = testclient.get("/profile/user", status=200) res = testclient.get("/profile/user", status=200)
form = res.forms["baseform"] form = res.forms["baseform"]
assert "phone_numbers-0" in form.fields assert "phone_numbers-0" in form.fields
@ -256,6 +261,7 @@ def test_field_permissions_write(testclient, logged_user):
"WRITE": ["phone_numbers"], "WRITE": ["phone_numbers"],
"PERMISSIONS": ["edit_self"], "PERMISSIONS": ["edit_self"],
} }
g.user.reload()
res = testclient.get("/profile/user", status=200) res = testclient.get("/profile/user", status=200)
form = res.forms["baseform"] form = res.forms["baseform"]
assert "phone_numbers-0" in form.fields assert "phone_numbers-0" in form.fields

View file

@ -2,6 +2,7 @@ import datetime
from unittest import mock from unittest import mock
from canaille.app import models from canaille.app import models
from flask import g
def test_edition( def test_edition(
@ -300,6 +301,7 @@ def test_edition_permission(
testclient.get("/profile/user/settings", status=404) testclient.get("/profile/user/settings", status=404)
testclient.app.config["ACL"]["DEFAULT"]["PERMISSIONS"] = ["edit_self"] testclient.app.config["ACL"]["DEFAULT"]["PERMISSIONS"] = ["edit_self"]
g.user.reload()
testclient.get("/profile/user/settings", status=200) testclient.get("/profile/user/settings", status=200)

View file

@ -7,6 +7,7 @@ from authlib.jose import jwt
from authlib.oauth2.rfc7636 import create_s256_code_challenge from authlib.oauth2.rfc7636 import create_s256_code_challenge
from canaille.app import models from canaille.app import models
from canaille.oidc.oauth import setup_oauth from canaille.oidc.oauth import setup_oauth
from flask import g
from werkzeug.security import gen_salt from werkzeug.security import gen_salt
from . import client_credentials from . import client_credentials
@ -234,6 +235,7 @@ def test_logout_login(testclient, logged_user, client):
) )
res = res.form.submit(name="answer", value="logout", status=302) res = res.form.submit(name="answer", value="logout", status=302)
g.user = None
res = res.follow(status=200) res = res.follow(status=200)
res.form["login"] = logged_user.user_name[0] res.form["login"] = logged_user.user_name[0]