canaille-globuzma/canaille/oauth.py

139 lines
4 KiB
Python
Raw Normal View History

2020-09-17 10:01:21 +00:00
import datetime
2020-09-25 09:26:41 +00:00
from authlib.integrations.flask_oauth2 import current_token
2020-08-28 14:07:39 +00:00
from authlib.jose import jwk
2020-08-17 15:49:49 +00:00
from authlib.oauth2 import OAuth2Error
from flask import Blueprint, request, session, redirect
2020-08-26 09:54:35 +00:00
from flask import render_template, jsonify, flash, current_app
2020-08-17 15:49:49 +00:00
from flask_babel import gettext
2020-09-17 08:00:39 +00:00
from .models import User, Client, Consent
2020-09-25 09:26:41 +00:00
from .oauth2utils import (
authorization,
IntrospectionEndpoint,
RevocationEndpoint,
generate_user_info,
require_oauth,
)
2020-08-19 14:20:57 +00:00
from .forms import LoginForm
from .flaskutils import current_user
2020-08-17 15:49:49 +00:00
bp = Blueprint(__name__, "oauth")
@bp.route("/authorize", methods=["GET", "POST"])
def authorize():
2020-08-19 14:20:57 +00:00
user = current_user()
2020-08-17 15:49:49 +00:00
client = Client.get(request.values["client_id"])
2020-09-17 08:00:39 +00:00
scopes = request.args.get("scope", "").split(" ")
# LOGIN
2020-08-17 15:49:49 +00:00
if not user:
2020-09-17 08:00:39 +00:00
if request.args.get("prompt") == "none":
return jsonify({"error": "login_required"})
2020-08-17 15:49:49 +00:00
form = LoginForm(request.form or None)
if request.method == "GET":
return render_template("login.html", form=form, menu=False)
2020-08-17 15:49:49 +00:00
2020-08-21 08:23:39 +00:00
if not form.validate() or not User.authenticate(
form.login.data, form.password.data, True
):
2020-08-17 15:49:49 +00:00
flash(gettext("Login failed, please check your information"), "error")
return render_template("login.html", form=form, menu=False)
2020-08-17 15:49:49 +00:00
return redirect(request.url)
2020-09-17 08:00:39 +00:00
# CONSENT
consents = Consent.filter(
oauthClient=client.dn,
oauthSubject=user.dn,
)
2020-09-17 10:01:21 +00:00
consents = [c for c in consents if not c.oauthRevokationDate]
2020-09-17 08:00:39 +00:00
consent = consents[0] if consents else None
2020-08-17 15:49:49 +00:00
if request.method == "GET":
2020-09-17 08:00:39 +00:00
if consent and all(scope in set(consent.oauthScope) for scope in scopes):
return authorization.create_authorization_response(grant_user=user.dn)
elif request.args.get("prompt") == "none":
return jsonify({"error": "consent_required"})
2020-08-17 15:49:49 +00:00
try:
grant = authorization.validate_consent_request(end_user=user)
except OAuth2Error as error:
return jsonify(dict(error.get_body()))
2020-09-17 08:00:39 +00:00
if consent:
consent.oauthScope = list(set(scopes + consents[0].oauthScope))
else:
consent = Consent(
oauthClient=client.dn,
oauthSubject=user.dn,
oauthScope=scopes,
2020-09-17 10:01:21 +00:00
oauthIssueDate=datetime.datetime.now().strftime("%Y%m%d%H%M%SZ"),
2020-09-17 08:00:39 +00:00
)
consent.save()
return render_template(
"authorize.html", user=user, grant=grant, client=client, menu=False
)
2020-08-17 15:49:49 +00:00
if request.form["answer"] == "logout":
del session["user_dn"]
flash(gettext("You have been successfully logged out."), "success")
return redirect(request.url)
if request.form["answer"] == "deny":
grant_user = None
if request.form["answer"] == "accept":
grant_user = user.dn
return authorization.create_authorization_response(grant_user=grant_user)
@bp.route("/token", methods=["POST"])
def issue_token():
return authorization.create_token_response()
2020-08-24 12:44:32 +00:00
@bp.route("/introspect", methods=["POST"])
def introspect_token():
return authorization.create_endpoint_response(IntrospectionEndpoint.ENDPOINT_NAME)
2020-08-24 13:56:30 +00:00
@bp.route("/revoke", methods=["POST"])
def revoke_token():
return authorization.create_endpoint_response(RevocationEndpoint.ENDPOINT_NAME)
2020-08-26 09:54:35 +00:00
@bp.route("/jwks.json")
def jwks():
2020-08-28 14:07:39 +00:00
with open(current_app.config["JWT"]["PUBLIC_KEY"]) as fd:
pubkey = fd.read()
obj = jwk.dumps(pubkey, current_app.config["JWT"]["KTY"])
return jsonify(
{
"keys": [
{
"kid": None,
"use": "sig",
"alg": current_app.config["JWT"]["ALG"],
**obj,
}
]
}
2020-08-26 09:54:35 +00:00
)
2020-09-25 09:26:41 +00:00
@bp.route("/userinfo")
@require_oauth("profile")
def userinfo():
return jsonify(
generate_user_info(current_token.oauthSubject, current_token.oauthScope[0])
)