feat : Added intruder lockout login delay

This commit is contained in:
Félix Rohrlich 2024-12-05 11:42:51 +01:00
parent 57db533a17
commit 80ef1741a7
16 changed files with 325 additions and 8 deletions

View file

@ -14,6 +14,10 @@ class Features:
def has_password_recovery(self):
return self.app.config["CANAILLE"]["ENABLE_PASSWORD_RECOVERY"]
@property
def has_intruder_lockout(self):
return self.app.config["CANAILLE"]["ENABLE_INTRUDER_LOCKOUT"]
@property
def otp_method(self):
return self.app.config["CANAILLE"]["OTP_METHOD"]

View file

@ -1,6 +1,7 @@
import importlib
import os
from contextlib import contextmanager
from math import ceil
from flask import g
@ -195,3 +196,7 @@ def available_backends():
for elt in os.scandir(os.path.dirname(__file__))
if elt.is_dir() and os.path.exists(os.path.join(elt, "backend.py"))
}
def get_lockout_delay_message(current_lockout_delay):
return f"Too much attempts. Please wait for {ceil(current_lockout_delay)} seconds before trying to login again."

View file

@ -15,6 +15,7 @@ from canaille.app import models
from canaille.app.configuration import ConfigurationException
from canaille.app.i18n import gettext as _
from canaille.backends import Backend
from canaille.backends import get_lockout_delay_message
from .utils import listify
from .utils import python_attrs_to_ldap
@ -206,6 +207,11 @@ class LDAPBackend(Backend):
return self.get(User, filter=filter)
def check_user_password(self, user, password):
if current_app.features.has_intruder_lockout:
if current_lockout_delay := user.get_intruder_lockout_delay():
self.save(user)
return (False, get_lockout_delay_message(current_lockout_delay))
conn = ldap.initialize(current_app.config["CANAILLE_LDAP"]["URI"])
conn.set_option(

View file

@ -40,6 +40,7 @@ class User(canaille.core.models.User, LDAPObject):
"hotp_counter": "oathHOTPCounter",
"one_time_password": "oathTokenPIN",
"one_time_password_emission_date": "oathSecretTime",
"password_failure_timestamps": "pwdFailureTime",
}
def match_filter(self, filter):

View file

