canaille-globuzma/canaille/app/flask.py

128 lines
3.3 KiB
Python
Raw Normal View History

import logging
2020-08-19 14:20:57 +00:00
from functools import wraps
from urllib.parse import urlsplit
from urllib.parse import urlunsplit
2021-12-20 22:57:27 +00:00
from canaille.app import models
from canaille.app.themes import render_template
2021-12-20 22:57:27 +00:00
from flask import abort
from flask import current_app
2023-08-13 20:08:28 +00:00
from flask import g
from flask import request
2021-12-20 22:57:27 +00:00
from flask import session
from flask_babel import gettext as _
2023-06-28 15:56:49 +00:00
from werkzeug.routing import BaseConverter
2020-08-19 14:20:57 +00:00
def current_user():
2023-08-13 20:08:28 +00:00
if "user" in g:
return g.user
2023-02-05 18:08:25 +00:00
for user_id in session.get("user_id", [])[::-1]:
user = models.User.get(id=user_id)
2022-11-01 11:25:21 +00:00
if user and (
not current_app.backend.has_account_lockability() or not user.locked
):
2023-08-13 20:08:28 +00:00
g.user = user
return g.user
2023-02-05 18:08:25 +00:00
session["user_id"].remove(user_id)
2022-12-10 19:47:47 +00:00
return None
2020-08-19 14:20:57 +00:00
def user_needed():
def wrapper(view_function):
@wraps(view_function)
def decorator(*args, **kwargs):
2020-08-27 08:50:50 +00:00
user = current_user()
if not user:
2020-08-19 14:20:57 +00:00
abort(403)
2020-08-27 08:50:50 +00:00
return view_function(*args, user=user, **kwargs)
2020-08-19 14:20:57 +00:00
return decorator
return wrapper
2021-12-02 17:23:14 +00:00
def permissions_needed(*args):
permissions = set(args)
2020-11-02 11:13:03 +00:00
2020-08-19 14:20:57 +00:00
def wrapper(view_function):
@wraps(view_function)
def decorator(*args, **kwargs):
user = current_user()
2021-12-02 17:23:14 +00:00
if not user or not permissions.issubset(user.permissions):
2020-08-19 14:20:57 +00:00
abort(403)
2020-10-29 10:09:31 +00:00
return view_function(*args, user=user, **kwargs)
2020-08-19 14:20:57 +00:00
return decorator
return wrapper
def smtp_needed():
def wrapper(view_function):
@wraps(view_function)
def decorator(*args, **kwargs):
if "SMTP" in current_app.config:
return view_function(*args, **kwargs)
message = _("No SMTP server has been configured")
logging.warning(message)
return (
render_template(
"error.html",
error_code=500,
icon="tools",
description=message,
),
500,
)
return decorator
return wrapper
def set_parameter_in_url_query(url, **kwargs):
split = list(urlsplit(url))
pairs = split[3].split("&")
parameters = {pair.split("=")[0]: pair.split("=")[1] for pair in pairs if pair}
parameters = {**parameters, **kwargs}
split[3] = "&".join(f"{key}={value}" for key, value in parameters.items())
return urlunsplit(split)
2023-03-30 21:14:39 +00:00
def request_is_htmx():
return request.headers.get("HX-Request", False) and not request.headers.get(
"HX-Boosted", False
)
2023-03-30 21:14:39 +00:00
def render_htmx_template(template, htmx_template=None, **kwargs):
template = (
2023-03-30 21:14:39 +00:00
(htmx_template or f"partial/{template}") if request_is_htmx() else template
)
return render_template(template, **kwargs)
2023-06-28 15:56:49 +00:00
def model_converter(model):
class ModelConverter(BaseConverter):
2023-06-29 10:15:12 +00:00
def __init__(self, *args, required=True, **kwargs):
self.required = required
super().__init__(self, *args, **kwargs)
2023-06-28 15:56:49 +00:00
def to_url(self, instance):
return instance.identifier
def to_python(self, identifier):
current_app.backend.setup()
instance = model.get(identifier)
2023-06-29 10:15:12 +00:00
if self.required and not instance:
2023-06-28 15:56:49 +00:00
abort(404)
return instance
return ModelConverter