canaille-globuzma/web/oauth.py
2020-08-26 15:37:15 +02:00

78 lines
2.4 KiB
Python

from authlib.common.encoding import urlsafe_b64encode
from authlib.oauth2 import OAuth2Error
from flask import Blueprint, request, session, redirect
from flask import render_template, jsonify, flash, current_app
from flask_babel import gettext
from .models import User, Client
from .oauth2utils import authorization, IntrospectionEndpoint, RevocationEndpoint
from .forms import LoginForm
from .flaskutils import current_user
bp = Blueprint(__name__, "oauth")
@bp.route("/authorize", methods=["GET", "POST"])
def authorize():
user = current_user()
client = Client.get(request.values["client_id"])
if not user:
form = LoginForm(request.form or None)
if request.method == "GET":
return render_template("login.html", form=form, menu=False)
if not form.validate() or not User.authenticate(
form.login.data, form.password.data, True
):
flash(gettext("Login failed, please check your information"), "error")
return render_template("login.html", form=form, menu=False)
return redirect(request.url)
if request.method == "GET":
try:
grant = authorization.validate_consent_request(end_user=user)
except OAuth2Error as error:
return jsonify(dict(error.get_body()))
return render_template(
"authorize.html", user=user, grant=grant, client=client, menu=False
)
if request.form["answer"] == "logout":
del session["user_dn"]
flash(gettext("You have been successfully logged out."), "success")
return redirect(request.url)
if request.form["answer"] == "deny":
grant_user = None
if request.form["answer"] == "accept":
grant_user = user.dn
return authorization.create_authorization_response(grant_user=grant_user)
@bp.route("/token", methods=["POST"])
def issue_token():
return authorization.create_token_response()
@bp.route("/introspect", methods=["POST"])
def introspect_token():
return authorization.create_endpoint_response(IntrospectionEndpoint.ENDPOINT_NAME)
@bp.route("/revoke", methods=["POST"])
def revoke_token():
return authorization.create_endpoint_response(RevocationEndpoint.ENDPOINT_NAME)
@bp.route("/jwks.json")
def jwks():
# TODO: Do not share secrets here!
key = urlsafe_b64encode(current_app.config["JWT"]["KEY"].encode("utf-8")).decode(
"utf-8"
)
return jsonify({"keys": [{"kid": None, "kty": "oct", "k": key}]})