canaille-globuzma/canaille/core/account.py

842 lines
25 KiB
Python

import datetime
import io
from dataclasses import astuple
from dataclasses import dataclass
from typing import List
import pkg_resources
import wtforms
from canaille.app import b64_to_obj
from canaille.app import default_fields
from canaille.app import models
from canaille.app import obj_to_b64
from canaille.app import profile_hash
from canaille.app.flask import current_user
from canaille.app.flask import permissions_needed
from canaille.app.flask import registration_needed
from canaille.app.flask import render_htmx_template
from canaille.app.flask import request_is_htmx
from canaille.app.flask import smtp_needed
from canaille.app.flask import user_needed
from canaille.app.forms import TableForm
from canaille.backends import Backend
from flask import abort
from flask import Blueprint
from flask import current_app
from flask import flash
from flask import redirect
from flask import request
from flask import send_file
from flask import session
from flask import url_for
from flask_babel import gettext as _
from flask_babel import refresh
from flask_themer import render_template
from werkzeug.datastructures import CombinedMultiDict
from werkzeug.datastructures import FileStorage
from .forms import FirstLoginForm
from .forms import ForgottenPasswordForm
from .forms import InvitationForm
from .forms import JoinForm
from .forms import LoginForm
from .forms import MINIMUM_PASSWORD_LENGTH
from .forms import OnboardingForm
from .forms import PasswordForm
from .forms import PasswordResetForm
from .forms import profile_form
from .mails import send_invitation_mail
from .mails import send_password_initialization_mail
from .mails import send_password_reset_mail
from .mails import send_registration_mail
bp = Blueprint("account", __name__)
@bp.route("/")
def index():
user = current_user()
if not user:
if "SMTP" not in current_app.config or "REGISTRATION" not in current_app.config:
return redirect(url_for("account.login"))
return redirect(url_for("account.onboarding"))
if user.can_edit_self or user.can_manage_users:
return redirect(
url_for("account.profile_edition", username=current_user().user_name[0])
)
if user.can_use_oidc:
return redirect(url_for("oidc.consents.consents"))
return redirect(url_for("account.about"))
@bp.route("/onboarding", methods=("GET", "POST"))
@smtp_needed()
@registration_needed()
def onboarding():
if current_user():
return redirect(
url_for("account.profile_edition", username=current_user().user_name[0])
)
form = OnboardingForm(request.form or None)
if request.form:
if request.form["answer"] == "sign-up":
return redirect(url_for("account.join"))
elif request.form["answer"] == "sign-in":
return redirect(url_for("account.login"))
return render_template("onboarding.html", form=form)
@bp.route("/join", methods=("GET", "POST"))
@smtp_needed()
@registration_needed()
def join():
if current_user():
return redirect(
url_for("account.profile_edition", username=current_user().user_name[0])
)
form = JoinForm(request.form or None)
email_sent = None
form_validated = False
if request.form and form.validate():
form_validated = True
invitation = Invitation(
datetime.datetime.now(datetime.timezone.utc).isoformat(),
form.user_name.data,
current_app.config["REGISTRATION"].get("CAN_EDIT_USERNAME", False),
form.email.data,
current_app.config["REGISTRATION"].get("GROUPS", []),
)
registration_url = url_for(
"account.registration",
data=invitation.b64(),
hash=invitation.profile_hash(),
_external=True,
)
email_sent = send_registration_mail(form.email.data, registration_url)
if email_sent:
flash(
_("You've received an email to continue the registration process."),
"success",
)
return redirect(url_for("account.login"))
return render_template(
"profile_add.html",
form=form,
form_validated=form_validated,
email_sent=email_sent,
edited_user=None,
self_deletion=False,
menuitem="users",
)
@bp.route("/about")
def about():
try:
version = pkg_resources.get_distribution("canaille").version
except pkg_resources.DistributionNotFound: # pragma: no cover
version = "git"
return render_template("about.html", version=version)
@bp.route("/login", methods=("GET", "POST"))
def login():
if current_user():
return redirect(
url_for("account.profile_edition", username=current_user().user_name[0])
)
form = LoginForm(request.form or None)
form["login"].render_kw["placeholder"] = Backend.get().login_placeholder()
if request.form:
user = models.User.get_from_login(form.login.data)
if user and not user.has_password():
return redirect(url_for("account.firstlogin", user_name=user.user_name[0]))
if not form.validate():
models.User.logout()
flash(_("Login failed, please check your information"), "error")
return render_template("login.html", form=form)
session["attempt_login"] = form.login.data
return redirect(url_for("account.password"))
return render_template("login.html", form=form)
@bp.route("/password", methods=("GET", "POST"))
def password():
if "attempt_login" not in session:
return redirect(url_for("account.login"))
form = PasswordForm(request.form or None)
if request.form:
user = models.User.get_from_login(session["attempt_login"])
if user and not user.has_password():
return redirect(url_for("account.firstlogin", user_name=user.user_name[0]))
if not form.validate() or not user:
models.User.logout()
flash(_("Login failed, please check your information"), "error")
return render_template(
"password.html", form=form, username=session["attempt_login"]
)
success, message = user.check_password(form.password.data)
if not success:
models.User.logout()
flash(message or _("Login failed, please check your information"), "error")
return render_template(
"password.html", form=form, username=session["attempt_login"]
)
del session["attempt_login"]
user.login()
flash(
_("Connection successful. Welcome %(user)s", user=user.formatted_name[0]),
"success",
)
return redirect(url_for("account.index"))
return render_template(
"password.html", form=form, username=session["attempt_login"]
)
@bp.route("/logout")
def logout():
user = current_user()
if user:
flash(
_(
"You have been disconnected. See you next time %(user)s",
user=user.formatted_name[0],
),
"success",
)
user.logout()
return redirect("/")
@bp.route("/firstlogin/<user_name>", methods=("GET", "POST"))
def firstlogin(user_name):
user = models.User.get_from_login(user_name)
if not user or user.has_password():
abort(404)
form = FirstLoginForm(request.form or None)
if not request.form:
return render_template("firstlogin.html", form=form, user_name=user_name)
form.validate()
if send_password_initialization_mail(user):
flash(
_(
"A password initialization link has been sent at your email address. You should receive it within a few minutes."
),
"success",
)
else:
flash(_("Could not send the password initialization email"), "error")
return render_template("firstlogin.html", form=form, user_name=user_name)
@bp.route("/users", methods=["GET", "POST"])
@permissions_needed("manage_users")
def users(user):
table_form = TableForm(
models.User, fields=user.read | user.write, formdata=request.form
)
if request.form and not table_form.validate():
abort(404)
return render_htmx_template(
"users.html",
menuitem="users",
table_form=table_form,
)
@dataclass
class Invitation:
creation_date_isoformat: str
user_name: str
user_name_editable: bool
email: str
groups: List[str]
@property
def creation_date(self):
return datetime.datetime.fromisoformat(self.creation_date_isoformat)
def has_expired(self):
DEFAULT_INVITATION_DURATION = 2 * 24 * 60 * 60
return datetime.datetime.now(
datetime.timezone.utc
) - self.creation_date > datetime.timedelta(
seconds=current_app.config.get(
"INVITATION_EXPIRATION", DEFAULT_INVITATION_DURATION
)
)
def b64(self):
return obj_to_b64(astuple(self))
def profile_hash(self):
return profile_hash(*astuple(self))
@bp.route("/invite", methods=["GET", "POST"])
@smtp_needed()
@permissions_needed("manage_users")
def user_invitation(user):
form = InvitationForm(request.form or None)
email_sent = None
registration_url = None
form_validated = False
if request.form and form.validate():
form_validated = True
invitation = Invitation(
datetime.datetime.now(datetime.timezone.utc).isoformat(),
form.user_name.data,
form.user_name_editable.data,
form.email.data,
form.groups.data,
)
registration_url = url_for(
"account.registration",
data=invitation.b64(),
hash=invitation.profile_hash(),
_external=True,
)
if request.form["action"] == "send":
email_sent = send_invitation_mail(form.email.data, registration_url)
return render_template(
"invite.html",
form=form,
menuitems="users",
form_validated=form_validated,
email_sent=email_sent,
registration_url=registration_url,
)
@bp.route("/register/<data>/<hash>", methods=["GET", "POST"])
def registration(data, hash):
try:
invitation = Invitation(*b64_to_obj(data))
except:
flash(
_("The invitation link that brought you here was invalid."),
"error",
)
return redirect(url_for("account.index"))
if invitation.has_expired():
flash(
_("The invitation link that brought you here has expired."),
"error",
)
return redirect(url_for("account.index"))
if models.User.get_from_login(invitation.user_name):
flash(
_("Your account has already been created."),
"error",
)
return redirect(url_for("account.index"))
if current_user():
flash(
_("You are already logged in, you cannot create an account."),
"error",
)
return redirect(url_for("account.index"))
if hash != invitation.profile_hash():
flash(
_("The invitation link that brought you here was invalid."),
"error",
)
return redirect(url_for("account.index"))
data = {
"user_name": invitation.user_name,
"email": invitation.email,
"groups": invitation.groups,
}
readable_fields, writable_fields = default_fields()
form = profile_form(writable_fields, readable_fields)
if "groups" not in form and invitation.groups:
form["groups"] = wtforms.SelectMultipleField(
_("Groups"),
choices=[(group.id, group.display_name) for group in models.Group.query()],
render_kw={"readonly": "true"},
)
form.process(CombinedMultiDict((request.files, request.form)) or None, data=data)
if "readonly" in form["user_name"].render_kw and invitation.user_name_editable:
del form["user_name"].render_kw["readonly"]
form["password1"].validators = [
wtforms.validators.DataRequired(),
wtforms.validators.Length(min=MINIMUM_PASSWORD_LENGTH),
]
form["password2"].validators = [
wtforms.validators.DataRequired(),
wtforms.validators.Length(min=MINIMUM_PASSWORD_LENGTH),
]
form["password1"].flags.required = True
form["password2"].flags.required = True
if request.form:
if not form.validate():
flash(_("User account creation failed."), "error")
else:
user = profile_create(current_app, form)
user.login()
flash(_("Your account has been created successfully."), "success")
return redirect(
url_for("account.profile_edition", username=user.user_name[0])
)
return render_template(
"profile_add.html",
form=form,
menuitem="users",
edited_user=None,
self_deletion=False,
)
@bp.route("/profile", methods=("GET", "POST"))
@permissions_needed("manage_users")
def profile_creation(user):
form = profile_form(user.write, user.read)
form.process(CombinedMultiDict((request.files, request.form)) or None)
for field in form:
if field.render_kw and "readonly" in field.render_kw:
del field.render_kw["readonly"]
if request.form:
if not form.validate():
flash(_("User account creation failed."), "error")
else:
user = profile_create(current_app, form)
return redirect(
url_for("account.profile_edition", username=user.user_name[0])
)
return render_template(
"profile_add.html",
form=form,
menuitem="users",
edited_user=None,
self_deletion=False,
)
def profile_create(current_app, form):
user = models.User()
for attribute in form:
if attribute.name in user.attributes:
if isinstance(attribute.data, FileStorage):
data = attribute.data.stream.read()
else:
data = attribute.data
setattr(user, attribute.name, data)
if "photo" in form and form["photo_delete"].data:
del user.photo
given_name = user.given_name[0] if user.given_name else ""
family_name = user.family_name[0] if user.family_name else ""
user.formatted_name = [f"{given_name} {family_name}".strip()]
user.save()
if form["password1"].data:
user.set_password(form["password1"].data)
user.save()
flash(_("User account creation succeed."), "success")
return user
@bp.route("/profile/<username>", methods=("GET", "POST"))
@user_needed()
def profile_edition(user, username):
editor = user
if not user.can_manage_users and not (
user.can_edit_self and username == user.user_name[0]
):
abort(403)
menuitem = "profile" if username == editor.user_name[0] else "users"
fields = editor.read | editor.write
if username != editor.user_name[0]:
user = models.User.get_from_login(username)
else:
user = editor
if not user:
abort(404)
available_fields = {
"formatted_name",
"title",
"given_name",
"family_name",
"display_name",
"email",
"phone_number",
"formatted_address",
"street",
"postal_code",
"locality",
"region",
"photo",
"photo_delete",
"employee_number",
"department",
"profile_url",
"preferred_language",
"organization",
}
data = {
k: getattr(user, k)[0]
if getattr(user, k) and isinstance(getattr(user, k), list)
else getattr(user, k) or ""
for k in fields
if hasattr(user, k) and k in available_fields
}
form = profile_form(
editor.write & available_fields, editor.read & available_fields, user
)
form.process(CombinedMultiDict((request.files, request.form)) or None, data=data)
if request.form:
if not form.validate():
flash(_("Profile edition failed."), "error")
else:
for attribute in form:
if attribute.name in user.attributes and attribute.name in editor.write:
if isinstance(attribute.data, FileStorage):
data = attribute.data.stream.read()
else:
data = attribute.data
setattr(user, attribute.name, data)
if "photo" in form and form["photo_delete"].data:
del user.photo
if "preferred_language" in request.form:
# Refresh the babel cache in case the lang is updated
refresh()
if form["preferred_language"].data == "auto":
user.preferred_language = None
user.save()
flash(_("Profile updated successfully."), "success")
return redirect(url_for("account.profile_edition", username=username))
return render_template(
"profile_edit.html",
form=form,
menuitem=menuitem,
edited_user=user,
)
@bp.route("/profile/<username>/settings", methods=("GET", "POST"))
@user_needed()
def profile_settings(user, username):
if not user.can_manage_users and not (
user.can_edit_self and username == user.user_name[0]
):
abort(403)
edited_user = models.User.get_from_login(username)
if not edited_user:
abort(404)
if (
request.method == "GET"
or request.form.get("action") == "edit"
or request_is_htmx()
):
return profile_settings_edit(user, edited_user)
if request.form.get("action") == "delete":
return profile_delete(user, edited_user)
if request.form.get("action") == "password-initialization-mail":
if send_password_initialization_mail(edited_user):
flash(
_(
"A password initialization link has been sent at the user email address. It should be received within a few minutes."
),
"success",
)
else:
flash(_("Could not send the password initialization email"), "error")
return profile_settings_edit(user, edited_user)
if request.form.get("action") == "password-reset-mail":
if send_password_reset_mail(edited_user):
flash(
_(
"A password reset link has been sent at the user email address. It should be received within a few minutes."
),
"success",
)
else:
flash(_("Could not send the password reset email"), "error")
return profile_settings_edit(user, edited_user)
if (
request.form.get("action") == "lock"
and Backend.get().has_account_lockability()
and not edited_user.locked
):
flash(_("The account has been locked"), "success")
edited_user.lock_date = datetime.datetime.now(datetime.timezone.utc)
edited_user.save()
return profile_settings_edit(user, edited_user)
if (
request.form.get("action") == "unlock"
and Backend.get().has_account_lockability()
and edited_user.locked
):
flash(_("The account has been unlocked"), "success")
del edited_user.lock_date
edited_user.save()
return profile_settings_edit(user, edited_user)
abort(400)
def profile_settings_edit(editor, edited_user):
menuitem = "profile" if editor.id == editor.id else "users"
fields = editor.read | editor.write
available_fields = {"password", "groups", "user_name", "lock_date"}
data = {
k: getattr(edited_user, k)[0]
if getattr(edited_user, k) and isinstance(getattr(edited_user, k), list)
else getattr(edited_user, k) or ""
for k in fields
if hasattr(edited_user, k) and k in available_fields
}
if "groups" in fields:
data["groups"] = [g.id for g in edited_user.groups]
form = profile_form(
editor.write & available_fields, editor.read & available_fields, edited_user
)
form.process(CombinedMultiDict((request.files, request.form)) or None, data=data)
if request.form and request.form.get("action") == "edit" or request_is_htmx():
if not form.validate():
flash(_("Profile edition failed."), "error")
else:
for attribute in form:
if attribute.name in available_fields & editor.write:
setattr(edited_user, attribute.name, attribute.data)
if (
"password1" in request.form
and form["password1"].data
and request.form["action"] == "edit"
):
edited_user.set_password(form["password1"].data)
edited_user.save()
flash(_("Profile updated successfully."), "success")
return redirect(
url_for("account.profile_settings", username=edited_user.user_name[0])
)
return render_template(
"profile_settings.html",
form=form,
menuitem=menuitem,
edited_user=edited_user,
self_deletion=edited_user.can_delete_account,
)
def profile_delete(user, edited_user):
self_deletion = user.id == edited_user.id
if self_deletion:
user.logout()
flash(
_(
"The user %(user)s has been sucessfuly deleted",
user=edited_user.formatted_name[0],
),
"success",
)
edited_user.delete()
if self_deletion:
return redirect(url_for("account.index"))
return redirect(url_for("account.users"))
@bp.route("/impersonate/<username>")
@permissions_needed("impersonate_users")
def impersonate(user, username):
puppet = models.User.get_from_login(username)
if not puppet:
abort(404)
puppet.login()
flash(
_("Connection successful. Welcome %(user)s", user=puppet.formatted_name),
"success",
)
return redirect(url_for("account.index"))
@bp.route("/reset", methods=["GET", "POST"])
@smtp_needed()
def forgotten():
if not current_app.config.get("ENABLE_PASSWORD_RECOVERY", True):
abort(404)
form = ForgottenPasswordForm(request.form)
if not request.form:
return render_template("forgotten-password.html", form=form)
if not form.validate():
flash(_("Could not send the password reset link."), "error")
return render_template("forgotten-password.html", form=form)
user = models.User.get_from_login(form.login.data)
success_message = _(
"A password reset link has been sent at your email address. You should receive it within a few minutes."
)
if current_app.config.get("HIDE_INVALID_LOGINS", True) and (
not user or not user.can_edit_self
):
flash(success_message, "success")
return render_template("forgotten-password.html", form=form)
if not user.can_edit_self:
flash(
_(
"The user '%(user)s' does not have permissions to update their password. "
"We cannot send a password reset email.",
user=user.formatted_name[0],
),
"error",
)
return render_template("forgotten-password.html", form=form)
success = send_password_reset_mail(user)
if success:
flash(success_message, "success")
else:
flash(
_("We encountered an issue while we sent the password recovery email."),
"error",
)
return render_template("forgotten-password.html", form=form)
@bp.route("/reset/<user_name>/<hash>", methods=["GET", "POST"])
def reset(user_name, hash):
if not current_app.config.get("ENABLE_PASSWORD_RECOVERY", True):
abort(404)
form = PasswordResetForm(request.form)
user = models.User.get_from_login(user_name)
if not user or hash != profile_hash(
user.user_name[0],
user.email[0],
user.password[0] if user.has_password() else "",
):
flash(
_("The password reset link that brought you here was invalid."),
"error",
)
return redirect(url_for("account.index"))
if request.form and form.validate():
user.set_password(form.password.data)
user.login()
flash(_("Your password has been updated successfully"), "success")
return redirect(url_for("account.profile_edition", username=user_name))
return render_template(
"reset-password.html", form=form, user_name=user_name, hash=hash
)
@bp.route("/profile/<user_name>/<field>")
def photo(user_name, field):
if field.lower() != "photo":
abort(404)
user = models.User.get_from_login(user_name)
if not user:
abort(404)
etag = None
if request.if_modified_since and request.if_modified_since >= user.last_modified:
return "", 304
etag = profile_hash(user_name, user.last_modified.isoformat())
if request.if_none_match and etag in request.if_none_match:
return "", 304
photos = getattr(user, field)
if not photos:
abort(404)
stream = io.BytesIO(photos[0])
return send_file(
stream, mimetype="image/jpeg", last_modified=user.last_modified, etag=etag
)