forked from Github-Mirrors/canaille
850 lines
26 KiB
Python
850 lines
26 KiB
Python
import binascii
|
|
import datetime
|
|
import io
|
|
from dataclasses import astuple
|
|
from dataclasses import dataclass
|
|
from importlib import metadata
|
|
from typing import List
|
|
|
|
import wtforms
|
|
from flask import Blueprint
|
|
from flask import abort
|
|
from flask import current_app
|
|
from flask import flash
|
|
from flask import g
|
|
from flask import redirect
|
|
from flask import request
|
|
from flask import send_file
|
|
from flask import session
|
|
from flask import url_for
|
|
from werkzeug.datastructures import CombinedMultiDict
|
|
from werkzeug.datastructures import FileStorage
|
|
|
|
from canaille.app import b64_to_obj
|
|
from canaille.app import build_hash
|
|
from canaille.app import default_fields
|
|
from canaille.app import models
|
|
from canaille.app import obj_to_b64
|
|
from canaille.app.flask import current_user
|
|
from canaille.app.flask import login_user
|
|
from canaille.app.flask import logout_user
|
|
from canaille.app.flask import permissions_needed
|
|
from canaille.app.flask import render_htmx_template
|
|
from canaille.app.flask import request_is_htmx
|
|
from canaille.app.flask import smtp_needed
|
|
from canaille.app.flask import user_needed
|
|
from canaille.app.forms import IDToModel
|
|
from canaille.app.forms import TableForm
|
|
from canaille.app.forms import is_readonly
|
|
from canaille.app.forms import set_readonly
|
|
from canaille.app.forms import set_writable
|
|
from canaille.app.i18n import gettext as _
|
|
from canaille.app.i18n import reload_translations
|
|
from canaille.app.themes import render_template
|
|
from canaille.backends import BaseBackend
|
|
|
|
from ..mails import send_confirmation_email
|
|
from ..mails import send_invitation_mail
|
|
from ..mails import send_password_initialization_mail
|
|
from ..mails import send_password_reset_mail
|
|
from ..mails import send_registration_mail
|
|
from .forms import MINIMUM_PASSWORD_LENGTH
|
|
from .forms import EmailConfirmationForm
|
|
from .forms import InvitationForm
|
|
from .forms import JoinForm
|
|
from .forms import build_profile_form
|
|
|
|
bp = Blueprint("account", __name__)
|
|
|
|
|
|
@bp.route("/")
|
|
def index():
|
|
user = current_user()
|
|
|
|
if not user:
|
|
return redirect(url_for("core.auth.login"))
|
|
|
|
if user.can_edit_self or user.can_manage_users:
|
|
return redirect(url_for("core.account.profile_edition", edited_user=user))
|
|
|
|
if "CANAILLE_OIDC" in current_app.config and user.can_use_oidc:
|
|
return redirect(url_for("oidc.consents.consents"))
|
|
|
|
return redirect(url_for("core.account.about"))
|
|
|
|
|
|
@bp.route("/join", methods=("GET", "POST"))
|
|
def join():
|
|
if not current_app.config["CANAILLE"]["ENABLE_REGISTRATION"]:
|
|
abort(404)
|
|
|
|
if not current_app.config["CANAILLE"]["EMAIL_CONFIRMATION"]:
|
|
return redirect(url_for(".registration"))
|
|
|
|
if current_user():
|
|
abort(403)
|
|
|
|
form = JoinForm(request.form or None)
|
|
if request.form and form.validate():
|
|
if models.User.query(emails=form.email.data):
|
|
flash(
|
|
_(
|
|
"You will receive soon an email to continue the registration process."
|
|
),
|
|
"success",
|
|
)
|
|
return render_template("join.html", form=form)
|
|
|
|
payload = RegistrationPayload(
|
|
creation_date_isoformat=datetime.datetime.now(
|
|
datetime.timezone.utc
|
|
).isoformat(),
|
|
user_name="",
|
|
user_name_editable=True,
|
|
email=form.email.data,
|
|
groups=[],
|
|
)
|
|
|
|
registration_url = url_for(
|
|
"core.account.registration",
|
|
data=payload.b64(),
|
|
hash=payload.build_hash(),
|
|
_external=True,
|
|
)
|
|
|
|
if send_registration_mail(form.email.data, registration_url):
|
|
flash(
|
|
_(
|
|
"You will receive soon an email to continue the registration process."
|
|
),
|
|
"success",
|
|
)
|
|
else:
|
|
flash(
|
|
_(
|
|
"An error happened while sending your registration mail. "
|
|
"Please try again in a few minutes. "
|
|
"If this still happens, please contact the administrators."
|
|
),
|
|
"error",
|
|
)
|
|
|
|
return render_template("join.html", form=form)
|
|
|
|
|
|
@bp.route("/about")
|
|
def about():
|
|
version = metadata.version("canaille")
|
|
return render_template("about.html", version=version)
|
|
|
|
|
|
@bp.route("/users", methods=["GET", "POST"])
|
|
@permissions_needed("manage_users")
|
|
def users(user):
|
|
table_form = TableForm(
|
|
models.User,
|
|
fields=user.readable_fields | user.writable_fields,
|
|
formdata=request.form,
|
|
)
|
|
if request.form and not table_form.validate():
|
|
abort(404)
|
|
|
|
return render_htmx_template(
|
|
"users.html",
|
|
menuitem="users",
|
|
table_form=table_form,
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class VerificationPayload:
|
|
creation_date_isoformat: str
|
|
|
|
@property
|
|
def creation_date(self):
|
|
return datetime.datetime.fromisoformat(self.creation_date_isoformat)
|
|
|
|
def has_expired(self):
|
|
return datetime.datetime.now(
|
|
datetime.timezone.utc
|
|
) - self.creation_date > datetime.timedelta(
|
|
seconds=current_app.config["CANAILLE"]["INVITATION_EXPIRATION"]
|
|
)
|
|
|
|
def b64(self):
|
|
return obj_to_b64(astuple(self))
|
|
|
|
def build_hash(self):
|
|
return build_hash(*astuple(self))
|
|
|
|
|
|
@dataclass
|
|
class EmailConfirmationPayload(VerificationPayload):
|
|
identifier: str
|
|
email: str
|
|
|
|
|
|
@dataclass
|
|
class RegistrationPayload(VerificationPayload):
|
|
user_name: str
|
|
user_name_editable: bool
|
|
email: str
|
|
groups: List[str]
|
|
|
|
|
|
@bp.route("/invite", methods=["GET", "POST"])
|
|
@smtp_needed()
|
|
@permissions_needed("manage_users")
|
|
def user_invitation(user):
|
|
form = InvitationForm(request.form or None)
|
|
email_sent = None
|
|
registration_url = None
|
|
form_validated = False
|
|
if request.form and form.validate():
|
|
form_validated = True
|
|
payload = RegistrationPayload(
|
|
datetime.datetime.now(datetime.timezone.utc).isoformat(),
|
|
form.user_name.data,
|
|
form.user_name_editable.data,
|
|
form.email.data,
|
|
[group.id for group in form.groups.data],
|
|
)
|
|
registration_url = url_for(
|
|
"core.account.registration",
|
|
data=payload.b64(),
|
|
hash=payload.build_hash(),
|
|
_external=True,
|
|
)
|
|
|
|
if request.form["action"] == "send":
|
|
email_sent = send_invitation_mail(form.email.data, registration_url)
|
|
|
|
return render_template(
|
|
"invite.html",
|
|
form=form,
|
|
menuitems="users",
|
|
form_validated=form_validated,
|
|
email_sent=email_sent,
|
|
registration_url=registration_url,
|
|
)
|
|
|
|
|
|
@bp.route("/register", methods=["GET", "POST"])
|
|
@bp.route("/register/<data>/<hash>", methods=["GET", "POST"])
|
|
def registration(data=None, hash=None):
|
|
if not data:
|
|
payload = None
|
|
if (
|
|
not current_app.config["CANAILLE"]["ENABLE_REGISTRATION"]
|
|
or current_app.config["CANAILLE"]["EMAIL_CONFIRMATION"]
|
|
):
|
|
abort(403)
|
|
|
|
else:
|
|
try:
|
|
payload = RegistrationPayload(*b64_to_obj(data))
|
|
except binascii.Error:
|
|
flash(
|
|
_("The registration link that brought you here was invalid."),
|
|
"error",
|
|
)
|
|
return redirect(url_for("core.account.index"))
|
|
|
|
if payload.has_expired():
|
|
flash(
|
|
_("The registration link that brought you here has expired."),
|
|
"error",
|
|
)
|
|
return redirect(url_for("core.account.index"))
|
|
|
|
if payload.user_name and models.User.get(user_name=payload.user_name):
|
|
flash(
|
|
_("Your account has already been created."),
|
|
"error",
|
|
)
|
|
return redirect(url_for("core.account.index"))
|
|
|
|
if hash != payload.build_hash():
|
|
flash(
|
|
_("The registration link that brought you here was invalid."),
|
|
"error",
|
|
)
|
|
return redirect(url_for("core.account.index"))
|
|
|
|
if current_user():
|
|
flash(
|
|
_("You are already logged in, you cannot create an account."),
|
|
"error",
|
|
)
|
|
return redirect(url_for("core.account.index"))
|
|
|
|
if payload:
|
|
data = {
|
|
"user_name": payload.user_name,
|
|
"emails": [payload.email],
|
|
"groups": [models.Group.get(id=group_id) for group_id in payload.groups],
|
|
}
|
|
|
|
has_smtp = "SMTP" in current_app.config["CANAILLE"]
|
|
emails_readonly = current_app.config["CANAILLE"]["EMAIL_CONFIRMATION"] is True or (
|
|
current_app.config["CANAILLE"]["EMAIL_CONFIRMATION"] is None and has_smtp
|
|
)
|
|
readable_fields, writable_fields = default_fields()
|
|
|
|
form = build_profile_form(writable_fields, readable_fields)
|
|
if "groups" not in form and payload and payload.groups:
|
|
form["groups"] = wtforms.SelectMultipleField(
|
|
_("Groups"),
|
|
choices=[(group, group.display_name) for group in models.Group.query()],
|
|
coerce=IDToModel("Group"),
|
|
)
|
|
set_readonly(form["groups"])
|
|
form.process(CombinedMultiDict((request.files, request.form)) or None, data=data)
|
|
|
|
if is_readonly(form["user_name"]) and (not payload or payload.user_name_editable):
|
|
set_writable(form["user_name"])
|
|
|
|
if not is_readonly(form["emails"]) and emails_readonly:
|
|
set_readonly(form["emails"])
|
|
|
|
form["password1"].validators = [
|
|
wtforms.validators.DataRequired(),
|
|
wtforms.validators.Length(min=MINIMUM_PASSWORD_LENGTH),
|
|
]
|
|
form["password2"].validators = [
|
|
wtforms.validators.DataRequired(),
|
|
wtforms.validators.Length(min=MINIMUM_PASSWORD_LENGTH),
|
|
]
|
|
form["password1"].flags.required = True
|
|
form["password2"].flags.required = True
|
|
|
|
if not request.form or form.form_control():
|
|
return render_template(
|
|
"profile_add.html",
|
|
form=form,
|
|
menuitem="users",
|
|
edited_user=None,
|
|
self_deletion=False,
|
|
)
|
|
|
|
if not form.validate():
|
|
flash(_("User account creation failed."), "error")
|
|
return render_template(
|
|
"profile_add.html",
|
|
form=form,
|
|
menuitem="users",
|
|
edited_user=None,
|
|
self_deletion=False,
|
|
)
|
|
|
|
user = profile_create(current_app, form)
|
|
login_user(user)
|
|
flash(_("Your account has been created successfully."), "success")
|
|
return redirect(
|
|
session.pop(
|
|
"redirect-after-login",
|
|
url_for("core.account.profile_edition", edited_user=user),
|
|
)
|
|
)
|
|
|
|
|
|
@bp.route("/email-confirmation/<data>/<hash>")
|
|
def email_confirmation(data, hash):
|
|
try:
|
|
confirmation_obj = EmailConfirmationPayload(*b64_to_obj(data))
|
|
except:
|
|
flash(
|
|
_("The email confirmation link that brought you here is invalid."),
|
|
"error",
|
|
)
|
|
return redirect(url_for("core.account.index"))
|
|
|
|
if confirmation_obj.has_expired():
|
|
flash(
|
|
_("The email confirmation link that brought you here has expired."),
|
|
"error",
|
|
)
|
|
return redirect(url_for("core.account.index"))
|
|
|
|
if hash != confirmation_obj.build_hash():
|
|
flash(
|
|
_("The invitation link that brought you here was invalid."),
|
|
"error",
|
|
)
|
|
return redirect(url_for("core.account.index"))
|
|
|
|
user = models.User.get(confirmation_obj.identifier)
|
|
if not user:
|
|
flash(
|
|
_("The email confirmation link that brought you here is invalid."),
|
|
"error",
|
|
)
|
|
return redirect(url_for("core.account.index"))
|
|
|
|
if confirmation_obj.email in user.emails:
|
|
flash(
|
|
_("This address email have already been confirmed."),
|
|
"error",
|
|
)
|
|
return redirect(url_for("core.account.index"))
|
|
|
|
if models.User.query(emails=confirmation_obj.email):
|
|
flash(
|
|
_("This address email is already associated with another account."),
|
|
"error",
|
|
)
|
|
return redirect(url_for("core.account.index"))
|
|
|
|
user.emails = user.emails + [confirmation_obj.email]
|
|
user.save()
|
|
flash(_("Your email address have been confirmed."), "success")
|
|
return redirect(url_for("core.account.index"))
|
|
|
|
|
|
@bp.route("/profile", methods=("GET", "POST"))
|
|
@permissions_needed("manage_users")
|
|
def profile_creation(user):
|
|
form = build_profile_form(user.writable_fields, user.readable_fields)
|
|
form.process(CombinedMultiDict((request.files, request.form)) or None)
|
|
|
|
for field in form:
|
|
if is_readonly(field):
|
|
set_writable(field)
|
|
|
|
if not request.form or form.form_control():
|
|
return render_template(
|
|
"profile_add.html",
|
|
form=form,
|
|
menuitem="users",
|
|
edited_user=None,
|
|
self_deletion=False,
|
|
)
|
|
|
|
if not form.validate():
|
|
flash(_("User account creation failed."), "error")
|
|
return render_template(
|
|
"profile_add.html",
|
|
form=form,
|
|
menuitem="users",
|
|
edited_user=None,
|
|
self_deletion=False,
|
|
)
|
|
|
|
user = profile_create(current_app, form)
|
|
flash(_("User account creation succeed."), "success")
|
|
return redirect(url_for("core.account.profile_edition", edited_user=user))
|
|
|
|
|
|
def profile_create(current_app, form):
|
|
user = models.User()
|
|
for attribute in form:
|
|
if attribute.name in models.User.attributes:
|
|
if isinstance(attribute.data, FileStorage):
|
|
data = attribute.data.stream.read()
|
|
else:
|
|
data = attribute.data
|
|
|
|
setattr(user, attribute.name, data)
|
|
|
|
if "photo" in form and form["photo_delete"].data:
|
|
user.photo = None
|
|
|
|
given_name = user.given_name if user.given_name else ""
|
|
family_name = user.family_name if user.family_name else ""
|
|
user.formatted_name = f"{given_name} {family_name}".strip()
|
|
user.save()
|
|
|
|
if form["password1"].data:
|
|
BaseBackend.get().set_user_password(user, form["password1"].data)
|
|
user.save()
|
|
|
|
return user
|
|
|
|
|
|
def profile_edition_main_form(user, edited_user, emails_readonly):
|
|
available_fields = {
|
|
"formatted_name",
|
|
"title",
|
|
"given_name",
|
|
"family_name",
|
|
"display_name",
|
|
"emails",
|
|
"phone_numbers",
|
|
"formatted_address",
|
|
"street",
|
|
"postal_code",
|
|
"locality",
|
|
"region",
|
|
"photo",
|
|
"photo_delete",
|
|
"employee_number",
|
|
"department",
|
|
"profile_url",
|
|
"preferred_language",
|
|
"organization",
|
|
}
|
|
if emails_readonly:
|
|
available_fields.remove("emails")
|
|
|
|
readable_fields = user.readable_fields & available_fields
|
|
writable_fields = user.writable_fields & available_fields
|
|
data = {
|
|
field: getattr(edited_user, field)
|
|
for field in writable_fields | readable_fields
|
|
if hasattr(edited_user, field) and getattr(edited_user, field)
|
|
}
|
|
request_data = CombinedMultiDict((request.files, request.form))
|
|
profile_form = build_profile_form(writable_fields, readable_fields)
|
|
profile_form.process(request_data or None, data=data)
|
|
profile_form.user = edited_user
|
|
profile_form.render_field_macro_file = "partial/profile_field.html"
|
|
profile_form.render_field_extra_context = {
|
|
"user": user,
|
|
"edited_user": edited_user,
|
|
}
|
|
return profile_form
|
|
|
|
|
|
def profile_edition_main_form_validation(user, edited_user, profile_form):
|
|
for field in profile_form:
|
|
if field.name in edited_user.attributes and field.name in user.writable_fields:
|
|
if isinstance(field, wtforms.FieldList):
|
|
# too bad wtforms cannot sanitize the list itself
|
|
data = [value for value in field.data if value] or None
|
|
elif isinstance(field.data, FileStorage):
|
|
data = field.data.stream.read()
|
|
else:
|
|
data = field.data
|
|
|
|
setattr(edited_user, field.name, data)
|
|
|
|
if "photo" in profile_form and profile_form["photo_delete"].data:
|
|
edited_user.photo = None
|
|
|
|
if "preferred_language" in request.form:
|
|
# Refresh the babel cache in case the lang is updated
|
|
reload_translations()
|
|
|
|
if profile_form["preferred_language"].data == "auto":
|
|
edited_user.preferred_language = None
|
|
|
|
edited_user.save()
|
|
g.user.reload()
|
|
|
|
|
|
def profile_edition_emails_form(user, edited_user, has_smtp):
|
|
emails_form = EmailConfirmationForm(
|
|
request.form or None, data={"old_emails": edited_user.emails}
|
|
)
|
|
emails_form.add_email_button = has_smtp
|
|
return emails_form
|
|
|
|
|
|
def profile_edition_add_email(user, edited_user, emails_form):
|
|
email_confirmation = EmailConfirmationPayload(
|
|
datetime.datetime.now(datetime.timezone.utc).isoformat(),
|
|
edited_user.identifier,
|
|
emails_form.new_email.data,
|
|
)
|
|
email_confirmation_url = url_for(
|
|
"core.account.email_confirmation",
|
|
data=email_confirmation.b64(),
|
|
hash=email_confirmation.build_hash(),
|
|
_external=True,
|
|
)
|
|
current_app.logger.debug(
|
|
f"Attempt to send a verification mail with link: {email_confirmation_url}"
|
|
)
|
|
return send_confirmation_email(emails_form.new_email.data, email_confirmation_url)
|
|
|
|
|
|
def profile_edition_remove_email(user, edited_user, email):
|
|
if email not in edited_user.emails:
|
|
return False
|
|
|
|
if len(edited_user.emails) == 1:
|
|
return False
|
|
|
|
edited_user.emails = [m for m in edited_user.emails if m != email]
|
|
edited_user.save()
|
|
return True
|
|
|
|
|
|
@bp.route("/profile/<user:edited_user>", methods=("GET", "POST"))
|
|
@user_needed()
|
|
def profile_edition(user, edited_user):
|
|
if not user.can_manage_users and not (
|
|
user.can_edit_self and edited_user.id == user.id
|
|
):
|
|
abort(404)
|
|
|
|
menuitem = "profile" if edited_user.id == user.id else "users"
|
|
has_smtp = "SMTP" in current_app.config["CANAILLE"]
|
|
has_email_confirmation = current_app.config["CANAILLE"][
|
|
"EMAIL_CONFIRMATION"
|
|
] is True or (
|
|
current_app.config["CANAILLE"]["EMAIL_CONFIRMATION"] is None and has_smtp
|
|
)
|
|
emails_readonly = has_email_confirmation and not user.can_manage_users
|
|
|
|
profile_form = profile_edition_main_form(user, edited_user, emails_readonly)
|
|
emails_form = (
|
|
profile_edition_emails_form(user, edited_user, has_smtp)
|
|
if emails_readonly
|
|
else None
|
|
)
|
|
|
|
render_context = {
|
|
"menuitem": menuitem,
|
|
"edited_user": edited_user,
|
|
"profile_form": profile_form,
|
|
"emails_form": emails_form,
|
|
}
|
|
|
|
if not request.form or profile_form.form_control():
|
|
return render_template("profile_edit.html", **render_context)
|
|
|
|
if request_is_htmx() or request.form.get("action") == "edit-profile":
|
|
if not profile_form.validate():
|
|
flash(_("Profile edition failed."), "error")
|
|
return render_template("profile_edit.html", **render_context)
|
|
|
|
profile_edition_main_form_validation(user, edited_user, profile_form)
|
|
flash(_("Profile updated successfully."), "success")
|
|
return redirect(
|
|
url_for("core.account.profile_edition", edited_user=edited_user)
|
|
)
|
|
|
|
if request.form.get("action") == "add_email":
|
|
if not emails_form.validate():
|
|
flash(_("Email addition failed."), "error")
|
|
return render_template("profile_edit.html", **render_context)
|
|
|
|
if profile_edition_add_email(user, edited_user, emails_form):
|
|
flash(
|
|
_(
|
|
"An email has been sent to the email address. "
|
|
"Please check your inbox and click on the verification link it contains"
|
|
),
|
|
"success",
|
|
)
|
|
else:
|
|
flash(_("Could not send the verification email"), "error")
|
|
|
|
return redirect(
|
|
url_for("core.account.profile_edition", edited_user=edited_user)
|
|
)
|
|
|
|
if request.form.get("email_remove"):
|
|
if not profile_edition_remove_email(
|
|
user, edited_user, request.form.get("email_remove")
|
|
):
|
|
flash(_("Email deletion failed."), "error")
|
|
return render_template("profile_edit.html", **render_context)
|
|
|
|
flash(_("The email have been successfully deleted."), "success")
|
|
return redirect(
|
|
url_for("core.account.profile_edition", edited_user=edited_user)
|
|
)
|
|
|
|
abort(400, f"bad form action: {request.form.get('action')}")
|
|
|
|
|
|
@bp.route("/profile/<user:edited_user>/settings", methods=("GET", "POST"))
|
|
@user_needed()
|
|
def profile_settings(user, edited_user):
|
|
if not user.can_manage_users and not (
|
|
user.can_edit_self and edited_user.id == user.id
|
|
):
|
|
abort(404)
|
|
|
|
if (
|
|
request.method == "GET"
|
|
or request.form.get("action") == "edit-settings"
|
|
or request_is_htmx()
|
|
):
|
|
return profile_settings_edit(user, edited_user)
|
|
|
|
if request.form.get("action") == "confirm-delete":
|
|
return render_template("modals/delete-account.html", edited_user=edited_user)
|
|
|
|
if request.form.get("action") == "delete":
|
|
return profile_delete(user, edited_user)
|
|
|
|
if request.form.get("action") == "password-initialization-mail":
|
|
statuses = [
|
|
send_password_initialization_mail(edited_user, email)
|
|
for email in edited_user.emails
|
|
]
|
|
success = all(statuses)
|
|
if success:
|
|
flash(
|
|
_(
|
|
"A password initialization link has been sent at the user email address. "
|
|
"It should be received within a few minutes."
|
|
),
|
|
"success",
|
|
)
|
|
else:
|
|
flash(_("Could not send the password initialization email"), "error")
|
|
|
|
return profile_settings_edit(user, edited_user)
|
|
|
|
if request.form.get("action") == "password-reset-mail":
|
|
statuses = [
|
|
send_password_reset_mail(edited_user, email) for email in edited_user.emails
|
|
]
|
|
success = all(statuses)
|
|
if success:
|
|
flash(
|
|
_(
|
|
"A password reset link has been sent at the user email address. "
|
|
"It should be received within a few minutes."
|
|
),
|
|
"success",
|
|
)
|
|
else:
|
|
flash(_("Could not send the password reset email"), "error")
|
|
|
|
return profile_settings_edit(user, edited_user)
|
|
|
|
if (
|
|
request.form.get("action") == "confirm-lock"
|
|
and BaseBackend.get().has_account_lockability()
|
|
and not edited_user.locked
|
|
):
|
|
return render_template("modals/lock-account.html", edited_user=edited_user)
|
|
|
|
if (
|
|
request.form.get("action") == "lock"
|
|
and BaseBackend.get().has_account_lockability()
|
|
and not edited_user.locked
|
|
):
|
|
flash(_("The account has been locked"), "success")
|
|
edited_user.lock_date = datetime.datetime.now(datetime.timezone.utc)
|
|
edited_user.save()
|
|
|
|
return profile_settings_edit(user, edited_user)
|
|
|
|
if (
|
|
request.form.get("action") == "unlock"
|
|
and BaseBackend.get().has_account_lockability()
|
|
and edited_user.locked
|
|
):
|
|
flash(_("The account has been unlocked"), "success")
|
|
edited_user.lock_date = None
|
|
edited_user.save()
|
|
|
|
return profile_settings_edit(user, edited_user)
|
|
|
|
abort(400, f"bad form action: {request.form.get('action')}")
|
|
|
|
|
|
def profile_settings_edit(editor, edited_user):
|
|
menuitem = "profile" if editor.id == editor.id else "users"
|
|
fields = editor.readable_fields | editor.writable_fields
|
|
|
|
available_fields = {"password", "groups", "user_name", "lock_date"}
|
|
data = {
|
|
k: getattr(edited_user, k)[0]
|
|
if getattr(edited_user, k) and isinstance(getattr(edited_user, k), list)
|
|
else getattr(edited_user, k) or ""
|
|
for k in fields
|
|
if hasattr(edited_user, k) and k in available_fields
|
|
}
|
|
|
|
data["groups"] = [g.id for g in edited_user.groups]
|
|
|
|
form = build_profile_form(
|
|
editor.writable_fields & available_fields,
|
|
editor.readable_fields & available_fields,
|
|
edited_user,
|
|
)
|
|
form.process(CombinedMultiDict((request.files, request.form)) or None, data=data)
|
|
|
|
if (
|
|
request.form
|
|
and request.form.get("action") == "edit-settings"
|
|
or request_is_htmx()
|
|
):
|
|
if not form.validate():
|
|
flash(_("Profile edition failed."), "error")
|
|
|
|
else:
|
|
for attribute in form:
|
|
if attribute.name in available_fields & editor.writable_fields:
|
|
setattr(edited_user, attribute.name, attribute.data)
|
|
|
|
if (
|
|
"password1" in request.form
|
|
and form["password1"].data
|
|
and request.form["action"] == "edit-settings"
|
|
):
|
|
BaseBackend.get().set_user_password(edited_user, form["password1"].data)
|
|
|
|
edited_user.save()
|
|
flash(_("Profile updated successfully."), "success")
|
|
return redirect(
|
|
url_for("core.account.profile_settings", edited_user=edited_user)
|
|
)
|
|
|
|
return render_template(
|
|
"profile_settings.html",
|
|
form=form,
|
|
menuitem=menuitem,
|
|
edited_user=edited_user,
|
|
self_deletion=edited_user.can_delete_account,
|
|
)
|
|
|
|
|
|
def profile_delete(user, edited_user):
|
|
self_deletion = user.id == edited_user.id
|
|
if self_deletion:
|
|
logout_user()
|
|
|
|
flash(
|
|
_(
|
|
"The user %(user)s has been sucessfuly deleted",
|
|
user=edited_user.formatted_name,
|
|
),
|
|
"success",
|
|
)
|
|
edited_user.delete()
|
|
|
|
if self_deletion:
|
|
return redirect(url_for("core.account.index"))
|
|
return redirect(url_for("core.account.users"))
|
|
|
|
|
|
@bp.route("/impersonate/<user:puppet>")
|
|
@permissions_needed("impersonate_users")
|
|
def impersonate(user, puppet):
|
|
login_user(puppet)
|
|
flash(
|
|
_("Connection successful. Welcome %(user)s", user=puppet.formatted_name),
|
|
"success",
|
|
)
|
|
return redirect(url_for("core.account.index"))
|
|
|
|
|
|
@bp.route("/profile/<user:user>/<field>")
|
|
def photo(user, field):
|
|
if field.lower() != "photo":
|
|
abort(404)
|
|
|
|
etag = None
|
|
if request.if_modified_since and request.if_modified_since >= user.last_modified:
|
|
return "", 304
|
|
|
|
etag = build_hash(user.identifier, user.last_modified.isoformat())
|
|
if request.if_none_match and etag in request.if_none_match:
|
|
return "", 304
|
|
|
|
photo = getattr(user, field)
|
|
if not photo:
|
|
abort(404)
|
|
|
|
stream = io.BytesIO(photo)
|
|
return send_file(
|
|
stream, mimetype="image/jpeg", last_modified=user.last_modified, etag=etag
|
|
)
|