forked from Github-Mirrors/canaille
CSRF protection everywhere
This commit is contained in:
parent
98df762666
commit
f97dc3b2c6
17 changed files with 151 additions and 89 deletions
|
@ -15,6 +15,11 @@ Changed
|
|||
- UX rework. Submenu addition. :pr:`114`
|
||||
- Properly handle LDAP date timezones. :pr:`117`
|
||||
|
||||
Fixed
|
||||
*****
|
||||
|
||||
- CSRF protection on every forms. :pr:`119`
|
||||
|
||||
[0.0.22] - 2023-03-13
|
||||
=====================
|
||||
|
||||
|
|
|
@ -10,10 +10,10 @@ from flask import session
|
|||
from flask_themer import FileSystemThemeLoader
|
||||
from flask_themer import render_template
|
||||
from flask_themer import Themer
|
||||
from flask_wtf.csrf import CSRFProtect
|
||||
|
||||
from .i18n import setup_i18n
|
||||
from .ldap_backend.backend import init_backend
|
||||
from .oidc.oauth import setup_oauth
|
||||
|
||||
csrf = CSRFProtect()
|
||||
|
||||
|
||||
def setup_config(app, config=None, validate=True):
|
||||
|
@ -128,12 +128,54 @@ def setup_blueprints(app):
|
|||
app.register_blueprint(canaille.oidc.bp)
|
||||
|
||||
|
||||
def setup_flask(app):
|
||||
csrf.init_app(app)
|
||||
|
||||
@app.before_request
|
||||
def make_session_permanent():
|
||||
session.permanent = True
|
||||
app.permanent_session_lifetime = datetime.timedelta(days=365)
|
||||
|
||||
@app.context_processor
|
||||
def global_processor():
|
||||
from .flaskutils import current_user
|
||||
|
||||
return {
|
||||
"has_smtp": "SMTP" in app.config,
|
||||
"logo_url": app.config.get("LOGO"),
|
||||
"favicon_url": app.config.get("FAVICON", app.config.get("LOGO")),
|
||||
"website_name": app.config.get("NAME", "Canaille"),
|
||||
"user": current_user(),
|
||||
"menu": True,
|
||||
}
|
||||
|
||||
@app.errorhandler(400)
|
||||
def bad_request(e):
|
||||
return render_template("error.html", error=400), 400
|
||||
|
||||
@app.errorhandler(403)
|
||||
def unauthorized(e):
|
||||
return render_template("error.html", error=403), 403
|
||||
|
||||
@app.errorhandler(404)
|
||||
def page_not_found(e):
|
||||
return render_template("error.html", error=404), 404
|
||||
|
||||
@app.errorhandler(500)
|
||||
def server_error(e): # pragma: no cover
|
||||
return render_template("error.html", error=500), 500
|
||||
|
||||
|
||||
def create_app(config=None, validate=True):
|
||||
app = Flask(__name__)
|
||||
setup_config(app, config, validate)
|
||||
|
||||
sentry_sdk = setup_sentry(app)
|
||||
try:
|
||||
from .oidc.oauth import setup_oauth
|
||||
from .ldap_backend.backend import init_backend
|
||||
from .i18n import setup_i18n
|
||||
|
||||
setup_logging(app)
|
||||
init_backend(app)
|
||||
setup_oauth(app)
|
||||
|
@ -141,40 +183,7 @@ def create_app(config=None, validate=True):
|
|||
setup_jinja(app)
|
||||
setup_i18n(app)
|
||||
setup_themer(app)
|
||||
|
||||
@app.before_request
|
||||
def make_session_permanent():
|
||||
session.permanent = True
|
||||
app.permanent_session_lifetime = datetime.timedelta(days=365)
|
||||
|
||||
@app.context_processor
|
||||
def global_processor():
|
||||
from .flaskutils import current_user
|
||||
|
||||
return {
|
||||
"has_smtp": "SMTP" in app.config,
|
||||
"logo_url": app.config.get("LOGO"),
|
||||
"favicon_url": app.config.get("FAVICON", app.config.get("LOGO")),
|
||||
"website_name": app.config.get("NAME", "Canaille"),
|
||||
"user": current_user(),
|
||||
"menu": True,
|
||||
}
|
||||
|
||||
@app.errorhandler(400)
|
||||
def bad_request(e):
|
||||
return render_template("error.html", error=400), 400
|
||||
|
||||
@app.errorhandler(403)
|
||||
def unauthorized(e):
|
||||
return render_template("error.html", error=403), 403
|
||||
|
||||
@app.errorhandler(404)
|
||||
def page_not_found(e):
|
||||
return render_template("error.html", error=404), 404
|
||||
|
||||
@app.errorhandler(500)
|
||||
def server_error(e): # pragma: no cover
|
||||
return render_template("error.html", error=500), 500
|
||||
setup_flask(app)
|
||||
|
||||
except Exception as exc: # pragma: no cover
|
||||
if sentry_sdk:
|
||||
|
|
|
@ -163,10 +163,7 @@ def firstlogin(uid):
|
|||
if not request.form:
|
||||
return render_template("firstlogin.html", form=form, uid=uid)
|
||||
|
||||
if not form.validate():
|
||||
flash(_("Could not send the password initialization link."), "error")
|
||||
return render_template("firstlogin.html", form=form, uid=uid)
|
||||
|
||||
form.validate()
|
||||
if send_password_initialization_mail(user):
|
||||
flash(
|
||||
_(
|
||||
|
|
|
@ -5,6 +5,7 @@ from authlib.integrations.flask_oauth2 import current_token
|
|||
from authlib.jose import JsonWebKey
|
||||
from authlib.jose import jwt
|
||||
from authlib.oauth2 import OAuth2Error
|
||||
from canaille import csrf
|
||||
from flask import abort
|
||||
from flask import Blueprint
|
||||
from flask import current_app
|
||||
|
@ -159,6 +160,7 @@ def authorize():
|
|||
|
||||
|
||||
@bp.route("/token", methods=["POST"])
|
||||
@csrf.exempt
|
||||
def issue_token():
|
||||
current_app.logger.debug(
|
||||
"token endpoint request: POST: %s", request.form.to_dict(flat=False)
|
||||
|
@ -169,6 +171,7 @@ def issue_token():
|
|||
|
||||
|
||||
@bp.route("/introspect", methods=["POST"])
|
||||
@csrf.exempt
|
||||
def introspect_token():
|
||||
current_app.logger.debug(
|
||||
"introspection endpoint request: POST: %s", request.form.to_dict(flat=False)
|
||||
|
@ -181,6 +184,7 @@ def introspect_token():
|
|||
|
||||
|
||||
@bp.route("/revoke", methods=["POST"])
|
||||
@csrf.exempt
|
||||
def revoke_token():
|
||||
current_app.logger.debug(
|
||||
"revokation endpoint request: POST: %s", request.form.to_dict(flat=False)
|
||||
|
@ -191,6 +195,7 @@ def revoke_token():
|
|||
|
||||
|
||||
@bp.route("/register", methods=["POST"])
|
||||
@csrf.exempt
|
||||
def client_registration():
|
||||
current_app.logger.debug(
|
||||
"client registration endpoint request: POST: %s",
|
||||
|
@ -204,6 +209,7 @@ def client_registration():
|
|||
|
||||
|
||||
@bp.route("/register/<client_id>", methods=["GET", "PUT", "DELETE"])
|
||||
@csrf.exempt
|
||||
def client_registration_management(client_id):
|
||||
current_app.logger.debug(
|
||||
"client registration management endpoint request: POST: %s",
|
||||
|
@ -336,10 +342,7 @@ def end_session():
|
|||
@bp.route("/end_session_confirm", methods=["POST"])
|
||||
def end_session_submit():
|
||||
form = LogoutForm(request.form)
|
||||
if not form.validate():
|
||||
flash(_("An error happened during the logout"), "error")
|
||||
client = Client.get(session.get("end_session_data", {}).get("client_id"))
|
||||
return render_template("oidc/user/logout.html", form=form, client=client)
|
||||
form.validate()
|
||||
|
||||
data = session["end_session_data"]
|
||||
del session["end_session_data"]
|
||||
|
|
|
@ -92,8 +92,8 @@ display=true
|
|||
{%- endmacro %}
|
||||
|
||||
{% macro render_fields(form) %}
|
||||
{{ form.hidden_tag() if form.hidden_tag }}
|
||||
{% if caller %}
|
||||
{{ form.hidden_tag() if form.hidden_tag }}
|
||||
{{ caller() }}
|
||||
{% else %}
|
||||
{% for field in form %}
|
||||
|
|
|
@ -34,6 +34,7 @@
|
|||
|
||||
<div class="ui center aligned container">
|
||||
<form action="{{ request.url }}" method="post">
|
||||
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
|
||||
<div class="ui stackable buttons">
|
||||
<button name="answer" type="submit" class="ui negative button" value="deny" id="deny">
|
||||
{% trans %}Deny{% endtrans %}
|
||||
|
|
|
@ -53,7 +53,7 @@
|
|||
class="ui form info profile-form"
|
||||
>
|
||||
|
||||
{#{ render_field(form.csrf_token) }#}
|
||||
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
|
||||
|
||||
<h4 class="ui dividing header">{% trans %}Personal information{% endtrans %}</h4>
|
||||
|
||||
|
|
|
@ -72,7 +72,7 @@
|
|||
class="ui form info{% if user.can_manage_users and edited_user and not edited_user.has_password() %} warning{% endif %} profile-form"
|
||||
>
|
||||
|
||||
{#{ render_field(form.csrf_token) }#}
|
||||
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
|
||||
|
||||
{% if "jpegPhoto" in form %}
|
||||
<div class="ui grid">
|
||||
|
|
|
@ -78,7 +78,7 @@
|
|||
class="ui form info{% if user.can_manage_users and not edited_user.has_password() %} warning{% endif %} profile-form"
|
||||
>
|
||||
|
||||
{#{ render_field(form.csrf_token) }#}
|
||||
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
|
||||
|
||||
{% if "uid" in form %}
|
||||
{% block uid_field scoped %}{{ render_field(form.uid) }}{% endblock %}
|
||||
|
|
|
@ -57,13 +57,17 @@ def test_client_list_bad_pages(testclient, logged_admin):
|
|||
res = testclient.get("/admin/client")
|
||||
form = res.forms["next"]
|
||||
testclient.post(
|
||||
"/admin/client", {"csrf_token": form["csrf_token"], "page": "2"}, status=404
|
||||
"/admin/client",
|
||||
{"csrf_token": form["csrf_token"].value, "page": "2"},
|
||||
status=404,
|
||||
)
|
||||
|
||||
res = testclient.get("/admin/client")
|
||||
form = res.forms["next"]
|
||||
testclient.post(
|
||||
"/admin/client", {"csrf_token": form["csrf_token"], "page": "-1"}, status=404
|
||||
"/admin/client",
|
||||
{"csrf_token": form["csrf_token"].value, "page": "-1"},
|
||||
status=404,
|
||||
)
|
||||
|
||||
|
||||
|
@ -217,8 +221,13 @@ def test_client_delete(testclient, logged_admin):
|
|||
assert not Consent.get()
|
||||
|
||||
|
||||
def test_client_delete_invalid_client(testclient, logged_admin):
|
||||
testclient.post("/admin/client/edit/invalid", {"action": "delete"}, status=404)
|
||||
def test_client_delete_invalid_client(testclient, logged_admin, client):
|
||||
res = testclient.get(f"/admin/client/edit/{client.client_id}")
|
||||
testclient.post(
|
||||
"/admin/client/edit/invalid",
|
||||
{"action": "delete", "csrf_token": res.forms["clientadd"]["csrf_token"].value},
|
||||
status=404,
|
||||
)
|
||||
|
||||
|
||||
def test_invalid_request(testclient, logged_admin, client):
|
||||
|
|
|
@ -51,7 +51,7 @@ def test_authorization_list_bad_pages(testclient, logged_admin):
|
|||
form = res.forms["next"]
|
||||
testclient.post(
|
||||
"/admin/authorization",
|
||||
{"csrf_token": form["csrf_token"], "page": "2"},
|
||||
{"csrf_token": form["csrf_token"].value, "page": "2"},
|
||||
status=404,
|
||||
)
|
||||
|
||||
|
@ -59,7 +59,7 @@ def test_authorization_list_bad_pages(testclient, logged_admin):
|
|||
form = res.forms["next"]
|
||||
testclient.post(
|
||||
"/admin/authorization",
|
||||
{"csrf_token": form["csrf_token"], "page": "-1"},
|
||||
{"csrf_token": form["csrf_token"].value, "page": "-1"},
|
||||
status=404,
|
||||
)
|
||||
|
||||
|
|
|
@ -374,19 +374,7 @@ def test_no_jwt_bad_csrf(testclient, slapd_connection, logged_user, client):
|
|||
|
||||
form = res.form
|
||||
form["csrf_token"] = "foobar"
|
||||
res = form.submit(name="answer", value="logout", status=200)
|
||||
|
||||
assert ("error", "An error happened during the logout") in res.flashes
|
||||
|
||||
res = res.form.submit(name="answer", value="logout", status=302)
|
||||
res = res.follow(status=302)
|
||||
|
||||
with testclient.session_transaction() as sess:
|
||||
assert not sess.get("user_id")
|
||||
|
||||
assert res.location == f"{post_logout_redirect_url}?state=foobar"
|
||||
|
||||
testclient.get(f"/profile/{logged_user.uid[0]}", status=403)
|
||||
res = form.submit(name="answer", value="logout", status=400)
|
||||
|
||||
|
||||
def test_end_session_already_disconnected(
|
||||
|
|
|
@ -60,13 +60,17 @@ def test_token_list_bad_pages(testclient, logged_admin):
|
|||
res = testclient.get("/admin/token")
|
||||
form = res.forms["next"]
|
||||
testclient.post(
|
||||
"/admin/token", {"csrf_token": form["csrf_token"], "page": "2"}, status=404
|
||||
"/admin/token",
|
||||
{"csrf_token": form["csrf_token"].value, "page": "2"},
|
||||
status=404,
|
||||
)
|
||||
|
||||
res = testclient.get("/admin/token")
|
||||
form = res.forms["next"]
|
||||
testclient.post(
|
||||
"/admin/token", {"csrf_token": form["csrf_token"], "page": "-1"}, status=404
|
||||
"/admin/token",
|
||||
{"csrf_token": form["csrf_token"].value, "page": "-1"},
|
||||
status=404,
|
||||
)
|
||||
|
||||
|
||||
|
|
|
@ -174,8 +174,7 @@ def test_first_login_form_error(testclient, slapd_connection, smtpd):
|
|||
|
||||
res = testclient.get("/firstlogin/temp", status=200)
|
||||
res.form["csrf_token"] = "invalid"
|
||||
res = res.form.submit(name="action", value="sendmail")
|
||||
assert ("error", "Could not send the password initialization link.") in res.flashes
|
||||
res = res.form.submit(name="action", value="sendmail", status=400)
|
||||
assert len(smtpd.messages) == 0
|
||||
u.delete()
|
||||
|
||||
|
|
|
@ -38,13 +38,13 @@ def test_group_list_bad_pages(testclient, logged_admin):
|
|||
res = testclient.get("/groups")
|
||||
form = res.forms["next"]
|
||||
testclient.post(
|
||||
"/groups", {"csrf_token": form["csrf_token"], "page": "2"}, status=404
|
||||
"/groups", {"csrf_token": form["csrf_token"].value, "page": "2"}, status=404
|
||||
)
|
||||
|
||||
res = testclient.get("/groups")
|
||||
form = res.forms["next"]
|
||||
testclient.post(
|
||||
"/groups", {"csrf_token": form["csrf_token"], "page": "-1"}, status=404
|
||||
"/groups", {"csrf_token": form["csrf_token"].value, "page": "-1"}, status=404
|
||||
)
|
||||
|
||||
|
||||
|
@ -156,7 +156,12 @@ def test_moderator_can_create_edit_and_delete_group(
|
|||
|
||||
|
||||
def test_cannot_create_already_existing_group(testclient, logged_moderator, foo_group):
|
||||
res = testclient.post("/groups/add", {"display_name": "foo"}, status=200)
|
||||
res = testclient.get("/groups/add")
|
||||
res = testclient.post(
|
||||
"/groups/add",
|
||||
{"csrf_token": res.form["csrf_token"].value, "display_name": "foo"},
|
||||
status=200,
|
||||
)
|
||||
|
||||
res.mustcontain("Group creation failed.")
|
||||
res.mustcontain("The group 'foo' already exists")
|
||||
|
@ -196,7 +201,7 @@ def test_invalid_form_request(testclient, logged_moderator, foo_group):
|
|||
def test_edition_failed(testclient, logged_moderator, foo_group):
|
||||
res = testclient.get("/groups/foo")
|
||||
form = res.forms["editgroupform"]
|
||||
form["csrf_token"] = "invalid"
|
||||
form["display_name"] = ""
|
||||
res = form.submit(name="action", value="edit")
|
||||
res.mustcontain("Group edition failed.")
|
||||
foo_group = Group.get(foo_group.id)
|
||||
|
@ -234,13 +239,15 @@ def test_user_list_bad_pages(testclient, logged_admin, foo_group):
|
|||
res = testclient.get("/groups/foo")
|
||||
form = res.forms["next"]
|
||||
testclient.post(
|
||||
"/groups/foo", {"csrf_token": form["csrf_token"], "page": "2"}, status=404
|
||||
"/groups/foo", {"csrf_token": form["csrf_token"].value, "page": "2"}, status=404
|
||||
)
|
||||
|
||||
res = testclient.get("/groups/foo")
|
||||
form = res.forms["next"]
|
||||
testclient.post(
|
||||
"/groups/foo", {"csrf_token": form["csrf_token"], "page": "-1"}, status=404
|
||||
"/groups/foo",
|
||||
{"csrf_token": form["csrf_token"].value, "page": "-1"},
|
||||
status=404,
|
||||
)
|
||||
|
||||
|
||||
|
|
|
@ -31,13 +31,13 @@ def test_user_list_bad_pages(testclient, logged_admin):
|
|||
res = testclient.get("/users")
|
||||
form = res.forms["next"]
|
||||
testclient.post(
|
||||
"/users", {"csrf_token": form["csrf_token"], "page": "2"}, status=404
|
||||
"/users", {"csrf_token": form["csrf_token"].value, "page": "2"}, status=404
|
||||
)
|
||||
|
||||
res = testclient.get("/users")
|
||||
form = res.forms["next"]
|
||||
testclient.post(
|
||||
"/users", {"csrf_token": form["csrf_token"], "page": "-1"}, status=404
|
||||
"/users", {"csrf_token": form["csrf_token"].value, "page": "-1"}, status=404
|
||||
)
|
||||
|
||||
|
||||
|
@ -169,7 +169,12 @@ def test_field_permissions_none(testclient, slapd_server, logged_user):
|
|||
assert "telephoneNumber" not in res.form.fields
|
||||
|
||||
testclient.post(
|
||||
"/profile/user", {"action": "edit", "telephoneNumber": "000-000-000"}
|
||||
"/profile/user",
|
||||
{
|
||||
"action": "edit",
|
||||
"telephoneNumber": "000-000-000",
|
||||
"csrf_token": res.form["csrf_token"].value,
|
||||
},
|
||||
)
|
||||
user = User.get(id=logged_user.id)
|
||||
assert user.telephoneNumber == ["555-666-777"]
|
||||
|
@ -189,7 +194,12 @@ def test_field_permissions_read(testclient, slapd_server, logged_user):
|
|||
assert "telephoneNumber" in res.form.fields
|
||||
|
||||
testclient.post(
|
||||
"/profile/user", {"action": "edit", "telephoneNumber": "000-000-000"}
|
||||
"/profile/user",
|
||||
{
|
||||
"action": "edit",
|
||||
"telephoneNumber": "000-000-000",
|
||||
"csrf_token": res.form["csrf_token"].value,
|
||||
},
|
||||
)
|
||||
user = User.get(id=logged_user.id)
|
||||
assert user.telephoneNumber == ["555-666-777"]
|
||||
|
@ -209,17 +219,30 @@ def test_field_permissions_write(testclient, slapd_server, logged_user):
|
|||
assert "telephoneNumber" in res.form.fields
|
||||
|
||||
testclient.post(
|
||||
"/profile/user", {"action": "edit", "telephoneNumber": "000-000-000"}
|
||||
"/profile/user",
|
||||
{
|
||||
"action": "edit",
|
||||
"telephoneNumber": "000-000-000",
|
||||
"csrf_token": res.form["csrf_token"].value,
|
||||
},
|
||||
)
|
||||
user = User.get(id=logged_user.id)
|
||||
assert user.telephoneNumber == ["000-000-000"]
|
||||
|
||||
|
||||
def test_simple_user_cannot_edit_other(testclient, logged_user):
|
||||
testclient.get("/profile/user", status=200)
|
||||
res = testclient.get("/profile/user", status=200)
|
||||
testclient.get("/profile/admin", status=403)
|
||||
testclient.post("/profile/admin", {"action": "edit"}, status=403)
|
||||
testclient.post("/profile/admin", {"action": "delete"}, status=403)
|
||||
testclient.post(
|
||||
"/profile/admin",
|
||||
{"action": "edit", "csrf_token": res.form["csrf_token"].value},
|
||||
status=403,
|
||||
)
|
||||
testclient.post(
|
||||
"/profile/admin",
|
||||
{"action": "delete", "csrf_token": res.form["csrf_token"].value},
|
||||
status=403,
|
||||
)
|
||||
testclient.get("/users", status=403)
|
||||
|
||||
|
||||
|
|
|
@ -176,9 +176,13 @@ def test_password_initialization_invalid_user(
|
|||
smtpd, testclient, slapd_connection, logged_admin
|
||||
):
|
||||
assert len(smtpd.messages) == 0
|
||||
res = testclient.get("/profile/admin/settings")
|
||||
testclient.post(
|
||||
"/profile/invalid/settings",
|
||||
{"action": "password-initialization-mail"},
|
||||
{
|
||||
"action": "password-initialization-mail",
|
||||
"csrf_token": res.form["csrf_token"].value,
|
||||
},
|
||||
status=404,
|
||||
)
|
||||
assert len(smtpd.messages) == 0
|
||||
|
@ -186,20 +190,33 @@ def test_password_initialization_invalid_user(
|
|||
|
||||
def test_password_reset_invalid_user(smtpd, testclient, slapd_connection, logged_admin):
|
||||
assert len(smtpd.messages) == 0
|
||||
res = testclient.get("/profile/admin/settings")
|
||||
testclient.post(
|
||||
"/profile/invalid/settings", {"action": "password-reset-mail"}, status=404
|
||||
"/profile/invalid/settings",
|
||||
{"action": "password-reset-mail", "csrf_token": res.form["csrf_token"].value},
|
||||
status=404,
|
||||
)
|
||||
assert len(smtpd.messages) == 0
|
||||
|
||||
|
||||
def test_delete_invalid_user(testclient, slapd_connection, logged_admin):
|
||||
testclient.post("/profile/invalid/settings", {"action": "delete"}, status=404)
|
||||
res = testclient.get("/profile/admin/settings")
|
||||
testclient.post(
|
||||
"/profile/invalid/settings",
|
||||
{"action": "delete", "csrf_token": res.form["csrf_token"].value},
|
||||
status=404,
|
||||
)
|
||||
|
||||
|
||||
def test_impersonate_invalid_user(testclient, slapd_connection, logged_admin):
|
||||
testclient.get("/impersonate/invalid", status=404)
|
||||
|
||||
|
||||
def test_invalid_form_request(testclient, logged_admin):
|
||||
res = testclient.get("/profile/admin/settings")
|
||||
res = res.form.submit(name="action", value="invalid-action", status=400)
|
||||
|
||||
|
||||
def test_password_reset_email(smtpd, testclient, slapd_connection, logged_admin):
|
||||
u = User(
|
||||
cn="Temp User",
|
||||
|
|
Loading…
Reference in a new issue