forked from Github-Mirrors/canaille
Claims are configurable
This commit is contained in:
parent
eedb578ab0
commit
60d30e258b
5 changed files with 83 additions and 41 deletions
|
@ -19,3 +19,22 @@ USER_FILTER = "(|(uid={login})(cn={login}))"
|
|||
# Filter to match admin users. If your server has memberof
|
||||
# you can filter against group membership
|
||||
ADMIN_FILTER = "cn=Jane Doe"
|
||||
|
||||
[JWT]
|
||||
KEY = "secret-key"
|
||||
ALG = "HS256"
|
||||
ISS = "http://mydomain.tld"
|
||||
EXP = 3600
|
||||
MAPPING =
|
||||
SUB = "uid"
|
||||
NAME = "cn"
|
||||
PHONE_NUMBER = "telephoneNumber"
|
||||
# EXAMPLE OF MAPPING FOR inetOrgPerson
|
||||
# PHONE_NUMBER = "telephoneNumber"
|
||||
# EMAIL = "mail"
|
||||
# GIVEN_NAME = "givenName"
|
||||
# PREFERRED_USERNAME = "displayName"
|
||||
# FAMILIY_NAME = "
|
||||
# LOCALE = "preferredLanguage"
|
||||
# PICTURE = "photo"
|
||||
# ADDRESS = "postalAddress"
|
||||
|
|
|
@ -88,6 +88,17 @@ def app(slapd_server):
|
|||
"BIND_PW": slapd_server.root_pw,
|
||||
"USER_FILTER": "(|(uid={login})(cn={login}))",
|
||||
},
|
||||
"JWT": {
|
||||
"KEY": "secret-key",
|
||||
"ALG": "HS256",
|
||||
"ISS": "http://mydomain.tld",
|
||||
"EXP": 3600,
|
||||
"MAPPING": {
|
||||
"SUB": "uid",
|
||||
"NAME": "cn",
|
||||
"PHONE_NUMBER": "telephoneNumber",
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
return app
|
||||
|
|
|
@ -70,10 +70,9 @@ def test_oidc_hybrid(testclient, slapd_connection, logged_user, client):
|
|||
|
||||
id_token = params["id_token"][0]
|
||||
claims = jwt.decode(id_token, "secret-key")
|
||||
assert logged_user.uid[0] == claims['sub']
|
||||
assert logged_user.cn[0] == claims['name']
|
||||
assert "toto@yolo.com" == claims['email']
|
||||
assert client.oauthClientID == claims['aud']
|
||||
assert logged_user.uid[0] == claims["sub"]
|
||||
assert logged_user.cn[0] == claims["name"]
|
||||
assert [client.oauthClientID] == claims["aud"]
|
||||
|
||||
res = testclient.get("/api/me", headers={"Authorization": f"Bearer {access_token}"})
|
||||
assert 200 == res.status_code
|
||||
|
|
|
@ -84,9 +84,9 @@ def test_oidc_implicit(testclient, slapd_connection, user, client):
|
|||
|
||||
id_token = params["id_token"][0]
|
||||
claims = jwt.decode(id_token, "secret-key")
|
||||
assert user.uid[0] == claims['sub']
|
||||
assert user.sn[0] == claims['name']
|
||||
assert client.oauthClientID == claims['aud']
|
||||
assert user.uid[0] == claims["sub"]
|
||||
assert user.cn[0] == claims["name"]
|
||||
assert [client.oauthClientID] == claims["aud"]
|
||||
|
||||
res = testclient.get("/api/me", headers={"Authorization": f"Bearer {access_token}"})
|
||||
assert (200, "application/json") == (res.status_code, res.content_type)
|
||||
|
|
|
@ -14,46 +14,59 @@ from authlib.oidc.core.grants import (
|
|||
OpenIDHybridGrant as _OpenIDHybridGrant,
|
||||
)
|
||||
from authlib.oidc.core import UserInfo
|
||||
from flask import current_app
|
||||
from werkzeug.security import gen_salt
|
||||
from .models import Client, AuthorizationCode, Token, User
|
||||
|
||||
DUMMY_JWT_CONFIG = {
|
||||
"key": "secret-key",
|
||||
"alg": "HS256",
|
||||
"iss": "https://authlib.org",
|
||||
"exp": 3600,
|
||||
}
|
||||
|
||||
|
||||
def exists_nonce(nonce, req):
|
||||
exists = AuthorizationCode.filter(oauthClientID=req.client_id, oauthNonce=nonce)
|
||||
return bool(exists)
|
||||
|
||||
|
||||
def get_jwt_config(grant):
|
||||
return {
|
||||
"key": current_app.config["JWT"]["KEY"],
|
||||
"alg": current_app.config["JWT"]["ALG"],
|
||||
"iss": current_app.config["JWT"]["ISS"],
|
||||
"exp": current_app.config["JWT"]["EXP"],
|
||||
}
|
||||
|
||||
|
||||
def generate_user_info(user, scope):
|
||||
return UserInfo(
|
||||
sub=user.uid[0],
|
||||
name=user.sn[0],
|
||||
email="toto@yolo.com",
|
||||
phone_number=user.telephoneNumber,
|
||||
# given_name
|
||||
# family_name,
|
||||
# middle_name,
|
||||
# nickname,
|
||||
# preferred_username,
|
||||
# profile,
|
||||
# picture,
|
||||
# website,
|
||||
# email,
|
||||
# email_verified,
|
||||
# gender,
|
||||
# birthdate,
|
||||
# zoneinfo,
|
||||
# locale,
|
||||
# phone_number_verified,
|
||||
# address,
|
||||
# updated_at,
|
||||
)
|
||||
fields = ["sub"]
|
||||
if "profile" in scope:
|
||||
fields += [
|
||||
"name",
|
||||
"family_name",
|
||||
"given_name",
|
||||
"nickname",
|
||||
"preferred_username",
|
||||
"profile",
|
||||
"picture",
|
||||
"website",
|
||||
"gender",
|
||||
"birthdate",
|
||||
"zoneinfo",
|
||||
"locale",
|
||||
"updated_at",
|
||||
]
|
||||
if "email" in scope:
|
||||
fields += ["email", "email_verified"]
|
||||
if "address" in scope:
|
||||
fields += ["address"]
|
||||
if "phone" in scope:
|
||||
fields += ["phone_number", "phone_number_verified"]
|
||||
|
||||
data = {}
|
||||
for field in fields:
|
||||
ldap_field_match = current_app.config["JWT"]["MAPPING"].get(field.upper())
|
||||
if ldap_field_match and getattr(user, ldap_field_match, None):
|
||||
data[field] = getattr(user, ldap_field_match)
|
||||
if isinstance(data[field], list):
|
||||
data[field] = data[field][0]
|
||||
|
||||
return UserInfo(**data)
|
||||
|
||||
|
||||
def save_authorization_code(code, request):
|
||||
|
@ -96,7 +109,7 @@ class OpenIDCode(_OpenIDCode):
|
|||
return exists_nonce(nonce, request)
|
||||
|
||||
def get_jwt_config(self, grant):
|
||||
return DUMMY_JWT_CONFIG
|
||||
return get_jwt_config(grant)
|
||||
|
||||
def generate_user_info(self, user, scope):
|
||||
return generate_user_info(user, scope)
|
||||
|
@ -128,7 +141,7 @@ class OpenIDImplicitGrant(_OpenIDImplicitGrant):
|
|||
return exists_nonce(nonce, request)
|
||||
|
||||
def get_jwt_config(self, grant=None):
|
||||
return DUMMY_JWT_CONFIG
|
||||
return get_jwt_config(grant)
|
||||
|
||||
def generate_user_info(self, user, scope):
|
||||
user = User.get(user)
|
||||
|
@ -146,8 +159,8 @@ class OpenIDHybridGrant(_OpenIDHybridGrant):
|
|||
def exists_nonce(self, nonce, request):
|
||||
return exists_nonce(nonce, request)
|
||||
|
||||
def get_jwt_config(self):
|
||||
return DUMMY_JWT_CONFIG
|
||||
def get_jwt_config(self, grant=None):
|
||||
return get_jwt_config(grant)
|
||||
|
||||
def generate_user_info(self, user, scope):
|
||||
user = User.get(user)
|
||||
|
|
Loading…
Reference in a new issue