feat : Added HOTP authentication and CLI Multi-factor authentication reset

This commit is contained in:
Félix Rohrlich 2024-11-14 09:49:19 +01:00
parent 74e0c8d635
commit b01e8323d8
22 changed files with 451 additions and 70 deletions

View file

@ -175,6 +175,7 @@ def validate(config, validate_remote=False):
validate_keypair(config.get("CANAILLE_OIDC")) validate_keypair(config.get("CANAILLE_OIDC"))
validate_theme(config["CANAILLE"]) validate_theme(config["CANAILLE"])
validate_admin_email(config["CANAILLE"]) validate_admin_email(config["CANAILLE"])
validate_otp_method(config["CANAILLE"])
if not validate_remote: if not validate_remote:
return return
@ -243,3 +244,8 @@ def validate_admin_email(config):
raise ConfigurationException( raise ConfigurationException(
"You must set an administration email if you want to check if users' passwords are compromised." "You must set an administration email if you want to check if users' passwords are compromised."
) )
def validate_otp_method(config):
if config["OTP_METHOD"] not in [None, "TOTP", "HOTP"]:
raise ConfigurationException("Invalid OTP method")

View file

@ -15,8 +15,12 @@ class Features:
return self.app.config["CANAILLE"]["ENABLE_PASSWORD_RECOVERY"] return self.app.config["CANAILLE"]["ENABLE_PASSWORD_RECOVERY"]
@property @property
def has_totp(self): def otp_method(self):
return self.app.config["CANAILLE"]["ENABLE_TOTP"] return self.app.config["CANAILLE"]["OTP_METHOD"]
@property
def has_otp(self):
return bool(self.app.config["CANAILLE"]["OTP_METHOD"])
@property @property
def has_registration(self): def has_registration(self):

View file

@ -37,6 +37,7 @@ class User(canaille.core.models.User, LDAPObject):
"lock_date": "pwdEndTime", "lock_date": "pwdEndTime",
"secret_token": "oathSecret", "secret_token": "oathSecret",
"last_otp_login": "oathLastLogin", "last_otp_login": "oathLastLogin",
"hotp_counter": "oathHOTPCounter",
} }
def match_filter(self, filter): def match_filter(self, filter):
@ -47,8 +48,8 @@ class User(canaille.core.models.User, LDAPObject):
return super().match_filter(filter) return super().match_filter(filter)
def save(self): def save(self):
if current_app.features.has_totp and not self.secret_token: if current_app.features.has_otp and not self.secret_token:
self.generate_otp_token() self.initialize_otp()
group_attr = self.python_attribute_to_ldap("groups") group_attr = self.python_attribute_to_ldap("groups")
if group_attr not in self.changes: if group_attr not in self.changes:

View file

@ -129,10 +129,10 @@ class MemoryBackend(Backend):
def save(self, instance): def save(self, instance):
if ( if (
isinstance(instance, canaille.backends.memory.models.User) isinstance(instance, canaille.backends.memory.models.User)
and current_app.features.has_totp and current_app.features.has_otp
and not instance.secret_token and not instance.secret_token
): ):
instance.generate_otp_token() instance.initialize_otp()
if not instance.id: if not instance.id:
instance.id = str(uuid.uuid4()) instance.id = str(uuid.uuid4())

View file

@ -105,10 +105,11 @@ class User(canaille.core.models.User, Base, SqlAlchemyModel):
TZDateTime(timezone=True), nullable=True TZDateTime(timezone=True), nullable=True
) )
secret_token: Mapped[str] = mapped_column(String, nullable=True, unique=True) secret_token: Mapped[str] = mapped_column(String, nullable=True, unique=True)
hotp_counter: Mapped[int] = mapped_column(Integer, nullable=True)
def save(self): def save(self):
if current_app.features.has_totp and not self.secret_token: if current_app.features.has_otp and not self.secret_token:
self.generate_otp_token() self.initialize_otp()
class Group(canaille.core.models.Group, Base, SqlAlchemyModel): class Group(canaille.core.models.Group, Base, SqlAlchemyModel):

View file

