canaille-globuzma/canaille/oidc/models.py

258 lines
7.7 KiB
Python
Raw Normal View History

2022-01-11 18:49:06 +00:00
import datetime
from authlib.oauth2.rfc6749 import AuthorizationCodeMixin
from authlib.oauth2.rfc6749 import ClientMixin
from authlib.oauth2.rfc6749 import TokenMixin
from authlib.oauth2.rfc6749 import util
from canaille.backends.ldap.ldapobject import LDAPObject
2022-01-11 18:49:06 +00:00
class Client(LDAPObject, ClientMixin):
ldap_object_class = ["oauthClient"]
2022-01-11 18:49:06 +00:00
base = "ou=clients,ou=oauth"
rdn_attribute = "oauthClientID"
client_info_attributes = {
"client_id": "oauthClientID",
"client_secret": "oauthClientSecret",
"client_id_issued_at": "oauthIssueDate",
"client_secret_expires_at": "oauthClientSecretExpDate",
}
client_metadata_attributes = {
2022-10-17 15:49:52 +00:00
"client_name": "oauthClientName",
"contacts": "oauthClientContact",
"client_uri": "oauthClientURI",
"redirect_uris": "oauthRedirectURIs",
"logo_uri": "oauthLogoURI",
2022-10-17 15:49:52 +00:00
"grant_types": "oauthGrantType",
"response_types": "oauthResponseType",
"scope": "oauthScope",
"tos_uri": "oauthTermsOfServiceURI",
"policy_uri": "oauthPolicyURI",
2022-10-17 15:49:52 +00:00
"jwks_uri": "oauthJWKURI",
"jwk": "oauthJWK",
"token_endpoint_auth_method": "oauthTokenEndpointAuthMethod",
"software_id": "oauthSoftwareID",
"software_version": "oauthSoftwareVersion",
}
attributes = {
2023-02-05 18:08:25 +00:00
"id": "dn",
"description": "description",
"preconsent": "oauthPreconsent",
# post_logout_redirect_uris is not yet supported by authlib
"post_logout_redirect_uris": "oauthPostLogoutRedirectURI",
"audience": "oauthAudience",
**client_info_attributes,
**client_metadata_attributes,
}
2022-01-11 18:49:06 +00:00
def get_client_id(self):
return self.client_id
2022-01-11 18:49:06 +00:00
def get_default_redirect_uri(self):
return self.redirect_uris[0]
2022-01-11 18:49:06 +00:00
def get_allowed_scope(self, scope):
2022-07-07 14:05:34 +00:00
return util.list_to_scope(
[scope_piece for scope_piece in self.scope if scope_piece in scope]
)
2022-01-11 18:49:06 +00:00
def check_redirect_uri(self, redirect_uri):
return redirect_uri in self.redirect_uris
2022-01-11 18:49:06 +00:00
def check_client_secret(self, client_secret):
2022-10-17 15:49:52 +00:00
return client_secret == self.client_secret
2022-01-11 18:49:06 +00:00
2022-04-10 14:00:51 +00:00
def check_endpoint_auth_method(self, method, endpoint):
if endpoint == "token":
return method == self.token_endpoint_auth_method
return True
2022-01-11 18:49:06 +00:00
def check_response_type(self, response_type):
2022-10-17 15:49:52 +00:00
return all(r in self.response_types for r in response_type.split(" "))
2022-01-11 18:49:06 +00:00
def check_grant_type(self, grant_type):
2022-10-17 15:49:52 +00:00
return grant_type in self.grant_types
2022-01-11 18:49:06 +00:00
@property
def client_info(self):
result = {
attribute_name: getattr(self, attribute_name)
for attribute_name in self.client_info_attributes
}
result["client_id_issued_at"] = int(
datetime.datetime.timestamp(result["client_id_issued_at"])
2022-01-11 18:49:06 +00:00
)
return result
@property
def client_metadata(self):
metadata = {
attribute_name: getattr(self, attribute_name)
for attribute_name in self.client_metadata_attributes
}
2023-01-28 17:35:39 +00:00
metadata["scope"] = " ".join(metadata["scope"])
return metadata
2022-01-11 18:49:06 +00:00
def delete(self):
for consent in Consent.query(client=self):
consent.delete()
for code in AuthorizationCode.query(client=self):
code.delete()
for token in Token.query(client=self):
token.delete()
super().delete()
2022-01-11 18:49:06 +00:00
class AuthorizationCode(LDAPObject, AuthorizationCodeMixin):
ldap_object_class = ["oauthAuthorizationCode"]
2022-01-11 18:49:06 +00:00
base = "ou=authorizations,ou=oauth"
rdn_attribute = "oauthAuthorizationCodeID"
attributes = {
2023-02-05 18:08:25 +00:00
"id": "dn",
"authorization_code_id": "oauthAuthorizationCodeID",
"description": "description",
"code": "oauthCode",
"client": "oauthClient",
"subject": "oauthSubject",
"redirect_uri": "oauthRedirectURI",
"response_type": "oauthResponseType",
"scope": "oauthScope",
"nonce": "oauthNonce",
"issue_date": "oauthAuthorizationDate",
"lifetime": "oauthAuthorizationLifetime",
"challenge": "oauthCodeChallenge",
"challenge_method": "oauthCodeChallengeMethod",
"revokation_date": "oauthRevokationDate",
}
2022-01-11 18:49:06 +00:00
def get_redirect_uri(self):
return self.redirect_uri
2022-01-11 18:49:06 +00:00
def get_scope(self):
2022-07-07 14:05:34 +00:00
return self.scope[0].split(" ")
2022-01-11 18:49:06 +00:00
def get_nonce(self):
return self.nonce
2022-01-11 18:49:06 +00:00
def is_expired(self):
2023-03-17 23:38:56 +00:00
return self.issue_date + datetime.timedelta(
seconds=int(self.lifetime)
) < datetime.datetime.now(datetime.timezone.utc)
2022-01-11 18:49:06 +00:00
def get_auth_time(self):
2023-03-17 23:38:56 +00:00
return int(
(
self.issue_date
- datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)
).total_seconds()
)
2022-01-11 18:49:06 +00:00
class Token(LDAPObject, TokenMixin):
ldap_object_class = ["oauthToken"]
2022-01-11 18:49:06 +00:00
base = "ou=tokens,ou=oauth"
rdn_attribute = "oauthTokenID"
attributes = {
2023-02-05 18:08:25 +00:00
"id": "dn",
"token_id": "oauthTokenID",
"access_token": "oauthAccessToken",
"description": "description",
"client": "oauthClient",
"subject": "oauthSubject",
"type": "oauthTokenType",
"refresh_token": "oauthRefreshToken",
"scope": "oauthScope",
"issue_date": "oauthIssueDate",
"lifetime": "oauthTokenLifetime",
"revokation_date": "oauthRevokationDate",
"audience": "oauthAudience",
}
2022-01-11 18:49:06 +00:00
@property
def expire_date(self):
return self.issue_date + datetime.timedelta(seconds=int(self.lifetime))
2022-01-11 18:49:06 +00:00
@property
def revoked(self):
return bool(self.revokation_date)
2022-01-11 18:49:06 +00:00
def get_scope(self):
return " ".join(self.scope)
2022-01-11 18:49:06 +00:00
def get_expires_in(self):
return int(self.lifetime)
2022-01-11 18:49:06 +00:00
def get_issued_at(self):
2023-03-17 23:38:56 +00:00
return int(
(
self.issue_date
- datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)
).total_seconds()
)
2022-01-11 18:49:06 +00:00
def get_expires_at(self):
issue_timestamp = (
2023-03-17 23:38:56 +00:00
self.issue_date
- datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)
2022-01-11 18:49:06 +00:00
).total_seconds()
return int(issue_timestamp) + int(self.lifetime)
2022-01-11 18:49:06 +00:00
def is_refresh_token_active(self):
if self.revokation_date:
2022-01-11 18:49:06 +00:00
return False
2023-03-17 23:38:56 +00:00
return self.expire_date >= datetime.datetime.now(datetime.timezone.utc)
2022-01-11 18:49:06 +00:00
def is_expired(self):
2023-03-17 23:38:56 +00:00
return self.issue_date + datetime.timedelta(
seconds=int(self.lifetime)
) < datetime.datetime.now(datetime.timezone.utc)
2022-01-11 18:49:06 +00:00
2022-04-10 14:00:51 +00:00
def is_revoked(self):
return bool(self.revokation_date)
def check_client(self, client):
return client.client_id == self.client.client_id
2022-04-10 14:00:51 +00:00
2022-01-11 18:49:06 +00:00
class Consent(LDAPObject):
ldap_object_class = ["oauthConsent"]
2022-01-11 18:49:06 +00:00
base = "ou=consents,ou=oauth"
rdn_attribute = "cn"
attributes = {
2023-02-05 18:08:25 +00:00
"id": "dn",
2023-03-09 23:38:16 +00:00
"consent_id": "cn",
"subject": "oauthSubject",
"client": "oauthClient",
"scope": "oauthScope",
"issue_date": "oauthIssueDate",
"revokation_date": "oauthRevokationDate",
}
2022-01-11 18:49:06 +00:00
2023-02-14 17:43:43 +00:00
@property
def revoked(self):
return bool(self.revokation_date)
2022-01-11 18:49:06 +00:00
def revoke(self):
2023-03-17 23:38:56 +00:00
self.revokation_date = datetime.datetime.now(datetime.timezone.utc)
2022-01-11 18:49:06 +00:00
self.save()
tokens = Token.query(
oauthClient=self.client,
oauthSubject=self.subject,
2022-01-11 18:49:06 +00:00
)
2023-02-14 17:43:43 +00:00
tokens = [token for token in tokens if not token.revoked]
2022-01-11 18:49:06 +00:00
for t in tokens:
t.revokation_date = self.revokation_date
2022-01-11 18:49:06 +00:00
t.save()
2023-02-14 17:43:43 +00:00
def restore(self):
self.revokation_date = None
self.save()