canaille-globuzma/canaille/__init__.py
2021-10-12 01:06:25 +02:00

220 lines
6.9 KiB
Python

import datetime
import ldap
import os
import toml
import canaille.admin
import canaille.admin.authorizations
import canaille.admin.clients
import canaille.admin.mail
import canaille.admin.tokens
import canaille.consents
import canaille.commands.clean
import canaille.oauth
import canaille.account
import canaille.groups
import canaille.well_known
from cryptography.hazmat.primitives import serialization as crypto_serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend as crypto_default_backend
from flask import Flask, g, request, render_template, session
from flask_babel import Babel
from .flaskutils import current_user
from .ldaputils import LDAPObject
from .oauth2utils import config_oauth
from .models import User, Token, AuthorizationCode, Client, Consent, Group
try: # pragma: no cover
import sentry_sdk
from sentry_sdk.integrations.flask import FlaskIntegration
SENTRY = True
except Exception:
SENTRY = False
def create_app(config=None):
app = Flask(__name__)
dir_path = os.path.dirname(os.path.realpath(__file__))
app.config.from_mapping(
{
"SESSION_COOKIE_NAME": "canaille",
"OAUTH2_REFRESH_TOKEN_GENERATOR": True,
}
)
if config:
app.config.from_mapping(config)
elif "CONFIG" in os.environ:
app.config.from_mapping(toml.load(os.environ.get("CONFIG")))
elif os.path.exists(os.path.join(dir_path, "conf", "config.toml")):
app.config.from_mapping(
toml.load(os.path.join(dir_path, "conf", "config.toml"))
)
else:
raise Exception(
"No configuration file found. "
"Either create conf/config.toml or set the 'CONFIG' variable environment."
)
setup_dev_keypair(app)
if not os.path.exists(app.config["JWT"]["PUBLIC_KEY"]):
raise Exception(f'Public key does not exist {app.config["JWT"]["PUBLIC_KEY"]}')
if not os.path.exists(app.config["JWT"]["PRIVATE_KEY"]):
raise Exception(
f'Private key does not exist {app.config["JWT"]["PRIVATE_KEY"]}'
)
setup_app(app)
return app
def setup_dev_keypair(app):
if not os.environ.get("FLASK_ENV") == "development":
return
if os.path.exists(app.config["JWT"]["PUBLIC_KEY"]) or os.path.exists(
app.config["JWT"]["PRIVATE_KEY"]
):
return
key = rsa.generate_private_key(
backend=crypto_default_backend(), public_exponent=65537, key_size=2048
)
private_key = key.private_bytes(
crypto_serialization.Encoding.PEM,
crypto_serialization.PrivateFormat.PKCS8,
crypto_serialization.NoEncryption(),
)
public_key = key.public_key().public_bytes(
crypto_serialization.Encoding.OpenSSH, crypto_serialization.PublicFormat.OpenSSH
)
with open(app.config["JWT"]["PUBLIC_KEY"], "wb") as fd:
fd.write(public_key)
with open(app.config["JWT"]["PRIVATE_KEY"], "wb") as fd:
fd.write(private_key)
def setup_ldap_tree(app):
conn = ldap.initialize(app.config["LDAP"]["URI"])
if app.config["LDAP"].get("TIMEOUT"):
conn.set_option(ldap.OPT_NETWORK_TIMEOUT, app.config["LDAP"]["TIMEOUT"])
conn.simple_bind_s(app.config["LDAP"]["BIND_DN"], app.config["LDAP"]["BIND_PW"])
Token.initialize(conn)
AuthorizationCode.initialize(conn)
Client.initialize(conn)
Consent.initialize(conn)
conn.unbind_s()
def setup_ldap(app):
g.ldap = ldap.initialize(app.config["LDAP"]["URI"])
g.ldap.simple_bind_s(app.config["LDAP"]["BIND_DN"], app.config["LDAP"]["BIND_PW"])
def teardown_ldap(app):
if "ldap" in g:
g.ldap.unbind_s()
def setup_app(app):
if SENTRY and app.config.get("SENTRY_DSN"):
sentry_sdk.init(dsn=app.config["SENTRY_DSN"], integrations=[FlaskIntegration()])
try:
LDAPObject.root_dn = app.config["LDAP"]["ROOT_DN"]
user_base = app.config["LDAP"]["USER_BASE"]
if user_base.endswith(app.config["LDAP"]["ROOT_DN"]):
user_base = user_base[: -len(app.config["LDAP"]["ROOT_DN"]) - 1]
User.base = user_base
group_base = app.config["LDAP"].get("GROUP_BASE")
Group.base = group_base
app.url_map.strict_slashes = False
config_oauth(app)
setup_ldap_tree(app)
app.register_blueprint(canaille.account.bp)
app.register_blueprint(canaille.groups.bp, url_prefix="/groups")
app.register_blueprint(canaille.oauth.bp, url_prefix="/oauth")
app.register_blueprint(canaille.commands.clean.bp)
app.register_blueprint(canaille.consents.bp, url_prefix="/consent")
app.register_blueprint(canaille.well_known.bp, url_prefix="/.well-known")
app.register_blueprint(canaille.admin.tokens.bp, url_prefix="/admin/token")
app.register_blueprint(
canaille.admin.authorizations.bp, url_prefix="/admin/authorization"
)
app.register_blueprint(canaille.admin.clients.bp, url_prefix="/admin/client")
app.register_blueprint(canaille.admin.mail.bp, url_prefix="/admin/mail")
babel = Babel(app)
@babel.localeselector
def get_locale():
user = getattr(g, "user", None)
if user is not None:
return user.locale
if app.config.get("LANGUAGE"):
return app.config.get("LANGUAGE")
return request.accept_languages.best_match(["fr_FR", "en_US"])
@babel.timezoneselector
def get_timezone():
user = getattr(g, "user", None)
if user is not None:
return user.timezone
@app.before_request
def before_request():
setup_ldap(app)
@app.after_request
def after_request(response):
teardown_ldap(app)
return response
@app.before_request
def make_session_permanent():
session.permanent = True
app.permanent_session_lifetime = datetime.timedelta(days=365)
@app.context_processor
def global_processor():
return {
"logo_url": app.config.get("LOGO"),
"favicon_url": app.config.get("FAVICON", app.config.get("LOGO")),
"website_name": app.config.get("NAME"),
"user": current_user(),
"menu": True,
}
@app.errorhandler(400)
def bad_request(e):
return render_template("error.html", error=400), 400
@app.errorhandler(403)
def unauthorized(e):
return render_template("error.html", error=403), 403
@app.errorhandler(404)
def page_not_found(e):
return render_template("error.html", error=404), 404
@app.errorhandler(500)
def server_error(e):
return render_template("error.html", error=500), 500
except Exception as exc:
if SENTRY and app.config.get("SENTRY_DSN"):
sentry_sdk.capture_exception(exc)
raise