@ -62,9 +62,12 @@ SECRET_KEY = "change me before you go in production"
# recovery link by email. This option is true by default. # recovery link by email. This option is true by default.
# ENABLE_PASSWORD_RECOVERY = true # ENABLE_PASSWORD_RECOVERY = true
# If ENABLE_TOTP is true, then users will need to authenticate themselves # If OTP_METHOD is defined, then users will need to authenticate themselves
# using a time one-time password via an authenticator app. This option is false by default. # using a one-time password (OTP) via an authenticator app.
# ENABLE_TOTP = false # Two options are supported : "TOTP" for time one-time password,
# and "HOTP" for HMAC-based one-time password.
# This option is deactivated by default.
# OTP_METHOD = "TOTP"
# The validity duration of registration invitations, in seconds. # The validity duration of registration invitations, in seconds.
# Defaults to 2 days # Defaults to 2 days
@ -119,7 +122,7 @@ SECRET_KEY = "change me before you go in production"
# USER_BASE = "ou=users,dc=mydomain,dc=tld" # USER_BASE = "ou=users,dc=mydomain,dc=tld"
# The object class to use for creating new users # The object class to use for creating new users
# Note : If you plan on using TOTP authentication, use : USER_CLASS = ["inetOrgPerson", "oathTOTPToken"] # Note : If you plan on using TOTP/HOTP authentication, use : USER_CLASS = ["inetOrgPerson", "oathHOTPToken"]
# USER_CLASS = "inetOrgPerson" # USER_CLASS = "inetOrgPerson"
# The attribute to identify an object in the User dn. # The attribute to identify an object in the User dn.

View file

@ -1,7 +1,12 @@
import json
import click import click
from flask.cli import with_appcontext from flask.cli import with_appcontext
from canaille.app import models
from canaille.app.commands import with_backendcontext from canaille.app.commands import with_backendcontext
from canaille.backends import Backend
from canaille.backends.commands import serialize
try: try:
HAS_FAKER = True HAS_FAKER = True
@ -50,3 +55,30 @@ def groups(ctx, nb_users_max):
def register(cli): def register(cli):
if HAS_FAKER: # pragma: no branch if HAS_FAKER: # pragma: no branch
cli.add_command(populate) cli.add_command(populate)
cli.add_command(reset_mfa)
@click.command()
@with_appcontext
@with_backendcontext
@click.argument("identifier")
def reset_mfa(identifier):
"""Reset multi-factor authentication for a user and display the
edited user in JSON format in the standard output.
IDENTIFIER should be a user id or user_name
"""
instance = Backend.instance.get(models.User, identifier)
if not instance:
raise click.ClickException(f"No user with id '{identifier}'")
instance.initialize_otp()
try:
Backend.instance.save(instance)
except Exception as exc: # pragma: no cover
raise click.ClickException(exc) from exc
output = json.dumps(serialize(instance))
click.echo(output)

View file

@ -248,9 +248,11 @@ class CoreSettings(BaseModel):
"""If :py:data:`False`, then users cannot ask for a password recovery link """If :py:data:`False`, then users cannot ask for a password recovery link
by email.""" by email."""
ENABLE_TOTP: bool = False OTP_METHOD: str = None
"""If :py:data:`True`, then users will need to authenticate themselves """If OTP_METHOD is defined, then users will need to authenticate themselves
using a time one-time password via an authenticator app.""" using a one-time password (OTP) via an authenticator app.
If set to :py:data:`TOTP`, the application will use time one-time passwords,
If set to :py:data:`HOTP`, the application will use HMAC-based one-time passwords."""
INVITATION_EXPIRATION: int = 172800 INVITATION_EXPIRATION: int = 172800
"""The validity duration of registration invitations, in seconds. """The validity duration of registration invitations, in seconds.

View file

@ -753,6 +753,19 @@ def profile_settings(user, edited_user):
return profile_settings_edit(user, edited_user) return profile_settings_edit(user, edited_user)
if (
request.form.get("action") == "confirm-reset-mfa"
and current_app.features.has_otp
):
return render_template("modals/reset-mfa.html", edited_user=edited_user)
if request.form.get("action") == "reset-mfa" and current_app.features.has_otp:
flash(_("Multi-factor authentication has been reset"), "success")
edited_user.initialize_otp()
Backend.instance.save(edited_user)
return profile_settings_edit(user, edited_user)
abort(400, f"bad form action: {request.form.get('action')}") abort(400, f"bad form action: {request.form.get('action')}")

View file