@ -37,7 +37,10 @@ def ldap_to_python(value, syntax):
# python cannot represent datetimes with year 0
return datetime.datetime.min
if value.endswith("Z"):
return datetime.datetime.strptime(value, "%Y%m%d%H%M%SZ").replace(
format_string = (
"%Y%m%d%H%M%S.%fZ" if "." in value else "%Y%m%d%H%M%SZ"
) # microseconds
return datetime.datetime.strptime(value, format_string).replace(
tzinfo=datetime.timezone.utc
)
return datetime.datetime.strptime(value, "%Y%m%d%H%M%S%z")

View file

@ -7,6 +7,7 @@ from flask import current_app
import canaille.backends.memory.models
from canaille.backends import Backend
from canaille.backends import get_lockout_delay_message
def listify(value):
@ -63,7 +64,14 @@ class MemoryBackend(Backend):
return self.get(User, user_name=login)
def check_user_password(self, user, password):
if current_app.features.has_intruder_lockout:
if current_lockout_delay := user.get_intruder_lockout_delay():
self.save(user)
return (False, get_lockout_delay_message(current_lockout_delay))
if password != user.password:
if current_app.features.has_intruder_lockout:
self.record_failed_attempt(user)
return (False, None)
if user.locked:
@ -234,3 +242,9 @@ class MemoryBackend(Backend):
# update the id index
del self.index(instance.__class__)[instance.id]
def record_failed_attempt(self, user):
user.password_failure_timestamps += [
datetime.datetime.now(datetime.timezone.utc)
]
self.save(user)

View file

@ -1,5 +1,6 @@
import datetime
from flask import current_app
from sqlalchemy import create_engine
from sqlalchemy import or_
from sqlalchemy import select
@ -7,6 +8,7 @@ from sqlalchemy.orm import Session
from sqlalchemy.orm import declarative_base
from canaille.backends import Backend
from canaille.backends import get_lockout_delay_message
Base = declarative_base()
@ -58,7 +60,14 @@ class SQLBackend(Backend):
return self.get(User, user_name=login)
def check_user_password(self, user, password):
if current_app.features.has_intruder_lockout:
if current_lockout_delay := user.get_intruder_lockout_delay():
self.save(user)
return (False, get_lockout_delay_message(current_lockout_delay))
if password != user.password:
if current_app.features.has_intruder_lockout:
self.record_failed_attempt(user)
return (False, None)
if user.locked:
@ -143,3 +152,11 @@ class SQLBackend(Backend):
# run the instance reload callback again if existing
next(reload_callback, None)
def record_failed_attempt(self, user):
if user.password_failure_timestamps is None:
user.password_failure_timestamps = []
user._password_failure_timestamps.append(
str(datetime.datetime.now(datetime.timezone.utc))
)
self.save(user)

View file

@ -72,11 +72,13 @@ class User(canaille.core.models.User, Base, SqlAlchemyModel):
last_modified: Mapped[datetime.datetime] = mapped_column(
TZDateTime(timezone=True), nullable=True
)
user_name: Mapped[str] = mapped_column(String, unique=True, nullable=False)
password: Mapped[str] = mapped_column(
PasswordType(schemes=["pbkdf2_sha512"]), nullable=True
)
_password_failure_timestamps: Mapped[list[str]] = mapped_column(
MutableJson, nullable=True
)
preferred_language: Mapped[str] = mapped_column(String, nullable=True)
family_name: Mapped[str] = mapped_column(String, nullable=True)
given_name: Mapped[str] = mapped_column(String, nullable=True)
@ -115,6 +117,22 @@ class User(canaille.core.models.User, Base, SqlAlchemyModel):
if current_app.features.has_otp and not self.secret_token:
self.initialize_otp()
@property
def password_failure_timestamps(self):
if self._password_failure_timestamps:
return [
datetime.datetime.fromisoformat(d)
for d in self._password_failure_timestamps
]
return self._password_failure_timestamps
@password_failure_timestamps.setter
def password_failure_timestamps(self, dates_list):
if dates_list:
self._password_failure_timestamps = [str(d) for d in dates_list]
else:
self._password_failure_timestamps = dates_list
class Group(canaille.core.models.Group, Base, SqlAlchemyModel):
__tablename__ = "group"

View file

@ -62,6 +62,10 @@ SECRET_KEY = "change me before you go in production"
# recovery link by email. This option is true by default.
# ENABLE_PASSWORD_RECOVERY = true
# If ENABLE_INTRUDER_LOCKOUT is true, then users will have to wait for an
# increasingly long time between each failed login attempt. This option is false by default.
# ENABLE_INTRUDER_LOCKOUT = false
# If OTP_METHOD is defined, then users will need to authenticate themselves
# using a one-time password (OTP) via an authenticator app.
# Two options are supported : "TOTP" for time one-time password,

View file

@ -266,6 +266,10 @@ class CoreSettings(BaseModel):
"""If :py:data:`False`, then users cannot ask for a password recovery link
by email."""
ENABLE_INTRUDER_LOCKOUT: bool = False
"""If :py:data:`True`, then users will have to wait for an increasingly
long time between each failed login attempt."""
OTP_METHOD: str = None
"""If OTP_METHOD is defined, then users will need to authenticate themselves
using a one-time password (OTP) via an authenticator app.

View file

@ -15,6 +15,10 @@ OTP_DIGITS = 6
OTP_VALIDITY = 600
SEND_NEW_OTP_DELAY = 10
PASSWORD_MIN_DELAY = 2
PASSWORD_MAX_DELAY = 600
PASSWORD_FAILURE_COUNT_INTERVAL = 600
class User(Model):
"""User model, based on the `SCIM User schema
@ -43,6 +47,13 @@ class User(Model):
and is case insensitive.
"""
password_failure_timestamps: list[datetime.datetime] = []
"""This attribute stores the timestamps of the user's failed
authentications.
It's currently used by the intruder lockout delay system.
"""
password: str | None = None
"""
This attribute is intended to be used as a means to set, replace,
@ -452,6 +463,29 @@ class User(Model):
>= datetime.timedelta(seconds=SEND_NEW_OTP_DELAY)
)
def get_intruder_lockout_delay(self):
if self.password_failure_timestamps:
# discard old attempts
self.password_failure_timestamps = [
attempt
for attempt in self.password_failure_timestamps
if attempt
> datetime.datetime.now(datetime.timezone.utc)
- datetime.timedelta(seconds=PASSWORD_FAILURE_COUNT_INTERVAL)
]
if not self.password_failure_timestamps:
return 0
failed_login_count = len(self.password_failure_timestamps)
# delay is multiplied by 2 each failed attempt, starting at min delay, limited to max delay
calculated_delay = min(
PASSWORD_MIN_DELAY * 2 ** (failed_login_count - 1), PASSWORD_MAX_DELAY
)
time_since_last_failed_bind = (
datetime.datetime.now(datetime.timezone.utc)
- self.password_failure_timestamps[-1]
).total_seconds()
return max(calculated_delay - time_since_last_failed_bind, 0)
class Group(Model):
"""User model, based on the `SCIM Group schema

View file

@ -9,3 +9,4 @@ pwdMustChange: TRUE
pwdLockout: TRUE
pwdAllowUserChange: TRUE
pwdGraceAuthNLimit: 1
pwdMaxFailure: 999

View file

@ -185,6 +185,13 @@ In case of lost token, TOTP/HOTP authentication can be reset by users with :attr
If a :class:`mail server <canaille.core.configuration.SMTPSettings>` is configured and the :attr:`email one-time password feature <canaille.core.configuration.CoreSettings.EMAIL_OTP>` is enabled, then users will need to authenticate themselves via a one-time password sent to their primary email address.
If a :class:`smpp server <canaille.core.configuration.SMPPSettings>` is configured and the :attr:`sms one-time password feature <canaille.core.configuration.CoreSettings.SMS_OTP>` is enabled, then users will need to authenticate themselves via a one-time password sent to their primary phone number.
.. _feature_intruder_lockout:
Intruder lockout
================
If the :attr:`intruder lockout feature <canaille.core.configuration.CoreSettings.ENABLE_INTRUDER_LOCKOUT>` is enabled, then users will have to wait for an increasingly long time between each failed login attempt.
Web interface
*************

View file

@ -117,12 +117,11 @@ dev = [
"time-machine >= 2.14.1",
"toml >= 0.10.0",
"tox-uv >= 1.16.0",
# Babel 2.14 does not directly depend on setuptools
# https://github.com/python-babel/babel/blob/40e60a1f6cf178d9f57fcc14f157ea1b2ab77361/CHANGES.rst?plain=1#L22-L24
# and neither python 3.12 due to PEP 632
# https://peps.python.org/pep-0632/
"setuptools >= 50.0.0; python_version>='3.12'"
# Babel 2.14 does not directly depend on setuptools
# https://github.com/python-babel/babel/blob/40e60a1f6cf178d9f57fcc14f157ea1b2ab77361/CHANGES.rst?plain=1#L22-L24
# and neither python 3.12 due to PEP 632
# https://peps.python.org/pep-0632/
"setuptools >= 50.0.0; python_version>='3.12'",
]
doc = [
"autodoc-pydantic >= 2.0.1",

View file

@ -0,0 +1,146 @@
import datetime
import logging
import time_machine
from canaille.backends.ldap.backend import LDAPBackend
from canaille.core.models import PASSWORD_MIN_DELAY
def test_intruder_lockout_fail_second_attempt_then_succeed_in_third(
testclient, user, caplog
):
testclient.app.config["CANAILLE"]["ENABLE_INTRUDER_LOCKOUT"] = True
with testclient.session_transaction() as session:
assert not session.get("user_id")
# add 100 milliseconds to account for LDAP time
with time_machine.travel(
datetime.datetime.now(datetime.timezone.utc)
+ datetime.timedelta(milliseconds=100),
tick=False,
) as traveller:
res = testclient.get("/login", status=200)
res.form["login"] = "user"
res = res.form.submit(status=302).follow()
res.form["password"] = "incorrect horse"
res = res.form.submit(status=200)
assert ("error", "Login failed, please check your information") in res.flashes
assert (
"canaille",
logging.SECURITY,
"Failed login attempt for user from unknown IP",
) in caplog.record_tuples
res.form["password"] = "correct horse battery staple"
res = res.form.submit(status=200)
assert (
"error",
f"Too much attempts. Please wait for {PASSWORD_MIN_DELAY} seconds before trying to login again.",
) in res.flashes
assert (
"canaille",
logging.SECURITY,
"Failed login attempt for user from unknown IP",
) in caplog.record_tuples
traveller.shift(datetime.timedelta(seconds=PASSWORD_MIN_DELAY))
res.form["password"] = "correct horse battery staple"
res = res.form.submit(status=302)
assert (
"success",
"Connection successful. Welcome John (johnny) Doe",
) in res.flashes
assert (
"canaille",
logging.SECURITY,
"Succeed login attempt for user from unknown IP",
) in caplog.record_tuples
def test_intruder_lockout_two_consecutive_fails(testclient, backend, user, caplog):
testclient.app.config["CANAILLE"]["ENABLE_INTRUDER_LOCKOUT"] = True
# add 100 milliseconds to account for LDAP time
with time_machine.travel(
datetime.datetime.now(datetime.timezone.utc)
+ datetime.timedelta(milliseconds=100),
tick=False,
) as traveller:
res = testclient.get("/login", status=200)
res.form["login"] = "user"
res = res.form.submit().follow()
res.form["password"] = "incorrect horse"
res = res.form.submit(status=200)
assert ("error", "Login failed, please check your information") in res.flashes
assert (
"canaille",
logging.SECURITY,
"Failed login attempt for user from unknown IP",
) in caplog.record_tuples
res.form["password"] = "correct horse battery staple"
res = res.form.submit(status=200)
assert (
"error",
f"Too much attempts. Please wait for {PASSWORD_MIN_DELAY} seconds before trying to login again.",
) in res.flashes
assert (
"canaille",
logging.SECURITY,
"Failed login attempt for user from unknown IP",
) in caplog.record_tuples
traveller.shift(datetime.timedelta(seconds=PASSWORD_MIN_DELAY))
ldap_shift = (
PASSWORD_MIN_DELAY if isinstance(backend, LDAPBackend) else 0
) # LDAP doesn't travel in time!
res.form["password"] = "incorrect horse"
res = res.form.submit(status=200)
assert ("error", "Login failed, please check your information") in res.flashes
assert (
"canaille",
logging.SECURITY,
"Failed login attempt for user from unknown IP",
) in caplog.record_tuples
res.form["password"] = "correct horse battery staple"
res = res.form.submit(status=200)
assert (
"error",
f"Too much attempts. Please wait for {PASSWORD_MIN_DELAY*2 - ldap_shift} seconds before trying to login again.",
) in res.flashes
assert (
"canaille",
logging.SECURITY,
"Failed login attempt for user from unknown IP",
) in caplog.record_tuples
traveller.shift(datetime.timedelta(seconds=PASSWORD_MIN_DELAY * 2))
res.form["password"] = "correct horse battery staple"
res = res.form.submit(status=302)
assert (
"success",
"Connection successful. Welcome John (johnny) Doe",
) in res.flashes
assert (
"canaille",
logging.SECURITY,
"Succeed login attempt for user from unknown IP",
) in caplog.record_tuples

View file

@ -10,6 +10,7 @@ from flask import g
from werkzeug.security import gen_salt
from canaille.app import models
from canaille.core.models import PASSWORD_MIN_DELAY
from . import client_credentials
@ -789,3 +790,56 @@ def test_missing_client_id(
status=400,
)
res.mustcontain("client_id parameter is missing.")
def test_logout_login_with_intruder_lockout(testclient, logged_user, client, backend):
testclient.app.config["CANAILLE"]["ENABLE_INTRUDER_LOCKOUT"] = True
# add 100 milliseconds to account for LDAP time
with time_machine.travel(
datetime.datetime.now(datetime.timezone.utc)
+ datetime.timedelta(milliseconds=100),
tick=False,
) as traveller:
res = testclient.get(
"/oauth/authorize",
params=dict(
response_type="code",
client_id=client.client_id,
scope="openid profile",
nonce="somenonce",
),
status=200,
)
res = res.form.submit(name="answer", value="logout")
res = res.follow()
g.user = None
res = res.follow()
res = res.follow()
res.form["login"] = logged_user.user_name
res = res.form.submit()
res = res.follow()
res.form["password"] = "wrong password"
res = res.form.submit(status=200)
assert ("error", "Login failed, please check your information") in res.flashes
res.form["password"] = "correct horse battery staple"
res = res.form.submit(status=200)
assert (
"error",
f"Too much attempts. Please wait for {PASSWORD_MIN_DELAY} seconds before trying to login again.",
) in res.flashes
traveller.shift(datetime.timedelta(seconds=PASSWORD_MIN_DELAY))
res.form["password"] = "correct horse battery staple"
res = res.form.submit(status=302)
res = res.follow(status=200)
res = res.form.submit(name="answer", value="accept", status=302)
assert res.location.startswith(client.redirect_uris[0])