diff --git a/.gitignore b/.gitignore index 5b989a27..943acb5e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ .env *.sqlite +*.sqlite-journal *.pyc *.mo *.prof diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index beea5554..51b1922e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -15,10 +15,10 @@ repos: - id: end-of-file-fixer exclude: "\\.svg$|\\.map$|\\.min\\.css$|\\.min\\.js$|\\.po$|\\.pot$" - id: check-toml - - repo: https://github.com/PyCQA/docformatter - rev: v1.7.5 - hooks: - - id: docformatter + # - repo: https://github.com/PyCQA/docformatter + # rev: v1.7.5 + # hooks: + # - id: docformatter - repo: https://github.com/rtts/djhtml rev: 3.0.7 hooks: diff --git a/CHANGES.rst b/CHANGES.rst index 21b944c9..9dc13e12 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -3,6 +3,12 @@ Added ^^^^^ +- Multi-factor authentication :issue:`47` +- :attr:`~canaille.core.configuration.CoreSettings.OTP_METHOD` and + :attr:`~canaille.core.configuration.CoreSettings.EMAIL_OTP` and + :attr:`~canaille.core.configuration.CoreSettings.SMS_OTP` and + :attr:`~canaille.core.configuration.CoreSettings.SMPP` + :issue:`47` - Password compromission check :issue:`179` - :attr:`~canaille.core.configuration.CoreSettings.ADMIN_EMAIL` and :attr:`~canaille.core.configuration.CoreSettings.ENABLE_PASSWORD_COMPROMISSION_CHECK` and diff --git a/canaille/app/__init__.py b/canaille/app/__init__.py index dc1a3c32..f94ecf9d 100644 --- a/canaille/app/__init__.py +++ b/canaille/app/__init__.py @@ -2,6 +2,8 @@ import base64 import hashlib import json import re +from base64 import b64encode +from io import BytesIO from flask import current_app from flask import request @@ -66,3 +68,29 @@ class classproperty: def __get__(self, obj, owner): return self.f(owner) + + +def get_b64encoded_qr_image(data): + try: + import qrcode + except ImportError: + return None + + qr = qrcode.QRCode(version=1, box_size=10, border=5) + qr.add_data(data) + qr.make(fit=True) + img = qr.make_image(fill_color="black", back_color="white") + buffered = BytesIO() + img.save(buffered) + return b64encode(buffered.getvalue()).decode("utf-8") + + +def mask_email(email): + atpos = email.find("@") + if atpos > 0: + return email[0] + "#####" + email[atpos - 1 :] + return None + + +def mask_phone(phone): + return phone[0:3] + "#####" + phone[-2:] diff --git a/canaille/app/configuration.py b/canaille/app/configuration.py index 961239ed..48ccfada 100644 --- a/canaille/app/configuration.py +++ b/canaille/app/configuration.py @@ -1,3 +1,4 @@ +import importlib.util import os import smtplib import socket @@ -175,7 +176,7 @@ def validate(config, validate_remote=False): validate_keypair(config.get("CANAILLE_OIDC")) validate_theme(config["CANAILLE"]) validate_admin_email(config["CANAILLE"]) - + validate_otp_config(config["CANAILLE"]) if not validate_remote: return @@ -184,6 +185,8 @@ def validate(config, validate_remote=False): Backend.instance.validate(config) if smtp_config := config["CANAILLE"]["SMTP"]: validate_smtp_configuration(smtp_config) + if smpp_config := config["CANAILLE"]["SMPP"]: + validate_smpp_configuration(smpp_config) def validate_keypair(config): @@ -231,6 +234,31 @@ def validate_smtp_configuration(config): raise ConfigurationException(exc) from exc +def validate_smpp_configuration(config): + try: + import smpplib + except ImportError as exc: + raise ConfigurationException( + "You have configured a SMPP server but the 'sms' extra is not installed." + ) from exc + + host = config["HOST"] + port = config["PORT"] + try: + with smpplib.client.Client(host, port, allow_unknown_opt_params=True) as client: + client.connect() + if config["LOGIN"]: + client.bind_transmitter( + system_id=config["LOGIN"], password=config["PASSWORD"] + ) + except smpplib.exceptions.ConnectionError as exc: + raise ConfigurationException( + f"Could not connect to the SMPP server '{host}' on port '{port}'" + ) from exc + except smpplib.exceptions.UnknownCommandError as exc: # pragma: no cover + raise ConfigurationException(exc) from exc + + def validate_theme(config): if not os.path.exists(config["THEME"]) and not os.path.exists( os.path.join(ROOT, "themes", config["THEME"]) @@ -243,3 +271,25 @@ def validate_admin_email(config): raise ConfigurationException( "You must set an administration email if you want to check if users' passwords are compromised." ) + + +def validate_otp_config(config): + if ( + config["OTP_METHOD"] or config["EMAIL_OTP"] or config["SMS_OTP"] + ) and not importlib.util.find_spec("otpauth"): # pragma: no cover + raise ConfigurationException( + "You are trying to use OTP but the 'otp' extra is not installed." + ) + + if config["OTP_METHOD"] not in [None, "TOTP", "HOTP"]: + raise ConfigurationException("Invalid OTP method") + + if config["EMAIL_OTP"] and not config["SMTP"]: + raise ConfigurationException( + "Cannot activate email one-time password authentication without SMTP" + ) + + if config["SMS_OTP"] and not config["SMPP"]: + raise ConfigurationException( + "Cannot activate sms one-time password authentication without SMPP" + ) diff --git a/canaille/app/features.py b/canaille/app/features.py index 858ceeb9..f31e69e0 100644 --- a/canaille/app/features.py +++ b/canaille/app/features.py @@ -14,6 +14,22 @@ class Features: def has_password_recovery(self): return self.app.config["CANAILLE"]["ENABLE_PASSWORD_RECOVERY"] + @property + def otp_method(self): + return self.app.config["CANAILLE"]["OTP_METHOD"] + + @property + def has_otp(self): + return bool(self.app.config["CANAILLE"]["OTP_METHOD"]) + + @property + def has_email_otp(self): + return bool(self.app.config["CANAILLE"]["EMAIL_OTP"]) + + @property + def has_sms_otp(self): + return self.app.config["CANAILLE"]["SMS_OTP"] + @property def has_registration(self): return self.app.config["CANAILLE"]["ENABLE_REGISTRATION"] diff --git a/canaille/app/sms.py b/canaille/app/sms.py new file mode 100644 index 00000000..be94c7f6 --- /dev/null +++ b/canaille/app/sms.py @@ -0,0 +1,51 @@ +from flask import current_app + + +def send_sms(recipient, sender, text): + try: + import smpplib.client + import smpplib.consts + import smpplib.gsm + except ImportError as exc: + raise RuntimeError( + "You are trying to send a sms but the 'sms' extra is not installed." + ) from exc + + port = current_app.config["CANAILLE"]["SMPP"]["PORT"] + host = current_app.config["CANAILLE"]["SMPP"]["HOST"] + login = current_app.config["CANAILLE"]["SMPP"]["LOGIN"] + password = current_app.config["CANAILLE"]["SMPP"]["PASSWORD"] + + try: + client = smpplib.client.Client(host, port, allow_unknown_opt_params=True) + client.connect() + try: + client.bind_transmitter(system_id=login, password=password) + pdu = client.send_message( + source_addr_ton=smpplib.consts.SMPP_TON_INTL, + source_addr=sender, + dest_addr_ton=smpplib.consts.SMPP_TON_INTL, + destination_addr=recipient, + short_message=bytes(text, "utf-8"), + ) + current_app.logger.debug(pdu.generate()) + finally: + if client.state in [ + smpplib.consts.SMPP_CLIENT_STATE_BOUND_TX + ]: # pragma: no cover + # if bound to transmitter + try: + client.unbind() + except smpplib.exceptions.UnknownCommandError: + try: + client.unbind() + except smpplib.exceptions.PDUError: + pass + except Exception as exc: + current_app.logger.warning(f"Could not send sms: {exc}") + return False + finally: + if client: # pragma: no branch + client.disconnect() + + return True diff --git a/canaille/backends/commands.py b/canaille/backends/commands.py index 84fad9e1..aea6b3f1 100644 --- a/canaille/backends/commands.py +++ b/canaille/backends/commands.py @@ -4,9 +4,11 @@ import json import typing import click +from flask import current_app from flask.cli import AppGroup from flask.cli import with_appcontext +from canaille.app import models from canaille.app.commands import with_backendcontext from canaille.app.models import MODELS from canaille.backends import Backend @@ -77,6 +79,8 @@ def register(cli): @cli.command(cls=ModelCommand, factory=factory, name=name, help=command_help) def factory_command(): ... + cli.add_command(reset_otp) + def serialize(instance): """Quick and dirty serialization method. @@ -281,3 +285,32 @@ def delete_factory(model): raise click.ClickException(exc) from exc return command + + +@click.command() +@with_appcontext +@with_backendcontext +@click.argument("identifier") +def reset_otp(identifier): + """Reset one-time password authentication for a user and display the + edited user in JSON format in the standard output. + + IDENTIFIER should be a user id or user_name + """ + + user = Backend.instance.get(models.User, identifier) + if not user: + raise click.ClickException(f"No user with id '{identifier}'") + + user.initialize_otp() + current_app.logger.security( + f"Reset one-time password authentication from CLI for {user.user_name}" + ) + + try: + Backend.instance.save(user) + except Exception as exc: # pragma: no cover + raise click.ClickException(exc) from exc + + output = json.dumps(serialize(user)) + click.echo(output) diff --git a/canaille/backends/ldap/configuration.py b/canaille/backends/ldap/configuration.py index 9dfaadf5..4dc36148 100644 --- a/canaille/backends/ldap/configuration.py +++ b/canaille/backends/ldap/configuration.py @@ -28,7 +28,7 @@ class LDAPSettings(BaseModel): For instance `ou=users,dc=mydomain,dc=tld`. """ - USER_CLASS: str = "inetOrgPerson" + USER_CLASS: list[str] = ["inetOrgPerson"] """The object class to use for creating new users.""" USER_RDN: str = "uid" diff --git a/canaille/backends/ldap/models.py b/canaille/backends/ldap/models.py index 0f0a3348..c9d4b729 100644 --- a/canaille/backends/ldap/models.py +++ b/canaille/backends/ldap/models.py @@ -1,4 +1,5 @@ import ldap.filter +from flask import current_app import canaille.core.models import canaille.oidc.models @@ -34,6 +35,11 @@ class User(canaille.core.models.User, LDAPObject): "organization": "o", "groups": "memberOf", "lock_date": "pwdEndTime", + "secret_token": "oathSecret", + "last_otp_login": "oathLastLogin", + "hotp_counter": "oathHOTPCounter", + "one_time_password": "oathTokenPIN", + "one_time_password_emission_date": "oathSecretTime", } def match_filter(self, filter): @@ -44,6 +50,9 @@ class User(canaille.core.models.User, LDAPObject): return super().match_filter(filter) def save(self): + if current_app.features.has_otp and not self.secret_token: + self.initialize_otp() + group_attr = self.python_attribute_to_ldap("groups") if group_attr not in self.changes: return diff --git a/canaille/backends/memory/backend.py b/canaille/backends/memory/backend.py index 36bf6bc4..ca0df63c 100644 --- a/canaille/backends/memory/backend.py +++ b/canaille/backends/memory/backend.py @@ -3,6 +3,9 @@ import datetime import uuid from typing import Any +from flask import current_app + +import canaille.backends.memory.models from canaille.backends import Backend @@ -124,6 +127,13 @@ class MemoryBackend(Backend): return results[0] if results else None def save(self, instance): + if ( + isinstance(instance, canaille.backends.memory.models.User) + and current_app.features.has_otp + and not instance.secret_token + ): + instance.initialize_otp() + if not instance.id: instance.id = str(uuid.uuid4()) diff --git a/canaille/backends/sql/backend.py b/canaille/backends/sql/backend.py index 69369bc1..04172b2c 100644 --- a/canaille/backends/sql/backend.py +++ b/canaille/backends/sql/backend.py @@ -110,6 +110,10 @@ class SQLBackend(Backend): ).scalar_one_or_none() def save(self, instance): + # run the instance save callback if existing + if hasattr(instance, "save"): + instance.save() + instance.last_modified = datetime.datetime.now(datetime.timezone.utc).replace( microsecond=0 ) diff --git a/canaille/backends/sql/models.py b/canaille/backends/sql/models.py index fc09a021..1c7e624e 100644 --- a/canaille/backends/sql/models.py +++ b/canaille/backends/sql/models.py @@ -2,6 +2,7 @@ import datetime import typing import uuid +from flask import current_app from sqlalchemy import Boolean from sqlalchemy import Column from sqlalchemy import ForeignKey @@ -100,6 +101,19 @@ class User(canaille.core.models.User, Base, SqlAlchemyModel): lock_date: Mapped[datetime.datetime] = mapped_column( TZDateTime(timezone=True), nullable=True ) + last_otp_login: Mapped[datetime.datetime] = mapped_column( + TZDateTime(timezone=True), nullable=True + ) + secret_token: Mapped[str] = mapped_column(String, nullable=True, unique=True) + hotp_counter: Mapped[int] = mapped_column(Integer, nullable=True) + one_time_password: Mapped[str] = mapped_column(String, nullable=True) + one_time_password_emission_date: Mapped[datetime.datetime] = mapped_column( + TZDateTime(timezone=True), nullable=True + ) + + def save(self): + if current_app.features.has_otp and not self.secret_token: + self.initialize_otp() class Group(canaille.core.models.Group, Base, SqlAlchemyModel): diff --git a/canaille/config.sample.toml b/canaille/config.sample.toml index bca3d6a5..51c6f27f 100644 --- a/canaille/config.sample.toml +++ b/canaille/config.sample.toml @@ -62,6 +62,23 @@ SECRET_KEY = "change me before you go in production" # recovery link by email. This option is true by default. # ENABLE_PASSWORD_RECOVERY = true +# If OTP_METHOD is defined, then users will need to authenticate themselves +# using a one-time password (OTP) via an authenticator app. +# Two options are supported : "TOTP" for time one-time password, +# and "HOTP" for HMAC-based one-time password. +# This option is deactivated by default. +# OTP_METHOD = "TOTP" + +# If EMAIL_OTP is true, then users will need to authenticate themselves +# via a one-time password sent to their primary email address. +# This option is false by default. +# EMAIL_OTP = false + +# If SMS_OTP is true, then users will need to authenticate themselves +# via a one-time password sent to their primary phone number. +# This option is false by default. +# SMS_OTP = false + # The validity duration of registration invitations, in seconds. # Defaults to 2 days # INVITATION_EXPIRATION = 172800 @@ -115,6 +132,7 @@ SECRET_KEY = "change me before you go in production" # USER_BASE = "ou=users,dc=mydomain,dc=tld" # The object class to use for creating new users +# Note : If you plan on using TOTP/HOTP authentication, use : USER_CLASS = ["inetOrgPerson", "oathHOTPToken"] # USER_CLASS = "inetOrgPerson" # The attribute to identify an object in the User dn. @@ -273,3 +291,11 @@ WRITE = [ # LOGIN = "" # PASSWORD = "" # FROM_ADDR = "admin@mydomain.example" + +# The SMPP server options. If not set, sms related features such as +# sms one-time passwords will be disabled. +[CANAILLE.SMPP] +# HOST = "localhost" +# PORT = 2775 +# LOGIN = "" +# PASSWORD = "" diff --git a/canaille/core/configuration.py b/canaille/core/configuration.py index ed7da812..64e21137 100644 --- a/canaille/core/configuration.py +++ b/canaille/core/configuration.py @@ -39,6 +39,24 @@ class SMTPSettings(BaseModel): """ +class SMPPSettings(BaseModel): + """The SMPP configuration. Belong in the ``CANAILLE.SMPP`` namespace. If + not set, sms related features such as sms one-time passwords will be disabled. + """ + + HOST: str | None = "localhost" + """The SMPP host.""" + + PORT: int | None = 2775 + """The SMPP port. Use 8775 for SMPP over TLS (recommended).""" + + LOGIN: str | None = None + """The SMPP login.""" + + PASSWORD: str | None = None + """The SMPP password.""" + + class Permission(str, Enum): """The permissions that can be assigned to users. @@ -248,6 +266,20 @@ class CoreSettings(BaseModel): """If :py:data:`False`, then users cannot ask for a password recovery link by email.""" + OTP_METHOD: str = None + """If OTP_METHOD is defined, then users will need to authenticate themselves + using a one-time password (OTP) via an authenticator app. + If set to :py:data:`TOTP`, the application will use time one-time passwords, + If set to :py:data:`HOTP`, the application will use HMAC-based one-time passwords.""" + + EMAIL_OTP: bool = False + """If :py:data:`True`, then users will need to authenticate themselves + via a one-time password sent to their primary email address.""" + + SMS_OTP: bool = False + """If :py:data:`True`, then users will need to authenticate themselves + via a one-time password sent to their primary phone number.""" + INVITATION_EXPIRATION: int = 172800 """The validity duration of registration invitations, in seconds. @@ -286,6 +318,13 @@ class CoreSettings(BaseModel): enabled. """ + SMPP: SMPPSettings | None = None + """The settings related to SMPP configuration. + + If unset, sms-related features like sms one-time passwords won't be + enabled. + """ + ACL: dict[str, ACLSettings] | None = {"DEFAULT": ACLSettings()} """Mapping of permission groups. See :class:`ACLSettings` for more details. diff --git a/canaille/core/endpoints/account.py b/canaille/core/endpoints/account.py index f2f0e75f..c0dc9fa0 100644 --- a/canaille/core/endpoints/account.py +++ b/canaille/core/endpoints/account.py @@ -274,7 +274,8 @@ def registration(data=None, hash=None): ) return redirect(url_for("core.account.index")) - if current_user(): + user = current_user() + if user: flash( _("You are already logged in, you cannot create an account."), "error", @@ -752,6 +753,23 @@ def profile_settings(user, edited_user): return profile_settings_edit(user, edited_user) + if ( + request.form.get("action") == "confirm-reset-otp" + and current_app.features.has_otp + ): + return render_template("modals/reset-otp.html", edited_user=edited_user) + + if request.form.get("action") == "reset-otp" and current_app.features.has_otp: + flash(_("One-time password authentication has been reset"), "success") + request_ip = request.remote_addr or "unknown IP" + current_app.logger.security( + f"Reset one-time password authentication for {edited_user.user_name} by {user.user_name} from {request_ip}" + ) + edited_user.initialize_otp() + Backend.instance.save(edited_user) + + return profile_settings_edit(user, edited_user) + abort(400, f"bad form action: {request.form.get('action')}") diff --git a/canaille/core/endpoints/admin.py b/canaille/core/endpoints/admin.py index 4d06fa16..c3f65d15 100644 --- a/canaille/core/endpoints/admin.py +++ b/canaille/core/endpoints/admin.py @@ -330,3 +330,35 @@ def compromised_password_check_failure_txt(user): hashed_password=hashed_password, user_email=user_email, ) + + +@bp.route("/mail/email_otp.html") +@permissions_needed("manage_oidc") +def email_otp_html(user): + base_url = url_for("core.account.index", _external=True) + otp = "000000" + + return render_template( + "mails/email_otp.html", + site_name=current_app.config["CANAILLE"]["NAME"], + site_url=base_url, + otp=otp, + logo=current_app.config["CANAILLE"]["LOGO"], + title=_("One-time password authentication on {website_name}").format( + website_name=current_app.config["CANAILLE"]["NAME"] + ), + ) + + +@bp.route("/mail/email_otp.txt") +@permissions_needed("manage_oidc") +def email_otp_txt(user): + base_url = url_for("core.account.index", _external=True) + otp = "000000" + + return render_template( + "mails/email_otp.txt", + site_name=current_app.config["CANAILLE"]["NAME"], + site_url=base_url, + otp=otp, + ) diff --git a/canaille/core/endpoints/auth.py b/canaille/core/endpoints/auth.py index d649605a..f4128328 100644 --- a/canaille/core/endpoints/auth.py +++ b/canaille/core/endpoints/auth.py @@ -1,3 +1,5 @@ +import datetime + from flask import Blueprint from flask import abort from flask import current_app @@ -8,6 +10,9 @@ from flask import session from flask import url_for from canaille.app import build_hash +from canaille.app import get_b64encoded_qr_image +from canaille.app import mask_email +from canaille.app import mask_phone from canaille.app.flask import smtp_needed from canaille.app.i18n import gettext as _ from canaille.app.session import current_user @@ -15,6 +20,8 @@ from canaille.app.session import login_user from canaille.app.session import logout_user from canaille.app.themes import render_template from canaille.backends import Backend +from canaille.core.endpoints.forms import TwoFactorForm +from canaille.core.models import SEND_NEW_OTP_DELAY from ..mails import send_password_initialization_mail from ..mails import send_password_reset_mail @@ -103,16 +110,33 @@ def password(): "password.html", form=form, username=session["attempt_login"] ) - current_app.logger.security( - f'Succeed login attempt for {session["attempt_login"]} from {request_ip}' - ) - del session["attempt_login"] - login_user(user) - flash( - _("Connection successful. Welcome %(user)s", user=user.formatted_name), - "success", - ) - return redirect(session.pop("redirect-after-login", url_for("core.account.index"))) + otp_methods = [] + if current_app.features.has_otp: + otp_methods.append(current_app.features.otp_method) # TOTP or HOTP + if current_app.features.has_email_otp: + otp_methods.append("EMAIL_OTP") + if current_app.features.has_sms_otp: + otp_methods.append("SMS_OTP") + + if otp_methods: + session["remaining_otp_methods"] = otp_methods + session["attempt_login_with_correct_password"] = session.pop("attempt_login") + return redirect_to_verify_2fa( + user, otp_methods[0], request_ip, url_for("core.auth.password") + ) + else: + current_app.logger.security( + f'Succeed login attempt for {session["attempt_login"]} from {request_ip}' + ) + del session["attempt_login"] + login_user(user) + flash( + _("Connection successful. Welcome %(user)s", user=user.formatted_name), + "success", + ) + return redirect( + session.pop("redirect-after-login", url_for("core.account.index")) + ) @bp.route("/logout") @@ -251,3 +275,253 @@ def reset(user, hash): ) return render_template("reset-password.html", form=form, user=user, hash=hash) + + +@bp.route("/setup-2fa") +def setup_two_factor_auth(): + if not current_app.features.has_otp: + abort(404) + + if current_user(): + return redirect( + url_for("core.account.profile_edition", edited_user=current_user()) + ) + + if "attempt_login_with_correct_password" not in session: + flash(_("Cannot remember the login you attempted to sign in with"), "warning") + return redirect(url_for("core.auth.login")) + + user = Backend.instance.get_user_from_login( + session["attempt_login_with_correct_password"] + ) + + uri = user.get_otp_authentication_setup_uri() + base64_qr_image = get_b64encoded_qr_image(uri) + return render_template( + "setup-2fa.html", + secret=user.secret_token, + qr_image=base64_qr_image, + username=user.user_name, + ) + + +@bp.route("/verify-2fa", methods=["GET", "POST"]) +def verify_two_factor_auth(): + if current_user(): + return redirect( + url_for("core.account.profile_edition", edited_user=current_user()) + ) + + if ( + "attempt_login_with_correct_password" not in session + or "remaining_otp_methods" not in session + or not session["remaining_otp_methods"] + ): + flash(_("Cannot remember the login you attempted to sign in with"), "warning") + return redirect(url_for("core.auth.login")) + + current_otp_method = session["remaining_otp_methods"][0] + if ( + (current_otp_method in ["TOTP", "HOTP"] and not current_app.features.has_otp) + or ( + current_otp_method == "EMAIL_OTP" and not current_app.features.has_email_otp + ) + or (current_otp_method == "SMS_OTP" and not current_app.features.has_sms_otp) + ): + abort(404) + + form = TwoFactorForm(request.form or None) + form.render_field_macro_file = "partial/login_field.html" + + if not request.form or form.form_control(): + return render_template( + "verify-2fa.html", + form=form, + username=session["attempt_login_with_correct_password"], + method=current_otp_method, + ) + + user = Backend.instance.get_user_from_login( + session["attempt_login_with_correct_password"] + ) + + if form.validate() and user.is_otp_valid(form.otp.data, current_otp_method): + session["remaining_otp_methods"].pop(0) + request_ip = request.remote_addr or "unknown IP" + if session["remaining_otp_methods"]: + return redirect_to_verify_2fa( + user, + session["remaining_otp_methods"][0], + request_ip, + url_for("core.auth.verify_two_factor_auth"), + ) + else: + user.last_otp_login = datetime.datetime.now(datetime.timezone.utc) + Backend.instance.save(user) + current_app.logger.security( + f'Succeed login attempt for {session["attempt_login_with_correct_password"]} from {request_ip}' + ) + del session["attempt_login_with_correct_password"] + login_user(user) + flash( + _( + "Connection successful. Welcome %(user)s", + user=user.formatted_name, + ), + "success", + ) + return redirect( + session.pop("redirect-after-login", url_for("core.account.index")) + ) + else: + flash( + "The one-time password you entered is invalid. Please try again", + "error", + ) + request_ip = request.remote_addr or "unknown IP" + current_app.logger.security( + f'Failed login attempt (wrong OTP) for {session["attempt_login_with_correct_password"]} from {request_ip}' + ) + return redirect(url_for("core.auth.verify_two_factor_auth")) + + +@bp.route("/send-mail-otp", methods=["POST"]) +def send_mail_otp(): + if not current_app.features.has_email_otp: + abort(404) + + if current_user(): + return redirect( + url_for("core.account.profile_edition", edited_user=current_user()) + ) + + if "attempt_login_with_correct_password" not in session: + flash(_("Cannot remember the login you attempted to sign in with"), "warning") + return redirect(url_for("core.auth.login")) + + user = Backend.instance.get_user_from_login( + session["attempt_login_with_correct_password"] + ) + + if user.can_send_new_otp(): + if user.generate_and_send_otp_mail(): + Backend.instance.save(user) + request_ip = request.remote_addr or "unknown IP" + current_app.logger.security( + f'Sent one-time password for {session["attempt_login_with_correct_password"]} to {user.emails[0]} from {request_ip}' + ) + flash( + "Code successfully sent!", + "success", + ) + else: + flash("Error while sending one-time password. Please try again.", "danger") + else: + flash( + f"Too many attempts. Please try again in {SEND_NEW_OTP_DELAY} seconds.", + "danger", + ) + + return redirect(url_for("core.auth.verify_two_factor_auth")) + + +@bp.route("/send-sms-otp", methods=["POST"]) +def send_sms_otp(): + if not current_app.features.has_sms_otp: + abort(404) + + if current_user(): + return redirect( + url_for("core.account.profile_edition", edited_user=current_user()) + ) + + if "attempt_login_with_correct_password" not in session: + flash(_("Cannot remember the login you attempted to sign in with"), "warning") + return redirect(url_for("core.auth.login")) + + user = Backend.instance.get_user_from_login( + session["attempt_login_with_correct_password"] + ) + + if user.can_send_new_otp(): + if user.generate_and_send_otp_sms(): + Backend.instance.save(user) + request_ip = request.remote_addr or "unknown IP" + current_app.logger.security( + f'Sent one-time password for {session["attempt_login_with_correct_password"]} to {user.phone_numbers[0]} from {request_ip}' + ) + flash( + "Code successfully sent!", + "success", + ) + else: + flash("Error while sending one-time password. Please try again.", "danger") + else: + flash( + f"Too many attempts. Please try again in {SEND_NEW_OTP_DELAY} seconds.", + "danger", + ) + + return redirect(url_for("core.auth.verify_two_factor_auth")) + + +def redirect_to_verify_2fa(user, otp_method, request_ip, fail_redirect_url): + if otp_method in ["HOTP", "TOTP"]: + if not user.last_otp_login: + flash( + "You have not enabled Two-Factor Authentication. Please enable it first to login.", + "info", + ) + return redirect(url_for("core.auth.setup_two_factor_auth")) + flash( + "Please enter the one-time password from your authenticator app.", + "info", + ) + return redirect(url_for("core.auth.verify_two_factor_auth")) + elif otp_method == "EMAIL_OTP": + if user.can_send_new_otp(): + if user.generate_and_send_otp_mail(): + Backend.instance.save(user) + flash( + f"A one-time password has been sent to your email address {mask_email(user.emails[0])}. Please enter it below to login.", + "info", + ) + current_app.logger.security( + f'Sent one-time password for {session["attempt_login_with_correct_password"]} to {user.emails[0]} from {request_ip}' + ) + return redirect(url_for("core.auth.verify_two_factor_auth")) + else: + flash( + "Error while sending one-time password. Please try again.", "danger" + ) + return redirect(fail_redirect_url) + else: + flash( + f"Too many attempts. Please try again in {SEND_NEW_OTP_DELAY} seconds.", + "danger", + ) + return redirect(fail_redirect_url) + else: # sms + if user.can_send_new_otp(): + if user.generate_and_send_otp_sms(): + Backend.instance.save(user) + flash( + f"A one-time password has been sent to your phone number {mask_phone(user.phone_numbers[0])}. Please enter it below to login.", + "info", + ) + current_app.logger.security( + f'Sent one-time password for {session["attempt_login_with_correct_password"]} to {user.phone_numbers[0]} from {request_ip}' + ) + return redirect(url_for("core.auth.verify_two_factor_auth")) + else: + flash( + "Error while sending one-time password. Please try again.", + "danger", + ) + return redirect(fail_redirect_url) + else: + flash( + f"Too many attempts. Please try again in {SEND_NEW_OTP_DELAY} seconds.", + "danger", + ) + return redirect(fail_redirect_url) diff --git a/canaille/core/endpoints/forms.py b/canaille/core/endpoints/forms.py index 43dacdab..5a019ee9 100644 --- a/canaille/core/endpoints/forms.py +++ b/canaille/core/endpoints/forms.py @@ -22,6 +22,7 @@ from canaille.app.i18n import gettext from canaille.app.i18n import lazy_gettext as _ from canaille.app.i18n import native_language_name_from_code from canaille.backends import Backend +from canaille.core.models import OTP_DIGITS def unique_user_name(form, field): @@ -480,3 +481,18 @@ class EmailConfirmationForm(Form): "autocorrect": "off", }, ) + + +class TwoFactorForm(Form): + otp = wtforms.StringField( + _("One-time password"), + validators=[ + wtforms.validators.DataRequired(), + wtforms.validators.Length(min=OTP_DIGITS, max=OTP_DIGITS), + ], + render_kw={ + "placeholder": _("123456"), + "spellcheck": "false", + "autocorrect": "off", + }, + ) diff --git a/canaille/core/mails.py b/canaille/core/mails.py index 0859314c..db9e28dc 100644 --- a/canaille/core/mails.py +++ b/canaille/core/mails.py @@ -249,3 +249,34 @@ def send_compromised_password_check_failure_mail( html=html_body, attachments=[(logo_cid, logo_filename, logo_raw)] if logo_filename else None, ) + + +def send_one_time_password_mail(mail, otp): + base_url = url_for("core.account.index", _external=True) + logo_cid, logo_filename, logo_raw = logo() + + subject = _("One-time password authentication on {website_name}").format( + website_name=current_app.config["CANAILLE"]["NAME"] + ) + text_body = render_template( + "mails/email_otp.txt", + site_name=current_app.config["CANAILLE"]["NAME"], + site_url=base_url, + otp=otp, + ) + html_body = render_template( + "mails/email_otp.html", + site_name=current_app.config["CANAILLE"]["NAME"], + site_url=base_url, + otp=otp, + logo=f"cid:{logo_cid[1:-1]}" if logo_cid else None, + title=subject, + ) + + return send_email( + subject=subject, + recipient=mail, + text=text_body, + html=html_body, + attachments=[(logo_cid, logo_filename, logo_raw)] if logo_filename else None, + ) diff --git a/canaille/core/models.py b/canaille/core/models.py index a532c76c..b5b543a8 100644 --- a/canaille/core/models.py +++ b/canaille/core/models.py @@ -1,4 +1,5 @@ import datetime +import secrets from typing import Annotated from typing import ClassVar @@ -6,6 +7,13 @@ from flask import current_app from canaille.backends.models import Model from canaille.core.configuration import Permission +from canaille.core.mails import send_one_time_password_mail +from canaille.core.sms import send_one_time_password_sms + +HOTP_LOOK_AHEAD_WINDOW = 10 +OTP_DIGITS = 6 +OTP_VALIDITY = 600 +SEND_NEW_OTP_DELAY = 10 class User(Model): @@ -241,6 +249,24 @@ class User(Model): lock_date: datetime.datetime | None = None """A DateTime indicating when the resource was locked.""" + last_otp_login: datetime.datetime | None = None + """A DateTime indicating when the user last logged in with a one-time password. + This attribute is currently used to check whether the user has activated one-time password authentication or not.""" + + secret_token: str | None = None + """Unique token generated for each user, used for + two-factor authentication.""" + + hotp_counter: int | None = None + """HMAC-based One Time Password counter, used for + two-factor authentication.""" + + one_time_password: str | None = None + """One time password used for email or sms two-factor authentication.""" + + one_time_password_emission_date: datetime.datetime | None = None + """A DateTime indicating when the user last emitted an email or sms one-time password.""" + _readable_fields = None _writable_fields = None _permissions = None @@ -258,10 +284,14 @@ class User(Model): def __getattribute__(self, name): prefix = "can_" - if name.startswith(prefix) and name != "can_read": - return self.can(name[len(prefix) :]) - return super().__getattribute__(name) + try: + return super().__getattribute__(name) + + except AttributeError: + if name.startswith(prefix) and name != "can_read": + return self.can(name[len(prefix) :]) + raise def can(self, *permissions: Permission): """Whether or not the user has the @@ -318,6 +348,110 @@ class User(Model): self._writable_fields |= set(details["WRITE"]) return self._writable_fields + def initialize_otp(self): + self.secret_token = secrets.token_hex(32) + self.last_otp_login = None + if current_app.features.otp_method == "HOTP": + self.hotp_counter = 1 + + def generate_otp(self, counter_delta=0): + import otpauth + + method = current_app.features.otp_method + if method == "TOTP": + totp = otpauth.TOTP(bytes(self.secret_token, "utf-8")) + return totp.string_code(totp.generate()) + elif method == "HOTP": + hotp = otpauth.HOTP(bytes(self.secret_token, "utf-8")) + return hotp.string_code(hotp.generate(self.hotp_counter + counter_delta)) + else: # pragma: no cover + raise RuntimeError("Invalid one-time password method") + + def generate_sms_or_mail_otp(self): + otp = string_code(secrets.randbelow(10**OTP_DIGITS), OTP_DIGITS) + self.one_time_password = otp + self.one_time_password_emission_date = datetime.datetime.now( + datetime.timezone.utc + ) + return otp + + def generate_and_send_otp_mail(self): + otp = self.generate_sms_or_mail_otp() + if send_one_time_password_mail(self.preferred_email, otp): + return otp + return False + + def generate_and_send_otp_sms(self): + otp = self.generate_sms_or_mail_otp() + if send_one_time_password_sms(self.phone_numbers[0], otp): + return otp + return False + + def get_otp_authentication_setup_uri(self): + import otpauth + + method = current_app.features.otp_method + if method == "TOTP": + return otpauth.TOTP(bytes(self.secret_token, "utf-8")).to_uri( + label=self.user_name, issuer=current_app.config["CANAILLE"]["NAME"] + ) + elif method == "HOTP": + return otpauth.HOTP(bytes(self.secret_token, "utf-8")).to_uri( + label=self.user_name, + issuer=current_app.config["CANAILLE"]["NAME"], + counter=self.hotp_counter, + ) + else: # pragma: no cover + raise RuntimeError("Invalid one-time password method") + + def is_otp_valid(self, user_otp, method): + if method == "TOTP": + return self.is_totp_valid(user_otp) + elif method == "HOTP": + return self.is_hotp_valid(user_otp) + elif method == "EMAIL_OTP" or method == "SMS_OTP": + return self.is_email_or_sms_otp_valid(user_otp) + else: # pragma: no cover + raise RuntimeError("Invalid one-time password method") + + def is_totp_valid(self, user_otp): + import otpauth + + return otpauth.TOTP(bytes(self.secret_token, "utf-8")).verify(user_otp) + + def is_hotp_valid(self, user_otp): + import otpauth + + counter = self.hotp_counter + is_valid = False + # if user token's counter is ahead of canaille's, try to catch up to it + while counter - self.hotp_counter <= HOTP_LOOK_AHEAD_WINDOW: + is_valid = otpauth.HOTP(bytes(self.secret_token, "utf-8")).verify( + user_otp, counter + ) + counter += 1 + if is_valid: + self.hotp_counter = counter + return True + return False + + def is_email_or_sms_otp_valid(self, user_otp): + return user_otp == self.one_time_password and self.is_otp_still_valid() + + def is_otp_still_valid(self): + return datetime.datetime.now( + datetime.timezone.utc + ) - self.one_time_password_emission_date < datetime.timedelta( + seconds=OTP_VALIDITY + ) + + def can_send_new_otp(self): + return self.one_time_password_emission_date is None or ( + datetime.datetime.now(datetime.timezone.utc) + - self.one_time_password_emission_date + >= datetime.timedelta(seconds=SEND_NEW_OTP_DELAY) + ) + class Group(Model): """User model, based on the `SCIM Group schema @@ -347,3 +481,15 @@ class Group(Model): """ description: str | None = None + + +def string_code(code: int, digit: int) -> str: + """Add leading 0 if the code length does not match the defined length. + + For instance, parameter ``digit=6``, but ``code=123``, this method would + return ``000123``:: + + >>> otp.string_code(123) + '000123' + """ + return f"{code:0{digit}}" diff --git a/canaille/core/sms.py b/canaille/core/sms.py new file mode 100644 index 00000000..dcfc4de0 --- /dev/null +++ b/canaille/core/sms.py @@ -0,0 +1,15 @@ +from flask import current_app + +from canaille.app.sms import send_sms +from canaille.app.themes import render_template + + +def send_one_time_password_sms(phone_number, otp): + website_name = current_app.config["CANAILLE"]["NAME"] + + text_body = render_template( + "sms/sms_otp.txt", + website_name=website_name, + otp=otp, + ) + return send_sms(recipient=phone_number, sender=website_name, text=text_body) diff --git a/canaille/core/templates/mails/admin.html b/canaille/core/templates/mails/admin.html index c628689e..a8bf5419 100644 --- a/canaille/core/templates/mails/admin.html +++ b/canaille/core/templates/mails/admin.html @@ -160,6 +160,18 @@ +
+
+ {% if logo %}
+
+ |
+ |
+ {% trans %}
+ Someone, probably you, asked for a one-time password at {{ site_name }}. If you did not ask for this email, please ignore it. If you did, please enter the one-time password below at {{ site_name }} to complete your authentication.
+ {% endtrans %}
+ {% trans %}Your one-time password{% endtrans %}:
+ |
+ |
+ {{ site_name }} + | +