@ -107,7 +107,7 @@ def password():
"password.html", form=form, username=session["attempt_login"] "password.html", form=form, username=session["attempt_login"]
) )
if not current_app.features.has_totp: if not current_app.features.has_otp:
current_app.logger.security( current_app.logger.security(
f'Succeed login attempt for {session["attempt_login"]} from {request_ip}' f'Succeed login attempt for {session["attempt_login"]} from {request_ip}'
) )
@ -271,7 +271,7 @@ def reset(user, hash):
@bp.route("/setup-2fa") @bp.route("/setup-2fa")
def setup_two_factor_auth(): def setup_two_factor_auth():
if not current_app.features.has_totp: if not current_app.features.has_otp:
abort(404) abort(404)
if current_user(): if current_user():
@ -287,7 +287,7 @@ def setup_two_factor_auth():
session["attempt_login_with_correct_password"] session["attempt_login_with_correct_password"]
) )
uri = user.get_authentication_setup_uri() uri = user.get_otp_authentication_setup_uri()
base64_qr_image = get_b64encoded_qr_image(uri) base64_qr_image = get_b64encoded_qr_image(uri)
return render_template( return render_template(
"setup-2fa.html", "setup-2fa.html",
@ -299,7 +299,7 @@ def setup_two_factor_auth():
@bp.route("/verify-2fa", methods=["GET", "POST"]) @bp.route("/verify-2fa", methods=["GET", "POST"])
def verify_two_factor_auth(): def verify_two_factor_auth():
if not current_app.features.has_totp: if not current_app.features.has_otp:
abort(404) abort(404)
if current_user(): if current_user():
@ -361,6 +361,6 @@ def verify_two_factor_auth():
) )
request_ip = request.remote_addr or "unknown IP" request_ip = request.remote_addr or "unknown IP"
current_app.logger.security( current_app.logger.security(
f'Failed login attempt (wrong TOTP) for {session["attempt_login_with_correct_password"]} from {request_ip}' f'Failed login attempt (wrong OTP) for {session["attempt_login_with_correct_password"]} from {request_ip}'
) )
return redirect(url_for("core.auth.verify_two_factor_auth")) return redirect(url_for("core.auth.verify_two_factor_auth"))

View file

@ -9,6 +9,8 @@ from flask import current_app
from canaille.backends.models import Model from canaille.backends.models import Model
from canaille.core.configuration import Permission from canaille.core.configuration import Permission
HOTP_LOOK_AHEAD_WINDOW = 10
class User(Model): class User(Model):
"""User model, based on the `SCIM User schema """User model, based on the `SCIM User schema
@ -251,6 +253,10 @@ class User(Model):
"""Unique token generated for each user, used for """Unique token generated for each user, used for
two-factor authentication.""" two-factor authentication."""
hotp_counter: int | None = None
"""HMAC-based One Time Password counter, used for
two-factor authentication."""
_readable_fields = None _readable_fields = None
_writable_fields = None _writable_fields = None
_permissions = None _permissions = None
@ -328,19 +334,51 @@ class User(Model):
self._writable_fields |= set(details["WRITE"]) self._writable_fields |= set(details["WRITE"])
return self._writable_fields return self._writable_fields
def generate_otp_token(self): def initialize_otp(self):
self.secret_token = secrets.token_hex(32) self.secret_token = secrets.token_hex(32)
self.last_otp_login = None
if current_app.features.otp_method == "HOTP":
self.hotp_counter = 1
def generate_otp(self): def generate_otp(self, counter_delta=0):
return otpauth.TOTP(bytes(self.secret_token, "utf-8")).generate() method = current_app.features.otp_method
if method == "TOTP":
totp = otpauth.TOTP(bytes(self.secret_token, "utf-8"))
return totp.string_code(totp.generate())
elif method == "HOTP":
hotp = otpauth.HOTP(bytes(self.secret_token, "utf-8"))
return hotp.string_code(hotp.generate(self.hotp_counter + counter_delta))
def get_authentication_setup_uri(self): def get_otp_authentication_setup_uri(self):
method = current_app.features.otp_method
if method == "TOTP":
return otpauth.TOTP(bytes(self.secret_token, "utf-8")).to_uri( return otpauth.TOTP(bytes(self.secret_token, "utf-8")).to_uri(
label=self.user_name, issuer=current_app.config["CANAILLE"]["NAME"] label=self.user_name, issuer=current_app.config["CANAILLE"]["NAME"]
) )
elif method == "HOTP":
return otpauth.HOTP(bytes(self.secret_token, "utf-8")).to_uri(
label=self.user_name,
issuer=current_app.config["CANAILLE"]["NAME"],
counter=self.hotp_counter,
)
def is_otp_valid(self, user_otp): def is_otp_valid(self, user_otp):
method = current_app.features.otp_method
if method == "TOTP":
return otpauth.TOTP(bytes(self.secret_token, "utf-8")).verify(user_otp) return otpauth.TOTP(bytes(self.secret_token, "utf-8")).verify(user_otp)
elif method == "HOTP":
counter = self.hotp_counter
is_valid = False
# if user token's counter is ahead of canaille's, try to catch up to it
while counter - self.hotp_counter <= HOTP_LOOK_AHEAD_WINDOW:
is_valid = otpauth.HOTP(bytes(self.secret_token, "utf-8")).verify(
user_otp, counter
)
counter += 1
if is_valid:
self.hotp_counter = counter
return True
return False
class Group(Model): class Group(Model):

View file

@ -0,0 +1,32 @@
{% extends theme('base.html') %}
{% block content %}
<div id="modal-reset-mfa" class="ui warning message">
<form method="post" action="{{ request.url }}">
<input type="hidden" name="csrf_token" value="{{ request.form.get("csrf_token") }}">
<div class="ui icon header">
<i class="key icon"></i>
{% trans %}Multi-factor authentication reset{% endtrans %}
</div>
<div class="content">
<p>
{% if user != edited_user %}
{% trans user_name=(edited_user.formatted_name or edited_user.identifier) %}
Are you sure you want to reset multi-factor authentication (MFA) for {{ user_name }} ? The user will have to perform MFA setup again at next login.
{% endtrans %}
{% else %}
{% trans %}
Are you sure you want to reset multi-factor authentication (MFA)? You will have to perform MFA setup again at next login.
{% endtrans %}
{% endif %}
</p>
</div>
<div class="ui center aligned container">
<div class="ui stackable buttons">
<a class="ui cancel button" href="{{ request.url }}">{% trans %}Cancel{% endtrans %}</a>
<button type="submit" name="action" value="reset-mfa" class="ui red approve button">{% trans %}Reset multi-factor authentication{% endtrans %}</button>
</div>
</div>
</form>
</div>
{% endblock %}

View file

@ -140,6 +140,12 @@
<div class="ui right aligned container"> <div class="ui right aligned container">
<div class="ui stackable buttons"> <div class="ui stackable buttons">
{% if features.has_otp and user.can_manage_users %}
<button type="submit" class="ui right floated basic negative button confirm" name="action" value="confirm-reset-mfa" id="reset-mfa" formnovalidate>
{% trans %}Reset multi-factor authentication{% endtrans %}
</button>
{% endif %}
{% if features.has_account_lockability and "lock_date" in user.writable_fields and not edited_user.locked %} {% if features.has_account_lockability and "lock_date" in user.writable_fields and not edited_user.locked %}
<button type="submit" class="ui right floated basic negative button confirm" name="action" value="confirm-lock" id="lock" formnovalidate> <button type="submit" class="ui right floated basic negative button confirm" name="action" value="confirm-lock" id="lock" formnovalidate>
{% trans %}Lock the account{% endtrans %} {% trans %}Lock the account{% endtrans %}

View file

@ -27,7 +27,7 @@ BIND_PW = "admin"
TIMEOUT = 10 TIMEOUT = 10
USER_BASE = "ou=users,dc=mydomain,dc=tld" USER_BASE = "ou=users,dc=mydomain,dc=tld"
GROUP_BASE = "ou=groups,dc=mydomain,dc=tld" GROUP_BASE = "ou=groups,dc=mydomain,dc=tld"
USER_CLASS = ["inetOrgPerson", "oathTOTPToken"] USER_CLASS = ["inetOrgPerson", "oathHOTPToken"]
[CANAILLE.ACL.DEFAULT] [CANAILLE.ACL.DEFAULT]
PERMISSIONS = ["edit_self", "use_oidc"] PERMISSIONS = ["edit_self", "use_oidc"]

View file

@ -179,7 +179,8 @@ If :attr:`password compromission check feature <canaille.core.configuration.Core
Multi-factor authentication Multi-factor authentication
============== ==============
If the :attr:`time one-time password feature <canaille.core.configuration.CoreSettings.ENABLE_TOTP>` is enabled, then users will need to authenticate themselves using a time one-time password via an authenticator app. If the :attr:`one-time password feature <canaille.core.configuration.CoreSettings.OTP_METHOD>` is set, then users will need to authenticate themselves using a one-time password via an authenticator app.
Two options are supported : "TOTP" for time one-time password, and "HOTP" for HMAC-based one-time password.
Web interface Web interface
************* *************

