canaille-globuzma/canaille/oidc/oauth.py

340 lines
10 KiB
Python
Raw Normal View History

2020-09-17 10:01:21 +00:00
import datetime
2022-05-20 12:07:56 +00:00
from urllib.parse import urlsplit
from urllib.parse import urlunsplit
2021-12-20 22:57:27 +00:00
2020-09-25 09:26:41 +00:00
from authlib.integrations.flask_oauth2 import current_token
2020-08-28 14:07:39 +00:00
from authlib.jose import jwk
2022-05-20 12:07:56 +00:00
from authlib.jose import jwt
2020-08-17 15:49:49 +00:00
from authlib.oauth2 import OAuth2Error
2021-12-20 22:57:27 +00:00
from flask import abort
from flask import Blueprint
from flask import current_app
from flask import flash
from flask import jsonify
from flask import redirect
from flask import request
from flask import session
2022-05-20 12:07:56 +00:00
from flask import url_for
2021-12-20 22:57:27 +00:00
from flask_babel import gettext
from flask_babel import lazy_gettext as _
from flask_themer import render_template
2022-05-20 12:07:56 +00:00
from werkzeug.datastructures import CombinedMultiDict
2021-12-20 22:57:27 +00:00
2022-01-11 18:49:06 +00:00
from ..flaskutils import current_user
from ..forms import FullLoginForm
from ..models import User
2022-05-20 12:07:56 +00:00
from .forms import LogoutForm
2021-12-20 22:57:27 +00:00
from .models import Client
from .models import Consent
from .oauth2utils import authorization
from .oauth2utils import DEFAULT_JWT_ALG
from .oauth2utils import DEFAULT_JWT_KTY
from .oauth2utils import generate_user_info
from .oauth2utils import IntrospectionEndpoint
from .oauth2utils import require_oauth
from .oauth2utils import RevocationEndpoint
2020-08-17 15:49:49 +00:00
2022-01-11 18:49:06 +00:00
bp = Blueprint("oauth", __name__, url_prefix="/oauth")
2020-08-17 15:49:49 +00:00
2020-10-29 14:28:19 +00:00
CLAIMS = {
"profile": (
"id card outline",
_("Personnal information about yourself, such as your name or your gender."),
),
"email": ("at", _("Your email address.")),
"address": ("envelope open outline", _("Your postal address.")),
"phone": ("phone", _("Your phone number.")),
2021-06-03 15:24:36 +00:00
"groups": ("users", _("Groups you are belonging to")),
2020-10-29 14:28:19 +00:00
}
2020-08-17 15:49:49 +00:00
2022-05-20 12:07:56 +00:00
def get_public_key():
with open(current_app.config["JWT"]["PUBLIC_KEY"]) as fd:
return fd.read()
2020-08-17 15:49:49 +00:00
@bp.route("/authorize", methods=["GET", "POST"])
def authorize():
2021-09-28 10:06:41 +00:00
current_app.logger.debug(
"authorization endpoint request:\nGET: %s\nPOST: %s",
request.args.to_dict(flat=False),
request.form.to_dict(flat=False),
)
2020-10-26 18:09:38 +00:00
if "client_id" not in request.args:
abort(400)
client = Client.get(request.args["client_id"])
2020-10-26 18:15:53 +00:00
if not client:
abort(400)
user = current_user()
2020-09-17 08:00:39 +00:00
scopes = request.args.get("scope", "").split(" ")
# LOGIN
2020-08-17 15:49:49 +00:00
if not user:
2020-09-17 08:00:39 +00:00
if request.args.get("prompt") == "none":
return jsonify({"error": "login_required"})
2021-01-23 21:30:43 +00:00
form = FullLoginForm(request.form or None)
2020-08-17 15:49:49 +00:00
if request.method == "GET":
return render_template("login.html", form=form, menu=False)
2020-08-17 15:49:49 +00:00
2020-08-21 08:23:39 +00:00
if not form.validate() or not User.authenticate(
form.login.data, form.password.data, True
):
2020-08-17 15:49:49 +00:00
flash(gettext("Login failed, please check your information"), "error")
return render_template("login.html", form=form, menu=False)
2020-08-17 15:49:49 +00:00
return redirect(request.url)
2021-12-06 23:07:32 +00:00
if not user.can_use_oidc:
abort(400)
2020-09-17 08:00:39 +00:00
# CONSENT
2021-10-03 11:46:52 +00:00
consents = Consent.filter(
client=client.dn,
subject=user.dn,
2021-10-03 11:46:52 +00:00
)
consents = [c for c in consents if not c.revokation_date]
2020-09-17 08:00:39 +00:00
consent = consents[0] if consents else None
2020-08-17 15:49:49 +00:00
if request.method == "GET":
2021-10-20 10:05:08 +00:00
if client.preconsent or (
consent and all(scope in set(consent.scope) for scope in scopes)
2021-10-20 10:05:08 +00:00
):
2020-09-17 08:00:39 +00:00
return authorization.create_authorization_response(grant_user=user.dn)
elif request.args.get("prompt") == "none":
2021-09-28 11:45:47 +00:00
response = {"error": "consent_required"}
current_app.logger.debug("authorization endpoint response: %s", response)
return jsonify(response)
2020-09-17 08:00:39 +00:00
2020-08-17 15:49:49 +00:00
try:
2022-04-10 14:00:51 +00:00
grant = authorization.get_consent_grant(end_user=user)
2020-08-17 15:49:49 +00:00
except OAuth2Error as error:
2021-09-28 11:45:47 +00:00
response = dict(error.get_body())
current_app.logger.debug("authorization endpoint response: %s", response)
return jsonify(response)
2020-08-17 15:49:49 +00:00
return render_template(
2022-01-11 18:49:06 +00:00
"oidc/user/authorize.html",
2020-10-29 14:28:19 +00:00
user=user,
grant=grant,
client=client,
claims=CLAIMS,
menu=False,
2020-11-09 18:01:41 +00:00
ignored_claims=["openid"],
)
2020-08-17 15:49:49 +00:00
2020-10-28 16:57:27 +00:00
if request.method == "POST":
if request.form["answer"] == "logout":
del session["user_dn"]
flash(gettext("You have been successfully logged out."), "success")
return redirect(request.url)
if request.form["answer"] == "deny":
grant_user = None
if request.form["answer"] == "accept":
grant_user = user.dn
2020-08-17 15:49:49 +00:00
2020-10-28 16:57:27 +00:00
if consent:
consent.scope = list(set(scopes + consents[0].scope))
2020-10-28 16:57:27 +00:00
else:
2020-08-17 15:49:49 +00:00
2020-10-28 16:57:27 +00:00
consent = Consent(
client=client.dn,
subject=user.dn,
scope=scopes,
issue_date=datetime.datetime.now(),
2020-10-28 16:57:27 +00:00
)
consent.save()
2020-08-17 15:49:49 +00:00
2021-09-28 10:06:41 +00:00
response = authorization.create_authorization_response(grant_user=grant_user)
current_app.logger.debug(
"authorization endpoint response: %s", response.location
)
return response
2020-08-17 15:49:49 +00:00
@bp.route("/token", methods=["POST"])
def issue_token():
2021-09-28 10:06:41 +00:00
current_app.logger.debug(
"token endpoint request: POST: %s", request.form.to_dict(flat=False)
)
response = authorization.create_token_response()
2021-09-28 11:45:47 +00:00
current_app.logger.debug("token endpoint response: %s", response.json)
2021-09-28 10:06:41 +00:00
return response
2020-08-24 12:44:32 +00:00
@bp.route("/introspect", methods=["POST"])
def introspect_token():
2021-09-28 10:06:41 +00:00
current_app.logger.debug(
"introspection endpoint request: POST: %s", request.form.to_dict(flat=False)
)
response = authorization.create_endpoint_response(
IntrospectionEndpoint.ENDPOINT_NAME
)
2021-10-03 11:46:52 +00:00
current_app.logger.debug("introspection endpoint response: %s", response.json)
2021-09-28 10:06:41 +00:00
return response
2020-08-24 13:56:30 +00:00
@bp.route("/revoke", methods=["POST"])
def revoke_token():
2021-09-28 10:06:41 +00:00
current_app.logger.debug(
"revokation endpoint request: POST: %s", request.form.to_dict(flat=False)
)
response = authorization.create_endpoint_response(RevocationEndpoint.ENDPOINT_NAME)
2021-10-03 11:46:52 +00:00
current_app.logger.debug("revokation endpoint response: %s", response.json)
2021-09-28 10:06:41 +00:00
return response
2020-08-26 09:54:35 +00:00
@bp.route("/jwks.json")
def jwks():
2022-05-20 12:07:56 +00:00
obj = jwk.dumps(
get_public_key(), current_app.config["JWT"].get("KTY", DEFAULT_JWT_KTY)
)
2020-08-28 14:07:39 +00:00
return jsonify(
{
"keys": [
{
"kid": None,
"use": "sig",
2021-12-03 17:37:25 +00:00
"alg": current_app.config["JWT"].get("ALG", DEFAULT_JWT_ALG),
2020-08-28 14:07:39 +00:00
**obj,
}
]
}
2020-08-26 09:54:35 +00:00
)
2020-09-25 09:26:41 +00:00
@bp.route("/userinfo")
@require_oauth("profile")
def userinfo():
2021-10-20 10:05:08 +00:00
current_app.logger.debug("userinfo endpoint request: %s", request.args)
response = generate_user_info(current_token.subject, current_token.scope[0])
2021-09-28 10:06:41 +00:00
current_app.logger.debug("userinfo endpoint response: %s", response)
2021-09-28 11:45:47 +00:00
return jsonify(response)
2022-05-20 12:07:56 +00:00
def set_parameter_in_url_query(url, **kwargs):
split = list(urlsplit(url))
parameters = "&".join(f"{key}={value}" for key, value in kwargs.items())
if split[3]:
split[3] = f"{split[3]}&{parameters}"
else:
split[3] = parameters
return urlunsplit(split)
@bp.route("/end_session", methods=["GET", "POST"])
def end_session():
data = CombinedMultiDict((request.args, request.form))
user = current_user()
form = LogoutForm(request.form)
form.action = url_for("oidc.oauth.end_session_submit")
client = None
valid_uris = []
if "client_id" in data:
client = Client.get(data["client_id"])
if client:
valid_uris = client.post_logout_redirect_uris
if (
not data.get("id_token_hint")
or (data.get("logout_hint") and data["logout_hint"] != user.uid[0])
) and not session.get("end_session_confirmation"):
session["end_session_data"] = data
return render_template(
"oidc/user/logout.html", form=form, client=client, menu=False
)
if data.get("id_token_hint"):
id_token = jwt.decode(data["id_token_hint"], get_public_key())
if not id_token["iss"] == current_app.config["JWT"]["ISS"]:
return jsonify(
{
"status": "error",
"message": "id_token_hint has not been issued here",
}
)
if "client_id" in data:
if (
data["client_id"] != id_token["aud"]
and data["client_id"] not in id_token["aud"]
):
return jsonify(
{
"status": "error",
"message": "id_token_hint and client_id don't match",
}
)
else:
client_ids = (
id_token["aud"]
if isinstance(id_token["aud"], list)
else [id_token["aud"]]
)
for client_id in client_ids:
client = Client.get(client_id)
if client:
valid_uris.extend(client.post_logout_redirect_uris)
if user.uid[0] != id_token["sub"] and not session.get(
"end_session_confirmation"
):
session["end_session_data"] = data
return render_template(
"oidc/user/logout.html", form=form, client=client, menu=False
)
user.logout()
if "end_session_confirmation" in session:
del session["end_session_confirmation"]
if (
"post_logout_redirect_uri" in data
and data["post_logout_redirect_uri"] in valid_uris
):
url = data["post_logout_redirect_uri"]
if "state" in data:
url = set_parameter_in_url_query(url, state=data["state"])
return redirect(data["post_logout_redirect_uri"])
flash(_("You have been disconnected"), "success")
return redirect(url_for("account.index"))
@bp.route("/end_session_confirm", methods=["POST"])
def end_session_submit():
form = LogoutForm(request.form)
if not form.validate():
flash(_("An error happened during the logout"), "error")
client = Client.get(session.get("end_session_data", {}).get("client_id"))
return render_template("oidc/user/logout.html", form=form, client=client)
data = session["end_session_data"]
del session["end_session_data"]
if request.form["answer"] == "logout":
session["end_session_confirmation"] = True
url = set_parameter_in_url_query(url_for("oidc.oauth.end_session"), **data)
return redirect(url)
flash(_("You have not been disconnected"), "info")
return redirect(url_for("account.index"))