canaille-globuzma/canaille/account.py

577 lines
17 KiB
Python
Raw Normal View History

import io
2022-01-01 10:56:48 +00:00
from dataclasses import astuple
from dataclasses import dataclass
from datetime import datetime
from datetime import timedelta
from typing import List
2021-12-20 22:57:27 +00:00
2020-11-13 09:45:01 +00:00
import pkg_resources
2021-12-01 11:19:28 +00:00
import wtforms
2021-12-20 22:57:27 +00:00
from flask import abort
from flask import Blueprint
from flask import current_app
from flask import flash
from flask import redirect
from flask import request
from flask import send_file
from flask import session
from flask import url_for
2020-10-22 15:37:01 +00:00
from flask_babel import gettext as _
from flask_themer import render_template
2021-12-20 22:57:27 +00:00
from werkzeug.datastructures import CombinedMultiDict
from werkzeug.datastructures import FileStorage
from .apputils import b64_to_obj
from .apputils import default_fields
from .apputils import login_placeholder
from .apputils import obj_to_b64
from .apputils import profile_hash
from .flaskutils import current_user
from .flaskutils import permissions_needed
from .flaskutils import smtp_needed
from .flaskutils import user_needed
from .forms import ForgottenPasswordForm
from .forms import InvitationForm
from .forms import LoginForm
from .forms import PasswordForm
from .forms import PasswordResetForm
from .forms import profile_form
from .mails import send_invitation_mail
from .mails import send_password_initialization_mail
from .mails import send_password_reset_mail
from .models import Group
2020-08-19 14:20:57 +00:00
from .models import User
2020-08-14 11:18:08 +00:00
2021-05-24 15:43:15 +00:00
bp = Blueprint("account", __name__)
2020-08-14 11:18:08 +00:00
2020-08-19 14:20:57 +00:00
@bp.route("/")
def index():
if not current_user():
2021-05-24 15:43:15 +00:00
return redirect(url_for("account.login"))
return redirect(url_for("account.profile_edition", username=current_user().uid[0]))
2020-08-19 14:20:57 +00:00
2020-11-13 09:45:01 +00:00
@bp.route("/about")
def about():
2020-11-16 14:45:02 +00:00
try:
version = pkg_resources.get_distribution("canaille").version
except pkg_resources.DistributionNotFound:
version = "git"
2020-11-13 09:45:01 +00:00
return render_template("about.html", version=version)
2020-08-19 14:20:57 +00:00
@bp.route("/login", methods=("GET", "POST"))
def login():
if current_user():
2021-12-08 17:06:50 +00:00
return redirect(
url_for("account.profile_edition", username=current_user().uid[0])
)
2020-08-19 14:20:57 +00:00
form = LoginForm(request.form or None)
form["login"].render_kw["placeholder"] = login_placeholder()
2020-08-14 13:26:14 +00:00
2020-08-19 14:20:57 +00:00
if request.form:
user = User.get(form.login.data)
if user and not user.has_password():
2021-05-24 15:43:15 +00:00
return redirect(url_for("account.firstlogin", uid=user.uid[0]))
2021-01-23 21:30:43 +00:00
if not form.validate():
User.logout()
flash(_("Login failed, please check your information"), "error")
return render_template("login.html", form=form)
session["attempt_login"] = form.login.data
2021-05-24 15:43:15 +00:00
return redirect(url_for("account.password"))
2021-01-23 21:30:43 +00:00
return render_template("login.html", form=form)
@bp.route("/password", methods=("GET", "POST"))
def password():
if "attempt_login" not in session:
2021-05-24 15:43:15 +00:00
return redirect(url_for("account.login"))
2021-01-23 21:30:43 +00:00
form = PasswordForm(request.form or None)
if request.form:
user = User.get(session["attempt_login"])
if user and not user.has_password():
2021-05-24 15:43:15 +00:00
return redirect(url_for("account.firstlogin", uid=user.uid[0]))
2021-01-23 21:30:43 +00:00
2020-08-21 08:23:39 +00:00
if not form.validate() or not User.authenticate(
2021-01-23 21:30:43 +00:00
session["attempt_login"], form.password.data, True
2020-08-21 08:23:39 +00:00
):
2020-12-29 08:31:46 +00:00
User.logout()
2020-10-22 15:37:01 +00:00
flash(_("Login failed, please check your information"), "error")
2021-01-23 21:30:43 +00:00
return render_template(
"password.html", form=form, username=session["attempt_login"]
)
2020-08-17 07:45:35 +00:00
2021-01-23 21:30:43 +00:00
del session["attempt_login"]
2020-11-01 10:33:56 +00:00
flash(_("Connection successful. Welcome %(user)s", user=user.name), "success")
2021-05-24 15:43:15 +00:00
return redirect(url_for("account.index"))
2020-08-17 07:45:35 +00:00
2021-01-23 21:30:43 +00:00
return render_template(
"password.html", form=form, username=session["attempt_login"]
)
2020-08-14 11:18:08 +00:00
2020-08-16 17:39:14 +00:00
@bp.route("/logout")
2020-08-14 13:26:14 +00:00
def logout():
2020-10-31 17:22:24 +00:00
user = current_user()
if user:
flash(
2020-11-01 10:33:56 +00:00
_("You have been disconnected. See you next time %(user)s", user=user.name),
"success",
2020-10-31 17:22:24 +00:00
)
user.logout()
2020-08-16 17:39:14 +00:00
return redirect("/")
2020-10-20 09:44:45 +00:00
@bp.route("/firstlogin/<uid>", methods=("GET", "POST"))
def firstlogin(uid):
user = User.get(uid)
user and not user.has_password() or abort(404)
form = ForgottenPasswordForm(request.form or None, data={"login": uid})
if not request.form:
return render_template("firstlogin.html", form=form, uid=uid)
if not form.validate():
flash(_("Could not send the password initialization link."), "error")
return render_template("firstlogin.html", form=form, uid=uid)
if send_password_initialization_mail(user):
flash(
_(
"A password initialization link has been sent at your email address. You should receive it within 10 minutes."
),
"success",
)
else:
flash(_("Could not send the password initialization email"), "error")
return render_template("firstlogin.html", form=form, uid=uid)
2020-11-01 10:33:56 +00:00
@bp.route("/users")
2021-12-02 17:23:14 +00:00
@permissions_needed("manage_users")
2020-11-01 10:33:56 +00:00
def users(user):
2021-12-03 17:37:25 +00:00
users = User.filter(
objectClass=current_app.config["LDAP"].get(
"USER_CLASS", User.DEFAULT_OBJECT_CLASS
)
)
2020-11-01 10:33:56 +00:00
return render_template("users.html", users=users, menuitem="users")
2022-01-01 10:56:48 +00:00
@dataclass
class Invitation:
creation_date_isoformat: str
uid: str
2022-01-01 17:41:04 +00:00
uid_editable: bool
2022-01-01 10:56:48 +00:00
mail: str
groups: List[str]
@property
def creation_date(self):
return datetime.fromisoformat(self.creation_date_isoformat)
def has_expired(self):
DEFAULT_INVITATION_DURATION = 2 * 24 * 60 * 60
return datetime.now() - self.creation_date > timedelta(
seconds=current_app.config.get(
"INVITATION_EXPIRATION", DEFAULT_INVITATION_DURATION
)
)
def b64(self):
return obj_to_b64(astuple(self))
def profile_hash(self):
return profile_hash(*astuple(self))
2021-12-01 11:19:28 +00:00
@bp.route("/invite", methods=["GET", "POST"])
@smtp_needed()
2021-12-02 17:23:14 +00:00
@permissions_needed("manage_users")
2021-12-01 11:19:28 +00:00
def user_invitation(user):
form = InvitationForm(request.form or None)
mail_sent = None
2021-12-01 11:19:28 +00:00
registration_url = None
form_validated = False
2021-12-01 11:19:28 +00:00
if request.form and form.validate():
form_validated = True
2022-01-01 10:56:48 +00:00
invitation = Invitation(
datetime.now().isoformat(),
form.uid.data,
2022-01-01 17:41:04 +00:00
form.uid_editable.data,
2022-01-01 10:56:48 +00:00
form.mail.data,
form.groups.data,
)
2021-12-01 11:19:28 +00:00
registration_url = url_for(
"account.registration",
2022-01-01 10:56:48 +00:00
data=invitation.b64(),
hash=invitation.profile_hash(),
2021-12-01 11:19:28 +00:00
_external=True,
)
if request.form["action"] == "send":
mail_sent = send_invitation_mail(form.mail.data, registration_url)
2021-12-01 11:19:28 +00:00
return render_template(
"invite.html",
form=form,
menuitems="users",
form_validated=form_validated,
mail_sent=mail_sent,
2021-12-01 11:19:28 +00:00
registration_url=registration_url,
)
2020-11-01 10:33:56 +00:00
@bp.route("/profile", methods=("GET", "POST"))
2021-12-02 17:23:14 +00:00
@permissions_needed("manage_users")
2020-11-01 10:33:56 +00:00
def profile_creation(user):
2021-12-02 17:23:14 +00:00
form = profile_form(user.write, user.read)
2020-12-31 17:16:35 +00:00
form.process(CombinedMultiDict((request.files, request.form)) or None)
2020-11-01 10:33:56 +00:00
2021-12-08 09:11:15 +00:00
for field in form:
if field.render_kw and "readonly" in field.render_kw:
del field.render_kw["readonly"]
2020-11-01 10:33:56 +00:00
if request.form:
if not form.validate():
2021-12-01 10:50:49 +00:00
flash(_("User account creation failed."), "error")
2020-11-01 10:33:56 +00:00
else:
2021-12-01 11:19:28 +00:00
user = profile_create(current_app, form)
return redirect(url_for("account.profile_edition", username=user.uid[0]))
return render_template(
"profile.html",
form=form,
menuitem="users",
edited_user=None,
self_deletion=False,
)
@bp.route("/register/<data>/<hash>", methods=["GET", "POST"])
def registration(data, hash):
try:
2022-01-01 10:56:48 +00:00
invitation = Invitation(*b64_to_obj(data))
2021-12-01 11:19:28 +00:00
except:
flash(
_("The invitation link that brought you here was invalid."),
"error",
)
return redirect(url_for("account.index"))
2022-01-01 10:56:48 +00:00
if invitation.has_expired():
flash(
_("The invitation link that brought you here has expired."),
"error",
)
return redirect(url_for("account.index"))
if User.get(invitation.uid):
2021-12-01 11:19:28 +00:00
flash(
_("Your account has already been created."),
"error",
)
return redirect(url_for("account.index"))
2020-12-31 17:16:35 +00:00
2021-12-01 11:19:28 +00:00
if current_user():
flash(
_("You are already logged in, you cannot create an account."),
"error",
)
2021-12-01 11:19:28 +00:00
return redirect(url_for("account.index"))
2020-11-01 10:33:56 +00:00
2022-01-01 10:56:48 +00:00
if hash != invitation.profile_hash():
2021-12-01 11:19:28 +00:00
flash(
_("The invitation link that brought you here was invalid."),
"error",
)
return redirect(url_for("account.index"))
2020-11-01 10:33:56 +00:00
2022-01-01 10:56:48 +00:00
data = {"uid": invitation.uid, "mail": invitation.mail, "groups": invitation.groups}
2020-11-01 10:33:56 +00:00
2021-12-08 09:00:36 +00:00
readable_fields, writable_fields = default_fields()
2021-12-02 17:23:14 +00:00
form = profile_form(writable_fields, readable_fields)
2021-12-01 11:19:28 +00:00
form.process(CombinedMultiDict((request.files, request.form)) or None, data=data)
2022-01-01 17:41:04 +00:00
if "readonly" in form["uid"].render_kw and invitation.uid_editable:
del form["uid"].render_kw["readonly"]
2021-12-01 11:19:28 +00:00
form["password1"].validators = [
wtforms.validators.DataRequired(),
wtforms.validators.Length(min=8),
]
form["password2"].validators = [
wtforms.validators.DataRequired(),
wtforms.validators.Length(min=8),
]
form["password1"].flags.required = True
form["password2"].flags.required = True
if request.form:
if not form.validate():
flash(_("User account creation failed."), "error")
else:
user = profile_create(current_app, form)
user.login()
flash(_("You account has been created successfuly."), "success")
2021-05-24 15:43:15 +00:00
return redirect(url_for("account.profile_edition", username=user.uid[0]))
2020-11-01 10:33:56 +00:00
return render_template(
"profile.html",
form=form,
menuitem="users",
edited_user=None,
self_deletion=False,
2020-11-01 10:33:56 +00:00
)
2021-12-01 11:19:28 +00:00
def profile_create(current_app, form):
2021-12-03 17:37:25 +00:00
user = User(
objectClass=current_app.config["LDAP"].get(
"USER_CLASS", User.DEFAULT_OBJECT_CLASS
)
)
2021-12-01 11:19:28 +00:00
for attribute in form:
if attribute.name in user.may + user.must:
if isinstance(attribute.data, FileStorage):
data = attribute.data.stream.read()
else:
data = attribute.data
2021-12-08 14:01:35 +00:00
if user.ldap_object_attributes()[attribute.name].single_value:
2021-12-01 11:19:28 +00:00
user[attribute.name] = data
else:
user[attribute.name] = [data]
2021-12-08 17:06:50 +00:00
if "jpegPhoto" in form and form["jpegPhoto_delete"].data:
user["jpegPhoto"] = None
user.cn = [f"{user.givenName[0]} {user.sn[0]}"]
user.save()
if "groups" in form:
groups = [Group.get(group_dn) for group_dn in form["groups"].data]
for group in groups:
group.add_member(user)
2021-12-01 11:19:28 +00:00
if not form["password1"].data or user.set_password(form["password1"].data):
flash(_("User account creation succeed."), "success")
user.save()
2021-12-01 11:19:28 +00:00
return user
2020-11-01 10:33:56 +00:00
@bp.route("/profile/<username>", methods=("GET", "POST"))
2020-10-20 09:44:45 +00:00
@user_needed()
2020-11-01 10:33:56 +00:00
def profile_edition(user, username):
2021-12-02 17:23:14 +00:00
user.can_manage_users or username == user.uid[0] or abort(403)
2020-11-01 10:33:56 +00:00
if request.method == "GET" or request.form.get("action") == "edit":
return profile_edit(user, username)
if request.form.get("action") == "delete":
return profile_delete(user, username)
if request.form.get("action") == "password-initialization-mail":
2021-01-13 09:09:41 +00:00
user = User.get(username) or abort(404)
if send_password_initialization_mail(user):
flash(
_(
"A password initialization link has been sent at the user email address. It should be received within 10 minutes."
),
"success",
)
else:
flash(_("Could not send the password initialization email"), "error")
return profile_edit(user, username)
2021-01-22 17:26:53 +00:00
if request.form.get("action") == "password-reset-mail":
user = User.get(username) or abort(404)
if send_password_reset_mail(user):
flash(
_(
"A password reset link has been sent at the user email address. It should be received within 10 minutes."
),
"success",
)
else:
flash(_("Could not send the password reset email"), "error")
return profile_edit(user, username)
2020-11-01 10:33:56 +00:00
abort(400)
2020-10-31 16:41:24 +00:00
2020-11-01 10:33:56 +00:00
2021-06-03 13:00:11 +00:00
def profile_edit(editor, username):
menuitem = "profile" if username == editor.uid[0] else "users"
2021-12-02 17:23:14 +00:00
fields = editor.read | editor.write
2021-06-03 13:00:11 +00:00
if username != editor.uid[0]:
2020-11-01 10:33:56 +00:00
user = User.get(username) or abort(404)
2021-06-03 13:00:11 +00:00
else:
user = editor
2020-11-01 10:33:56 +00:00
2020-10-20 09:44:45 +00:00
data = {
k: getattr(user, k)[0]
if getattr(user, k) and isinstance(getattr(user, k), list)
else getattr(user, k) or ""
for k in fields
if hasattr(user, k)
2020-10-20 09:44:45 +00:00
}
2021-06-03 13:00:11 +00:00
if "groups" in fields:
data["groups"] = [g.dn for g in user.groups]
2021-12-02 17:23:14 +00:00
form = profile_form(editor.write, editor.read)
2020-12-31 17:16:35 +00:00
form.process(CombinedMultiDict((request.files, request.form)) or None, data=data)
2020-10-21 08:26:31 +00:00
2020-10-20 09:44:45 +00:00
if request.form:
if not form.validate():
2020-10-22 15:37:01 +00:00
flash(_("Profile edition failed."), "error")
2020-10-20 09:44:45 +00:00
else:
for attribute in form:
2021-06-03 13:00:11 +00:00
if (
attribute.name in user.may + user.must
2021-12-02 17:23:14 +00:00
and attribute.name in editor.write
2021-06-03 13:00:11 +00:00
):
2020-12-31 17:16:35 +00:00
if isinstance(attribute.data, FileStorage):
data = attribute.data.stream.read()
else:
data = attribute.data
2021-12-08 14:01:35 +00:00
if user.ldap_object_attributes()[attribute.name].single_value:
2020-12-31 17:16:35 +00:00
user[attribute.name] = data
else:
2020-12-31 17:16:35 +00:00
user[attribute.name] = [data]
2021-12-02 17:23:14 +00:00
elif attribute.name == "groups" and "groups" in editor.write:
2021-06-03 13:00:11 +00:00
user.set_groups(attribute.data)
2020-10-20 09:44:45 +00:00
2021-12-08 17:06:50 +00:00
if "jpegPhoto" in form and form["jpegPhoto_delete"].data:
user["jpegPhoto"] = None
if (
2021-12-02 17:23:14 +00:00
"password1" not in request.form
or not form["password1"].data
or user.set_password(form["password1"].data)
) and request.form["action"] == "edit":
2020-10-22 15:37:01 +00:00
flash(_("Profile updated successfuly."), "success")
2021-12-02 17:23:14 +00:00
2020-10-20 09:44:45 +00:00
user.save()
2020-11-01 10:33:56 +00:00
return render_template(
"profile.html",
form=form,
menuitem=menuitem,
edited_user=user,
2021-12-02 17:23:14 +00:00
self_deletion=user.can_delete_account,
2020-11-01 10:33:56 +00:00
)
def profile_delete(user, username):
self_deletion = username == user.uid[0]
if self_deletion:
user.logout()
else:
user = User.get(username) or abort(404)
flash(_("The user %(user)s has been sucessfuly deleted", user=user.name), "success")
user.delete()
if self_deletion:
2021-05-24 15:43:15 +00:00
return redirect(url_for("account.index"))
return redirect(url_for("account.users"))
2020-10-22 15:37:01 +00:00
2021-12-01 10:47:11 +00:00
@bp.route("/impersonate/<username>")
2021-12-02 17:23:14 +00:00
@permissions_needed("impersonate_users")
2021-12-01 10:47:11 +00:00
def impersonate(user, username):
u = User.get(username) or abort(404)
u.login()
return redirect(url_for("account.index"))
2020-10-22 15:37:01 +00:00
@bp.route("/reset", methods=["GET", "POST"])
@smtp_needed()
2020-10-22 15:37:01 +00:00
def forgotten():
form = ForgottenPasswordForm(request.form)
if not request.form:
return render_template("forgotten-password.html", form=form)
if not form.validate():
flash(_("Could not send the password reset link."), "error")
return render_template("forgotten-password.html", form=form)
user = User.get(form.login.data)
if not user:
flash(
2020-11-23 16:03:03 +00:00
_(
"A password reset link has been sent at your email address. You should receive it within 10 minutes."
),
"success",
2020-10-22 15:37:01 +00:00
)
return render_template("forgotten-password.html", form=form)
2021-01-22 17:26:53 +00:00
success = send_password_reset_mail(user)
2020-10-22 15:37:01 +00:00
if success:
flash(
2020-11-23 16:03:03 +00:00
_(
"A password reset link has been sent at your email address. You should receive it within 10 minutes."
),
"success",
2020-10-22 15:37:01 +00:00
)
else:
flash(_("Could not reset your password"), "error")
2020-10-22 15:37:01 +00:00
return render_template("forgotten-password.html", form=form)
@bp.route("/reset/<uid>/<hash>", methods=["GET", "POST"])
def reset(uid, hash):
form = PasswordResetForm(request.form)
user = User.get(uid)
if not user or hash != profile_hash(
user.uid[0], user.mail[0], user.userPassword[0] if user.has_password() else ""
):
2020-10-22 15:37:01 +00:00
flash(
2021-10-29 12:20:06 +00:00
_("The password reset link that brought you here was invalid."),
"error",
2020-10-22 15:37:01 +00:00
)
2021-05-24 15:43:15 +00:00
return redirect(url_for("account.index"))
2020-10-22 15:37:01 +00:00
if request.form and form.validate():
user.set_password(form.password.data)
user.login()
flash(_("Your password has been updated successfuly"), "success")
2021-05-24 15:43:15 +00:00
return redirect(url_for("account.profile_edition", username=uid))
2020-10-22 15:37:01 +00:00
return render_template("reset-password.html", form=form, uid=uid, hash=hash)
@bp.route("/profile/<uid>/<field>")
def photo(uid, field):
if field.lower() != "jpegphoto":
abort(404)
user = User.get(uid)
photo = getattr(user, field)[0]
stream = io.BytesIO(photo)
2021-12-20 22:57:27 +00:00
return send_file(stream, mimetype="image/jpeg")