View file

@ -49,14 +49,14 @@ It is used when the ``CANAILLE_LDAP`` configuration parameter is defined. For in
GROUP_BASE = "ou=groups,dc=mydomain,dc=tld" GROUP_BASE = "ou=groups,dc=mydomain,dc=tld"
If you want to use TOTP authentication, you will need to add the ``oathTOTPToken`` class to the user : If you want to use TOTP/HOTP authentication, you will need to add the ``oathHOTPToken`` class to the user :
.. code-block:: toml .. code-block:: toml
USER_CLASS = ["inetOrgPerson", "oathTOTPToken"] USER_CLASS = ["inetOrgPerson", "oathHOTPToken"]
You can find more details on the LDAP configuration in the :class:`dedicated section <canaille.backends.ldap.configuration.LDAPSettings>`. You can find more details on the LDAP configuration in the :class:`dedicated section <canaille.backends.ldap.configuration.LDAPSettings>`.
.. note :: .. note ::
Currently, only the ``inetOrgPerson``, ``oathTOTPToken`` and ``groupOfNames`` schemas have been tested. Currently, only the ``inetOrgPerson``, ``oathHOTPToken`` and ``groupOfNames`` schemas have been tested.
If you want to use different schemas or LDAP servers, adaptations may be needed. If you want to use different schemas or LDAP servers, adaptations may be needed.
Patches are welcome. Patches are welcome.

