Merge branch '176-new-password-expiry-policy' into 'main'

Adds password expiry policy feature (issue #176)

Closes #176

See merge request yaal/canaille!203
This commit is contained in:
sebastien yaal 2024-12-20 08:37:59 +00:00
commit 0d1f2d4d8d
18 changed files with 1646 additions and 1439 deletions

View file

@ -20,6 +20,8 @@ Added
- Implement OIDC client_credentials flow. :issue:`207`
- Button in the client admin page to create client tokens.
- Basic SCIM implementation. :issue:`116` :pr:`197`
- Password expiry policy :issue:`176`
- :attr:`~canaille.core.configuration.CoreSettings.PASSWORD_LIFETIME`
Changed
^^^^^^^

View file

@ -5,8 +5,11 @@ from urllib.parse import urlunsplit
from flask import abort
from flask import current_app
from flask import flash
from flask import make_response
from flask import redirect
from flask import request
from flask import url_for
from werkzeug.exceptions import HTTPException
from werkzeug.routing import BaseConverter
@ -15,21 +18,7 @@ from canaille.app.session import current_user
from canaille.app.themes import render_template
def user_needed():
def wrapper(view_function):
@wraps(view_function)
def decorator(*args, **kwargs):
user = current_user()
if not user:
abort(403)
return view_function(*args, user=user, **kwargs)
return decorator
return wrapper
def permissions_needed(*args):
def user_needed(*args):
permissions = set(args)
def wrapper(view_function):
@ -38,6 +27,19 @@ def permissions_needed(*args):
user = current_user()
if not user or not user.can(*permissions):
abort(403)
if user.has_expired_password():
flash(
_("Your password has expired, please choose a new password."),
"info",
)
return redirect(
url_for(
"core.account.reset",
user=user,
)
)
return view_function(*args, user=user, **kwargs)
return decorator

View file

@ -41,6 +41,7 @@ class User(canaille.core.models.User, LDAPObject):
"one_time_password": "oathTokenPIN",
"one_time_password_emission_date": "oathSecretTime",
"password_failure_timestamps": "pwdFailureTime",
"password_last_update": "pwdChangedTime",
}
def match_filter(self, filter):

View file

@ -84,6 +84,10 @@ class MemoryBackend(Backend):
def set_user_password(self, user, password):
user.password = password
user.password_last_update = datetime.datetime.now(
datetime.timezone.utc
).replace(microsecond=0)
self.save(user)
def query(self, model, **kwargs):

View file

@ -77,6 +77,9 @@ class SQLBackend(Backend):
def set_user_password(self, user, password):
user.password = password
user.password_last_update = datetime.datetime.now(
datetime.timezone.utc
).replace(microsecond=0)
self.save(user)
def query(self, model, **kwargs):

View file

@ -76,6 +76,9 @@ class User(canaille.core.models.User, Base, SqlAlchemyModel):
password: Mapped[str] = mapped_column(
PasswordType(schemes=["pbkdf2_sha512"]), nullable=True
)
password_last_update: Mapped[datetime.datetime] = mapped_column(
TZDateTime(timezone=True), nullable=True
)
_password_failure_timestamps: Mapped[list[str]] = mapped_column(
MutableJson, nullable=True
)

View file

@ -120,6 +120,14 @@ SECRET_KEY = "change me before you go in production"
# This url should not be modified.
# API_URL_HIBP = "https://api.pwnedpasswords.com/range/"
# Password validity duration.
# If a value is recorded Canaille will check if user's password is expired.
# Then, the user is forced to change his password when the lifetime of the password is over.
# This value is expressed in `ISO8601 format <https://en.wikipedia.org/wiki/ISO_8601#Durations>`_.
# Example for 60 days: "P60D"
# It is possible to disable this option by entering None.
# PASSWORD_LIFETIME = None
# [CANAILLE_SQL]
# The SQL database connection string
# Details on https://docs.sqlalchemy.org/en/20/core/engines.html

View file

@ -374,3 +374,13 @@ class CoreSettings(BaseModel):
PASSWORD_COMPROMISSION_CHECK_API_URL: str = "https://api.pwnedpasswords.com/range/"
"""Have i been pwned api url for compromission checks."""
PASSWORD_LIFETIME: str | None = None
"""Password validity duration.
If a value is recorded Canaille will check if user's password is expired.
Then, the user is forced to change his password when the lifetime of the password is over.
This value is expressed in `ISO8601 format <https://en.wikipedia.org/wiki/ISO_8601#Durations>`_.
Example for 60 days: "P60D"
It is possible to disable this option by entering None.
"""

