diff --git a/CHANGES.rst b/CHANGES.rst index 4ae66f4c..5117da27 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -20,6 +20,8 @@ Added - Implement OIDC client_credentials flow. :issue:`207` - Button in the client admin page to create client tokens. - Basic SCIM implementation. :issue:`116` :pr:`197` +- Password expiry policy :issue:`176` +- :attr:`~canaille.core.configuration.CoreSettings.PASSWORD_LIFETIME` Changed ^^^^^^^ diff --git a/canaille/app/flask.py b/canaille/app/flask.py index 73597653..8281bd64 100644 --- a/canaille/app/flask.py +++ b/canaille/app/flask.py @@ -5,8 +5,11 @@ from urllib.parse import urlunsplit from flask import abort from flask import current_app +from flask import flash from flask import make_response +from flask import redirect from flask import request +from flask import url_for from werkzeug.exceptions import HTTPException from werkzeug.routing import BaseConverter @@ -15,21 +18,7 @@ from canaille.app.session import current_user from canaille.app.themes import render_template -def user_needed(): - def wrapper(view_function): - @wraps(view_function) - def decorator(*args, **kwargs): - user = current_user() - if not user: - abort(403) - return view_function(*args, user=user, **kwargs) - - return decorator - - return wrapper - - -def permissions_needed(*args): +def user_needed(*args): permissions = set(args) def wrapper(view_function): @@ -38,6 +27,19 @@ def permissions_needed(*args): user = current_user() if not user or not user.can(*permissions): abort(403) + + if user.has_expired_password(): + flash( + _("Your password has expired, please choose a new password."), + "info", + ) + return redirect( + url_for( + "core.account.reset", + user=user, + ) + ) + return view_function(*args, user=user, **kwargs) return decorator diff --git a/canaille/backends/ldap/models.py b/canaille/backends/ldap/models.py index 8a04b689..dc78bf82 100644 --- a/canaille/backends/ldap/models.py +++ b/canaille/backends/ldap/models.py @@ -41,6 +41,7 @@ class User(canaille.core.models.User, LDAPObject): "one_time_password": "oathTokenPIN", "one_time_password_emission_date": "oathSecretTime", "password_failure_timestamps": "pwdFailureTime", + "password_last_update": "pwdChangedTime", } def match_filter(self, filter): diff --git a/canaille/backends/memory/backend.py b/canaille/backends/memory/backend.py index 853218cb..46c2fc5a 100644 --- a/canaille/backends/memory/backend.py +++ b/canaille/backends/memory/backend.py @@ -84,6 +84,10 @@ class MemoryBackend(Backend): def set_user_password(self, user, password): user.password = password + user.password_last_update = datetime.datetime.now( + datetime.timezone.utc + ).replace(microsecond=0) + self.save(user) def query(self, model, **kwargs): diff --git a/canaille/backends/sql/backend.py b/canaille/backends/sql/backend.py index 726ecff7..9bf23d62 100644 --- a/canaille/backends/sql/backend.py +++ b/canaille/backends/sql/backend.py @@ -77,6 +77,9 @@ class SQLBackend(Backend): def set_user_password(self, user, password): user.password = password + user.password_last_update = datetime.datetime.now( + datetime.timezone.utc + ).replace(microsecond=0) self.save(user) def query(self, model, **kwargs): diff --git a/canaille/backends/sql/models.py b/canaille/backends/sql/models.py index caa1822a..a59a44d9 100644 --- a/canaille/backends/sql/models.py +++ b/canaille/backends/sql/models.py @@ -76,6 +76,9 @@ class User(canaille.core.models.User, Base, SqlAlchemyModel): password: Mapped[str] = mapped_column( PasswordType(schemes=["pbkdf2_sha512"]), nullable=True ) + password_last_update: Mapped[datetime.datetime] = mapped_column( + TZDateTime(timezone=True), nullable=True + ) _password_failure_timestamps: Mapped[list[str]] = mapped_column( MutableJson, nullable=True ) diff --git a/canaille/config.sample.toml b/canaille/config.sample.toml index 95f8cca2..2898a30b 100644 --- a/canaille/config.sample.toml +++ b/canaille/config.sample.toml @@ -120,6 +120,14 @@ SECRET_KEY = "change me before you go in production" # This url should not be modified. # API_URL_HIBP = "https://api.pwnedpasswords.com/range/" +# Password validity duration. +# If a value is recorded Canaille will check if user's password is expired. +# Then, the user is forced to change his password when the lifetime of the password is over. +# This value is expressed in `ISO8601 format `_. +# Example for 60 days: "P60D" +# It is possible to disable this option by entering None. +# PASSWORD_LIFETIME = None + # [CANAILLE_SQL] # The SQL database connection string # Details on https://docs.sqlalchemy.org/en/20/core/engines.html diff --git a/canaille/core/configuration.py b/canaille/core/configuration.py index 8ee3960c..cc9472c3 100644 --- a/canaille/core/configuration.py +++ b/canaille/core/configuration.py @@ -374,3 +374,13 @@ class CoreSettings(BaseModel): PASSWORD_COMPROMISSION_CHECK_API_URL: str = "https://api.pwnedpasswords.com/range/" """Have i been pwned api url for compromission checks.""" + + PASSWORD_LIFETIME: str | None = None + """Password validity duration. + + If a value is recorded Canaille will check if user's password is expired. + Then, the user is forced to change his password when the lifetime of the password is over. + This value is expressed in `ISO8601 format `_. + Example for 60 days: "P60D" + It is possible to disable this option by entering None. + """ diff --git a/canaille/core/endpoints/account.py b/canaille/core/endpoints/account.py index df114187..81097891 100644 --- a/canaille/core/endpoints/account.py +++ b/canaille/core/endpoints/account.py @@ -24,7 +24,6 @@ from canaille.app import build_hash from canaille.app import default_fields from canaille.app import models from canaille.app import obj_to_b64 -from canaille.app.flask import permissions_needed from canaille.app.flask import render_htmx_template from canaille.app.flask import request_is_htmx from canaille.app.flask import smtp_needed @@ -53,6 +52,7 @@ from ..mails import send_registration_mail from .forms import EmailConfirmationForm from .forms import InvitationForm from .forms import JoinForm +from .forms import PasswordResetForm from .forms import build_profile_form bp = Blueprint("account", __name__) @@ -140,7 +140,7 @@ def about(): @bp.route("/users", methods=["GET", "POST"]) -@permissions_needed("manage_users") +@user_needed("manage_users") def users(user): table_form = TableForm( models.User, @@ -195,7 +195,7 @@ class RegistrationPayload(VerificationPayload): @bp.route("/invite", methods=["GET", "POST"]) @smtp_needed() -@permissions_needed("manage_users") +@user_needed("manage_users") def user_invitation(user): form = InvitationForm(request.form or None) email_sent = None @@ -406,7 +406,7 @@ def email_confirmation(data, hash): @bp.route("/profile", methods=("GET", "POST")) -@permissions_needed("manage_users") +@user_needed("manage_users") def profile_creation(user): form = build_profile_form(user.writable_fields, user.readable_fields) form.process(CombinedMultiDict((request.files, request.form)) or None) @@ -847,7 +847,7 @@ def profile_delete(user, edited_user): @bp.route("/impersonate/") -@permissions_needed("impersonate_users") +@user_needed("impersonate_users") def impersonate(user, puppet): if puppet.locked: abort(403, _("Locked users cannot be impersonated.")) @@ -881,3 +881,23 @@ def photo(user, field): return send_file( stream, mimetype="image/jpeg", last_modified=user.last_modified, etag=etag ) + + +@bp.route("/reset/", methods=["GET", "POST"]) +def reset(user): + form = PasswordResetForm(request.form) + if user != current_user() or not user.has_expired_password(): + abort(403) + + if request.form and form.validate(): + Backend.instance.set_user_password(user, form.password.data) + login_user(user) + flash(_("Your password has been updated successfully"), "success") + return redirect( + session.pop( + "redirect-after-login", + url_for("core.account.profile_edition", edited_user=user), + ) + ) + + return render_template("reset-password.html", form=form, hash=None, user=user) diff --git a/canaille/core/endpoints/admin.py b/canaille/core/endpoints/admin.py index 8a23ea82..a802e248 100644 --- a/canaille/core/endpoints/admin.py +++ b/canaille/core/endpoints/admin.py @@ -7,7 +7,7 @@ from wtforms import StringField from wtforms.validators import DataRequired from canaille.app import obj_to_b64 -from canaille.app.flask import permissions_needed +from canaille.app.flask import user_needed from canaille.app.forms import Form from canaille.app.forms import email_validator from canaille.app.i18n import gettext as _ @@ -34,7 +34,7 @@ class MailTestForm(Form): @bp.route("/mail", methods=["GET", "POST"]) -@permissions_needed("manage_oidc") +@user_needed("manage_oidc") def mail_index(user): form = MailTestForm(request.form or None) if request.form and form.validate(): @@ -47,7 +47,7 @@ def mail_index(user): @bp.route("/mail/test.html") -@permissions_needed("manage_oidc") +@user_needed("manage_oidc") def test_html(user): base_url = url_for("core.account.index", _external=True) return render_template( @@ -62,7 +62,7 @@ def test_html(user): @bp.route("/mail/test.txt") -@permissions_needed("manage_oidc") +@user_needed("manage_oidc") def test_txt(user): base_url = url_for("core.account.index", _external=True) return render_template( @@ -73,7 +73,7 @@ def test_txt(user): @bp.route("/mail/password-init.html") -@permissions_needed("manage_oidc") +@user_needed("manage_oidc") def password_init_html(user): base_url = url_for("core.account.index", _external=True) reset_url = url_for( @@ -99,7 +99,7 @@ def password_init_html(user): @bp.route("/mail/password-init.txt") -@permissions_needed("manage_oidc") +@user_needed("manage_oidc") def password_init_txt(user): base_url = url_for("core.account.index", _external=True) reset_url = url_for( @@ -118,7 +118,7 @@ def password_init_txt(user): @bp.route("/mail/reset.html") -@permissions_needed("manage_oidc") +@user_needed("manage_oidc") def password_reset_html(user): base_url = url_for("core.account.index", _external=True) reset_url = url_for( @@ -144,7 +144,7 @@ def password_reset_html(user): @bp.route("/mail/reset.txt") -@permissions_needed("manage_oidc") +@user_needed("manage_oidc") def password_reset_txt(user): base_url = url_for("core.account.index", _external=True) reset_url = url_for( @@ -163,7 +163,7 @@ def password_reset_txt(user): @bp.route("/mail///invitation.html") -@permissions_needed("manage_oidc") +@user_needed("manage_oidc") def invitation_html(user, identifier, email): base_url = url_for("core.account.index", _external=True) registration_url = url_for( @@ -186,7 +186,7 @@ def invitation_html(user, identifier, email): @bp.route("/mail///invitation.txt") -@permissions_needed("manage_oidc") +@user_needed("manage_oidc") def invitation_txt(user, identifier, email): base_url = url_for("core.account.index", _external=True) registration_url = url_for( @@ -205,7 +205,7 @@ def invitation_txt(user, identifier, email): @bp.route("/mail///email-confirmation.html") -@permissions_needed("manage_oidc") +@user_needed("manage_oidc") def email_confirmation_html(user, identifier, email): base_url = url_for("core.account.index", _external=True) email_confirmation_url = url_for( @@ -228,7 +228,7 @@ def email_confirmation_html(user, identifier, email): @bp.route("/mail///email-confirmation.txt") -@permissions_needed("manage_oidc") +@user_needed("manage_oidc") def email_confirmation_txt(user, identifier, email): base_url = url_for("core.account.index", _external=True) email_confirmation_url = url_for( @@ -247,7 +247,7 @@ def email_confirmation_txt(user, identifier, email): @bp.route("/mail//registration.html") -@permissions_needed("manage_oidc") +@user_needed("manage_oidc") def registration_html(user, email): base_url = url_for("core.account.index", _external=True) registration_url = url_for( @@ -270,7 +270,7 @@ def registration_html(user, email): @bp.route("/mail//registration.txt") -@permissions_needed("manage_oidc") +@user_needed("manage_oidc") def registration_txt(user, email): base_url = url_for("core.account.index", _external=True) registration_url = url_for( @@ -289,7 +289,7 @@ def registration_txt(user, email): @bp.route("/mail/compromised_password_check_failure.html") -@permissions_needed("manage_oidc") +@user_needed("manage_oidc") def compromised_password_check_failure_html(user): base_url = url_for("core.account.index", _external=True) user_name = "" @@ -313,7 +313,7 @@ def compromised_password_check_failure_html(user): @bp.route("/mail/compromised_password_check_failure.txt") -@permissions_needed("manage_oidc") +@user_needed("manage_oidc") def compromised_password_check_failure_txt(user): base_url = url_for("core.account.index", _external=True) user_name = "" @@ -333,7 +333,7 @@ def compromised_password_check_failure_txt(user): @bp.route("/mail/email_otp.html") -@permissions_needed("manage_oidc") +@user_needed("manage_oidc") def email_otp_html(user): base_url = url_for("core.account.index", _external=True) otp = "000000" @@ -351,7 +351,7 @@ def email_otp_html(user): @bp.route("/mail/email_otp.txt") -@permissions_needed("manage_oidc") +@user_needed("manage_oidc") def email_otp_txt(user): base_url = url_for("core.account.index", _external=True) otp = "000000" diff --git a/canaille/core/endpoints/groups.py b/canaille/core/endpoints/groups.py index 19efaf63..939c09db 100644 --- a/canaille/core/endpoints/groups.py +++ b/canaille/core/endpoints/groups.py @@ -6,8 +6,8 @@ from flask import request from flask import url_for from canaille.app import models -from canaille.app.flask import permissions_needed from canaille.app.flask import render_htmx_template +from canaille.app.flask import user_needed from canaille.app.forms import TableForm from canaille.app.i18n import gettext as _ from canaille.app.themes import render_template @@ -21,7 +21,7 @@ bp = Blueprint("groups", __name__, url_prefix="/groups") @bp.route("/", methods=["GET", "POST"]) -@permissions_needed("manage_groups") +@user_needed("manage_groups") def groups(user): table_form = TableForm(models.Group, formdata=request.form) if request.form and request.form.get("page") and not table_form.validate(): @@ -33,7 +33,7 @@ def groups(user): @bp.route("/add", methods=("GET", "POST")) -@permissions_needed("manage_groups") +@user_needed("manage_groups") def create_group(user): form = CreateGroupForm(request.form or None) @@ -61,7 +61,7 @@ def create_group(user): @bp.route("/", methods=("GET", "POST")) -@permissions_needed("manage_groups") +@user_needed("manage_groups") def group(user, group): if ( request.method == "GET" diff --git a/canaille/core/models.py b/canaille/core/models.py index b1d23cda..98ba9232 100644 --- a/canaille/core/models.py +++ b/canaille/core/models.py @@ -4,6 +4,7 @@ from typing import Annotated from typing import ClassVar from flask import current_app +from pydantic import TypeAdapter from canaille.backends.models import Model from canaille.core.configuration import Permission @@ -99,6 +100,11 @@ class User(Model): "never"). """ + password_last_update: datetime.datetime | None = None + """Specifies the last time the entry's password was changed. + By default, the date of creation of the password is retained. + """ + preferred_language: str | None = None """Indicates the user's preferred written or spoken languages and is generally used for selecting a localized user interface. @@ -486,6 +492,23 @@ class User(Model): ).total_seconds() return max(calculated_delay - time_since_last_failed_bind, 0) + def has_expired_password(self): + last_update = self.password_last_update or datetime.datetime.now( + datetime.timezone.utc + ) + if current_app.config["CANAILLE"]["PASSWORD_LIFETIME"] is None: + password_expiration = None + else: + password_expiration = TypeAdapter(datetime.timedelta).validate_python( + current_app.config["CANAILLE"]["PASSWORD_LIFETIME"] + ) + + return ( + password_expiration is not None + and last_update + password_expiration + < datetime.datetime.now(datetime.timezone.utc) + ) + class Group(Model): """User model, based on the `SCIM Group schema diff --git a/canaille/oidc/endpoints/authorizations.py b/canaille/oidc/endpoints/authorizations.py index c4cbce20..08a08667 100644 --- a/canaille/oidc/endpoints/authorizations.py +++ b/canaille/oidc/endpoints/authorizations.py @@ -3,8 +3,8 @@ from flask import abort from flask import request from canaille.app import models -from canaille.app.flask import permissions_needed from canaille.app.flask import render_htmx_template +from canaille.app.flask import user_needed from canaille.app.forms import TableForm from canaille.app.themes import render_template @@ -12,7 +12,7 @@ bp = Blueprint("authorizations", __name__, url_prefix="/admin/authorization") @bp.route("/", methods=["GET", "POST"]) -@permissions_needed("manage_oidc") +@user_needed("manage_oidc") def index(user): table_form = TableForm(models.AuthorizationCode, formdata=request.form) if request.form and request.form.get("page") and not table_form.validate(): @@ -26,7 +26,7 @@ def index(user): @bp.route("/", methods=["GET", "POST"]) -@permissions_needed("manage_oidc") +@user_needed("manage_oidc") def view(user, authorization): return render_template( "oidc/authorization_view.html", diff --git a/canaille/oidc/endpoints/clients.py b/canaille/oidc/endpoints/clients.py index 13ae71ce..8b0a9326 100644 --- a/canaille/oidc/endpoints/clients.py +++ b/canaille/oidc/endpoints/clients.py @@ -10,8 +10,8 @@ from flask import url_for from werkzeug.security import gen_salt from canaille.app import models -from canaille.app.flask import permissions_needed from canaille.app.flask import render_htmx_template +from canaille.app.flask import user_needed from canaille.app.forms import TableForm from canaille.app.i18n import gettext as _ from canaille.app.themes import render_template @@ -23,7 +23,7 @@ bp = Blueprint("clients", __name__, url_prefix="/admin/client") @bp.route("/", methods=["GET", "POST"]) -@permissions_needed("manage_oidc") +@user_needed("manage_oidc") def index(user): table_form = TableForm(models.Client, formdata=request.form) if request.form and request.form.get("page") and not table_form.validate(): @@ -35,7 +35,7 @@ def index(user): @bp.route("/add", methods=["GET", "POST"]) -@permissions_needed("manage_oidc") +@user_needed("manage_oidc") def add(user): form = ClientAddForm(request.form or None) @@ -87,7 +87,7 @@ def add(user): @bp.route("/edit/", methods=["GET", "POST"]) -@permissions_needed("manage_oidc") +@user_needed("manage_oidc") def edit(user, client): if request.form.get("action") == "confirm-delete": return render_template("oidc/modals/delete-client.html", client=client) diff --git a/canaille/oidc/endpoints/tokens.py b/canaille/oidc/endpoints/tokens.py index a91e5481..afec9e29 100644 --- a/canaille/oidc/endpoints/tokens.py +++ b/canaille/oidc/endpoints/tokens.py @@ -7,8 +7,8 @@ from flask import flash from flask import request from canaille.app import models -from canaille.app.flask import permissions_needed from canaille.app.flask import render_htmx_template +from canaille.app.flask import user_needed from canaille.app.forms import TableForm from canaille.app.i18n import gettext as _ from canaille.app.themes import render_template @@ -20,7 +20,7 @@ bp = Blueprint("tokens", __name__, url_prefix="/admin/token") @bp.route("/", methods=["GET", "POST"]) -@permissions_needed("manage_oidc") +@user_needed("manage_oidc") def index(user): table_form = TableForm(models.Token, formdata=request.form) if request.form and request.form.get("page") and not table_form.validate(): @@ -32,7 +32,7 @@ def index(user): @bp.route("/", methods=["GET", "POST"]) -@permissions_needed("manage_oidc") +@user_needed("manage_oidc") def view(user, token): form = TokenRevokationForm(request.form or None) diff --git a/canaille/templates/core/reset-password.html b/canaille/templates/core/reset-password.html index 4a0c721e..55a0abbb 100644 --- a/canaille/templates/core/reset-password.html +++ b/canaille/templates/core/reset-password.html @@ -25,8 +25,15 @@ Displays a password reset form. {% endblock %} -
- {{ fui.render_form(form, _("Password reset")) }} -
+ {% if hash == None %} +
+ {{ fui.render_form(form, _("Password reset"), action=url_for("core.account.reset", user=user)) }} +
+ {% else %} +
+ {{ fui.render_form(form, _("Password reset"), action=url_for("core.auth.reset", user=user, hash=hash)) }} +
+ {% endif %} + {% endblock %} diff --git a/tests/core/test_account.py b/tests/core/test_account.py index 61d57cd4..97b3b0df 100644 --- a/tests/core/test_account.py +++ b/tests/core/test_account.py @@ -1,5 +1,8 @@ import datetime +from unittest import mock +import pytest +import time_machine from flask import g from canaille.app import models @@ -196,3 +199,126 @@ def test_account_locked_during_session(testclient, logged_user, backend): logged_user.lock_date = datetime.datetime.now(datetime.timezone.utc) backend.save(logged_user) testclient.get("/profile/user/settings", status=403) + + +def test_expired_password_redirection_and_register_new_password_for_memory_and_sql( + testclient, + logged_user, + user, + backend, + admin, +): + """time_machine does not work with ldap.""" + if "ldap" in backend.__class__.__module__: + pytest.skip() + + testclient.app.config["WTF_CSRF_ENABLED"] = False + backend.reload(logged_user) + res = testclient.get("/profile/user/settings", status=200) + res.form["password1"] = "123456789" + res.form["password2"] = "123456789" + + with time_machine.travel("2020-01-01 01:00:00+00:00", tick=False) as traveller: + res = res.form.submit(name="action", value="edit-settings") + + testclient.app.config["CANAILLE"]["PASSWORD_LIFETIME"] = "P5D" + + traveller.shift(datetime.timedelta(days=5, minutes=1)) + + backend.reload(g.user) + + res = testclient.get("/profile/user/settings") + + testclient.get("/reset/admin", status=403) + + assert ( + "info", + "Your password has expired, please choose a new password.", + ) in res.flashes + assert res.location == "/reset/user" + + backend.reload(logged_user) + res = testclient.get("/reset/user") + + res.form["password"] = "foobarbaz" + res.form["confirmation"] = "foobarbaz" + res = res.form.submit() + assert ("success", "Your password has been updated successfully") in res.flashes + + +@mock.patch("canaille.core.models.User.has_expired_password") +def test_expired_password_redirection_and_register_new_password_for_ldap_sql_and_memory( + has_expired, + testclient, + logged_user, + user, + backend, + admin, +): + """time_machine does not work with ldap.""" + has_expired.return_value = False + assert user.password_last_update is None + res = testclient.get("/profile/user/settings", status=200) + res.form["password1"] = "123456789" + res.form["password2"] = "123456789" + res = res.form.submit(name="action", value="edit-settings") + backend.reload(logged_user) + assert user.password_last_update is not None + + has_expired.return_value = True + res = testclient.get("/profile/user/settings") + testclient.get("/reset/admin", status=403) + assert ( + "info", + "Your password has expired, please choose a new password.", + ) in res.flashes + assert res.location == "/reset/user" + backend.reload(logged_user) + backend.reload(g.user) + backend.reload(user) + + res = testclient.get("/reset/user") + + res.form["password"] = "foobarbaz" + res.form["confirmation"] = "foobarbaz" + res = res.form.submit() + assert ("success", "Your password has been updated successfully") in res.flashes + + +def test_not_expired_password_or_wrong_user_redirection( + testclient, logged_user, user, backend, admin +): + assert user.password_last_update is None + res = testclient.get("/profile/user/settings", status=200) + res.form["password1"] = "123456789" + res.form["password2"] = "123456789" + res = res.form.submit(name="action", value="edit-settings") + backend.reload(logged_user) + assert user.password_last_update is not None + + def test_two_redirections(password_lifetime): + testclient.app.config["CANAILLE"]["PASSWORD_LIFETIME"] = password_lifetime + testclient.get("/reset/user", status=403) + testclient.get("/reset/admin", status=403) + + test_two_redirections(None) + + testclient.app.config["CANAILLE"]["PASSWORD_LIFETIME"] = "PT0S" + res = testclient.get("/profile/user/settings") + assert ( + "info", + "Your password has expired, please choose a new password.", + ) in res.flashes + assert res.location == "/reset/user" + + testclient.app.config["CANAILLE"]["PASSWORD_LIFETIME"] = "P1D" + res = testclient.get("/profile/user/settings") + res.form["password1"] = "123456789" + res.form["password2"] = "123456789" + res = res.form.submit(name="action", value="edit-settings") + + test_two_redirections("P1D") + + +def test_expired_password_needed_without_current_user(testclient, user): + testclient.get("/reset/user", status=403)