Merge branch '177-security-events-logging-policy' into 'main'

feat: Added security logs for email update, forgotten password mail, token...

Closes #177

See merge request yaal/canaille!185
This commit is contained in:
Éloi Rivard 2024-10-23 07:21:35 +00:00
commit 40136fa394
17 changed files with 192 additions and 28 deletions

View file

@ -1,6 +1,10 @@
[0.0.56] - Unreleased
---------------------
Added
^^^^^
- New security events logs :issue:`177`
Changed
^^^^^^^
- Update to HTMX 2.0.3 :pr:`184`

View file

@ -1,4 +1,5 @@
import datetime
import logging
import sys
from logging.config import dictConfig
from logging.config import fileConfig
@ -30,6 +31,11 @@ def setup_sentry(app): # pragma: no cover
def setup_logging(app):
conf = app.config["CANAILLE"]["LOGGING"]
security_level_name = "SECURITY"
if not hasattr(logging, security_level_name):
addLoggingLevel(security_level_name, logging.INFO + 5)
if conf is None:
log_level = "DEBUG" if app.debug else "INFO"
dictConfig(
@ -157,3 +163,52 @@ def create_app(
raise
return app
def addLoggingLevel(levelName, levelNum, methodName=None):
"""Comprehensively adds a new logging level to the `logging` module and the
currently configured logging class.
`levelName` becomes an attribute of the `logging` module with the value
`levelNum`. `methodName` becomes a convenience method for both `logging`
itself and the class returned by `logging.getLoggerClass()` (usually just
`logging.Logger`). If `methodName` is not specified, `levelName.lower()` is
used.
To avoid accidental clobberings of existing attributes, this method will
raise an `AttributeError` if the level name is already an attribute of the
`logging` module or if the method name is already present
Example
-------
>>> addLoggingLevel("TRACE", logging.DEBUG - 5)
>>> logging.getLogger(__name__).setLevel("TRACE")
>>> logging.getLogger(__name__).trace("that worked")
>>> logging.trace("so did this")
>>> logging.TRACE
5
"""
if not methodName:
methodName = levelName.lower()
if hasattr(logging, levelName):
raise AttributeError(f"{levelName} already defined in logging module")
if hasattr(logging, methodName):
raise AttributeError(f"{methodName} already defined in logging module")
if hasattr(logging.getLoggerClass(), methodName):
raise AttributeError(f"{methodName} already defined in logger class")
# This method was inspired by the answers to Stack Overflow post
# http://stackoverflow.com/q/2183233/2988730, especially
# http://stackoverflow.com/a/13638084/2988730
def logForLevel(self, message, *args, **kwargs):
if self.isEnabledFor(levelNum):
self._log(levelNum, message, args, **kwargs)
def logToRoot(message, *args, **kwargs):
logging.log(levelNum, message, *args, **kwargs)
logging.addLevelName(levelNum, levelName)
setattr(logging, levelName, levelNum)
setattr(logging.getLoggerClass(), methodName, logForLevel)
setattr(logging, methodName, logToRoot)

View file

@ -583,6 +583,7 @@ def profile_edition(user, edited_user):
):
abort(404)
request_ip = request.remote_addr or "unknown IP"
menuitem = "profile" if edited_user.id == user.id else "users"
emails_readonly = (
current_app.features.has_email_confirmation and not user.can_manage_users
@ -595,6 +596,10 @@ def profile_edition(user, edited_user):
else None
)
has_email_changed = "emails" in profile_form and set(
profile_form["emails"].data
) != set(user.emails)
render_context = {
"menuitem": menuitem,
"edited_user": edited_user,
@ -611,7 +616,14 @@ def profile_edition(user, edited_user):
return render_template("profile_edit.html", **render_context)
profile_edition_main_form_validation(user, edited_user, profile_form)
if has_email_changed:
current_app.logger.security(
f"Updated email for {edited_user.user_name} from {request_ip}"
)
flash(_("Profile updated successfully."), "success")
return redirect(
url_for("core.account.profile_edition", edited_user=edited_user)
)
@ -744,6 +756,7 @@ def profile_settings(user, edited_user):
def profile_settings_edit(editor, edited_user):
menuitem = "profile" if editor.id == editor.id else "users"
fields = editor.readable_fields | editor.writable_fields
request_ip = request.remote_addr or "unknown IP"
available_fields = {"password", "groups", "user_name", "lock_date"}
data = {
@ -781,6 +794,9 @@ def profile_settings_edit(editor, edited_user):
and request.form["action"] == "edit-settings"
):
Backend.instance.set_user_password(edited_user, form["password1"].data)
current_app.logger.security(
f"Changed password in settings for {edited_user.user_name} from {request_ip}"
)
Backend.instance.save(edited_user)
flash(_("Profile updated successfully."), "success")

View file

@ -95,7 +95,7 @@ def password():
request_ip = request.remote_addr or "unknown IP"
if not success:
logout_user()
current_app.logger.info(
current_app.logger.security(
f'Failed login attempt for {session["attempt_login"]} from {request_ip}'
)
flash(message or _("Login failed, please check your information"), "error")
@ -103,7 +103,7 @@ def password():
"password.html", form=form, username=session["attempt_login"]
)
current_app.logger.info(
current_app.logger.security(
f'Succeed login attempt for {session["attempt_login"]} from {request_ip}'
)
del session["attempt_login"]
@ -121,7 +121,7 @@ def logout():
if user:
request_ip = request.remote_addr or "unknown IP"
current_app.logger.info(f"Logout {user.identifier} from {request_ip}")
current_app.logger.security(f"Logout {user.identifier} from {request_ip}")
flash(
_(
@ -197,8 +197,14 @@ def forgotten():
)
return render_template("forgotten-password.html", form=form)
statuses = [send_password_reset_mail(user, email) for email in user.emails]
success = all(statuses)
request_ip = request.remote_addr or "unknown IP"
success = True
for email in user.emails:
if not send_password_reset_mail(user, email):
success = False
current_app.logger.security(
f"Sending a reset password mail to {email} for {user.user_name} from {request_ip}"
)
if success:
flash(success_message, "success")

View file

@ -2,8 +2,10 @@ import datetime
import uuid
from flask import Blueprint
from flask import current_app
from flask import flash
from flask import redirect
from flask import request
from flask import url_for
from canaille.app import models
@ -77,6 +79,10 @@ def revoke(user, consent):
else:
consent.revoke()
request_ip = request.remote_addr or "unknown IP"
current_app.logger.security(
f"Consent revoked for {user.user_name} in client {consent.client.client_name} from {request_ip}"
)
flash(_("The access has been revoked"), "success")
return redirect(url_for("oidc.consents.consents"))

View file

@ -178,6 +178,10 @@ def authorize_consent(client, user):
issue_date=datetime.datetime.now(datetime.timezone.utc),
)
Backend.instance.save(consent)
request_ip = request.remote_addr or "unknown IP"
current_app.logger.security(
f"New consent for {user.user_name} in client {consent.client.client_name} from {request_ip}"
)
response = authorization.create_authorization_response(grant_user=grant_user)
current_app.logger.debug("authorization endpoint response: %s", response.location)
@ -187,11 +191,19 @@ def authorize_consent(client, user):
@bp.route("/token", methods=["POST"])
@csrf.exempt
def issue_token():
current_app.logger.debug(
"token endpoint request: POST: %s", request.form.to_dict(flat=False)
)
request_params = request.form.to_dict(flat=False)
grant_type = request_params["grant_type"][0]
current_app.logger.debug("token endpoint request: POST: %s", request_params)
response = authorization.create_token_response()
current_app.logger.debug("token endpoint response: %s", response.json)
if response.json.get("access_token"):
access_token = response.json["access_token"]
token = Backend.instance.get(models.Token, access_token=access_token)
request_ip = request.remote_addr or "unknown IP"
current_app.logger.security(
f"Issued {grant_type} token for {token.subject.user_name} in client {token.client.client_name} from {request_ip}"
)
return response

View file

@ -2,6 +2,7 @@ import datetime
from flask import Blueprint
from flask import abort
from flask import current_app
from flask import flash
from flask import request
@ -42,6 +43,10 @@ def view(user, token):
elif request.form.get("action") == "revoke":
token.revokation_date = datetime.datetime.now(datetime.timezone.utc)
Backend.instance.save(token)
request_ip = request.remote_addr or "unknown IP"
current_app.logger.security(
f"Revoked token for {token.subject.user_name} in client {token.client.client_name} by {user.user_name} from {request_ip}"
)
flash(_("The token has successfully been revoked."), "success")
else:

1
demo/.gitignore vendored
View file

@ -1,2 +1,3 @@
env
*.pem
var/

View file

@ -263,6 +263,18 @@ Logging
Canaille writes :attr:`logs <canaille.core.configuration.CoreSettings.LOGGING>` for every important event happening, to help administrators understand what is going on and debug funky situations.
The following security events are logged with the tag [SECURITY] for easy retrieval :
- Authentication attempt
- Password update
- Email update
- Forgotten password mail sent to user
- Token emission
- Token refresh
- Token revokation
- New consent given for client application
- Consent revokation
.. _feature_development:
A tool for your development and tests

View file

@ -24,7 +24,7 @@ def test_signin_and_out(testclient, user, caplog):
) in res.flashes
assert (
"canaille",
logging.INFO,
logging.SECURITY,
"Succeed login attempt for user from unknown IP",
) in caplog.record_tuples
res = res.follow(status=302)
@ -43,7 +43,7 @@ def test_signin_and_out(testclient, user, caplog):
) in res.flashes
assert (
"canaille",
logging.INFO,
logging.SECURITY,
"Logout user from unknown IP",
) in caplog.record_tuples
res = res.follow(status=302)
@ -80,7 +80,7 @@ def test_signin_wrong_password(testclient, user, caplog):
assert ("error", "Login failed, please check your information") in res.flashes
assert (
"canaille",
logging.INFO,
logging.SECURITY,
"Failed login attempt for user from unknown IP",
) in caplog.record_tuples

View file

@ -1,3 +1,4 @@
import logging
from unittest import mock
@ -11,7 +12,7 @@ def test_password_forgotten_disabled(smtpd, testclient, user):
res.mustcontain(no="Forgotten password")
def test_password_forgotten(smtpd, testclient, user):
def test_password_forgotten(smtpd, testclient, user, caplog):
res = testclient.get("/reset", status=200)
res.form["login"] = "user"
@ -21,12 +22,17 @@ def test_password_forgotten(smtpd, testclient, user):
"A password reset link has been sent at your email address. You should receive "
"it within a few minutes.",
) in res.flashes
assert (
"canaille",
logging.SECURITY,
"Sending a reset password mail to john@doe.com for user from unknown IP",
) in caplog.record_tuples
res.mustcontain("Send again")
assert len(smtpd.messages) == 1
def test_password_forgotten_multiple_mails(smtpd, testclient, user, backend):
def test_password_forgotten_multiple_mails(smtpd, testclient, user, backend, caplog):
user.emails = ["foo@bar.com", "foo@baz.com", "foo@foo.com"]
backend.save(user)
@ -39,6 +45,12 @@ def test_password_forgotten_multiple_mails(smtpd, testclient, user, backend):
"A password reset link has been sent at your email address. You should receive "
"it within a few minutes.",
) in res.flashes
for email in user.emails:
assert (
"canaille",
logging.SECURITY,
f"Sending a reset password mail to {email} for user from unknown IP",
) in caplog.record_tuples
res.mustcontain("Send again")
assert len(smtpd.messages) == 3