View file

@ -24,7 +24,6 @@ from canaille.app import build_hash
from canaille.app import default_fields
from canaille.app import models
from canaille.app import obj_to_b64
from canaille.app.flask import permissions_needed
from canaille.app.flask import render_htmx_template
from canaille.app.flask import request_is_htmx
from canaille.app.flask import smtp_needed
@ -53,6 +52,7 @@ from ..mails import send_registration_mail
from .forms import EmailConfirmationForm
from .forms import InvitationForm
from .forms import JoinForm
from .forms import PasswordResetForm
from .forms import build_profile_form
bp = Blueprint("account", __name__)
@ -140,7 +140,7 @@ def about():
@bp.route("/users", methods=["GET", "POST"])
@permissions_needed("manage_users")
@user_needed("manage_users")
def users(user):
table_form = TableForm(
models.User,
@ -195,7 +195,7 @@ class RegistrationPayload(VerificationPayload):
@bp.route("/invite", methods=["GET", "POST"])
@smtp_needed()
@permissions_needed("manage_users")
@user_needed("manage_users")
def user_invitation(user):
form = InvitationForm(request.form or None)
email_sent = None
@ -406,7 +406,7 @@ def email_confirmation(data, hash):
@bp.route("/profile", methods=("GET", "POST"))
@permissions_needed("manage_users")
@user_needed("manage_users")
def profile_creation(user):
form = build_profile_form(user.writable_fields, user.readable_fields)
form.process(CombinedMultiDict((request.files, request.form)) or None)
@ -847,7 +847,7 @@ def profile_delete(user, edited_user):
@bp.route("/impersonate/<user:puppet>")
@permissions_needed("impersonate_users")
@user_needed("impersonate_users")
def impersonate(user, puppet):
if puppet.locked:
abort(403, _("Locked users cannot be impersonated."))
@ -881,3 +881,23 @@ def photo(user, field):
return send_file(
stream, mimetype="image/jpeg", last_modified=user.last_modified, etag=etag
)
@bp.route("/reset/<user:user>", methods=["GET", "POST"])
def reset(user):
form = PasswordResetForm(request.form)
if user != current_user() or not user.has_expired_password():
abort(403)
if request.form and form.validate():
Backend.instance.set_user_password(user, form.password.data)
login_user(user)
flash(_("Your password has been updated successfully"), "success")
return redirect(
session.pop(
"redirect-after-login",
url_for("core.account.profile_edition", edited_user=user),
)
)
return render_template("core/reset-password.html", form=form, user=user, hash=None)

View file

@ -7,7 +7,7 @@ from wtforms import StringField
from wtforms.validators import DataRequired
from canaille.app import obj_to_b64
from canaille.app.flask import permissions_needed
from canaille.app.flask import user_needed
from canaille.app.forms import Form
from canaille.app.forms import email_validator
from canaille.app.i18n import gettext as _
@ -34,7 +34,7 @@ class MailTestForm(Form):
@bp.route("/mail", methods=["GET", "POST"])
@permissions_needed("manage_oidc")
@user_needed("manage_oidc")
def mail_index(user):
form = MailTestForm(request.form or None)
if request.form and form.validate():
@ -47,7 +47,7 @@ def mail_index(user):
@bp.route("/mail/test.html")
@permissions_needed("manage_oidc")
@user_needed("manage_oidc")
def test_html(user):
base_url = url_for("core.account.index", _external=True)
return render_template(
@ -62,7 +62,7 @@ def test_html(user):
@bp.route("/mail/test.txt")
@permissions_needed("manage_oidc")
@user_needed("manage_oidc")
def test_txt(user):
base_url = url_for("core.account.index", _external=True)
return render_template(
@ -73,7 +73,7 @@ def test_txt(user):
@bp.route("/mail/password-init.html")
@permissions_needed("manage_oidc")
@user_needed("manage_oidc")
def password_init_html(user):
base_url = url_for("core.account.index", _external=True)
reset_url = url_for(
@ -99,7 +99,7 @@ def password_init_html(user):
@bp.route("/mail/password-init.txt")
@permissions_needed("manage_oidc")
@user_needed("manage_oidc")
def password_init_txt(user):
base_url = url_for("core.account.index", _external=True)
reset_url = url_for(
@ -118,7 +118,7 @@ def password_init_txt(user):
@bp.route("/mail/reset.html")
@permissions_needed("manage_oidc")
@user_needed("manage_oidc")
def password_reset_html(user):
base_url = url_for("core.account.index", _external=True)
reset_url = url_for(
@ -144,7 +144,7 @@ def password_reset_html(user):
@bp.route("/mail/reset.txt")
@permissions_needed("manage_oidc")
@user_needed("manage_oidc")
def password_reset_txt(user):
base_url = url_for("core.account.index", _external=True)
reset_url = url_for(
@ -163,7 +163,7 @@ def password_reset_txt(user):
@bp.route("/mail/<identifier>/<email>/invitation.html")
@permissions_needed("manage_oidc")
@user_needed("manage_oidc")
def invitation_html(user, identifier, email):
base_url = url_for("core.account.index", _external=True)
registration_url = url_for(
@ -186,7 +186,7 @@ def invitation_html(user, identifier, email):
@bp.route("/mail/<identifier>/<email>/invitation.txt")
@permissions_needed("manage_oidc")
@user_needed("manage_oidc")
def invitation_txt(user, identifier, email):
base_url = url_for("core.account.index", _external=True)
registration_url = url_for(
@ -205,7 +205,7 @@ def invitation_txt(user, identifier, email):
@bp.route("/mail/<identifier>/<email>/email-confirmation.html")
@permissions_needed("manage_oidc")
@user_needed("manage_oidc")
def email_confirmation_html(user, identifier, email):
base_url = url_for("core.account.index", _external=True)
email_confirmation_url = url_for(
@ -228,7 +228,7 @@ def email_confirmation_html(user, identifier, email):
@bp.route("/mail/<identifier>/<email>/email-confirmation.txt")
@permissions_needed("manage_oidc")
@user_needed("manage_oidc")
def email_confirmation_txt(user, identifier, email):
base_url = url_for("core.account.index", _external=True)
email_confirmation_url = url_for(
@ -247,7 +247,7 @@ def email_confirmation_txt(user, identifier, email):
@bp.route("/mail/<email>/registration.html")
@permissions_needed("manage_oidc")
@user_needed("manage_oidc")
def registration_html(user, email):
base_url = url_for("core.account.index", _external=True)
registration_url = url_for(
@ -270,7 +270,7 @@ def registration_html(user, email):
@bp.route("/mail/<email>/registration.txt")
@permissions_needed("manage_oidc")
@user_needed("manage_oidc")
def registration_txt(user, email):
base_url = url_for("core.account.index", _external=True)
registration_url = url_for(
@ -289,7 +289,7 @@ def registration_txt(user, email):
@bp.route("/mail/compromised_password_check_failure.html")
@permissions_needed("manage_oidc")
@user_needed("manage_oidc")
def compromised_password_check_failure_html(user):
base_url = url_for("core.account.index", _external=True)
user_name = "<USER NAME>"
@ -313,7 +313,7 @@ def compromised_password_check_failure_html(user):
@bp.route("/mail/compromised_password_check_failure.txt")
@permissions_needed("manage_oidc")
@user_needed("manage_oidc")
def compromised_password_check_failure_txt(user):
base_url = url_for("core.account.index", _external=True)
user_name = "<USER NAME>"
@ -333,7 +333,7 @@ def compromised_password_check_failure_txt(user):
@bp.route("/mail/email_otp.html")
@permissions_needed("manage_oidc")
@user_needed("manage_oidc")
def email_otp_html(user):
base_url = url_for("core.account.index", _external=True)
otp = "000000"
@ -351,7 +351,7 @@ def email_otp_html(user):
@bp.route("/mail/email_otp.txt")
@permissions_needed("manage_oidc")
@user_needed("manage_oidc")
def email_otp_txt(user):
base_url = url_for("core.account.index", _external=True)
otp = "000000"

View file

@ -6,8 +6,8 @@ from flask import request
from flask import url_for
from canaille.app import models
from canaille.app.flask import permissions_needed
from canaille.app.flask import render_htmx_template
from canaille.app.flask import user_needed
from canaille.app.forms import TableForm
from canaille.app.i18n import gettext as _
from canaille.app.themes import render_template
@ -21,7 +21,7 @@ bp = Blueprint("groups", __name__, url_prefix="/groups")
@bp.route("/", methods=["GET", "POST"])
@permissions_needed("manage_groups")
@user_needed("manage_groups")
def groups(user):
table_form = TableForm(models.Group, formdata=request.form)
if request.form and request.form.get("page") and not table_form.validate():
@ -33,7 +33,7 @@ def groups(user):
@bp.route("/add", methods=("GET", "POST"))
@permissions_needed("manage_groups")
@user_needed("manage_groups")
def create_group(user):
form = CreateGroupForm(request.form or None)
@ -61,7 +61,7 @@ def create_group(user):
@bp.route("/<group:group>", methods=("GET", "POST"))
@permissions_needed("manage_groups")
@user_needed("manage_groups")
def group(user, group):
if (
request.method == "GET"

View file

@ -4,6 +4,7 @@ from typing import Annotated
from typing import ClassVar
from flask import current_app
from pydantic import TypeAdapter
from canaille.backends.models import Model
from canaille.core.configuration import Permission
@ -99,6 +100,11 @@ class User(Model):
"never").
"""
password_last_update: datetime.datetime | None = None
"""Specifies the last time the entry's password was changed.
By default, the date of creation of the password is retained.
"""
preferred_language: str | None = None
"""Indicates the user's preferred written or spoken languages and is
generally used for selecting a localized user interface.
@ -486,6 +492,23 @@ class User(Model):
).total_seconds()
return max(calculated_delay - time_since_last_failed_bind, 0)
def has_expired_password(self):
last_update = self.password_last_update or datetime.datetime.now(
datetime.timezone.utc
)
if current_app.config["CANAILLE"]["PASSWORD_LIFETIME"] is None:
password_expiration = None
else:
password_expiration = TypeAdapter(datetime.timedelta).validate_python(
current_app.config["CANAILLE"]["PASSWORD_LIFETIME"]
)
return (
password_expiration is not None
and last_update + password_expiration
< datetime.datetime.now(datetime.timezone.utc)
)
class Group(Model):
"""User model, based on the `SCIM Group schema

View file

@ -3,8 +3,8 @@ from flask import abort
from flask import request
from canaille.app import models
from canaille.app.flask import permissions_needed
from canaille.app.flask import render_htmx_template
from canaille.app.flask import user_needed
from canaille.app.forms import TableForm
from canaille.app.themes import render_template
@ -12,7 +12,7 @@ bp = Blueprint("authorizations", __name__, url_prefix="/admin/authorization")
@bp.route("/", methods=["GET", "POST"])
@permissions_needed("manage_oidc")
@user_needed("manage_oidc")
def index(user):
table_form = TableForm(models.AuthorizationCode, formdata=request.form)
if request.form and request.form.get("page") and not table_form.validate():
@ -26,7 +26,7 @@ def index(user):
@bp.route("/<authorizationcode:authorization>", methods=["GET", "POST"])
@permissions_needed("manage_oidc")
@user_needed("manage_oidc")
def view(user, authorization):
return render_template(
"oidc/authorization_view.html",

View file

@ -10,8 +10,8 @@ from flask import url_for
from werkzeug.security import gen_salt
from canaille.app import models
from canaille.app.flask import permissions_needed
from canaille.app.flask import render_htmx_template
from canaille.app.flask import user_needed
from canaille.app.forms import TableForm
from canaille.app.i18n import gettext as _
from canaille.app.themes import render_template
@ -23,7 +23,7 @@ bp = Blueprint("clients", __name__, url_prefix="/admin/client")
@bp.route("/", methods=["GET", "POST"])
@permissions_needed("manage_oidc")
@user_needed("manage_oidc")
def index(user):
table_form = TableForm(models.Client, formdata=request.form)
if request.form and request.form.get("page") and not table_form.validate():
@ -35,7 +35,7 @@ def index(user):
@bp.route("/add", methods=["GET", "POST"])
@permissions_needed("manage_oidc")
@user_needed("manage_oidc")
def add(user):
form = ClientAddForm(request.form or None)
@ -87,7 +87,7 @@ def add(user):
@bp.route("/edit/<client:client>", methods=["GET", "POST"])
@permissions_needed("manage_oidc")
@user_needed("manage_oidc")
def edit(user, client):
if request.form.get("action") == "confirm-delete":
return render_template("oidc/modals/delete-client.html", client=client)

View file

@ -7,8 +7,8 @@ from flask import flash
from flask import request
from canaille.app import models
from canaille.app.flask import permissions_needed
from canaille.app.flask import render_htmx_template
from canaille.app.flask import user_needed
from canaille.app.forms import TableForm
from canaille.app.i18n import gettext as _
from canaille.app.themes import render_template
@ -20,7 +20,7 @@ bp = Blueprint("tokens", __name__, url_prefix="/admin/token")
@bp.route("/", methods=["GET", "POST"])
@permissions_needed("manage_oidc")
@user_needed("manage_oidc")
def index(user):
table_form = TableForm(models.Token, formdata=request.form)
if request.form and request.form.get("page") and not table_form.validate():
@ -32,7 +32,7 @@ def index(user):
@bp.route("/<token:token>", methods=["GET", "POST"])
@permissions_needed("manage_oidc")
@user_needed("manage_oidc")
def view(user, token):
form = TokenRevokationForm(request.form or None)

View file

@ -28,5 +28,6 @@ Displays a password reset form.
<div class="ui attached clearing segment">
{{ fui.render_form(form, _("Password reset")) }}
</div>
</div>
{% endblock %}

File diff suppressed because it is too large Load diff

View file

@ -1,5 +1,8 @@
import datetime
from unittest import mock
import pytest
import time_machine
from flask import g
from canaille.app import models
@ -196,3 +199,126 @@ def test_account_locked_during_session(testclient, logged_user, backend):
logged_user.lock_date = datetime.datetime.now(datetime.timezone.utc)
backend.save(logged_user)
testclient.get("/profile/user/settings", status=403)
def test_expired_password_redirection_and_register_new_password_for_memory_and_sql(
testclient,
logged_user,
user,
backend,
admin,
):
"""time_machine does not work with ldap."""
if "ldap" in backend.__class__.__module__:
pytest.skip()
testclient.app.config["WTF_CSRF_ENABLED"] = False
backend.reload(logged_user)
res = testclient.get("/profile/user/settings", status=200)
res.form["password1"] = "123456789"
res.form["password2"] = "123456789"
with time_machine.travel("2020-01-01 01:00:00+00:00", tick=False) as traveller:
res = res.form.submit(name="action", value="edit-settings")
testclient.app.config["CANAILLE"]["PASSWORD_LIFETIME"] = "P5D"
traveller.shift(datetime.timedelta(days=5, minutes=1))
backend.reload(g.user)
res = testclient.get("/profile/user/settings")
testclient.get("/reset/admin", status=403)
assert (
"info",
"Your password has expired, please choose a new password.",
) in res.flashes
assert res.location == "/reset/user"
backend.reload(logged_user)
res = testclient.get("/reset/user")
res.form["password"] = "foobarbaz"
res.form["confirmation"] = "foobarbaz"
res = res.form.submit()
assert ("success", "Your password has been updated successfully") in res.flashes
@mock.patch("canaille.core.models.User.has_expired_password")
def test_expired_password_redirection_and_register_new_password_for_ldap_sql_and_memory(
has_expired,
testclient,
logged_user,
user,
backend,
admin,
):
"""time_machine does not work with ldap."""
has_expired.return_value = False
assert user.password_last_update is None
res = testclient.get("/profile/user/settings", status=200)
res.form["password1"] = "123456789"
res.form["password2"] = "123456789"
res = res.form.submit(name="action", value="edit-settings")
backend.reload(logged_user)
assert user.password_last_update is not None
has_expired.return_value = True
res = testclient.get("/profile/user/settings")
testclient.get("/reset/admin", status=403)
assert (
"info",
"Your password has expired, please choose a new password.",
) in res.flashes
assert res.location == "/reset/user"
backend.reload(logged_user)
backend.reload(g.user)
backend.reload(user)
res = testclient.get("/reset/user")
res.form["password"] = "foobarbaz"
res.form["confirmation"] = "foobarbaz"
res = res.form.submit()
assert ("success", "Your password has been updated successfully") in res.flashes
def test_not_expired_password_or_wrong_user_redirection(
testclient, logged_user, user, backend, admin
):
assert user.password_last_update is None
res = testclient.get("/profile/user/settings", status=200)
res.form["password1"] = "123456789"
res.form["password2"] = "123456789"
res = res.form.submit(name="action", value="edit-settings")
backend.reload(logged_user)
assert user.password_last_update is not None
def test_two_redirections(password_lifetime):
testclient.app.config["CANAILLE"]["PASSWORD_LIFETIME"] = password_lifetime
testclient.get("/reset/user", status=403)
testclient.get("/reset/admin", status=403)
test_two_redirections(None)
testclient.app.config["CANAILLE"]["PASSWORD_LIFETIME"] = "PT0S"
res = testclient.get("/profile/user/settings")
assert (
"info",
"Your password has expired, please choose a new password.",
) in res.flashes
assert res.location == "/reset/user"
testclient.app.config["CANAILLE"]["PASSWORD_LIFETIME"] = "P1D"
res = testclient.get("/profile/user/settings")
res.form["password1"] = "123456789"
res.form["password2"] = "123456789"
res = res.form.submit(name="action", value="edit-settings")
test_two_redirections("P1D")
def test_expired_password_needed_without_current_user(testclient, user):
testclient.get("/reset/user", status=403)