import binascii import datetime import io from dataclasses import astuple from dataclasses import dataclass from typing import List import pkg_resources import wtforms from canaille.app import b64_to_obj from canaille.app import build_hash from canaille.app import default_fields from canaille.app import models from canaille.app import obj_to_b64 from canaille.app.flask import current_user from canaille.app.flask import permissions_needed from canaille.app.flask import render_htmx_template from canaille.app.flask import request_is_htmx from canaille.app.flask import smtp_needed from canaille.app.flask import user_needed from canaille.app.forms import is_readonly from canaille.app.forms import set_readonly from canaille.app.forms import set_writable from canaille.app.forms import TableForm from canaille.app.themes import render_template from canaille.backends import BaseBackend from flask import abort from flask import Blueprint from flask import current_app from flask import flash from flask import g from flask import redirect from flask import request from flask import send_file from flask import url_for from flask_babel import gettext as _ from flask_babel import refresh from werkzeug.datastructures import CombinedMultiDict from werkzeug.datastructures import FileStorage from .forms import build_profile_form from .forms import EmailConfirmationForm from .forms import InvitationForm from .forms import JoinForm from .forms import MINIMUM_PASSWORD_LENGTH from .forms import PROFILE_FORM_FIELDS from .mails import send_confirmation_email from .mails import send_invitation_mail from .mails import send_password_initialization_mail from .mails import send_password_reset_mail from .mails import send_registration_mail bp = Blueprint("account", __name__) @bp.route("/") def index(): user = current_user() if not user: return redirect(url_for("core.auth.login")) if user.can_edit_self or user.can_manage_users: return redirect(url_for("core.account.profile_edition", edited_user=user)) if user.can_use_oidc: return redirect(url_for("oidc.consents.consents")) return redirect(url_for("core.account.about")) @bp.route("/join", methods=("GET", "POST")) def join(): if not current_app.config.get("ENABLE_REGISTRATION", False): abort(404) if not current_app.config.get("EMAIL_CONFIRMATION", True): return redirect(url_for(".registration")) if current_user(): abort(403) form = JoinForm(request.form or None) if request.form and form.validate(): if models.User.query(emails=form.email.data): flash( _( "You will receive soon an email to continue the registration process." ), "success", ) return render_template("join.html", form=form) payload = RegistrationPayload( creation_date_isoformat=datetime.datetime.now( datetime.timezone.utc ).isoformat(), user_name="", user_name_editable=True, email=form.email.data, groups=[], ) registration_url = url_for( "core.account.registration", data=payload.b64(), hash=payload.build_hash(), _external=True, ) if send_registration_mail(form.email.data, registration_url): flash( _( "You will receive soon an email to continue the registration process." ), "success", ) else: flash( _( "An error happened while sending your registration mail. " "Please try again in a few minutes. " "If this still happens, please contact the administrators." ), "error", ) return render_template("join.html", form=form) @bp.route("/about") def about(): try: version = pkg_resources.get_distribution("canaille").version except pkg_resources.DistributionNotFound: # pragma: no cover version = "git" return render_template("about.html", version=version) @bp.route("/users", methods=["GET", "POST"]) @permissions_needed("manage_users") def users(user): table_form = TableForm( models.User, fields=user.read | user.write, formdata=request.form ) if request.form and not table_form.validate(): abort(404) return render_htmx_template( "users.html", menuitem="users", table_form=table_form, ) @dataclass class VerificationPayload: creation_date_isoformat: str @property def creation_date(self): return datetime.datetime.fromisoformat(self.creation_date_isoformat) def has_expired(self): DEFAULT_INVITATION_DURATION = 2 * 24 * 60 * 60 return datetime.datetime.now( datetime.timezone.utc ) - self.creation_date > datetime.timedelta( seconds=current_app.config.get( "INVITATION_EXPIRATION", DEFAULT_INVITATION_DURATION ) ) def b64(self): return obj_to_b64(astuple(self)) def build_hash(self): return build_hash(*astuple(self)) @dataclass class EmailConfirmationPayload(VerificationPayload): identifier: str email: str @dataclass class RegistrationPayload(VerificationPayload): user_name: str user_name_editable: bool email: str groups: List[str] @bp.route("/invite", methods=["GET", "POST"]) @smtp_needed() @permissions_needed("manage_users") def user_invitation(user): form = InvitationForm(request.form or None) email_sent = None registration_url = None form_validated = False if request.form and form.validate(): form_validated = True payload = RegistrationPayload( datetime.datetime.now(datetime.timezone.utc).isoformat(), form.user_name.data, form.user_name_editable.data, form.email.data, form.groups.data, ) registration_url = url_for( "core.account.registration", data=payload.b64(), hash=payload.build_hash(), _external=True, ) if request.form["action"] == "send": email_sent = send_invitation_mail(form.email.data, registration_url) return render_template( "invite.html", form=form, menuitems="users", form_validated=form_validated, email_sent=email_sent, registration_url=registration_url, ) @bp.route("/register", methods=["GET", "POST"]) @bp.route("/register//", methods=["GET", "POST"]) def registration(data=None, hash=None): if not data: payload = None if not current_app.config.get( "ENABLE_REGISTRATION", False ) or current_app.config.get("EMAIL_CONFIRMATION", True): abort(403) else: try: payload = RegistrationPayload(*b64_to_obj(data)) except binascii.Error: flash( _("The registration link that brought you here was invalid."), "error", ) return redirect(url_for("core.account.index")) if payload.has_expired(): flash( _("The registration link that brought you here has expired."), "error", ) return redirect(url_for("core.account.index")) if payload.user_name and models.User.get_from_login(payload.user_name): flash( _("Your account has already been created."), "error", ) return redirect(url_for("core.account.index")) if hash != payload.build_hash(): flash( _("The registration link that brought you here was invalid."), "error", ) return redirect(url_for("core.account.index")) if current_user(): flash( _("You are already logged in, you cannot create an account."), "error", ) return redirect(url_for("core.account.index")) if payload: data = { "user_name": payload.user_name, "emails": [payload.email], "groups": payload.groups, } has_smtp = "SMTP" in current_app.config emails_readonly = current_app.config.get("EMAIL_CONFIRMATION") is True or ( current_app.config.get("EMAIL_CONFIRMATION") is None and has_smtp ) readable_fields, writable_fields = default_fields() form = build_profile_form(writable_fields, readable_fields) if "groups" not in form and payload and payload.groups: form["groups"] = wtforms.SelectMultipleField( _("Groups"), choices=[(group.id, group.display_name) for group in models.Group.query()], ) set_readonly(form["groups"]) form.process(CombinedMultiDict((request.files, request.form)) or None, data=data) if is_readonly(form["user_name"]) and (not payload or payload.user_name_editable): set_writable(form["user_name"]) if not is_readonly(form["emails"]) and emails_readonly: set_readonly(form["emails"]) form["password1"].validators = [ wtforms.validators.DataRequired(), wtforms.validators.Length(min=MINIMUM_PASSWORD_LENGTH), ] form["password2"].validators = [ wtforms.validators.DataRequired(), wtforms.validators.Length(min=MINIMUM_PASSWORD_LENGTH), ] form["password1"].flags.required = True form["password2"].flags.required = True if not request.form or form.form_control(): return render_template( "profile_add.html", form=form, menuitem="users", edited_user=None, self_deletion=False, ) if not form.validate(): flash(_("User account creation failed."), "error") return render_template( "profile_add.html", form=form, menuitem="users", edited_user=None, self_deletion=False, ) user = profile_create(current_app, form) user.login() flash(_("Your account has been created successfully."), "success") return redirect(url_for("core.account.profile_edition", edited_user=user)) @bp.route("/email-confirmation//") def email_confirmation(data, hash): try: confirmation_obj = EmailConfirmationPayload(*b64_to_obj(data)) except: flash( _("The email confirmation link that brought you here is invalid."), "error", ) return redirect(url_for("core.account.index")) if confirmation_obj.has_expired(): flash( _("The email confirmation link that brought you here has expired."), "error", ) return redirect(url_for("core.account.index")) if hash != confirmation_obj.build_hash(): flash( _("The invitation link that brought you here was invalid."), "error", ) return redirect(url_for("core.account.index")) user = models.User.get(confirmation_obj.identifier) if not user: flash( _("The email confirmation link that brought you here is invalid."), "error", ) return redirect(url_for("core.account.index")) if confirmation_obj.email in user.emails: flash( _("This address email have already been confirmed."), "error", ) return redirect(url_for("core.account.index")) if models.User.query(emails=confirmation_obj.email): flash( _("This address email is already associated with another account."), "error", ) return redirect(url_for("core.account.index")) user.emails = user.emails + [confirmation_obj.email] user.save() flash(_("Your email address have been confirmed."), "success") return redirect(url_for("core.account.index")) @bp.route("/profile", methods=("GET", "POST")) @permissions_needed("manage_users") def profile_creation(user): form = build_profile_form(user.write, user.read) form.process(CombinedMultiDict((request.files, request.form)) or None) for field in form: if is_readonly(field): set_writable(field) if not request.form or form.form_control(): return render_template( "profile_add.html", form=form, menuitem="users", edited_user=None, self_deletion=False, ) if not form.validate(): flash(_("User account creation failed."), "error") return render_template( "profile_add.html", form=form, menuitem="users", edited_user=None, self_deletion=False, ) user = profile_create(current_app, form) flash(_("User account creation succeed."), "success") return redirect(url_for("core.account.profile_edition", edited_user=user)) def profile_create(current_app, form): user = models.User() for attribute in form: if attribute.name in user.attributes: if isinstance(attribute.data, FileStorage): data = attribute.data.stream.read() else: data = attribute.data setattr(user, attribute.name, data) if "photo" in form and form["photo_delete"].data: del user.photo given_name = user.given_name[0] if user.given_name else "" family_name = user.family_name[0] if user.family_name else "" user.formatted_name = [f"{given_name} {family_name}".strip()] user.save() if form["password1"].data: user.set_password(form["password1"].data) user.save() user.load_permissions() return user def profile_edition_main_form(user, edited_user, emails_readonly): available_fields = { "formatted_name", "title", "given_name", "family_name", "display_name", "emails", "phone_numbers", "formatted_address", "street", "postal_code", "locality", "region", "photo", "photo_delete", "employee_number", "department", "profile_url", "preferred_language", "organization", } if emails_readonly: available_fields.remove("emails") readable_fields = user.read & available_fields writable_fields = user.write & available_fields data = { field: getattr(edited_user, field)[0] if getattr(edited_user, field) and isinstance(getattr(edited_user, field), list) and not PROFILE_FORM_FIELDS[field].field_class == wtforms.FieldList else getattr(edited_user, field) or "" for field in writable_fields | readable_fields if hasattr(edited_user, field) } request_data = CombinedMultiDict((request.files, request.form)) profile_form = build_profile_form(writable_fields, readable_fields) profile_form.process(request_data or None, data=data) profile_form.user = edited_user profile_form.render_field_macro_file = "partial/profile_field.html" profile_form.render_field_extra_context = { "user": user, "edited_user": edited_user, } return profile_form def profile_edition_main_form_validation(user, edited_user, profile_form): for attribute in profile_form: if attribute.name in edited_user.attributes and attribute.name in user.write: if isinstance(attribute.data, FileStorage): data = attribute.data.stream.read() else: data = attribute.data setattr(edited_user, attribute.name, data) if "photo" in profile_form and profile_form["photo_delete"].data: del edited_user.photo if "preferred_language" in request.form: # Refresh the babel cache in case the lang is updated refresh() if profile_form["preferred_language"].data == "auto": edited_user.preferred_language = None edited_user.save() g.user.reload() def profile_edition_emails_form(user, edited_user, has_smtp): emails_form = EmailConfirmationForm( request.form or None, data={"old_emails": edited_user.emails} ) emails_form.add_email_button = has_smtp return emails_form def profile_edition_add_email(user, edited_user, emails_form): email_confirmation = EmailConfirmationPayload( datetime.datetime.now(datetime.timezone.utc).isoformat(), edited_user.identifier, emails_form.new_email.data, ) email_confirmation_url = url_for( "core.account.email_confirmation", data=email_confirmation.b64(), hash=email_confirmation.build_hash(), _external=True, ) current_app.logger.debug( f"Attempt to send a verification mail with link: {email_confirmation_url}" ) return send_confirmation_email(emails_form.new_email.data, email_confirmation_url) def profile_edition_remove_email(user, edited_user, email): if email not in edited_user.emails: return False if len(edited_user.emails) == 1: return False edited_user.emails = [m for m in edited_user.emails if m != email] edited_user.save() return True @bp.route("/profile/", methods=("GET", "POST")) @user_needed() def profile_edition(user, edited_user): if not user.can_manage_users and not ( user.can_edit_self and edited_user.id == user.id ): abort(404) menuitem = "profile" if edited_user.id == user.id else "users" has_smtp = "SMTP" in current_app.config has_email_confirmation = current_app.config.get("EMAIL_CONFIRMATION") is True or ( current_app.config.get("EMAIL_CONFIRMATION") is None and has_smtp ) emails_readonly = has_email_confirmation and not user.can_manage_users profile_form = profile_edition_main_form(user, edited_user, emails_readonly) emails_form = ( profile_edition_emails_form(user, edited_user, has_smtp) if emails_readonly else None ) render_context = { "menuitem": menuitem, "edited_user": edited_user, "profile_form": profile_form, "emails_form": emails_form, } if not request.form or profile_form.form_control(): return render_template("profile_edit.html", **render_context) if request.form.get("action") == "edit-profile": if not profile_form.validate(): flash(_("Profile edition failed."), "error") return render_template("profile_edit.html", **render_context) profile_edition_main_form_validation(user, edited_user, profile_form) flash(_("Profile updated successfully."), "success") return redirect( url_for("core.account.profile_edition", edited_user=edited_user) ) if request.form.get("action") == "add_email": if not emails_form.validate(): flash(_("Email addition failed."), "error") return render_template("profile_edit.html", **render_context) if profile_edition_add_email(user, edited_user, emails_form): flash( _( "An email has been sent to the email address. " "Please check your inbox and click on the verification link it contains" ), "success", ) else: flash(_("Could not send the verification email"), "error") return redirect( url_for("core.account.profile_edition", edited_user=edited_user) ) if request.form.get("email_remove"): if not profile_edition_remove_email( user, edited_user, request.form.get("email_remove") ): flash(_("Email deletion failed."), "error") return render_template("profile_edit.html", **render_context) flash(_("The email have been successfully deleted."), "success") return redirect( url_for("core.account.profile_edition", edited_user=edited_user) ) abort(400, f"bad form action: {request.form.get('action')}") @bp.route("/profile//settings", methods=("GET", "POST")) @user_needed() def profile_settings(user, edited_user): if not user.can_manage_users and not ( user.can_edit_self and edited_user.id == user.id ): abort(404) if ( request.method == "GET" or request.form.get("action") == "edit-settings" or request_is_htmx() ): return profile_settings_edit(user, edited_user) if ( request.form.get("action") == "confirm-delete" and BaseBackend.get().has_account_lockability() and not edited_user.locked ): return render_template("modals/delete-account.html", edited_user=edited_user) if request.form.get("action") == "delete": return profile_delete(user, edited_user) if request.form.get("action") == "password-initialization-mail": success = all( send_password_initialization_mail(edited_user, email) for email in edited_user.emails ) if success: flash( _( "A password initialization link has been sent at the user email address. " "It should be received within a few minutes." ), "success", ) else: flash(_("Could not send the password initialization email"), "error") return profile_settings_edit(user, edited_user) if request.form.get("action") == "password-reset-mail": success = all( send_password_reset_mail(edited_user, email) for email in edited_user.emails ) if success: flash( _( "A password reset link has been sent at the user email address. " "It should be received within a few minutes." ), "success", ) else: flash(_("Could not send the password reset email"), "error") return profile_settings_edit(user, edited_user) if ( request.form.get("action") == "confirm-lock" and BaseBackend.get().has_account_lockability() and not edited_user.locked ): return render_template("modals/lock-account.html", edited_user=edited_user) if ( request.form.get("action") == "lock" and BaseBackend.get().has_account_lockability() and not edited_user.locked ): flash(_("The account has been locked"), "success") edited_user.lock_date = datetime.datetime.now(datetime.timezone.utc) edited_user.save() return profile_settings_edit(user, edited_user) if ( request.form.get("action") == "unlock" and BaseBackend.get().has_account_lockability() and edited_user.locked ): flash(_("The account has been unlocked"), "success") del edited_user.lock_date edited_user.save() return profile_settings_edit(user, edited_user) abort(400, f"bad form action: {request.form.get('action')}") def profile_settings_edit(editor, edited_user): menuitem = "profile" if editor.id == editor.id else "users" fields = editor.read | editor.write available_fields = {"password", "groups", "user_name", "lock_date"} data = { k: getattr(edited_user, k)[0] if getattr(edited_user, k) and isinstance(getattr(edited_user, k), list) else getattr(edited_user, k) or "" for k in fields if hasattr(edited_user, k) and k in available_fields } data["groups"] = [g.id for g in edited_user.groups] form = build_profile_form( editor.write & available_fields, editor.read & available_fields, edited_user ) form.process(CombinedMultiDict((request.files, request.form)) or None, data=data) if ( request.form and request.form.get("action") == "edit-settings" or request_is_htmx() ): if not form.validate(): flash(_("Profile edition failed."), "error") else: for attribute in form: if attribute.name in available_fields & editor.write: setattr(edited_user, attribute.name, attribute.data) if ( "password1" in request.form and form["password1"].data and request.form["action"] == "edit-settings" ): edited_user.set_password(form["password1"].data) edited_user.save() flash(_("Profile updated successfully."), "success") return redirect( url_for("core.account.profile_settings", edited_user=edited_user) ) return render_template( "profile_settings.html", form=form, menuitem=menuitem, edited_user=edited_user, self_deletion=edited_user.can_delete_account, ) def profile_delete(user, edited_user): self_deletion = user.id == edited_user.id if self_deletion: user.logout() flash( _( "The user %(user)s has been sucessfuly deleted", user=edited_user.formatted_name[0], ), "success", ) edited_user.delete() if self_deletion: return redirect(url_for("core.account.index")) return redirect(url_for("core.account.users")) @bp.route("/impersonate/") @permissions_needed("impersonate_users") def impersonate(user, puppet): puppet.login() flash( _("Connection successful. Welcome %(user)s", user=puppet.formatted_name[0]), "success", ) return redirect(url_for("core.account.index")) @bp.route("/profile//") def photo(user, field): if field.lower() != "photo": abort(404) etag = None if request.if_modified_since and request.if_modified_since >= user.last_modified: return "", 304 etag = build_hash(user.identifier, user.last_modified.isoformat()) if request.if_none_match and etag in request.if_none_match: return "", 304 photos = getattr(user, field) if not photos: abort(404) stream = io.BytesIO(photos[0]) return send_file( stream, mimetype="image/jpeg", last_modified=user.last_modified, etag=etag )