diff --git a/canaille/__init__.py b/canaille/__init__.py index e4dc4021..5bf5cee5 100644 --- a/canaille/__init__.py +++ b/canaille/__init__.py @@ -1,4 +1,5 @@ import datetime +import logging import sys from logging.config import dictConfig from logging.config import fileConfig @@ -30,6 +31,11 @@ def setup_sentry(app): # pragma: no cover def setup_logging(app): conf = app.config["CANAILLE"]["LOGGING"] + + security_level_name = "SECURITY" + if not hasattr(logging, security_level_name): + addLoggingLevel(security_level_name, logging.INFO + 5) + if conf is None: log_level = "DEBUG" if app.debug else "INFO" dictConfig( @@ -157,3 +163,52 @@ def create_app( raise return app + + +def addLoggingLevel(levelName, levelNum, methodName=None): + """Comprehensively adds a new logging level to the `logging` module and the + currently configured logging class. + + `levelName` becomes an attribute of the `logging` module with the value + `levelNum`. `methodName` becomes a convenience method for both `logging` + itself and the class returned by `logging.getLoggerClass()` (usually just + `logging.Logger`). If `methodName` is not specified, `levelName.lower()` is + used. + + To avoid accidental clobberings of existing attributes, this method will + raise an `AttributeError` if the level name is already an attribute of the + `logging` module or if the method name is already present + + Example + ------- + >>> addLoggingLevel("TRACE", logging.DEBUG - 5) + >>> logging.getLogger(__name__).setLevel("TRACE") + >>> logging.getLogger(__name__).trace("that worked") + >>> logging.trace("so did this") + >>> logging.TRACE + 5 + """ + if not methodName: + methodName = levelName.lower() + + if hasattr(logging, levelName): + raise AttributeError(f"{levelName} already defined in logging module") + if hasattr(logging, methodName): + raise AttributeError(f"{methodName} already defined in logging module") + if hasattr(logging.getLoggerClass(), methodName): + raise AttributeError(f"{methodName} already defined in logger class") + + # This method was inspired by the answers to Stack Overflow post + # http://stackoverflow.com/q/2183233/2988730, especially + # http://stackoverflow.com/a/13638084/2988730 + def logForLevel(self, message, *args, **kwargs): + if self.isEnabledFor(levelNum): + self._log(levelNum, message, args, **kwargs) + + def logToRoot(message, *args, **kwargs): + logging.log(levelNum, message, *args, **kwargs) + + logging.addLevelName(levelNum, levelName) + setattr(logging, levelName, levelNum) + setattr(logging.getLoggerClass(), methodName, logForLevel) + setattr(logging, methodName, logToRoot) diff --git a/canaille/app/__init__.py b/canaille/app/__init__.py index c71af59e..dc1a3c32 100644 --- a/canaille/app/__init__.py +++ b/canaille/app/__init__.py @@ -66,7 +66,3 @@ class classproperty: def __get__(self, obj, owner): return self.f(owner) - - -def generate_security_log(message): - return "[SECURITY] " + message diff --git a/canaille/core/endpoints/account.py b/canaille/core/endpoints/account.py index 5327c8c9..d020f8e3 100644 --- a/canaille/core/endpoints/account.py +++ b/canaille/core/endpoints/account.py @@ -23,7 +23,6 @@ from werkzeug.datastructures import FileStorage from canaille.app import b64_to_obj from canaille.app import build_hash from canaille.app import default_fields -from canaille.app import generate_security_log from canaille.app import models from canaille.app import obj_to_b64 from canaille.app.flask import current_user @@ -576,17 +575,6 @@ def profile_edition_remove_email(user, edited_user, email): return True -def has_email_changed(old_emails): - emailstr = "emails-" - i = 0 - new_emails = set() - while request.form.get(emailstr + str(i)): - new_emails.add(request.form.get(emailstr + str(i))) - i += 1 - - return set(old_emails) != new_emails - - @bp.route("/profile/", methods=("GET", "POST")) @user_needed() def profile_edition(user, edited_user): @@ -595,8 +583,6 @@ def profile_edition(user, edited_user): ): abort(404) - is_email_modified = has_email_changed(user.emails) - request_ip = request.remote_addr or "unknown IP" menuitem = "profile" if edited_user.id == user.id else "users" emails_readonly = ( @@ -610,6 +596,10 @@ def profile_edition(user, edited_user): else None ) + has_email_changed = "emails" in profile_form and set( + profile_form["emails"].data + ) != set(user.emails) + render_context = { "menuitem": menuitem, "edited_user": edited_user, @@ -627,11 +617,9 @@ def profile_edition(user, edited_user): profile_edition_main_form_validation(user, edited_user, profile_form) - if is_email_modified: - current_app.logger.info( - generate_security_log( - f"Updated email for {edited_user.user_name} from {request_ip}" - ) + if has_email_changed: + current_app.logger.security( + f"Updated email for {edited_user.user_name} from {request_ip}" ) flash(_("Profile updated successfully."), "success") @@ -806,10 +794,8 @@ def profile_settings_edit(editor, edited_user): and request.form["action"] == "edit-settings" ): Backend.instance.set_user_password(edited_user, form["password1"].data) - current_app.logger.info( - generate_security_log( - f"Changed password in settings for {edited_user.user_name} from {request_ip}" - ) + current_app.logger.security( + f"Changed password in settings for {edited_user.user_name} from {request_ip}" ) Backend.instance.save(edited_user) diff --git a/canaille/core/endpoints/auth.py b/canaille/core/endpoints/auth.py index 0190d33f..9891755a 100644 --- a/canaille/core/endpoints/auth.py +++ b/canaille/core/endpoints/auth.py @@ -8,7 +8,6 @@ from flask import session from flask import url_for from canaille.app import build_hash -from canaille.app import generate_security_log from canaille.app.flask import current_user from canaille.app.flask import login_user from canaille.app.flask import logout_user @@ -96,20 +95,16 @@ def password(): request_ip = request.remote_addr or "unknown IP" if not success: logout_user() - current_app.logger.info( - generate_security_log( - f'Failed login attempt for {session["attempt_login"]} from {request_ip}' - ) + current_app.logger.security( + f'Failed login attempt for {session["attempt_login"]} from {request_ip}' ) flash(message or _("Login failed, please check your information"), "error") return render_template( "password.html", form=form, username=session["attempt_login"] ) - current_app.logger.info( - generate_security_log( - f'Succeed login attempt for {session["attempt_login"]} from {request_ip}' - ) + current_app.logger.security( + f'Succeed login attempt for {session["attempt_login"]} from {request_ip}' ) del session["attempt_login"] login_user(user) @@ -126,9 +121,7 @@ def logout(): if user: request_ip = request.remote_addr or "unknown IP" - current_app.logger.info( - generate_security_log(f"Logout {user.identifier} from {request_ip}") - ) + current_app.logger.security(f"Logout {user.identifier} from {request_ip}") flash( _( @@ -209,10 +202,8 @@ def forgotten(): for email in user.emails: if not send_password_reset_mail(user, email): success = False - current_app.logger.info( - generate_security_log( - f"Sending a reset password mail to {email} for {user.user_name} from {request_ip}" - ) + current_app.logger.security( + f"Sending a reset password mail to {email} for {user.user_name} from {request_ip}" ) if success: diff --git a/canaille/oidc/endpoints/consents.py b/canaille/oidc/endpoints/consents.py index 5895316c..a9976214 100644 --- a/canaille/oidc/endpoints/consents.py +++ b/canaille/oidc/endpoints/consents.py @@ -8,7 +8,6 @@ from flask import redirect from flask import request from flask import url_for -from canaille.app import generate_security_log from canaille.app import models from canaille.app.flask import user_needed from canaille.app.i18n import gettext as _ @@ -81,10 +80,8 @@ def revoke(user, consent): else: consent.revoke() request_ip = request.remote_addr or "unknown IP" - current_app.logger.info( - generate_security_log( - f"Consent revoked for {user.user_name} in client {consent.client.client_name} from {request_ip}" - ) + current_app.logger.security( + f"Consent revoked for {user.user_name} in client {consent.client.client_name} from {request_ip}" ) flash(_("The access has been revoked"), "success") diff --git a/canaille/oidc/endpoints/oauth.py b/canaille/oidc/endpoints/oauth.py index 4449bcf8..a4d63f5e 100644 --- a/canaille/oidc/endpoints/oauth.py +++ b/canaille/oidc/endpoints/oauth.py @@ -17,7 +17,6 @@ from flask import url_for from werkzeug.datastructures import CombinedMultiDict from canaille import csrf -from canaille.app import generate_security_log from canaille.app import models from canaille.app.flask import current_user from canaille.app.flask import logout_user @@ -180,10 +179,8 @@ def authorize_consent(client, user): ) Backend.instance.save(consent) request_ip = request.remote_addr or "unknown IP" - current_app.logger.info( - generate_security_log( - f"New consent for {user.user_name} in client {consent.client.client_name} from {request_ip}" - ) + current_app.logger.security( + f"New consent for {user.user_name} in client {consent.client.client_name} from {request_ip}" ) response = authorization.create_authorization_response(grant_user=grant_user) @@ -195,7 +192,6 @@ def authorize_consent(client, user): @csrf.exempt def issue_token(): request_params = request.form.to_dict(flat=False) - print(request_params["grant_type"]) grant_type = request_params["grant_type"][0] current_app.logger.debug("token endpoint request: POST: %s", request_params) response = authorization.create_token_response() @@ -205,10 +201,8 @@ def issue_token(): access_token = response.json["access_token"] token = Backend.instance.get(models.Token, access_token=access_token) request_ip = request.remote_addr or "unknown IP" - current_app.logger.info( - generate_security_log( - f"Issued {grant_type} token for {token.subject.user_name} in client {token.client.client_name} from {request_ip}" - ) + current_app.logger.security( + f"Issued {grant_type} token for {token.subject.user_name} in client {token.client.client_name} from {request_ip}" ) return response diff --git a/canaille/oidc/endpoints/tokens.py b/canaille/oidc/endpoints/tokens.py index e91a122b..543e4ace 100644 --- a/canaille/oidc/endpoints/tokens.py +++ b/canaille/oidc/endpoints/tokens.py @@ -6,7 +6,6 @@ from flask import current_app from flask import flash from flask import request -from canaille.app import generate_security_log from canaille.app import models from canaille.app.flask import permissions_needed from canaille.app.flask import render_htmx_template @@ -45,10 +44,8 @@ def view(user, token): token.revokation_date = datetime.datetime.now(datetime.timezone.utc) Backend.instance.save(token) request_ip = request.remote_addr or "unknown IP" - current_app.logger.info( - generate_security_log( - f"Revoked token for {token.subject.user_name} in client {token.client.client_name} by {user.user_name} from {request_ip}" - ) + current_app.logger.security( + f"Revoked token for {token.subject.user_name} in client {token.client.client_name} by {user.user_name} from {request_ip}" ) flash(_("The token has successfully been revoked."), "success") diff --git a/tests/core/test_auth.py b/tests/core/test_auth.py index 107cd92b..86e460a7 100644 --- a/tests/core/test_auth.py +++ b/tests/core/test_auth.py @@ -1,8 +1,6 @@ import datetime import logging -from canaille.app import generate_security_log - def test_signin_and_out(testclient, user, caplog): with testclient.session_transaction() as session: @@ -26,8 +24,8 @@ def test_signin_and_out(testclient, user, caplog): ) in res.flashes assert ( "canaille", - logging.INFO, - generate_security_log("Succeed login attempt for user from unknown IP"), + logging.SECURITY, + "Succeed login attempt for user from unknown IP", ) in caplog.record_tuples res = res.follow(status=302) res = res.follow(status=200) @@ -45,8 +43,8 @@ def test_signin_and_out(testclient, user, caplog): ) in res.flashes assert ( "canaille", - logging.INFO, - generate_security_log("Logout user from unknown IP"), + logging.SECURITY, + "Logout user from unknown IP", ) in caplog.record_tuples res = res.follow(status=302) res = res.follow(status=200) @@ -82,8 +80,8 @@ def test_signin_wrong_password(testclient, user, caplog): assert ("error", "Login failed, please check your information") in res.flashes assert ( "canaille", - logging.INFO, - generate_security_log("Failed login attempt for user from unknown IP"), + logging.SECURITY, + "Failed login attempt for user from unknown IP", ) in caplog.record_tuples diff --git a/tests/core/test_forgotten_password.py b/tests/core/test_forgotten_password.py index aaec88c1..fdb189ec 100644 --- a/tests/core/test_forgotten_password.py +++ b/tests/core/test_forgotten_password.py @@ -1,8 +1,6 @@ import logging from unittest import mock -from canaille.app import generate_security_log - def test_password_forgotten_disabled(smtpd, testclient, user): testclient.app.config["CANAILLE"]["ENABLE_PASSWORD_RECOVERY"] = False @@ -26,10 +24,8 @@ def test_password_forgotten(smtpd, testclient, user, caplog): ) in res.flashes assert ( "canaille", - logging.INFO, - generate_security_log( - "Sending a reset password mail to john@doe.com for user from unknown IP" - ), + logging.SECURITY, + "Sending a reset password mail to john@doe.com for user from unknown IP", ) in caplog.record_tuples res.mustcontain("Send again") @@ -52,10 +48,8 @@ def test_password_forgotten_multiple_mails(smtpd, testclient, user, backend, cap for email in user.emails: assert ( "canaille", - logging.INFO, - generate_security_log( - f"Sending a reset password mail to {email} for user from unknown IP" - ), + logging.SECURITY, + f"Sending a reset password mail to {email} for user from unknown IP", ) in caplog.record_tuples res.mustcontain("Send again") diff --git a/tests/core/test_profile_edition.py b/tests/core/test_profile_edition.py index 9b0ef383..5bc720ca 100644 --- a/tests/core/test_profile_edition.py +++ b/tests/core/test_profile_edition.py @@ -4,7 +4,6 @@ import pytest from flask import g from webtest import Upload -from canaille.app import generate_security_log from canaille.core.populate import fake_users @@ -130,8 +129,8 @@ def test_edition(testclient, logged_user, admin, jpeg_photo, backend, caplog): ], res.text assert ( "canaille", - logging.INFO, - generate_security_log("Updated email for user from unknown IP"), + logging.SECURITY, + "Updated email for user from unknown IP", ) in caplog.record_tuples res = res.follow() diff --git a/tests/core/test_profile_settings.py b/tests/core/test_profile_settings.py index 27ca0ac4..d106b026 100644 --- a/tests/core/test_profile_settings.py +++ b/tests/core/test_profile_settings.py @@ -4,7 +4,6 @@ from unittest import mock from flask import g -from canaille.app import generate_security_log from canaille.app import models @@ -141,8 +140,8 @@ def test_password_change(testclient, logged_user, backend, caplog): assert ( "canaille", - logging.INFO, - generate_security_log("Changed password in settings for user from unknown IP"), + logging.SECURITY, + "Changed password in settings for user from unknown IP", ) in caplog.record_tuples res = res.follow() diff --git a/tests/oidc/test_authorization_code_flow.py b/tests/oidc/test_authorization_code_flow.py index 0be6a917..8eda77b9 100644 --- a/tests/oidc/test_authorization_code_flow.py +++ b/tests/oidc/test_authorization_code_flow.py @@ -9,7 +9,6 @@ from authlib.oauth2.rfc7636 import create_s256_code_challenge from flask import g from werkzeug.security import gen_salt -from canaille.app import generate_security_log from canaille.app import models from . import client_credentials @@ -33,10 +32,8 @@ def test_nominal_case( res = res.form.submit(name="answer", value="accept", status=302) assert ( "canaille", - logging.INFO, - generate_security_log( - "New consent for user in client Some client from unknown IP" - ), + logging.SECURITY, + "New consent for user in client Some client from unknown IP", ) in caplog.record_tuples assert res.location.startswith(client.redirect_uris[0]) @@ -99,10 +96,8 @@ def test_nominal_case( assert claims["aud"] == [client.client_id, trusted_client.client_id] assert ( "canaille", - logging.INFO, - generate_security_log( - "Issued authorization_code token for user in client Some client from unknown IP" - ), + logging.SECURITY, + "Issued authorization_code token for user in client Some client from unknown IP", ) in caplog.record_tuples res = testclient.get( "/oauth/userinfo", diff --git a/tests/oidc/test_consent.py b/tests/oidc/test_consent.py index eda35f25..33336f31 100644 --- a/tests/oidc/test_consent.py +++ b/tests/oidc/test_consent.py @@ -2,7 +2,6 @@ import logging from urllib.parse import parse_qs from urllib.parse import urlsplit -from canaille.app import generate_security_log from canaille.app import models from . import client_credentials @@ -24,10 +23,8 @@ def test_revokation(testclient, client, consent, logged_user, token, backend, ca assert ("success", "The access has been revoked") in res.flashes assert ( "canaille", - logging.INFO, - generate_security_log( - f"Consent revoked for {logged_user.user_name} in client {client.client_name} from unknown IP" - ), + logging.SECURITY, + f"Consent revoked for {logged_user.user_name} in client {client.client_name} from unknown IP", ) in caplog.record_tuples res = res.follow(status=200) res.mustcontain(no="Revoke access") diff --git a/tests/oidc/test_refresh_token.py b/tests/oidc/test_refresh_token.py index 0a02090f..be9f99dc 100644 --- a/tests/oidc/test_refresh_token.py +++ b/tests/oidc/test_refresh_token.py @@ -3,7 +3,6 @@ import logging from urllib.parse import parse_qs from urllib.parse import urlsplit -from canaille.app import generate_security_log from canaille.app import models from . import client_credentials @@ -63,10 +62,8 @@ def test_refresh_token(testclient, logged_user, client, backend, caplog): assert old_token.access_token != new_token.access_token assert ( "canaille", - logging.INFO, - generate_security_log( - "Issued refresh_token token for user in client Some client from unknown IP" - ), + logging.SECURITY, + "Issued refresh_token token for user in client Some client from unknown IP", ) in caplog.record_tuples backend.reload(old_token) assert old_token.revokation_date diff --git a/tests/oidc/test_token_admin.py b/tests/oidc/test_token_admin.py index 2c35e693..10834a8c 100644 --- a/tests/oidc/test_token_admin.py +++ b/tests/oidc/test_token_admin.py @@ -3,7 +3,6 @@ import logging from werkzeug.security import gen_salt -from canaille.app import generate_security_log from canaille.app import models @@ -145,10 +144,8 @@ def test_revoke_token(testclient, token, logged_admin, backend, caplog): assert ("success", "The token has successfully been revoked.") in res.flashes assert ( "canaille", - logging.INFO, - generate_security_log( - "Revoked token for user in client Some client by admin from unknown IP" - ), + logging.SECURITY, + "Revoked token for user in client Some client by admin from unknown IP", ) in caplog.record_tuples backend.reload(token)