diff --git a/canaille/oidc/oauth.py b/canaille/oidc/oauth.py index 180deb82..ffa31052 100644 --- a/canaille/oidc/oauth.py +++ b/canaille/oidc/oauth.py @@ -287,7 +287,8 @@ class RefreshTokenGrant(_RefreshTokenGrant): return token[0] def authenticate_user(self, credential): - return credential.subject + if credential.subject and not credential.subject.locked: + return credential.subject def revoke_old_credential(self, credential): credential.revokation_date = datetime.datetime.now(datetime.timezone.utc) diff --git a/tests/oidc/test_refresh_token.py b/tests/oidc/test_refresh_token.py index 855c41b8..f8b2a1d1 100644 --- a/tests/oidc/test_refresh_token.py +++ b/tests/oidc/test_refresh_token.py @@ -1,3 +1,4 @@ +import datetime from urllib.parse import parse_qs from urllib.parse import urlsplit @@ -148,3 +149,55 @@ def test_refresh_token_with_invalid_user(testclient, client): "error_description": 'There is no "user" for this token.', } models.Token.get(access_token=access_token).delete() + + +def test_cannot_refresh_token_for_locked_users(testclient, user, client): + """Canaille should not issue new tokens for locked users.""" + res = testclient.get( + "/oauth/authorize", + params=dict( + response_type="code", + client_id=client.client_id, + scope="openid profile", + nonce="somenonce", + ), + ) + res = res.follow() + + res.form["login"] = "user" + res = res.form.submit(name="answer", value="accept") + res = res.follow() + res.form["password"] = "correct horse battery staple" + res = res.form.submit(name="answer", value="accept") + res = res.follow() + res = res.form.submit(name="answer", value="accept") + + assert res.location.startswith(client.redirect_uris[0]) + params = parse_qs(urlsplit(res.location).query) + code = params["code"][0] + + res = testclient.post( + "/oauth/token", + params=dict( + grant_type="authorization_code", + code=code, + scope="openid profile", + redirect_uri=client.redirect_uris[0], + ), + headers={"Authorization": f"Basic {client_credentials(client)}"}, + status=200, + ) + + user.lock_date = datetime.datetime.now(datetime.timezone.utc) + user.save() + + res = testclient.post( + "/oauth/token", + params=dict( + grant_type="refresh_token", + refresh_token=res.json["refresh_token"], + ), + headers={"Authorization": f"Basic {client_credentials(client)}"}, + status=400, + ) + assert "access_token" not in res.json