View file

@ -0,0 +1,55 @@
import json
from unittest import mock
import pytest
from canaille.commands import cli
@pytest.mark.parametrize("otp_method", ["TOTP", "HOTP"])
def test_reset_mfa_by_id(testclient, backend, user_otp, otp_method):
"""Reset multi-factor authentication for a user by its id."""
testclient.app.config["CANAILLE"]["OTP_METHOD"] = otp_method
old_token = user_otp.secret_token
assert old_token is not None
assert user_otp.last_otp_login is not None
runner = testclient.app.test_cli_runner()
res = runner.invoke(
cli,
[
"reset-mfa",
user_otp.id,
],
)
assert res.exit_code == 0, res.stdout
assert json.loads(res.stdout) == {
"created": mock.ANY,
"display_name": "Johnny",
"emails": [
"john@doe.com",
],
"family_name": "Doe",
"formatted_address": "1235, somewhere",
"formatted_name": "John (johnny) Doe",
"given_name": "John",
"id": user_otp.id,
"last_modified": mock.ANY,
"password": "***",
"phone_numbers": [
"555-000-000",
],
"preferred_language": "en",
"profile_url": "https://john.example",
"user_name": "user",
"hotp_counter": 1,
"secret_token": mock.ANY,
}
backend.reload(user_otp)
assert user_otp.secret_token is not None
assert user_otp.secret_token != old_token
assert user_otp.last_otp_login is None
if otp_method == "HOTP":
assert user_otp.hotp_counter == 1

View file

@ -229,3 +229,19 @@ def test_enable_password_compromission_check_with_and_without_admin_email(
config_obj = settings_factory(configuration) config_obj = settings_factory(configuration)
config_dict = config_obj.model_dump() config_dict = config_obj.model_dump()
validate(config_dict, validate_remote=False) validate(config_dict, validate_remote=False)
def test_invalid_otp_option(configuration, backend):
config_obj = settings_factory(configuration)
config_dict = config_obj.model_dump()
validate(config_dict, validate_remote=False)
with pytest.raises(
ConfigurationException,
match=r"Invalid OTP method",
):
configuration["CANAILLE"]["OTP_METHOD"] = "invalid"
config_obj = settings_factory(configuration)
config_dict = config_obj.model_dump()
validate(config_dict, validate_remote=False)

View file

@ -39,7 +39,7 @@ def ldap_configuration(configuration, slapd_server):
"USER_FILTER": "(uid={{ login }})", "USER_FILTER": "(uid={{ login }})",
"GROUP_BASE": "ou=groups", "GROUP_BASE": "ou=groups",
"TIMEOUT": 0.1, "TIMEOUT": 0.1,
"USER_CLASS": ["inetOrgPerson", "oathTOTPToken"], "USER_CLASS": ["inetOrgPerson", "oathHOTPToken"],
} }
yield configuration yield configuration
del configuration["CANAILLE_LDAP"] del configuration["CANAILLE_LDAP"]

View file

@ -201,10 +201,11 @@ def user(app, backend):
@pytest.fixture @pytest.fixture
def user_totp(app, user, backend): def user_otp(app, user, backend):
user.secret_token = ( user.secret_token = (
"fefe9b106b8a033d3fcb4de16ac06b2cae71c7d95a41b158c30380d1bc35b2ba" "fefe9b106b8a033d3fcb4de16ac06b2cae71c7d95a41b158c30380d1bc35b2ba"
) )
user.hotp_counter = 1
user.last_otp_login = datetime.datetime(2020, 1, 1) user.last_otp_login = datetime.datetime(2020, 1, 1)
backend.save(user) backend.save(user)
yield user yield user
@ -246,10 +247,10 @@ def logged_user(user, testclient):
@pytest.fixture @pytest.fixture
def logged_user_totp(user_totp, testclient): def logged_user_otp(user_otp, testclient):
with testclient.session_transaction() as sess: with testclient.session_transaction() as sess:
sess["user_id"] = [user_totp.id] sess["user_id"] = [user_otp.id]
return user_totp return user_otp
@pytest.fixture @pytest.fixture

View file

