diff --git a/CHANGES.rst b/CHANGES.rst index 51d26ed9..4ae66f4c 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -3,6 +3,9 @@ Added ^^^^^ +- Intruder lockout :issue:`173` +- :attr:`~canaille.core.configuration.CoreSettings.ENABLE_INTRUDER_LOCKOUT` + :issue:`173` - Multi-factor authentication :issue:`47` - :attr:`~canaille.core.configuration.CoreSettings.OTP_METHOD` and :attr:`~canaille.core.configuration.CoreSettings.EMAIL_OTP` and diff --git a/canaille/app/features.py b/canaille/app/features.py index f31e69e0..8ab3b7a1 100644 --- a/canaille/app/features.py +++ b/canaille/app/features.py @@ -14,6 +14,10 @@ class Features: def has_password_recovery(self): return self.app.config["CANAILLE"]["ENABLE_PASSWORD_RECOVERY"] + @property + def has_intruder_lockout(self): + return self.app.config["CANAILLE"]["ENABLE_INTRUDER_LOCKOUT"] + @property def otp_method(self): return self.app.config["CANAILLE"]["OTP_METHOD"] diff --git a/canaille/backends/__init__.py b/canaille/backends/__init__.py index 07cef570..940b28ac 100644 --- a/canaille/backends/__init__.py +++ b/canaille/backends/__init__.py @@ -1,6 +1,7 @@ import importlib import os from contextlib import contextmanager +from math import ceil from flask import g @@ -195,3 +196,7 @@ def available_backends(): for elt in os.scandir(os.path.dirname(__file__)) if elt.is_dir() and os.path.exists(os.path.join(elt, "backend.py")) } + + +def get_lockout_delay_message(current_lockout_delay): + return f"Too much attempts. Please wait for {ceil(current_lockout_delay)} seconds before trying to login again." diff --git a/canaille/backends/ldap/backend.py b/canaille/backends/ldap/backend.py index 5f82a480..f213c0ef 100644 --- a/canaille/backends/ldap/backend.py +++ b/canaille/backends/ldap/backend.py @@ -15,6 +15,7 @@ from canaille.app import models from canaille.app.configuration import ConfigurationException from canaille.app.i18n import gettext as _ from canaille.backends import Backend +from canaille.backends import get_lockout_delay_message from .utils import listify from .utils import python_attrs_to_ldap @@ -206,6 +207,11 @@ class LDAPBackend(Backend): return self.get(User, filter=filter) def check_user_password(self, user, password): + if current_app.features.has_intruder_lockout: + if current_lockout_delay := user.get_intruder_lockout_delay(): + self.save(user) + return (False, get_lockout_delay_message(current_lockout_delay)) + conn = ldap.initialize(current_app.config["CANAILLE_LDAP"]["URI"]) conn.set_option( diff --git a/canaille/backends/ldap/models.py b/canaille/backends/ldap/models.py index c9d4b729..8a04b689 100644 --- a/canaille/backends/ldap/models.py +++ b/canaille/backends/ldap/models.py @@ -40,6 +40,7 @@ class User(canaille.core.models.User, LDAPObject): "hotp_counter": "oathHOTPCounter", "one_time_password": "oathTokenPIN", "one_time_password_emission_date": "oathSecretTime", + "password_failure_timestamps": "pwdFailureTime", } def match_filter(self, filter): diff --git a/canaille/backends/ldap/utils.py b/canaille/backends/ldap/utils.py index 7826fc28..ec534fd3 100644 --- a/canaille/backends/ldap/utils.py +++ b/canaille/backends/ldap/utils.py @@ -37,7 +37,10 @@ def ldap_to_python(value, syntax): # python cannot represent datetimes with year 0 return datetime.datetime.min if value.endswith("Z"): - return datetime.datetime.strptime(value, "%Y%m%d%H%M%SZ").replace( + format_string = ( + "%Y%m%d%H%M%S.%fZ" if "." in value else "%Y%m%d%H%M%SZ" + ) # microseconds + return datetime.datetime.strptime(value, format_string).replace( tzinfo=datetime.timezone.utc ) return datetime.datetime.strptime(value, "%Y%m%d%H%M%S%z") diff --git a/canaille/backends/memory/backend.py b/canaille/backends/memory/backend.py index 04031a1a..853218cb 100644 --- a/canaille/backends/memory/backend.py +++ b/canaille/backends/memory/backend.py @@ -7,6 +7,7 @@ from flask import current_app import canaille.backends.memory.models from canaille.backends import Backend +from canaille.backends import get_lockout_delay_message def listify(value): @@ -66,7 +67,14 @@ class MemoryBackend(Backend): return self.get(User, user_name=login) def check_user_password(self, user, password): + if current_app.features.has_intruder_lockout: + if current_lockout_delay := user.get_intruder_lockout_delay(): + self.save(user) + return (False, get_lockout_delay_message(current_lockout_delay)) + if password != user.password: + if current_app.features.has_intruder_lockout: + self.record_failed_attempt(user) return (False, None) if user.locked: @@ -237,3 +245,9 @@ class MemoryBackend(Backend): # update the id index del self.index(instance.__class__)[instance.id] + + def record_failed_attempt(self, user): + user.password_failure_timestamps += [ + datetime.datetime.now(datetime.timezone.utc) + ] + self.save(user) diff --git a/canaille/backends/sql/backend.py b/canaille/backends/sql/backend.py index 04172b2c..726ecff7 100644 --- a/canaille/backends/sql/backend.py +++ b/canaille/backends/sql/backend.py @@ -1,5 +1,6 @@ import datetime +from flask import current_app from sqlalchemy import create_engine from sqlalchemy import or_ from sqlalchemy import select @@ -7,6 +8,7 @@ from sqlalchemy.orm import Session from sqlalchemy.orm import declarative_base from canaille.backends import Backend +from canaille.backends import get_lockout_delay_message Base = declarative_base() @@ -58,7 +60,14 @@ class SQLBackend(Backend): return self.get(User, user_name=login) def check_user_password(self, user, password): + if current_app.features.has_intruder_lockout: + if current_lockout_delay := user.get_intruder_lockout_delay(): + self.save(user) + return (False, get_lockout_delay_message(current_lockout_delay)) + if password != user.password: + if current_app.features.has_intruder_lockout: + self.record_failed_attempt(user) return (False, None) if user.locked: @@ -143,3 +152,11 @@ class SQLBackend(Backend): # run the instance reload callback again if existing next(reload_callback, None) + + def record_failed_attempt(self, user): + if user.password_failure_timestamps is None: + user.password_failure_timestamps = [] + user._password_failure_timestamps.append( + str(datetime.datetime.now(datetime.timezone.utc)) + ) + self.save(user) diff --git a/canaille/backends/sql/models.py b/canaille/backends/sql/models.py index 1c7e624e..caa1822a 100644 --- a/canaille/backends/sql/models.py +++ b/canaille/backends/sql/models.py @@ -72,11 +72,13 @@ class User(canaille.core.models.User, Base, SqlAlchemyModel): last_modified: Mapped[datetime.datetime] = mapped_column( TZDateTime(timezone=True), nullable=True ) - user_name: Mapped[str] = mapped_column(String, unique=True, nullable=False) password: Mapped[str] = mapped_column( PasswordType(schemes=["pbkdf2_sha512"]), nullable=True ) + _password_failure_timestamps: Mapped[list[str]] = mapped_column( + MutableJson, nullable=True + ) preferred_language: Mapped[str] = mapped_column(String, nullable=True) family_name: Mapped[str] = mapped_column(String, nullable=True) given_name: Mapped[str] = mapped_column(String, nullable=True) @@ -115,6 +117,22 @@ class User(canaille.core.models.User, Base, SqlAlchemyModel): if current_app.features.has_otp and not self.secret_token: self.initialize_otp() + @property + def password_failure_timestamps(self): + if self._password_failure_timestamps: + return [ + datetime.datetime.fromisoformat(d) + for d in self._password_failure_timestamps + ] + return self._password_failure_timestamps + + @password_failure_timestamps.setter + def password_failure_timestamps(self, dates_list): + if dates_list: + self._password_failure_timestamps = [str(d) for d in dates_list] + else: + self._password_failure_timestamps = dates_list + class Group(canaille.core.models.Group, Base, SqlAlchemyModel): __tablename__ = "group" diff --git a/canaille/config.sample.toml b/canaille/config.sample.toml index 51c6f27f..95f8cca2 100644 --- a/canaille/config.sample.toml +++ b/canaille/config.sample.toml @@ -62,6 +62,10 @@ SECRET_KEY = "change me before you go in production" # recovery link by email. This option is true by default. # ENABLE_PASSWORD_RECOVERY = true +# If ENABLE_INTRUDER_LOCKOUT is true, then users will have to wait for an +# increasingly long time between each failed login attempt. This option is false by default. +# ENABLE_INTRUDER_LOCKOUT = false + # If OTP_METHOD is defined, then users will need to authenticate themselves # using a one-time password (OTP) via an authenticator app. # Two options are supported : "TOTP" for time one-time password, diff --git a/canaille/core/configuration.py b/canaille/core/configuration.py index c6d467e7..8ee3960c 100644 --- a/canaille/core/configuration.py +++ b/canaille/core/configuration.py @@ -266,6 +266,10 @@ class CoreSettings(BaseModel): """If :py:data:`False`, then users cannot ask for a password recovery link by email.""" + ENABLE_INTRUDER_LOCKOUT: bool = False + """If :py:data:`True`, then users will have to wait for an increasingly + long time between each failed login attempt.""" + OTP_METHOD: str = None """If OTP_METHOD is defined, then users will need to authenticate themselves using a one-time password (OTP) via an authenticator app. diff --git a/canaille/core/models.py b/canaille/core/models.py index cf2cfccf..b1d23cda 100644 --- a/canaille/core/models.py +++ b/canaille/core/models.py @@ -15,6 +15,10 @@ OTP_DIGITS = 6 OTP_VALIDITY = 600 SEND_NEW_OTP_DELAY = 10 +PASSWORD_MIN_DELAY = 2 +PASSWORD_MAX_DELAY = 600 +PASSWORD_FAILURE_COUNT_INTERVAL = 600 + class User(Model): """User model, based on the `SCIM User schema @@ -43,6 +47,13 @@ class User(Model): and is case insensitive. """ + password_failure_timestamps: list[datetime.datetime] = [] + """This attribute stores the timestamps of the user's failed + authentications. + + It's currently used by the intruder lockout delay system. + """ + password: str | None = None """ This attribute is intended to be used as a means to set, replace, @@ -452,6 +463,29 @@ class User(Model): >= datetime.timedelta(seconds=SEND_NEW_OTP_DELAY) ) + def get_intruder_lockout_delay(self): + if self.password_failure_timestamps: + # discard old attempts + self.password_failure_timestamps = [ + attempt + for attempt in self.password_failure_timestamps + if attempt + > datetime.datetime.now(datetime.timezone.utc) + - datetime.timedelta(seconds=PASSWORD_FAILURE_COUNT_INTERVAL) + ] + if not self.password_failure_timestamps: + return 0 + failed_login_count = len(self.password_failure_timestamps) + # delay is multiplied by 2 each failed attempt, starting at min delay, limited to max delay + calculated_delay = min( + PASSWORD_MIN_DELAY * 2 ** (failed_login_count - 1), PASSWORD_MAX_DELAY + ) + time_since_last_failed_bind = ( + datetime.datetime.now(datetime.timezone.utc) + - self.password_failure_timestamps[-1] + ).total_seconds() + return max(calculated_delay - time_since_last_failed_bind, 0) + class Group(Model): """User model, based on the `SCIM Group schema diff --git a/demo/ldif/ppolicy.ldif b/demo/ldif/ppolicy.ldif index 71f44886..870efc28 100644 --- a/demo/ldif/ppolicy.ldif +++ b/demo/ldif/ppolicy.ldif @@ -9,3 +9,4 @@ pwdMustChange: TRUE pwdLockout: TRUE pwdAllowUserChange: TRUE pwdGraceAuthNLimit: 1 +pwdMaxFailure: 999 diff --git a/doc/features.rst b/doc/features.rst index dd03bc9c..df789e93 100644 --- a/doc/features.rst +++ b/doc/features.rst @@ -185,6 +185,13 @@ In case of lost token, TOTP/HOTP authentication can be reset by users with :attr If a :class:`mail server ` is configured and the :attr:`email one-time password feature ` is enabled, then users will need to authenticate themselves via a one-time password sent to their primary email address. If a :class:`smpp server ` is configured and the :attr:`sms one-time password feature ` is enabled, then users will need to authenticate themselves via a one-time password sent to their primary phone number. +.. _feature_intruder_lockout: + +Intruder lockout +================ + +If the :attr:`intruder lockout feature ` is enabled, then users will have to wait for an increasingly long time between each failed login attempt. + Web interface ************* diff --git a/tests/core/test_intruder_lockout.py b/tests/core/test_intruder_lockout.py new file mode 100644 index 00000000..1c718b9d --- /dev/null +++ b/tests/core/test_intruder_lockout.py @@ -0,0 +1,146 @@ +import datetime +import logging + +import time_machine + +from canaille.backends.ldap.backend import LDAPBackend +from canaille.core.models import PASSWORD_MIN_DELAY + + +def test_intruder_lockout_fail_second_attempt_then_succeed_in_third( + testclient, user, caplog +): + testclient.app.config["CANAILLE"]["ENABLE_INTRUDER_LOCKOUT"] = True + + with testclient.session_transaction() as session: + assert not session.get("user_id") + + # add 100 milliseconds to account for LDAP time + with time_machine.travel( + datetime.datetime.now(datetime.timezone.utc) + + datetime.timedelta(milliseconds=100), + tick=False, + ) as traveller: + res = testclient.get("/login", status=200) + + res.form["login"] = "user" + res = res.form.submit(status=302).follow() + + res.form["password"] = "incorrect horse" + res = res.form.submit(status=200) + + assert ("error", "Login failed, please check your information") in res.flashes + assert ( + "canaille", + logging.SECURITY, + "Failed login attempt for user from unknown IP", + ) in caplog.record_tuples + + res.form["password"] = "correct horse battery staple" + res = res.form.submit(status=200) + + assert ( + "error", + f"Too much attempts. Please wait for {PASSWORD_MIN_DELAY} seconds before trying to login again.", + ) in res.flashes + assert ( + "canaille", + logging.SECURITY, + "Failed login attempt for user from unknown IP", + ) in caplog.record_tuples + + traveller.shift(datetime.timedelta(seconds=PASSWORD_MIN_DELAY)) + + res.form["password"] = "correct horse battery staple" + res = res.form.submit(status=302) + + assert ( + "success", + "Connection successful. Welcome John (johnny) Doe", + ) in res.flashes + assert ( + "canaille", + logging.SECURITY, + "Succeed login attempt for user from unknown IP", + ) in caplog.record_tuples + + +def test_intruder_lockout_two_consecutive_fails(testclient, backend, user, caplog): + testclient.app.config["CANAILLE"]["ENABLE_INTRUDER_LOCKOUT"] = True + + # add 100 milliseconds to account for LDAP time + with time_machine.travel( + datetime.datetime.now(datetime.timezone.utc) + + datetime.timedelta(milliseconds=100), + tick=False, + ) as traveller: + res = testclient.get("/login", status=200) + + res.form["login"] = "user" + res = res.form.submit().follow() + + res.form["password"] = "incorrect horse" + res = res.form.submit(status=200) + + assert ("error", "Login failed, please check your information") in res.flashes + assert ( + "canaille", + logging.SECURITY, + "Failed login attempt for user from unknown IP", + ) in caplog.record_tuples + + res.form["password"] = "correct horse battery staple" + res = res.form.submit(status=200) + + assert ( + "error", + f"Too much attempts. Please wait for {PASSWORD_MIN_DELAY} seconds before trying to login again.", + ) in res.flashes + assert ( + "canaille", + logging.SECURITY, + "Failed login attempt for user from unknown IP", + ) in caplog.record_tuples + + traveller.shift(datetime.timedelta(seconds=PASSWORD_MIN_DELAY)) + ldap_shift = ( + PASSWORD_MIN_DELAY if isinstance(backend, LDAPBackend) else 0 + ) # LDAP doesn't travel in time! + + res.form["password"] = "incorrect horse" + res = res.form.submit(status=200) + + assert ("error", "Login failed, please check your information") in res.flashes + assert ( + "canaille", + logging.SECURITY, + "Failed login attempt for user from unknown IP", + ) in caplog.record_tuples + + res.form["password"] = "correct horse battery staple" + res = res.form.submit(status=200) + + assert ( + "error", + f"Too much attempts. Please wait for {PASSWORD_MIN_DELAY*2 - ldap_shift} seconds before trying to login again.", + ) in res.flashes + assert ( + "canaille", + logging.SECURITY, + "Failed login attempt for user from unknown IP", + ) in caplog.record_tuples + + traveller.shift(datetime.timedelta(seconds=PASSWORD_MIN_DELAY * 2)) + + res.form["password"] = "correct horse battery staple" + res = res.form.submit(status=302) + + assert ( + "success", + "Connection successful. Welcome John (johnny) Doe", + ) in res.flashes + assert ( + "canaille", + logging.SECURITY, + "Succeed login attempt for user from unknown IP", + ) in caplog.record_tuples diff --git a/tests/oidc/test_authorization_code_flow.py b/tests/oidc/test_authorization_code_flow.py index a3fc351d..f1446fb9 100644 --- a/tests/oidc/test_authorization_code_flow.py +++ b/tests/oidc/test_authorization_code_flow.py @@ -10,6 +10,7 @@ from flask import g from werkzeug.security import gen_salt from canaille.app import models +from canaille.core.models import PASSWORD_MIN_DELAY from . import client_credentials @@ -789,3 +790,56 @@ def test_missing_client_id( status=400, ) res.mustcontain("client_id parameter is missing.") + + +def test_logout_login_with_intruder_lockout(testclient, logged_user, client, backend): + testclient.app.config["CANAILLE"]["ENABLE_INTRUDER_LOCKOUT"] = True + + # add 100 milliseconds to account for LDAP time + with time_machine.travel( + datetime.datetime.now(datetime.timezone.utc) + + datetime.timedelta(milliseconds=100), + tick=False, + ) as traveller: + res = testclient.get( + "/oauth/authorize", + params=dict( + response_type="code", + client_id=client.client_id, + scope="openid profile", + nonce="somenonce", + ), + status=200, + ) + + res = res.form.submit(name="answer", value="logout") + res = res.follow() + g.user = None + res = res.follow() + res = res.follow() + + res.form["login"] = logged_user.user_name + res = res.form.submit() + res = res.follow() + + res.form["password"] = "wrong password" + res = res.form.submit(status=200) + assert ("error", "Login failed, please check your information") in res.flashes + + res.form["password"] = "correct horse battery staple" + res = res.form.submit(status=200) + + assert ( + "error", + f"Too much attempts. Please wait for {PASSWORD_MIN_DELAY} seconds before trying to login again.", + ) in res.flashes + + traveller.shift(datetime.timedelta(seconds=PASSWORD_MIN_DELAY)) + + res.form["password"] = "correct horse battery staple" + res = res.form.submit(status=302) + res = res.follow(status=200) + + res = res.form.submit(name="answer", value="accept", status=302) + + assert res.location.startswith(client.redirect_uris[0])