2020-08-28 14:07:39 +00:00
|
|
|
from authlib.jose import jwk
|
2020-08-17 15:49:49 +00:00
|
|
|
from authlib.oauth2 import OAuth2Error
|
|
|
|
from flask import Blueprint, request, session, redirect
|
2020-08-26 09:54:35 +00:00
|
|
|
from flask import render_template, jsonify, flash, current_app
|
2020-08-17 15:49:49 +00:00
|
|
|
from flask_babel import gettext
|
|
|
|
from .models import User, Client
|
2020-08-24 13:56:30 +00:00
|
|
|
from .oauth2utils import authorization, IntrospectionEndpoint, RevocationEndpoint
|
2020-08-19 14:20:57 +00:00
|
|
|
from .forms import LoginForm
|
|
|
|
from .flaskutils import current_user
|
2020-08-17 15:49:49 +00:00
|
|
|
|
|
|
|
|
|
|
|
bp = Blueprint(__name__, "oauth")
|
|
|
|
|
|
|
|
|
|
|
|
@bp.route("/authorize", methods=["GET", "POST"])
|
|
|
|
def authorize():
|
2020-08-19 14:20:57 +00:00
|
|
|
user = current_user()
|
2020-08-17 15:49:49 +00:00
|
|
|
client = Client.get(request.values["client_id"])
|
|
|
|
|
|
|
|
if not user:
|
|
|
|
form = LoginForm(request.form or None)
|
|
|
|
if request.method == "GET":
|
2020-08-20 09:32:33 +00:00
|
|
|
return render_template("login.html", form=form, menu=False)
|
2020-08-17 15:49:49 +00:00
|
|
|
|
2020-08-21 08:23:39 +00:00
|
|
|
if not form.validate() or not User.authenticate(
|
|
|
|
form.login.data, form.password.data, True
|
|
|
|
):
|
2020-08-17 15:49:49 +00:00
|
|
|
flash(gettext("Login failed, please check your information"), "error")
|
2020-08-20 09:32:33 +00:00
|
|
|
return render_template("login.html", form=form, menu=False)
|
2020-08-17 15:49:49 +00:00
|
|
|
|
|
|
|
return redirect(request.url)
|
|
|
|
|
|
|
|
if request.method == "GET":
|
|
|
|
try:
|
|
|
|
grant = authorization.validate_consent_request(end_user=user)
|
|
|
|
except OAuth2Error as error:
|
|
|
|
return jsonify(dict(error.get_body()))
|
|
|
|
|
2020-08-20 09:32:33 +00:00
|
|
|
return render_template(
|
|
|
|
"authorize.html", user=user, grant=grant, client=client, menu=False
|
|
|
|
)
|
2020-08-17 15:49:49 +00:00
|
|
|
|
|
|
|
if request.form["answer"] == "logout":
|
|
|
|
del session["user_dn"]
|
|
|
|
flash(gettext("You have been successfully logged out."), "success")
|
|
|
|
return redirect(request.url)
|
|
|
|
|
|
|
|
if request.form["answer"] == "deny":
|
|
|
|
grant_user = None
|
|
|
|
|
|
|
|
if request.form["answer"] == "accept":
|
|
|
|
grant_user = user.dn
|
|
|
|
|
|
|
|
return authorization.create_authorization_response(grant_user=grant_user)
|
|
|
|
|
|
|
|
|
|
|
|
@bp.route("/token", methods=["POST"])
|
|
|
|
def issue_token():
|
|
|
|
return authorization.create_token_response()
|
2020-08-24 12:44:32 +00:00
|
|
|
|
|
|
|
|
|
|
|
@bp.route("/introspect", methods=["POST"])
|
|
|
|
def introspect_token():
|
|
|
|
return authorization.create_endpoint_response(IntrospectionEndpoint.ENDPOINT_NAME)
|
2020-08-24 13:56:30 +00:00
|
|
|
|
|
|
|
|
|
|
|
@bp.route("/revoke", methods=["POST"])
|
|
|
|
def revoke_token():
|
|
|
|
return authorization.create_endpoint_response(RevocationEndpoint.ENDPOINT_NAME)
|
2020-08-26 09:54:35 +00:00
|
|
|
|
|
|
|
|
|
|
|
@bp.route("/jwks.json")
|
|
|
|
def jwks():
|
2020-08-28 14:07:39 +00:00
|
|
|
with open(current_app.config["JWT"]["PUBLIC_KEY"]) as fd:
|
|
|
|
pubkey = fd.read()
|
|
|
|
|
|
|
|
obj = jwk.dumps(pubkey, current_app.config["JWT"]["KTY"])
|
|
|
|
return jsonify(
|
|
|
|
{
|
|
|
|
"keys": [
|
|
|
|
{
|
|
|
|
"kid": None,
|
|
|
|
"use": "sig",
|
|
|
|
"alg": current_app.config["JWT"]["ALG"],
|
|
|
|
**obj,
|
|
|
|
}
|
|
|
|
]
|
|
|
|
}
|
2020-08-26 09:54:35 +00:00
|
|
|
)
|