canaille-globuzma/website/oauth2.py

173 lines
5.1 KiB
Python
Raw Normal View History

2020-08-16 17:39:14 +00:00
import datetime
2020-08-14 13:26:14 +00:00
from authlib.integrations.flask_oauth2 import AuthorizationServer, ResourceProtector
from authlib.oauth2.rfc6749.grants import (
AuthorizationCodeGrant as _AuthorizationCodeGrant,
ResourceOwnerPasswordCredentialsGrant as _ResourceOwnerPasswordCredentialsGrant,
RefreshTokenGrant as _RefreshTokenGrant,
2020-08-14 11:18:08 +00:00
)
2020-08-14 13:26:14 +00:00
from authlib.oauth2.rfc6750 import BearerTokenValidator as _BearerTokenValidator
from authlib.oidc.core.grants import (
OpenIDCode as _OpenIDCode,
OpenIDImplicitGrant as _OpenIDImplicitGrant,
OpenIDHybridGrant as _OpenIDHybridGrant,
)
from authlib.oidc.core import UserInfo
from werkzeug.security import gen_salt
from .models import Client, AuthorizationCode, Token, User
DUMMY_JWT_CONFIG = {
"key": "secret-key",
"alg": "HS256",
"iss": "https://authlib.org",
"exp": 3600,
}
def exists_nonce(nonce, req):
exists = AuthorizationCode.query.filter_by(
client_id=req.client_id, nonce=nonce
).first()
return bool(exists)
def generate_user_info(user, scope):
return UserInfo(sub=str(user.id), name=user.username)
def create_authorization_code(client, grant_user, request):
raise NotImplementedError()
code = gen_salt(48)
nonce = request.data.get("nonce")
item = AuthorizationCode(
code=code,
client_id=client.client_id,
redirect_uri=request.redirect_uri,
scope=request.scope,
user_id=grant_user.id,
nonce=nonce,
)
return code
2020-08-14 11:18:08 +00:00
2020-08-14 13:26:14 +00:00
class AuthorizationCodeGrant(_AuthorizationCodeGrant):
def create_authorization_code(self, client, grant_user, request):
return create_authorization_code(client, grant_user, request)
2020-08-14 11:18:08 +00:00
2020-08-14 13:26:14 +00:00
def parse_authorization_code(self, code, client):
item = AuthorizationCode.query.filter_by(
2020-08-14 11:18:08 +00:00
code=code, client_id=client.client_id
).first()
2020-08-14 13:26:14 +00:00
if item and not item.is_expired():
return item
2020-08-14 11:18:08 +00:00
def delete_authorization_code(self, authorization_code):
raise NotImplementedError()
def authenticate_user(self, authorization_code):
return User.query.get(authorization_code.user_id)
2020-08-14 13:26:14 +00:00
class OpenIDCode(_OpenIDCode):
def exists_nonce(self, nonce, request):
return exists_nonce(nonce, request)
def get_jwt_config(self, grant):
return DUMMY_JWT_CONFIG
def generate_user_info(self, user, scope):
return generate_user_info(user, scope)
class PasswordGrant(_ResourceOwnerPasswordCredentialsGrant):
2020-08-14 11:18:08 +00:00
def authenticate_user(self, username, password):
user = User.get(username)
if user is not None and user.check_password(password):
return user
2020-08-14 13:26:14 +00:00
class RefreshTokenGrant(_RefreshTokenGrant):
2020-08-14 11:18:08 +00:00
def authenticate_refresh_token(self, refresh_token):
raise NotImplementedError()
token = Token.query.filter_by(refresh_token=refresh_token).first()
if token and token.is_refresh_token_active():
return token
def authenticate_user(self, credential):
raise NotImplementedError()
return User.query.get(credential.user_id)
def revoke_old_credential(self, credential):
raise NotImplementedError()
credential.revoked = True
2020-08-14 13:26:14 +00:00
2020-08-16 17:39:14 +00:00
2020-08-14 13:26:14 +00:00
class ImplicitGrant(_OpenIDImplicitGrant):
def exists_nonce(self, nonce, request):
return exists_nonce(nonce, request)
def get_jwt_config(self, grant):
return DUMMY_JWT_CONFIG
def generate_user_info(self, user, scope):
return generate_user_info(user, scope)
class HybridGrant(_OpenIDHybridGrant):
def create_authorization_code(self, client, grant_user, request):
return create_authorization_code(client, grant_user, request)
def exists_nonce(self, nonce, request):
return exists_nonce(nonce, request)
def get_jwt_config(self):
return DUMMY_JWT_CONFIG
def generate_user_info(self, user, scope):
return generate_user_info(user, scope)
2020-08-14 11:18:08 +00:00
def query_client(client_id):
return Client.get(client_id)
def save_token(token, request):
2020-08-16 17:39:14 +00:00
now = datetime.datetime.now()
token = Token(
oauthTokenType=token["token_type"],
oauthAccessToken=token["access_token"],
oauthRefreshToken=token["refresh_token"],
oauthIssueDate=now.strftime("%Y%m%d%H%M%SZ"),
oauthTokenLifetime=str(token["expires_in"]),
2020-08-17 07:45:35 +00:00
oauthScope=token["scope"],
2020-08-16 17:39:14 +00:00
oauthClientID=request.client.oauthClientID[0],
)
token.save()
2020-08-14 11:18:08 +00:00
2020-08-14 13:26:14 +00:00
class BearerTokenValidator(_BearerTokenValidator):
2020-08-14 11:18:08 +00:00
def authenticate_token(self, token_string):
return Token.get(token_string)
def request_invalid(self, request):
return False
def token_revoked(self, token):
return False
2020-08-14 13:26:14 +00:00
authorization = AuthorizationServer()
2020-08-14 11:18:08 +00:00
require_oauth = ResourceProtector()
def config_oauth(app):
2020-08-14 13:26:14 +00:00
authorization.init_app(app, query_client=query_client, save_token=save_token)
2020-08-14 11:18:08 +00:00
2020-08-14 13:26:14 +00:00
authorization.register_grant(
AuthorizationCodeGrant, [OpenIDCode(require_nonce=True)]
)
authorization.register_grant(ImplicitGrant)
authorization.register_grant(HybridGrant)
2020-08-14 11:18:08 +00:00
authorization.register_grant(PasswordGrant)
require_oauth.register_token_validator(BearerTokenValidator())