import datetime from authlib.oauth2.rfc6749 import AuthorizationCodeMixin from authlib.oauth2.rfc6749 import ClientMixin from authlib.oauth2.rfc6749 import TokenMixin from authlib.oauth2.rfc6749 import util from canaille.app import models class Client(ClientMixin): client_info_attributes = [ "client_id", "client_secret", "client_id_issued_at", "client_secret_expires_at", ] client_metadata_attributes = [ "client_name", "contacts", "client_uri", "redirect_uris", "logo_uri", "grant_types", "response_types", "scope", "tos_uri", "policy_uri", "jwks_uri", "jwk", "token_endpoint_auth_method", "software_id", "software_version", ] def get_client_id(self): return self.client_id def get_default_redirect_uri(self): return self.redirect_uris[0] def get_allowed_scope(self, scope): return util.list_to_scope( [scope_piece for scope_piece in self.scope if scope_piece in scope] ) def check_redirect_uri(self, redirect_uri): return redirect_uri in self.redirect_uris def check_client_secret(self, client_secret): return client_secret == self.client_secret def check_endpoint_auth_method(self, method, endpoint): if endpoint == "token": return method == self.token_endpoint_auth_method return True def check_response_type(self, response_type): return all(r in self.response_types for r in response_type.split(" ")) def check_grant_type(self, grant_type): return grant_type in self.grant_types @property def client_info(self): result = { attribute_name: getattr(self, attribute_name) for attribute_name in self.client_info_attributes } result["client_id_issued_at"] = int( datetime.datetime.timestamp(result["client_id_issued_at"]) ) return result @property def client_metadata(self): metadata = { attribute_name: getattr(self, attribute_name) for attribute_name in self.client_metadata_attributes } metadata["scope"] = " ".join(metadata["scope"]) return metadata def delete(self): for consent in models.Consent.query(client=self): consent.delete() for code in models.AuthorizationCode.query(client=self): code.delete() for token in models.Token.query(client=self): token.delete() super().delete() class AuthorizationCode(AuthorizationCodeMixin): def get_redirect_uri(self): return self.redirect_uri def get_scope(self): return self.scope[0].split(" ") def get_nonce(self): return self.nonce def is_expired(self): return self.issue_date + datetime.timedelta( seconds=int(self.lifetime) ) < datetime.datetime.now(datetime.timezone.utc) def get_auth_time(self): return int( ( self.issue_date - datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) ).total_seconds() ) class Token(TokenMixin): @property def expire_date(self): return self.issue_date + datetime.timedelta(seconds=int(self.lifetime)) @property def revoked(self): return bool(self.revokation_date) def get_scope(self): return " ".join(self.scope) def get_expires_in(self): return int(self.lifetime) def get_issued_at(self): return int( ( self.issue_date - datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) ).total_seconds() ) def get_expires_at(self): issue_timestamp = ( self.issue_date - datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) ).total_seconds() return int(issue_timestamp) + int(self.lifetime) def is_refresh_token_active(self): if self.revokation_date: return False return self.expire_date >= datetime.datetime.now(datetime.timezone.utc) def is_expired(self): return self.issue_date + datetime.timedelta( seconds=int(self.lifetime) ) < datetime.datetime.now(datetime.timezone.utc) def is_revoked(self): return bool(self.revokation_date) def check_client(self, client): return client.client_id == self.client.client_id class Consent: @property def revoked(self): return bool(self.revokation_date) def revoke(self): self.revokation_date = datetime.datetime.now(datetime.timezone.utc) self.save() tokens = models.Token.query( client=self.client, subject=self.subject, ) tokens = [token for token in tokens if not token.revoked] for t in tokens: t.revokation_date = self.revokation_date t.save() def restore(self): self.revokation_date = None self.save()