View file

@ -1,3 +1,5 @@
import logging
import pytest
from flask import g
from webtest import Upload
@ -101,13 +103,7 @@ def test_edition_permission(
testclient.get("/profile/user", status=200)
def test_edition(
testclient,
logged_user,
admin,
jpeg_photo,
backend,
):
def test_edition(testclient, logged_user, admin, jpeg_photo, backend, caplog):
res = testclient.get("/profile/user", status=200)
form = res.forms["baseform"]
form["given_name"] = "given_name"
@ -131,6 +127,11 @@ def test_edition(
assert res.flashes == [
("success", "Le profil a été mis à jour avec succès.")
], res.text
assert (
"canaille",
logging.SECURITY,
"Updated email for user from unknown IP",
) in caplog.record_tuples
res = res.follow()
backend.reload(logged_user)

View file

@ -1,4 +1,5 @@
import datetime
import logging
from unittest import mock
from flask import g
@ -118,7 +119,7 @@ def test_edition_without_groups(
backend.save(logged_user)
def test_password_change(testclient, logged_user, backend):
def test_password_change(testclient, logged_user, backend, caplog):
res = testclient.get("/profile/user/settings", status=200)
res.form["password1"] = "new_password"
@ -136,6 +137,13 @@ def test_password_change(testclient, logged_user, backend):
res = res.form.submit(name="action", value="edit-settings")
assert ("success", "Profile updated successfully.") in res.flashes
assert (
"canaille",
logging.SECURITY,
"Changed password in settings for user from unknown IP",
) in caplog.record_tuples
res = res.follow()
backend.reload(logged_user)

View file

@ -1,4 +1,5 @@
import datetime
import logging
from urllib.parse import parse_qs
from urllib.parse import urlsplit
@ -14,7 +15,7 @@ from . import client_credentials
def test_nominal_case(
testclient, logged_user, client, keypair, trusted_client, backend
testclient, logged_user, client, keypair, trusted_client, backend, caplog
):
assert not backend.query(models.Consent)
@ -28,8 +29,12 @@ def test_nominal_case(
),
status=200,
)
res = res.form.submit(name="answer", value="accept", status=302)
assert (
"canaille",
logging.SECURITY,
"New consent for user in client Some client from unknown IP",
) in caplog.record_tuples
assert res.location.startswith(client.redirect_uris[0])
params = parse_qs(urlsplit(res.location).query)
@ -89,7 +94,11 @@ def test_nominal_case(
assert claims["sub"] == logged_user.user_name
assert claims["name"] == logged_user.formatted_name
assert claims["aud"] == [client.client_id, trusted_client.client_id]
assert (
"canaille",
logging.SECURITY,
"Issued authorization_code token for user in client Some client from unknown IP",
) in caplog.record_tuples
res = testclient.get(
"/oauth/userinfo",
headers={"Authorization": f"Bearer {access_token}"},

View file

@ -1,3 +1,4 @@
import logging
from urllib.parse import parse_qs
from urllib.parse import urlsplit
@ -10,7 +11,7 @@ def test_no_logged_no_access(testclient):
testclient.get("/consent", status=403)
def test_revokation(testclient, client, consent, logged_user, token, backend):
def test_revokation(testclient, client, consent, logged_user, token, backend, caplog):
res = testclient.get("/consent", status=200)
res.mustcontain(client.client_name)
res.mustcontain("Revoke access")
@ -20,6 +21,11 @@ def test_revokation(testclient, client, consent, logged_user, token, backend):
res = testclient.get(f"/consent/revoke/{consent.consent_id}", status=302)
assert ("success", "The access has been revoked") in res.flashes
assert (
"canaille",
logging.SECURITY,
f"Consent revoked for {logged_user.user_name} in client {client.client_name} from unknown IP",
) in caplog.record_tuples
res = res.follow(status=200)
res.mustcontain(no="Revoke access")
res.mustcontain("Restore access")

View file

@ -1,4 +1,5 @@
import datetime
import logging
from urllib.parse import parse_qs
from urllib.parse import urlsplit
@ -7,7 +8,7 @@ from canaille.app import models
from . import client_credentials
def test_refresh_token(testclient, logged_user, client, backend):
def test_refresh_token(testclient, logged_user, client, backend, caplog):
assert not backend.query(models.Consent)
res = testclient.get(
@ -59,7 +60,11 @@ def test_refresh_token(testclient, logged_user, client, backend):
new_token = backend.get(models.Token, access_token=access_token)
assert new_token is not None
assert old_token.access_token != new_token.access_token
assert (
"canaille",
logging.SECURITY,
"Issued refresh_token token for user in client Some client from unknown IP",
) in caplog.record_tuples
backend.reload(old_token)
assert old_token.revokation_date

View file

@ -1,4 +1,5 @@
import datetime
import logging
from werkzeug.security import gen_salt
@ -134,13 +135,18 @@ def test_revoke_bad_request(testclient, token, logged_admin):
res = res.form.submit(name="action", value="invalid", status=400)
def test_revoke_token(testclient, token, logged_admin, backend):
def test_revoke_token(testclient, token, logged_admin, backend, caplog):
assert not token.revoked
res = testclient.get(f"/admin/token/{token.token_id}")
res = res.form.submit(name="action", value="confirm-revoke")
res = res.form.submit(name="action", value="revoke")
assert ("success", "The token has successfully been revoked.") in res.flashes
assert (
"canaille",
logging.SECURITY,
"Revoked token for user in client Some client by admin from unknown IP",
) in caplog.record_tuples
backend.reload(token)
assert token.revoked