@ -1,20 +1,23 @@
import datetime import datetime
import logging import logging
import pytest
import time_machine import time_machine
from canaille.app import models from canaille.app import models
from canaille.core.models import HOTP_LOOK_AHEAD_WINDOW
def test_totp_disabled(testclient): def test_otp_disabled(testclient):
testclient.app.config["CANAILLE"]["ENABLE_TOTP"] = False testclient.app.config["CANAILLE"]["OTP_METHOD"] = None
testclient.get("/setup-2fa", status=404) testclient.get("/setup-2fa", status=404)
testclient.get("/verify-2fa", status=404) testclient.get("/verify-2fa", status=404)
def test_signin_and_out_with_totp(testclient, user_totp, caplog): @pytest.mark.parametrize("otp_method", ["TOTP", "HOTP"])
testclient.app.config["CANAILLE"]["ENABLE_TOTP"] = True def test_signin_and_out_with_otp(testclient, user_otp, caplog, otp_method):
testclient.app.config["CANAILLE"]["OTP_METHOD"] = otp_method
with testclient.session_transaction() as session: with testclient.session_transaction() as session:
assert not session.get("user_id") assert not session.get("user_id")
@ -37,7 +40,7 @@ def test_signin_and_out_with_totp(testclient, user_totp, caplog):
assert "user" == session.get("attempt_login_with_correct_password") assert "user" == session.get("attempt_login_with_correct_password")
res = testclient.get("/verify-2fa") res = testclient.get("/verify-2fa")
res.form["otp"] = user_totp.generate_otp() res.form["otp"] = user_otp.generate_otp()
res = res.form.submit(status=302) res = res.form.submit(status=302)
assert ( assert (
@ -53,7 +56,7 @@ def test_signin_and_out_with_totp(testclient, user_totp, caplog):
res = res.follow(status=200) res = res.follow(status=200)
with testclient.session_transaction() as session: with testclient.session_transaction() as session:
assert [user_totp.id] == session.get("user_id") assert [user_otp.id] == session.get("user_id")
assert "attempt_login" not in session assert "attempt_login" not in session
assert "attempt_login_with_correct_password" not in session assert "attempt_login_with_correct_password" not in session
@ -73,8 +76,9 @@ def test_signin_and_out_with_totp(testclient, user_totp, caplog):
res = res.follow(status=200) res = res.follow(status=200)
def test_signin_wrong_totp(testclient, user_totp, caplog): @pytest.mark.parametrize("otp_method", ["TOTP", "HOTP"])
testclient.app.config["CANAILLE"]["ENABLE_TOTP"] = True def test_signin_wrong_otp(testclient, user_otp, caplog, otp_method):
testclient.app.config["CANAILLE"]["OTP_METHOD"] = otp_method
with testclient.session_transaction() as session: with testclient.session_transaction() as session:
assert not session.get("user_id") assert not session.get("user_id")
@ -99,12 +103,12 @@ def test_signin_wrong_totp(testclient, user_totp, caplog):
assert ( assert (
"canaille", "canaille",
logging.SECURITY, logging.SECURITY,
"Failed login attempt (wrong TOTP) for user from unknown IP", "Failed login attempt (wrong OTP) for user from unknown IP",
) in caplog.record_tuples ) in caplog.record_tuples
def test_signin_expired_totp(testclient, user_totp, caplog): def test_signin_expired_totp(testclient, user_otp, caplog):
testclient.app.config["CANAILLE"]["ENABLE_TOTP"] = True testclient.app.config["CANAILLE"]["OTP_METHOD"] = "TOTP"
with time_machine.travel("2020-01-01 01:00:00+00:00", tick=False) as traveller: with time_machine.travel("2020-01-01 01:00:00+00:00", tick=False) as traveller:
with testclient.session_transaction() as session: with testclient.session_transaction() as session:
@ -120,7 +124,7 @@ def test_signin_expired_totp(testclient, user_totp, caplog):
res = res.form.submit(status=302) res = res.form.submit(status=302)
res = res.follow(status=200) res = res.follow(status=200)
res.form["otp"] = user_totp.generate_otp() res.form["otp"] = user_otp.generate_otp()
traveller.shift(datetime.timedelta(seconds=30)) traveller.shift(datetime.timedelta(seconds=30))
res = res.form.submit() res = res.form.submit()
@ -131,30 +135,33 @@ def test_signin_expired_totp(testclient, user_totp, caplog):
assert ( assert (
"canaille", "canaille",
logging.SECURITY, logging.SECURITY,
"Failed login attempt (wrong TOTP) for user from unknown IP", "Failed login attempt (wrong OTP) for user from unknown IP",
) in caplog.record_tuples ) in caplog.record_tuples
def test_setup_totp(testclient, backend, caplog): @pytest.mark.parametrize("otp_method", ["TOTP", "HOTP"])
testclient.app.config["CANAILLE"]["ENABLE_TOTP"] = True def test_new_user_setup_otp(testclient, backend, caplog, otp_method):
testclient.app.config["CANAILLE"]["OTP_METHOD"] = otp_method
u = models.User( u = models.User(
formatted_name="Totp User", formatted_name="Otp User",
family_name="Totp", family_name="Otp",
user_name="totp", user_name="otp",
emails=["john@doe.com"], emails=["john@doe.com"],
password="correct horse battery staple", password="correct horse battery staple",
) )
backend.save(u) backend.save(u)
assert u.secret_token is not None assert u.secret_token is not None
if otp_method == "HOTP":
assert u.hotp_counter == 1
with testclient.session_transaction() as session: with testclient.session_transaction() as session:
assert not session.get("user_id") assert not session.get("user_id")
res = testclient.get("/login", status=200) res = testclient.get("/login", status=200)
res.form["login"] = "totp" res.form["login"] = "otp"
res = res.form.submit(status=302) res = res.form.submit(status=302)
res = res.follow(status=200) res = res.follow(status=200)
@ -175,16 +182,15 @@ def test_setup_totp(testclient, backend, caplog):
assert ( assert (
"success", "success",
"Two-factor authentication setup successful. Welcome Totp User", "Two-factor authentication setup successful. Welcome Otp User",
) in res.flashes ) in res.flashes
assert ( assert (
"canaille", "canaille",
logging.SECURITY, logging.SECURITY,
"Succeed login attempt for totp from unknown IP", "Succeed login attempt for otp from unknown IP",
) in caplog.record_tuples ) in caplog.record_tuples
res = res.follow(status=302) res = res.follow(status=302)
res = res.follow(status=200) res = res.follow(status=200)
with testclient.session_transaction() as session: with testclient.session_transaction() as session:
assert [u.id] == session.get("user_id") assert [u.id] == session.get("user_id")
assert "attempt_login" not in session assert "attempt_login" not in session
@ -192,11 +198,14 @@ def test_setup_totp(testclient, backend, caplog):
res = testclient.get("/login", status=302) res = testclient.get("/login", status=302)
backend.delete(u)
def test_verify_totp_page_without_signin_in_redirects_to_login_page(
testclient, user_totp @pytest.mark.parametrize("otp_method", ["TOTP", "HOTP"])
def test_verify_otp_page_without_signin_in_redirects_to_login_page(
testclient, user_otp, otp_method
): ):
testclient.app.config["CANAILLE"]["ENABLE_TOTP"] = True testclient.app.config["CANAILLE"]["OTP_METHOD"] = otp_method
res = testclient.get("/verify-2fa", status=302) res = testclient.get("/verify-2fa", status=302)
assert res.location == "/login" assert res.location == "/login"
@ -205,10 +214,11 @@ def test_verify_totp_page_without_signin_in_redirects_to_login_page(
] ]
def test_setup_totp_page_without_signin_in_redirects_to_login_page( @pytest.mark.parametrize("otp_method", ["TOTP", "HOTP"])
testclient, user_totp def test_setup_otp_page_without_signin_in_redirects_to_login_page(
testclient, user_otp, otp_method
): ):
testclient.app.config["CANAILLE"]["ENABLE_TOTP"] = True testclient.app.config["CANAILLE"]["OTP_METHOD"] = otp_method
res = testclient.get("/setup-2fa", status=302) res = testclient.get("/setup-2fa", status=302)
assert res.location == "/login" assert res.location == "/login"
@ -217,15 +227,152 @@ def test_setup_totp_page_without_signin_in_redirects_to_login_page(
] ]
def test_verify_totp_page_already_logged_in(testclient, logged_user_totp): @pytest.mark.parametrize("otp_method", ["TOTP", "HOTP"])
testclient.app.config["CANAILLE"]["ENABLE_TOTP"] = True def test_verify_otp_page_already_logged_in(testclient, logged_user_otp, otp_method):
testclient.app.config["CANAILLE"]["OTP_METHOD"] = otp_method
res = testclient.get("/verify-2fa", status=302) res = testclient.get("/verify-2fa", status=302)
assert res.location == "/profile/user" assert res.location == "/profile/user"
def test_setup_totp_page_already_logged_in(testclient, logged_user_totp): def test_signin_multiple_attempts_doesnt_desynchronize_hotp(
testclient.app.config["CANAILLE"]["ENABLE_TOTP"] = True testclient, user_otp, caplog
):
testclient.app.config["CANAILLE"]["OTP_METHOD"] = "HOTP"
with testclient.session_transaction() as session:
assert not session.get("user_id")
res = testclient.get("/login", status=200)
res.form["login"] = "user"
res = res.form.submit(status=302)
res = res.follow(status=200)
with testclient.session_transaction() as session:
assert "user" == session.get("attempt_login")
res.form["password"] = "correct horse battery staple"
res = res.form.submit(status=302)
res = res.follow(status=200)
with testclient.session_transaction() as session:
assert "attempt_login" not in session
assert "user" == session.get("attempt_login_with_correct_password")
res = testclient.get("/verify-2fa")
for _x in range(3):
res.form["otp"] = "111111"
res = res.form.submit(status=302).follow()
res.form["otp"] = user_otp.generate_otp()
res = res.form.submit(status=302)
assert (
"success",
"Connection successful. Welcome John (johnny) Doe",
) in res.flashes
assert (
"canaille",
logging.SECURITY,
"Succeed login attempt for user from unknown IP",
) in caplog.record_tuples
res = res.follow(status=302)
res = res.follow(status=200)
@pytest.mark.parametrize("otp_method", ["TOTP", "HOTP"])
def test_setup_otp_page_already_logged_in(testclient, logged_user_otp, otp_method):
testclient.app.config["CANAILLE"]["OTP_METHOD"] = otp_method
res = testclient.get("/setup-2fa", status=302) res = testclient.get("/setup-2fa", status=302)
assert res.location == "/profile/user" assert res.location == "/profile/user"
def test_signin_inside_hotp_look_ahead_window(testclient, backend, user_otp, caplog):
testclient.app.config["CANAILLE"]["OTP_METHOD"] = "HOTP"
with testclient.session_transaction() as session:
assert not session.get("user_id")
assert user_otp.hotp_counter == 1
res = testclient.get("/login", status=200)
res.form["login"] = "user"
res = res.form.submit(status=302)
res = res.follow(status=200)
with testclient.session_transaction() as session:
assert "user" == session.get("attempt_login")
res.form["password"] = "correct horse battery staple"
res = res.form.submit(status=302)
res = res.follow(status=200)
with testclient.session_transaction() as session:
assert "attempt_login" not in session
assert "user" == session.get("attempt_login_with_correct_password")
res = testclient.get("/verify-2fa")
res.form["otp"] = user_otp.generate_otp(HOTP_LOOK_AHEAD_WINDOW)
res = res.form.submit(status=302)
assert (
"success",
"Connection successful. Welcome John (johnny) Doe",
) in res.flashes
assert (
"canaille",
logging.SECURITY,
"Succeed login attempt for user from unknown IP",
) in caplog.record_tuples
res = res.follow(status=302)
res = res.follow(status=200)
user = backend.get(models.User, id=user_otp.id)
assert user.hotp_counter == HOTP_LOOK_AHEAD_WINDOW + 2
def test_signin_outside_hotp_look_ahead_window(testclient, backend, user_otp, caplog):
testclient.app.config["CANAILLE"]["OTP_METHOD"] = "HOTP"
with testclient.session_transaction() as session:
assert not session.get("user_id")
assert user_otp.hotp_counter == 1
res = testclient.get("/login", status=200)
res.form["login"] = "user"
res = res.form.submit(status=302)
res = res.follow(status=200)
with testclient.session_transaction() as session:
assert "user" == session.get("attempt_login")
res.form["password"] = "correct horse battery staple"
res = res.form.submit(status=302)
res = res.follow(status=200)
with testclient.session_transaction() as session:
assert "attempt_login" not in session
assert "user" == session.get("attempt_login_with_correct_password")
res = testclient.get("/verify-2fa")
res.form["otp"] = user_otp.generate_otp(HOTP_LOOK_AHEAD_WINDOW + 1)
res = res.form.submit(status=302)
assert (
"error",
"The one-time password you entered is invalid. Please try again",
) in res.flashes
assert (
"canaille",
logging.SECURITY,
"Failed login attempt (wrong OTP) for user from unknown IP",
) in caplog.record_tuples
user = backend.get(models.User, id=user_otp.id)
assert user.hotp_counter == 1

