canaille-globuzma/canaille/oidc/oauth.py

451 lines
15 KiB
Python
Raw Normal View History

2020-09-17 10:01:21 +00:00
import datetime
2022-10-06 11:32:41 +00:00
from authlib.integrations.flask_oauth2 import AuthorizationServer
from authlib.integrations.flask_oauth2 import ResourceProtector
from authlib.oauth2.rfc6749.grants import (
AuthorizationCodeGrant as _AuthorizationCodeGrant,
)
from authlib.oauth2.rfc6749.grants import ClientCredentialsGrant
from authlib.oauth2.rfc6749.grants import ImplicitGrant
from authlib.oauth2.rfc6749.grants import RefreshTokenGrant as _RefreshTokenGrant
from authlib.oauth2.rfc6749.grants import (
ResourceOwnerPasswordCredentialsGrant as _ResourceOwnerPasswordCredentialsGrant,
)
from authlib.oauth2.rfc6750 import BearerTokenValidator as _BearerTokenValidator
from authlib.oauth2.rfc7009 import RevocationEndpoint as _RevocationEndpoint
from authlib.oauth2.rfc7591 import (
ClientRegistrationEndpoint as _ClientRegistrationEndpoint,
)
from authlib.oauth2.rfc7592 import (
ClientConfigurationEndpoint as _ClientConfigurationEndpoint,
)
2022-10-06 11:32:41 +00:00
from authlib.oauth2.rfc7636 import CodeChallenge as _CodeChallenge
from authlib.oauth2.rfc7662 import IntrospectionEndpoint as _IntrospectionEndpoint
from authlib.oidc.core import UserInfo
from authlib.oidc.core.grants import OpenIDCode as _OpenIDCode
from authlib.oidc.core.grants import OpenIDHybridGrant as _OpenIDHybridGrant
from authlib.oidc.core.grants import OpenIDImplicitGrant as _OpenIDImplicitGrant
from authlib.oidc.core.grants.util import generate_id_token
from canaille.app import models
2021-12-20 22:57:27 +00:00
from flask import current_app
from flask import request
2022-10-06 11:32:41 +00:00
from werkzeug.security import gen_salt
DEFAULT_JWT_KTY = "RSA"
DEFAULT_JWT_ALG = "RS256"
DEFAULT_JWT_EXP = 3600
AUTHORIZATION_CODE_LIFETIME = 84400
DEFAULT_JWT_MAPPING = {
"SUB": "{{ user.user_name[0] }}",
"NAME": "{% if user.formatted_name %}{{ user.formatted_name[0] }}{% endif %}",
"PHONE_NUMBER": "{% if user.phone_number %}{{ user.phone_number[0] }}{% endif %}",
"EMAIL": "{% if user.email %}{{ user.email[0] }}{% endif %}",
"GIVEN_NAME": "{% if user.given_name %}{{ user.given_name[0] }}{% endif %}",
"FAMILY_NAME": "{% if user.family_name %}{{ user.family_name[0] }}{% endif %}",
"PREFERRED_USERNAME": "{% if user.display_name %}{{ user.display_name }}{% endif %}",
"LOCALE": "{% if user.preferred_language %}{{ user.preferred_language }}{% endif %}",
"ADDRESS": "{% if user.formatted_address %}{{ user.formatted_address[0] }}{% endif %}",
"PICTURE": "{% if user.photo %}{{ url_for('account.photo', user_name=user.user_name[0], field='photo', _external=True) }}{% endif %}",
"WEBSITE": "{% if user.profile_url %}{{ user.profile_url[0] }}{% endif %}",
}
2022-10-06 11:32:41 +00:00
def exists_nonce(nonce, req):
client = models.Client.get(id=req.client_id)
exists = models.AuthorizationCode.query(client=client, nonce=nonce)
2022-10-06 11:32:41 +00:00
return bool(exists)
2021-09-28 10:06:41 +00:00
2020-10-26 18:09:38 +00:00
def get_issuer():
if current_app.config["OIDC"]["JWT"].get("ISS"):
return current_app.config["OIDC"]["JWT"].get("ISS")
if current_app.config.get("SERVER_NAME"):
return current_app.config.get("SERVER_NAME")
return request.url_root
2022-10-06 11:32:41 +00:00
def get_jwt_config(grant):
with open(current_app.config["OIDC"]["JWT"]["PRIVATE_KEY"]) as pk:
2022-10-06 11:32:41 +00:00
return {
"key": pk.read(),
"alg": current_app.config["OIDC"]["JWT"].get("ALG", DEFAULT_JWT_ALG),
"iss": get_issuer(),
"exp": current_app.config["OIDC"]["JWT"].get("EXP", DEFAULT_JWT_EXP),
2022-10-06 11:32:41 +00:00
}
2022-12-24 00:44:16 +00:00
def claims_from_scope(scope):
claims = {"sub"}
2022-10-06 11:32:41 +00:00
if "profile" in scope:
2022-12-24 00:44:16 +00:00
claims |= {
2022-10-06 11:32:41 +00:00
"name",
"family_name",
"given_name",
"nickname",
"preferred_username",
"profile",
"picture",
"website",
"gender",
"birthdate",
"zoneinfo",
"locale",
"updated_at",
2022-12-24 00:44:16 +00:00
}
2022-10-06 11:32:41 +00:00
if "email" in scope:
2022-12-24 00:44:16 +00:00
claims |= {"email", "email_verified"}
2022-10-06 11:32:41 +00:00
if "address" in scope:
2022-12-24 00:44:16 +00:00
claims |= {"address"}
2022-10-06 11:32:41 +00:00
if "phone" in scope:
2022-12-24 00:44:16 +00:00
claims |= {"phone_number", "phone_number_verified"}
2022-10-06 11:32:41 +00:00
if "groups" in scope:
2022-12-24 00:44:16 +00:00
claims |= {"groups"}
return claims
2022-10-06 11:32:41 +00:00
2022-12-24 00:44:16 +00:00
def generate_user_info(user, scope):
claims = claims_from_scope(scope)
2022-10-06 11:32:41 +00:00
data = generate_user_claims(user, claims)
return UserInfo(**data)
def generate_user_claims(user, claims, jwt_mapping_config=None):
jwt_mapping_config = {
**DEFAULT_JWT_MAPPING,
**current_app.config["OIDC"]["JWT"].get("MAPPING", {}),
**(jwt_mapping_config or {}),
}
2022-10-06 11:32:41 +00:00
data = {}
for claim in claims:
raw_claim = jwt_mapping_config.get(claim.upper())
if raw_claim:
formatted_claim = current_app.jinja_env.from_string(raw_claim).render(
user=user
)
if formatted_claim:
# According to https://openid.net/specs/openid-connect-core-1_0.html#UserInfoResponse
# it's better to not insert a null or empty string value
data[claim] = formatted_claim
if claim == "groups":
data[claim] = [group.display_name for group in user.groups]
2022-10-06 11:32:41 +00:00
return data
def save_authorization_code(code, request):
nonce = request.data.get("nonce")
2023-03-17 23:38:56 +00:00
now = datetime.datetime.now(datetime.timezone.utc)
2022-10-06 11:32:41 +00:00
scope = request.client.get_allowed_scope(request.scope)
code = models.AuthorizationCode(
2022-10-06 11:32:41 +00:00
authorization_code_id=gen_salt(48),
code=code,
subject=request.user,
client=request.client,
2022-10-06 11:32:41 +00:00
redirect_uri=request.redirect_uri or request.client.redirect_uris[0],
scope=scope,
nonce=nonce,
issue_date=now,
lifetime=AUTHORIZATION_CODE_LIFETIME,
2022-10-06 11:32:41 +00:00
challenge=request.data.get("code_challenge"),
challenge_method=request.data.get("code_challenge_method"),
2022-07-07 14:05:34 +00:00
)
2022-10-06 11:32:41 +00:00
code.save()
return code.code
2020-09-17 08:00:39 +00:00
2020-08-17 15:49:49 +00:00
2022-10-06 11:32:41 +00:00
class AuthorizationCodeGrant(_AuthorizationCodeGrant):
TOKEN_ENDPOINT_AUTH_METHODS = ["client_secret_basic", "client_secret_post", "none"]
2020-09-17 08:00:39 +00:00
2022-10-06 11:32:41 +00:00
def save_authorization_code(self, code, request):
return save_authorization_code(code, request)
2020-08-17 15:49:49 +00:00
2022-10-06 11:32:41 +00:00
def query_authorization_code(self, code, client):
item = models.AuthorizationCode.query(code=code, client=client)
2022-10-06 11:32:41 +00:00
if item and not item[0].is_expired():
return item[0]
2020-08-17 15:49:49 +00:00
2022-10-06 11:32:41 +00:00
def delete_authorization_code(self, authorization_code):
authorization_code.delete()
2020-08-17 15:49:49 +00:00
2022-10-06 11:32:41 +00:00
def authenticate_user(self, authorization_code):
return authorization_code.subject
2021-12-06 23:07:32 +00:00
2020-09-17 08:00:39 +00:00
2022-10-06 11:32:41 +00:00
class OpenIDCode(_OpenIDCode):
def exists_nonce(self, nonce, request):
return exists_nonce(nonce, request)
2020-08-24 12:44:32 +00:00
2022-10-06 11:32:41 +00:00
def get_jwt_config(self, grant):
return get_jwt_config(grant)
2020-08-24 12:44:32 +00:00
2022-10-06 11:32:41 +00:00
def generate_user_info(self, user, scope):
return generate_user_info(user, scope)
2020-08-24 13:56:30 +00:00
2022-10-06 11:32:41 +00:00
def get_audiences(self, request):
client = request.client
return [aud.client_id for aud in client.audience]
2020-08-24 13:56:30 +00:00
2020-08-26 09:54:35 +00:00
2022-10-06 11:32:41 +00:00
class PasswordGrant(_ResourceOwnerPasswordCredentialsGrant):
TOKEN_ENDPOINT_AUTH_METHODS = ["client_secret_basic", "client_secret_post", "none"]
2020-08-26 09:54:35 +00:00
2022-10-06 11:32:41 +00:00
def authenticate_user(self, username, password):
user = models.User.get_from_login(username)
2022-11-01 11:25:21 +00:00
if not user:
return None
2023-05-17 10:48:14 +00:00
2022-11-01 11:25:21 +00:00
success, _ = user.check_password(password)
if not success:
2023-05-17 10:48:14 +00:00
return None
return user
2020-09-25 09:26:41 +00:00
2022-10-06 11:32:41 +00:00
class RefreshTokenGrant(_RefreshTokenGrant):
def authenticate_refresh_token(self, refresh_token):
token = models.Token.query(refresh_token=refresh_token)
2022-10-06 11:32:41 +00:00
if token and token[0].is_refresh_token_active():
return token[0]
2022-05-20 12:07:56 +00:00
2022-10-06 11:32:41 +00:00
def authenticate_user(self, credential):
return credential.subject
2022-05-20 12:07:56 +00:00
2022-10-06 11:32:41 +00:00
def revoke_old_credential(self, credential):
2023-03-17 23:38:56 +00:00
credential.revokation_date = datetime.datetime.now(datetime.timezone.utc)
2022-10-06 11:32:41 +00:00
credential.save()
2022-05-20 12:07:56 +00:00
2022-10-06 11:32:41 +00:00
class OpenIDImplicitGrant(_OpenIDImplicitGrant):
def exists_nonce(self, nonce, request):
return exists_nonce(nonce, request)
2022-05-20 12:07:56 +00:00
2022-10-06 11:32:41 +00:00
def get_jwt_config(self, grant=None):
return get_jwt_config(grant)
2022-05-20 12:07:56 +00:00
2022-10-06 11:32:41 +00:00
def generate_user_info(self, user, scope):
return generate_user_info(user, scope)
2022-05-20 12:07:56 +00:00
2022-10-06 11:32:41 +00:00
def get_audiences(self, request):
client = request.client
return [aud.client_id for aud in client.audience]
2022-05-20 12:07:56 +00:00
2022-10-06 11:32:41 +00:00
class OpenIDHybridGrant(_OpenIDHybridGrant):
def save_authorization_code(self, code, request):
return save_authorization_code(code, request)
2022-05-20 12:07:56 +00:00
2022-10-06 11:32:41 +00:00
def exists_nonce(self, nonce, request):
return exists_nonce(nonce, request)
2022-05-20 12:07:56 +00:00
2022-10-06 11:32:41 +00:00
def get_jwt_config(self, grant=None):
return get_jwt_config(grant)
2022-05-20 12:07:56 +00:00
2022-10-06 11:32:41 +00:00
def generate_user_info(self, user, scope):
return generate_user_info(user, scope)
2022-05-20 12:07:56 +00:00
2022-10-06 11:32:41 +00:00
def get_audiences(self, request):
client = request.client
return [aud.client_id for aud in client.audience]
2022-10-06 11:32:41 +00:00
def query_client(client_id):
return models.Client.get(client_id=client_id)
2022-05-20 12:07:56 +00:00
2022-10-06 11:32:41 +00:00
def save_token(token, request):
2023-03-17 23:38:56 +00:00
now = datetime.datetime.now(datetime.timezone.utc)
t = models.Token(
2022-10-06 11:32:41 +00:00
token_id=gen_salt(48),
type=token["token_type"],
access_token=token["access_token"],
issue_date=now,
lifetime=token["expires_in"],
2022-10-06 11:32:41 +00:00
scope=token["scope"],
client=request.client,
2022-10-06 11:32:41 +00:00
refresh_token=token.get("refresh_token"),
subject=request.user,
audience=request.client.audience,
)
t.save()
class BearerTokenValidator(_BearerTokenValidator):
def authenticate_token(self, token_string):
return models.Token.get(access_token=token_string)
2022-10-06 11:32:41 +00:00
def query_token(token, token_type_hint):
if token_type_hint == "access_token":
return models.Token.get(access_token=token)
elif token_type_hint == "refresh_token":
return models.Token.get(refresh_token=token)
item = models.Token.get(access_token=token)
if item:
return item
2022-10-06 11:32:41 +00:00
item = models.Token.get(refresh_token=token)
if item:
return item
2022-10-06 11:32:41 +00:00
return None
2022-10-06 11:32:41 +00:00
class RevocationEndpoint(_RevocationEndpoint):
def query_token(self, token, token_type_hint):
return query_token(token, token_type_hint)
2022-10-06 11:32:41 +00:00
def revoke_token(self, token, request):
2023-03-17 23:38:56 +00:00
token.revokation_date = datetime.datetime.now(datetime.timezone.utc)
2022-10-06 11:32:41 +00:00
token.save()
class IntrospectionEndpoint(_IntrospectionEndpoint):
def query_token(self, token, token_type_hint):
return query_token(token, token_type_hint)
2022-10-06 11:32:41 +00:00
def check_permission(self, token, client, request):
return client in token.audience
2022-10-06 11:32:41 +00:00
def introspect_token(self, token):
audience = [aud.client_id for aud in token.audience]
2022-10-06 11:32:41 +00:00
return {
"active": True,
"client_id": token.client.client_id,
2022-10-06 11:32:41 +00:00
"token_type": token.type,
"username": token.subject.formatted_name[0],
2022-10-06 11:32:41 +00:00
"scope": token.get_scope(),
"sub": token.subject.user_name[0],
2022-10-06 11:32:41 +00:00
"aud": audience,
"iss": get_issuer(),
2022-10-06 11:32:41 +00:00
"exp": token.get_expires_at(),
"iat": token.get_issued_at(),
}
2022-05-20 12:07:56 +00:00
class ClientManagementMixin:
def authenticate_token(self, request):
if current_app.config.get("OIDC", {}).get(
"DYNAMIC_CLIENT_REGISTRATION_OPEN", False
):
return True
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.lower().startswith("bearer "):
return None
bearer_token = auth_header.split()[1]
if bearer_token not in current_app.config.get("OIDC", {}).get(
"DYNAMIC_CLIENT_REGISTRATION_TOKENS", []
):
return None
return True
def get_server_metadata(self):
2022-12-15 22:00:52 +00:00
from .well_known import openid_configuration
2022-12-15 22:00:52 +00:00
result = openid_configuration()
return result
def resolve_public_key(self, request):
# At the moment the only keypair accepted in software statement
# is the one used to isues JWTs. This might change somedays.
with open(current_app.config["OIDC"]["JWT"]["PUBLIC_KEY"], "rb") as fd:
return fd.read()
class ClientRegistrationEndpoint(ClientManagementMixin, _ClientRegistrationEndpoint):
software_statement_alg_values_supported = ["RS256"]
def save_client(self, client_info, client_metadata, request):
client_info["client_id_issued_at"] = datetime.datetime.fromtimestamp(
2023-03-17 23:38:56 +00:00
client_info["client_id_issued_at"], datetime.timezone.utc
)
if "scope" in client_metadata and not isinstance(
client_metadata["scope"], list
):
client_metadata["scope"] = client_metadata["scope"].split(" ")
client = models.Client(**client_info, **client_metadata)
client.save()
return client
class ClientConfigurationEndpoint(ClientManagementMixin, _ClientConfigurationEndpoint):
def authenticate_client(self, request):
client_id = request.uri.split("/")[-1]
return models.Client.get(client_id=client_id)
def revoke_access_token(self, request, token):
pass
def check_permission(self, client, request):
return True
def delete_client(self, client, request):
client.delete()
def update_client(self, client, client_metadata, request):
2023-01-28 17:35:39 +00:00
client_metadata["scope"] = client_metadata["scope"].split(" ")
for key, value in client_metadata.items():
setattr(client, key, value)
client.save()
return client
def generate_client_registration_info(self, client, request):
access_token = request.headers["Authorization"].split(" ")[1]
return {
"registration_client_uri": request.uri,
"registration_access_token": access_token,
}
2022-10-06 11:32:41 +00:00
class CodeChallenge(_CodeChallenge):
def get_authorization_code_challenge(self, authorization_code):
return authorization_code.challenge
2022-05-20 12:07:56 +00:00
2022-10-06 11:32:41 +00:00
def get_authorization_code_challenge_method(self, authorization_code):
return authorization_code.challenge_method
2022-05-20 12:07:56 +00:00
2022-10-06 11:32:41 +00:00
authorization = AuthorizationServer()
require_oauth = ResourceProtector()
2022-05-20 12:07:56 +00:00
def generate_access_token(client, grant_type, user, scope):
audience = [client.client_id for client in client.audience]
bearer_token_generator = authorization._token_generators["default"]
kwargs = {
"token": {},
"user_info": generate_user_info(user, scope),
"aud": audience,
**get_jwt_config(grant_type),
}
kwargs["exp"] = bearer_token_generator._get_expires_in(client, grant_type)
return generate_id_token(**kwargs)
2022-10-06 11:32:41 +00:00
def setup_oauth(app):
authorization.init_app(app, query_client=query_client, save_token=save_token)
2022-05-20 12:07:56 +00:00
2022-10-06 11:32:41 +00:00
authorization.register_grant(PasswordGrant)
authorization.register_grant(ImplicitGrant)
authorization.register_grant(RefreshTokenGrant)
authorization.register_grant(ClientCredentialsGrant)
authorization.register_grant(
AuthorizationCodeGrant,
[OpenIDCode(require_nonce=True), CodeChallenge(required=True)],
)
authorization.register_grant(OpenIDImplicitGrant)
authorization.register_grant(OpenIDHybridGrant)
2022-05-20 12:07:56 +00:00
2022-10-06 11:32:41 +00:00
require_oauth.register_token_validator(BearerTokenValidator())
2022-05-20 12:07:56 +00:00
2022-10-06 11:32:41 +00:00
authorization.register_endpoint(IntrospectionEndpoint)
authorization.register_endpoint(RevocationEndpoint)
authorization.register_endpoint(ClientRegistrationEndpoint)
authorization.register_endpoint(ClientConfigurationEndpoint)