canaille-globuzma/canaille/__init__.py
2021-12-26 19:59:20 +01:00

285 lines
8.3 KiB
Python

import datetime
import logging
import os
from logging.config import dictConfig
import canaille.account
import canaille.admin
import canaille.configuration
import canaille.consents
import canaille.groups
import canaille.installation
import canaille.oauth
import canaille.well_known
import ldap
import toml
from flask import Flask
from flask import g
from flask import request
from flask import session
from flask_babel import Babel
from flask_babel import gettext as _
from flask_themer import FileSystemThemeLoader
from flask_themer import render_template
from flask_themer import Themer
from .flaskutils import current_user
from .ldaputils import LDAPObject
from .models import Group
from .models import User
from .oauth2utils import setup_oauth
try: # pragma: no cover
import sentry_sdk
from sentry_sdk.integrations.flask import FlaskIntegration
SENTRY = True
except Exception:
SENTRY = False
def setup_config(app, config=None, validate=True):
dir_path = os.path.dirname(os.path.realpath(__file__))
app.config.from_mapping(
{
"SESSION_COOKIE_NAME": "canaille",
"OAUTH2_REFRESH_TOKEN_GENERATOR": True,
}
)
if config:
app.config.from_mapping(config)
elif "CONFIG" in os.environ:
app.config.from_mapping(toml.load(os.environ.get("CONFIG")))
elif os.path.exists(os.path.join(dir_path, "conf", "config.toml")):
app.config.from_mapping(
toml.load(os.path.join(dir_path, "conf", "config.toml"))
)
else:
raise Exception(
"No configuration file found. "
"Either create conf/config.toml or set the 'CONFIG' variable environment."
)
if os.environ.get("FLASK_ENV") == "development":
canaille.installation.setup_keypair(app.config)
if validate:
canaille.configuration.validate(app.config)
def setup_logging(app):
log_level = app.config.get("LOGGING", {}).get("LEVEL", "WARNING")
if not app.config.get("LOGGING", {}).get("PATH"):
handler = {
"class": "logging.StreamHandler",
"stream": "ext://flask.logging.wsgi_errors_stream",
"formatter": "default",
}
else:
handler = {
"class": "logging.handlers.WatchedFileHandler",
"filename": app.config["LOGGING"]["PATH"],
"formatter": "default",
}
dictConfig(
{
"version": 1,
"formatters": {
"default": {
"format": "[%(asctime)s] %(levelname)s in %(module)s: %(message)s",
}
},
"handlers": {"wsgi": handler},
"root": {"level": log_level, "handlers": ["wsgi"]},
}
)
def setup_ldap_models(app):
LDAPObject.root_dn = app.config["LDAP"]["ROOT_DN"]
user_base = app.config["LDAP"]["USER_BASE"]
if user_base.endswith(app.config["LDAP"]["ROOT_DN"]):
user_base = user_base[: -len(app.config["LDAP"]["ROOT_DN"]) - 1]
User.base = user_base
User.id = app.config["LDAP"].get("USER_ID_ATTRIBUTE", User.DEFAULT_ID_ATTRIBUTE)
group_base = app.config["LDAP"].get("GROUP_BASE")
if group_base.endswith(app.config["LDAP"]["ROOT_DN"]):
group_base = group_base[: -len(app.config["LDAP"]["ROOT_DN"]) - 1]
Group.base = group_base
Group.id = app.config["LDAP"].get("GROUP_ID_ATTRIBTUE", Group.DEFAULT_ID_ATTRIBUTE)
def setup_ldap_connection(app):
try: # pragma: no-cover
if request.endpoint == "static":
return
except RuntimeError:
pass
try:
g.ldap = ldap.initialize(app.config["LDAP"]["URI"])
if app.config["LDAP"].get("TIMEOUT"):
g.ldap.set_option(ldap.OPT_NETWORK_TIMEOUT, app.config["LDAP"]["TIMEOUT"])
g.ldap.simple_bind_s(
app.config["LDAP"]["BIND_DN"], app.config["LDAP"]["BIND_PW"]
)
except ldap.SERVER_DOWN:
message = _("Could not connect to the LDAP server '{uri}'").format(
uri=app.config["LDAP"]["URI"]
)
logging.error(message)
return (
render_template(
"error.html",
error=500,
icon="database",
debug=app.config.get("DEBUG", False),
description=message,
),
500,
)
except ldap.INVALID_CREDENTIALS:
message = _("LDAP authentication failed with user '{user}'").format(
user=app.config["LDAP"]["BIND_DN"]
)
logging.error(message)
return (
render_template(
"error.html",
error=500,
icon="key",
debug=app.config.get("DEBUG", False),
description=message,
),
500,
)
def teardown_ldap_connection(app):
if "ldap" in g:
g.ldap.unbind_s()
def setup_jinja(app):
app.jinja_env.filters["len"] = len
def setup_babel(app):
babel = Babel(app)
@babel.localeselector
def get_locale():
user = getattr(g, "user", None)
if user is not None:
return user.locale
if app.config.get("LANGUAGE"):
return app.config.get("LANGUAGE")
return request.accept_languages.best_match(["fr_FR", "en_US"])
@babel.timezoneselector
def get_timezone():
user = getattr(g, "user", None)
if user is not None:
return user.timezone
def setup_themer(app):
additional_themes_dir = (
os.path.dirname(app.config["THEME"])
if app.config.get("THEME") and os.path.exists(app.config["THEME"])
else None
)
themer = Themer(
app,
loaders=[FileSystemThemeLoader(additional_themes_dir)]
if additional_themes_dir
else None,
)
@themer.current_theme_loader
def get_current_theme():
# if config['THEME'] may be a theme name or an absolute path
return app.config.get("THEME", "default").split("/")[-1]
def setup_blueprints(app):
app.url_map.strict_slashes = False
app.register_blueprint(canaille.account.bp)
app.register_blueprint(canaille.admin.bp, url_prefix="/admin")
app.register_blueprint(canaille.groups.bp, url_prefix="/groups")
app.register_blueprint(canaille.oauth.bp, url_prefix="/oauth")
app.register_blueprint(canaille.consents.bp, url_prefix="/consent")
app.register_blueprint(canaille.well_known.bp, url_prefix="/.well-known")
def create_app(config=None, validate=True):
app = Flask(__name__)
setup_config(app, config, validate)
if SENTRY and app.config.get("SENTRY_DSN"):
sentry_sdk.init(dsn=app.config["SENTRY_DSN"], integrations=[FlaskIntegration()])
try:
setup_logging(app)
setup_ldap_models(app)
setup_oauth(app)
setup_blueprints(app)
setup_jinja(app)
setup_babel(app)
setup_themer(app)
@app.before_request
def before_request():
return setup_ldap_connection(app)
@app.after_request
def after_request(response):
teardown_ldap_connection(app)
return response
@app.before_request
def make_session_permanent():
session.permanent = True
app.permanent_session_lifetime = datetime.timedelta(days=365)
@app.context_processor
def global_processor():
return {
"has_smtp": "SMTP" in app.config,
"logo_url": app.config.get("LOGO"),
"favicon_url": app.config.get("FAVICON", app.config.get("LOGO")),
"website_name": app.config.get("NAME", "Canaille"),
"user": current_user(),
"menu": True,
}
@app.errorhandler(400)
def bad_request(e):
return render_template("error.html", error=400), 400
@app.errorhandler(403)
def unauthorized(e):
return render_template("error.html", error=403), 403
@app.errorhandler(404)
def page_not_found(e):
return render_template("error.html", error=404), 404
@app.errorhandler(500)
def server_error(e):
return render_template("error.html", error=500), 500
except Exception as exc:
if SENTRY and app.config.get("SENTRY_DSN"):
sentry_sdk.capture_exception(exc)
raise
return app