View file

@ -2,6 +2,7 @@ import datetime
import logging import logging
from unittest import mock from unittest import mock
import pytest
from flask import current_app from flask import current_app
from flask import g from flask import g
@ -733,3 +734,25 @@ def test_edition_invalid_group(testclient, logged_admin, user, foo_group):
res = res.form.submit(name="action", value="edit-settings") res = res.form.submit(name="action", value="edit-settings")
assert res.flashes == [("error", "Profile edition failed.")] assert res.flashes == [("error", "Profile edition failed.")]
res.mustcontain("Invalid choice(s): one or more data inputs could not be coerced.") res.mustcontain("Invalid choice(s): one or more data inputs could not be coerced.")
@pytest.mark.parametrize("otp_method", ["TOTP", "HOTP"])
def test_account_reset_mfa(testclient, backend, logged_admin, user_otp, otp_method):
testclient.app.config["CANAILLE"]["OTP_METHOD"] = otp_method
old_token = user_otp.secret_token
assert old_token is not None
assert user_otp.last_otp_login is not None
res = testclient.get("/profile/user/settings")
res.mustcontain("Reset multi-factor authentication")
res = res.form.submit(name="action", value="confirm-reset-mfa")
res = res.form.submit(name="action", value="reset-mfa")
user = backend.get(models.User, id=user_otp.id)
assert user.secret_token is not None
assert user.secret_token != old_token
assert user.last_otp_login is None
if otp_method == "HOTP":
assert user.hotp_counter == 1
res.mustcontain("Multi-factor authentication has been reset")