forked from Github-Mirrors/canaille
fix: locked users cannot use refresh tokens
This commit is contained in:
parent
31423cde1a
commit
fc8c0da912
2 changed files with 55 additions and 1 deletions
|
@ -287,7 +287,8 @@ class RefreshTokenGrant(_RefreshTokenGrant):
|
||||||
return token[0]
|
return token[0]
|
||||||
|
|
||||||
def authenticate_user(self, credential):
|
def authenticate_user(self, credential):
|
||||||
return credential.subject
|
if credential.subject and not credential.subject.locked:
|
||||||
|
return credential.subject
|
||||||
|
|
||||||
def revoke_old_credential(self, credential):
|
def revoke_old_credential(self, credential):
|
||||||
credential.revokation_date = datetime.datetime.now(datetime.timezone.utc)
|
credential.revokation_date = datetime.datetime.now(datetime.timezone.utc)
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import datetime
|
||||||
from urllib.parse import parse_qs
|
from urllib.parse import parse_qs
|
||||||
from urllib.parse import urlsplit
|
from urllib.parse import urlsplit
|
||||||
|
|
||||||
|
@ -148,3 +149,55 @@ def test_refresh_token_with_invalid_user(testclient, client):
|
||||||
"error_description": 'There is no "user" for this token.',
|
"error_description": 'There is no "user" for this token.',
|
||||||
}
|
}
|
||||||
models.Token.get(access_token=access_token).delete()
|
models.Token.get(access_token=access_token).delete()
|
||||||
|
|
||||||
|
|
||||||
|
def test_cannot_refresh_token_for_locked_users(testclient, user, client):
|
||||||
|
"""Canaille should not issue new tokens for locked users."""
|
||||||
|
res = testclient.get(
|
||||||
|
"/oauth/authorize",
|
||||||
|
params=dict(
|
||||||
|
response_type="code",
|
||||||
|
client_id=client.client_id,
|
||||||
|
scope="openid profile",
|
||||||
|
nonce="somenonce",
|
||||||
|
),
|
||||||
|
)
|
||||||
|
res = res.follow()
|
||||||
|
|
||||||
|
res.form["login"] = "user"
|
||||||
|
res = res.form.submit(name="answer", value="accept")
|
||||||
|
res = res.follow()
|
||||||
|
res.form["password"] = "correct horse battery staple"
|
||||||
|
res = res.form.submit(name="answer", value="accept")
|
||||||
|
res = res.follow()
|
||||||
|
res = res.form.submit(name="answer", value="accept")
|
||||||
|
|
||||||
|
assert res.location.startswith(client.redirect_uris[0])
|
||||||
|
params = parse_qs(urlsplit(res.location).query)
|
||||||
|
code = params["code"][0]
|
||||||
|
|
||||||
|
res = testclient.post(
|
||||||
|
"/oauth/token",
|
||||||
|
params=dict(
|
||||||
|
grant_type="authorization_code",
|
||||||
|
code=code,
|
||||||
|
scope="openid profile",
|
||||||
|
redirect_uri=client.redirect_uris[0],
|
||||||
|
),
|
||||||
|
headers={"Authorization": f"Basic {client_credentials(client)}"},
|
||||||
|
status=200,
|
||||||
|
)
|
||||||
|
|
||||||
|
user.lock_date = datetime.datetime.now(datetime.timezone.utc)
|
||||||
|
user.save()
|
||||||
|
|
||||||
|
res = testclient.post(
|
||||||
|
"/oauth/token",
|
||||||
|
params=dict(
|
||||||
|
grant_type="refresh_token",
|
||||||
|
refresh_token=res.json["refresh_token"],
|
||||||
|
),
|
||||||
|
headers={"Authorization": f"Basic {client_credentials(client)}"},
|
||||||
|
status=400,
|
||||||
|
)
|
||||||
|
assert "access_token" not in res.json
|
||||||
|
|
Loading…
Reference in a new issue