From 31423cde1a274b065f29e3c0c724b106d4efdba7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=89loi=20Rivard?= Date: Wed, 17 Apr 2024 11:25:01 +0200 Subject: [PATCH] refactor: split oidc.authorize endpoint in several smaller functions --- canaille/oidc/endpoints/oauth.py | 26 +++- tests/oidc/test_authorization_code_flow.py | 142 ------------------- tests/oidc/test_refresh_token.py | 150 +++++++++++++++++++++ 3 files changed, 169 insertions(+), 149 deletions(-) create mode 100644 tests/oidc/test_refresh_token.py diff --git a/canaille/oidc/endpoints/oauth.py b/canaille/oidc/endpoints/oauth.py index cff3be25..c3bce4b5 100644 --- a/canaille/oidc/endpoints/oauth.py +++ b/canaille/oidc/endpoints/oauth.py @@ -49,10 +49,24 @@ def authorize(): request.form.to_dict(flat=False), ) + client = models.Client.get(client_id=request.args["client_id"]) + user = current_user() + + if response := authorize_guards(client): + return response + + if response := authorize_login(user): + return response + + response = authorize_consent(client, user) + + return response + + +def authorize_guards(client): if "client_id" not in request.args: abort(400, "client_id parameter is missing.") - client = models.Client.get(client_id=request.args["client_id"]) if not client: abort(400, "Invalid client.") @@ -73,12 +87,8 @@ def authorize(): "error_description": f"prompt '{request.args['prompt'] }' value is not supported", }, 400 - user = current_user() - requested_scopes = request.args.get("scope", "").split(" ") - allowed_scopes = client.get_allowed_scope(requested_scopes).split(" ") - - # LOGIN +def authorize_login(user): if not user: if request.args.get("prompt") == "none": return jsonify({"error": "login_required"}) @@ -95,8 +105,10 @@ def authorize(): 403, "The user does not have the permission to achieve OIDC authentication." ) - # CONSENT +def authorize_consent(client, user): + requested_scopes = request.args.get("scope", "").split(" ") + allowed_scopes = client.get_allowed_scope(requested_scopes).split(" ") consents = models.Consent.query( client=client, subject=user, diff --git a/tests/oidc/test_authorization_code_flow.py b/tests/oidc/test_authorization_code_flow.py index fd1500c8..41f40ce4 100644 --- a/tests/oidc/test_authorization_code_flow.py +++ b/tests/oidc/test_authorization_code_flow.py @@ -314,85 +314,6 @@ def test_deny(testclient, logged_user, client): assert not models.Consent.query() -def test_refresh_token(testclient, user, client): - assert not models.Consent.query() - - with freezegun.freeze_time("2020-01-01 01:00:00"): - res = testclient.get( - "/oauth/authorize", - params=dict( - response_type="code", - client_id=client.client_id, - scope="openid profile", - nonce="somenonce", - ), - ) - res = res.follow() - - res.form["login"] = "user" - res = res.form.submit(name="answer", value="accept") - res = res.follow() - res.form["password"] = "correct horse battery staple" - res = res.form.submit(name="answer", value="accept") - res = res.follow() - res = res.form.submit(name="answer", value="accept") - - assert res.location.startswith(client.redirect_uris[0]) - params = parse_qs(urlsplit(res.location).query) - code = params["code"][0] - authcode = models.AuthorizationCode.get(code=code) - assert authcode is not None - - consents = models.Consent.query(client=client, subject=user) - assert "profile" in consents[0].scope - - with freezegun.freeze_time("2020-01-01 00:01:00"): - res = testclient.post( - "/oauth/token", - params=dict( - grant_type="authorization_code", - code=code, - scope="openid profile", - redirect_uri=client.redirect_uris[0], - ), - headers={"Authorization": f"Basic {client_credentials(client)}"}, - status=200, - ) - access_token = res.json["access_token"] - old_token = models.Token.get(access_token=access_token) - assert old_token is not None - assert not old_token.revokation_date - - with freezegun.freeze_time("2020-01-01 00:02:00"): - res = testclient.post( - "/oauth/token", - params=dict( - grant_type="refresh_token", - refresh_token=res.json["refresh_token"], - ), - headers={"Authorization": f"Basic {client_credentials(client)}"}, - status=200, - ) - access_token = res.json["access_token"] - new_token = models.Token.get(access_token=access_token) - assert new_token is not None - assert old_token.access_token != new_token.access_token - - old_token.reload() - assert old_token.revokation_date - - with freezegun.freeze_time("2020-01-01 00:03:00"): - res = testclient.get( - "/oauth/userinfo", - headers={"Authorization": f"Bearer {access_token}"}, - status=200, - ) - assert res.json["name"] == "John (johnny) Doe" - - for consent in consents: - consent.delete() - - def test_code_challenge(testclient, logged_user, client): assert not models.Consent.query() @@ -806,69 +727,6 @@ def test_code_with_invalid_user(testclient, admin, client): authcode.delete() -def test_refresh_token_with_invalid_user(testclient, client): - user = models.User( - formatted_name="John Doe", - family_name="Doe", - user_name="temp", - emails=["temp@temp.com"], - password="correct horse battery staple", - ) - user.save() - - res = testclient.get( - "/oauth/authorize", - params=dict( - response_type="code", - client_id=client.client_id, - scope="openid profile", - nonce="somenonce", - ), - ).follow() - - res.form["login"] = "temp" - res = res.form.submit(name="answer", value="accept", status=302).follow() - - res.form["password"] = "correct horse battery staple" - res = res.form.submit(name="answer", value="accept", status=302).follow() - res = res.form.submit(name="answer", value="accept", status=302) - - params = parse_qs(urlsplit(res.location).query) - code = params["code"][0] - models.AuthorizationCode.get(code=code) - - res = testclient.post( - "/oauth/token", - params=dict( - grant_type="authorization_code", - code=code, - scope="openid profile", - redirect_uri=client.redirect_uris[0], - ), - headers={"Authorization": f"Basic {client_credentials(client)}"}, - status=200, - ) - - refresh_token = res.json["refresh_token"] - access_token = res.json["access_token"] - user.delete() - - res = testclient.post( - "/oauth/token", - params=dict( - grant_type="refresh_token", - refresh_token=refresh_token, - ), - headers={"Authorization": f"Basic {client_credentials(client)}"}, - status=400, - ) - assert res.json == { - "error": "invalid_request", - "error_description": 'There is no "user" for this token.', - } - models.Token.get(access_token=access_token).delete() - - def test_token_default_expiration_date(testclient, logged_user, client, keypair): res = testclient.get( "/oauth/authorize", diff --git a/tests/oidc/test_refresh_token.py b/tests/oidc/test_refresh_token.py new file mode 100644 index 00000000..855c41b8 --- /dev/null +++ b/tests/oidc/test_refresh_token.py @@ -0,0 +1,150 @@ +from urllib.parse import parse_qs +from urllib.parse import urlsplit + +import freezegun + +from canaille.app import models + +from . import client_credentials + + +def test_refresh_token(testclient, user, client): + assert not models.Consent.query() + + with freezegun.freeze_time("2020-01-01 01:00:00"): + res = testclient.get( + "/oauth/authorize", + params=dict( + response_type="code", + client_id=client.client_id, + scope="openid profile", + nonce="somenonce", + ), + ) + res = res.follow() + + res.form["login"] = "user" + res = res.form.submit(name="answer", value="accept") + res = res.follow() + res.form["password"] = "correct horse battery staple" + res = res.form.submit(name="answer", value="accept") + res = res.follow() + res = res.form.submit(name="answer", value="accept") + + assert res.location.startswith(client.redirect_uris[0]) + params = parse_qs(urlsplit(res.location).query) + code = params["code"][0] + authcode = models.AuthorizationCode.get(code=code) + assert authcode is not None + + consents = models.Consent.query(client=client, subject=user) + assert "profile" in consents[0].scope + + with freezegun.freeze_time("2020-01-01 00:01:00"): + res = testclient.post( + "/oauth/token", + params=dict( + grant_type="authorization_code", + code=code, + scope="openid profile", + redirect_uri=client.redirect_uris[0], + ), + headers={"Authorization": f"Basic {client_credentials(client)}"}, + status=200, + ) + access_token = res.json["access_token"] + old_token = models.Token.get(access_token=access_token) + assert old_token is not None + assert not old_token.revokation_date + + with freezegun.freeze_time("2020-01-01 00:02:00"): + res = testclient.post( + "/oauth/token", + params=dict( + grant_type="refresh_token", + refresh_token=res.json["refresh_token"], + ), + headers={"Authorization": f"Basic {client_credentials(client)}"}, + status=200, + ) + access_token = res.json["access_token"] + new_token = models.Token.get(access_token=access_token) + assert new_token is not None + assert old_token.access_token != new_token.access_token + + old_token.reload() + assert old_token.revokation_date + + with freezegun.freeze_time("2020-01-01 00:03:00"): + res = testclient.get( + "/oauth/userinfo", + headers={"Authorization": f"Bearer {access_token}"}, + status=200, + ) + assert res.json["name"] == "John (johnny) Doe" + + for consent in consents: + consent.delete() + + +def test_refresh_token_with_invalid_user(testclient, client): + user = models.User( + formatted_name="John Doe", + family_name="Doe", + user_name="temp", + emails=["temp@temp.com"], + password="correct horse battery staple", + ) + user.save() + + res = testclient.get( + "/oauth/authorize", + params=dict( + response_type="code", + client_id=client.client_id, + scope="openid profile", + nonce="somenonce", + ), + ).follow() + + res.form["login"] = "temp" + res = res.form.submit(name="answer", value="accept", status=302).follow() + + res.form["password"] = "correct horse battery staple" + res = res.form.submit(name="answer", value="accept", status=302).follow() + res = res.form.submit(name="answer", value="accept", status=302) + + params = parse_qs(urlsplit(res.location).query) + code = params["code"][0] + models.AuthorizationCode.get(code=code) + + res = testclient.post( + "/oauth/token", + params=dict( + grant_type="authorization_code", + code=code, + scope="openid profile", + redirect_uri=client.redirect_uris[0], + ), + headers={"Authorization": f"Basic {client_credentials(client)}"}, + status=200, + ) + + refresh_token = res.json["refresh_token"] + access_token = res.json["access_token"] + user.delete() + + res = testclient.post( + "/oauth/token", + params=dict( + grant_type="refresh_token", + refresh_token=refresh_token, + ), + headers={"Authorization": f"Basic {client_credentials(client)}"}, + status=400, + ) + assert res.json == { + "error": "invalid_request", + "error_description": 'There is no "user" for this token.', + } + models.Token.get(access_token=access_token).delete()