canaille-globuzma/canaille/oidc/clients.py
2023-04-02 00:32:27 +02:00

177 lines
5.5 KiB
Python

import datetime
from canaille.oidc.forms import ClientAddForm
from canaille.oidc.models import Client
from canaille.utils.flask import permissions_needed
from canaille.utils.flask import render_htmx_template
from canaille.utils.flask import request_is_htmx
from canaille.utils.forms import TableForm
from flask import abort
from flask import Blueprint
from flask import flash
from flask import redirect
from flask import request
from flask import url_for
from flask_babel import gettext as _
from flask_themer import render_template
from werkzeug.security import gen_salt
bp = Blueprint("clients", __name__, url_prefix="/admin/client")
@bp.route("/", methods=["GET", "POST"])
@permissions_needed("manage_oidc")
def index(user):
table_form = TableForm(Client, formdata=request.form)
if request.form and request.form.get("page") and not table_form.validate():
abort(404)
return render_htmx_template(
"oidc/admin/client_list.html", menuitem="admin", table_form=table_form
)
@bp.route("/add", methods=["GET", "POST"])
@permissions_needed("manage_oidc")
def add(user):
form = ClientAddForm(request.form or None)
if not request.form:
return render_template(
"oidc/admin/client_add.html", form=form, menuitem="admin"
)
if not form.validate():
flash(
_("The client has not been added. Please check your information."),
"error",
)
return render_template(
"oidc/admin/client_add.html", form=form, menuitem="admin"
)
client_id = gen_salt(24)
client_id_issued_at = datetime.datetime.now(datetime.timezone.utc)
client = Client(
client_id=client_id,
client_id_issued_at=client_id_issued_at,
client_name=form["client_name"].data,
contacts=[form["contacts"].data],
client_uri=form["client_uri"].data,
grant_types=form["grant_types"].data,
redirect_uris=[form["redirect_uris"].data],
post_logout_redirect_uris=[form["post_logout_redirect_uris"].data],
response_types=form["response_types"].data,
scope=form["scope"].data.split(" "),
token_endpoint_auth_method=form["token_endpoint_auth_method"].data,
logo_uri=form["logo_uri"].data,
tos_uri=form["tos_uri"].data,
policy_uri=form["policy_uri"].data,
software_id=form["software_id"].data,
software_version=form["software_version"].data,
jwk=form["jwk"].data,
jwks_uri=form["jwks_uri"].data,
preconsent=form["preconsent"].data,
client_secret=""
if form["token_endpoint_auth_method"].data == "none"
else gen_salt(48),
)
client.audience = [client]
client.save()
flash(
_("The client has been created."),
"success",
)
return redirect(url_for("oidc.clients.edit", client_id=client_id))
@bp.route("/edit/<client_id>", methods=["GET", "POST"])
@permissions_needed("manage_oidc")
def edit(user, client_id):
if (
request.method == "GET"
or request.form.get("action") == "edit"
or request_is_htmx()
):
return client_edit(client_id)
if request.form.get("action") == "delete":
return client_delete(client_id)
abort(400)
def client_edit(client_id):
client = Client.get(client_id)
if not client:
abort(404)
data = dict(client)
data["scope"] = " ".join(data["scope"])
data["redirect_uris"] = data["redirect_uris"][0] if data["redirect_uris"] else ""
data["contacts"] = data["contacts"][0] if data["contacts"] else ""
data["post_logout_redirect_uris"] = (
data["post_logout_redirect_uris"][0]
if data["post_logout_redirect_uris"]
else ""
)
data["preconsent"] = client.preconsent
form = ClientAddForm(request.form or None, data=data, client=client)
if not request.form:
return render_template(
"oidc/admin/client_edit.html", form=form, client=client, menuitem="admin"
)
if not form.validate():
flash(
_("The client has not been edited. Please check your information."),
"error",
)
return render_template(
"oidc/admin/client_edit.html", form=form, client=client, menuitem="admin"
)
client.update(
client_name=form["client_name"].data,
contacts=[form["contacts"].data],
client_uri=form["client_uri"].data,
grant_types=form["grant_types"].data,
redirect_uris=[form["redirect_uris"].data],
post_logout_redirect_uris=[form["post_logout_redirect_uris"].data],
response_types=form["response_types"].data,
scope=form["scope"].data.split(" "),
token_endpoint_auth_method=form["token_endpoint_auth_method"].data,
logo_uri=form["logo_uri"].data,
tos_uri=form["tos_uri"].data,
policy_uri=form["policy_uri"].data,
software_id=form["software_id"].data,
software_version=form["software_version"].data,
jwk=form["jwk"].data,
jwks_uri=form["jwks_uri"].data,
audience=[Client.get(id=id) for id in form["audience"].data],
preconsent=form["preconsent"].data,
)
client.save()
flash(
_("The client has been edited."),
"success",
)
return redirect(url_for("oidc.clients.edit", client_id=client_id))
def client_delete(client_id):
client = Client.get(client_id)
if not client:
abort(404)
flash(
_("The client has been deleted."),
"success",
)
client.delete()
return redirect(url_for("oidc.clients.index"))