import datetime import ldap import os import toml import canaille.admin import canaille.admin.authorizations import canaille.admin.clients import canaille.admin.mail import canaille.admin.tokens import canaille.consents import canaille.configuration import canaille.oauth import canaille.account import canaille.groups import canaille.well_known from flask import Flask, g, request, session from flask_babel import Babel from flask_themer import Themer, render_template, FileSystemThemeLoader from logging.config import dictConfig from .flaskutils import current_user from .ldaputils import LDAPObject from .oauth2utils import config_oauth from .models import User, Group try: # pragma: no cover import sentry_sdk from sentry_sdk.integrations.flask import FlaskIntegration SENTRY = True except Exception: SENTRY = False def create_app(config=None): app = Flask(__name__) dir_path = os.path.dirname(os.path.realpath(__file__)) app.config.from_mapping( { "SESSION_COOKIE_NAME": "canaille", "OAUTH2_REFRESH_TOKEN_GENERATOR": True, } ) if config: app.config.from_mapping(config) elif "CONFIG" in os.environ: app.config.from_mapping(toml.load(os.environ.get("CONFIG"))) elif os.path.exists(os.path.join(dir_path, "conf", "config.toml")): app.config.from_mapping( toml.load(os.path.join(dir_path, "conf", "config.toml")) ) else: raise Exception( "No configuration file found. " "Either create conf/config.toml or set the 'CONFIG' variable environment." ) if os.environ.get("FLASK_ENV") == "development": canaille.configuration.setup_dev_keypair(app.config) canaille.configuration.validate(app.config) setup_app(app) return app def setup_ldap_connection(app): g.ldap = ldap.initialize(app.config["LDAP"]["URI"]) if app.config["LDAP"].get("TIMEOUT"): g.ldap.set_option(ldap.OPT_NETWORK_TIMEOUT, app.config["LDAP"]["TIMEOUT"]) g.ldap.simple_bind_s(app.config["LDAP"]["BIND_DN"], app.config["LDAP"]["BIND_PW"]) def teardown_ldap_connection(app): if "ldap" in g: g.ldap.unbind_s() def setup_logging(app): log_level = app.config.get("LOGGING", {}).get("LEVEL", "WARNING") if not app.config.get("LOGGING", {}).get("PATH"): handler = { "class": "logging.StreamHandler", "stream": "ext://flask.logging.wsgi_errors_stream", "formatter": "default", } else: handler = { "class": "logging.handlers.WatchedFileHandler", "filename": app.config["LOGGING"]["PATH"], "formatter": "default", } dictConfig( { "version": 1, "formatters": { "default": { "format": "[%(asctime)s] %(levelname)s in %(module)s: %(message)s", } }, "handlers": {"wsgi": handler}, "root": {"level": log_level, "handlers": ["wsgi"]}, } ) def setup_app(app): if SENTRY and app.config.get("SENTRY_DSN"): sentry_sdk.init(dsn=app.config["SENTRY_DSN"], integrations=[FlaskIntegration()]) try: setup_logging(app) LDAPObject.root_dn = app.config["LDAP"]["ROOT_DN"] user_base = app.config["LDAP"]["USER_BASE"] if user_base.endswith(app.config["LDAP"]["ROOT_DN"]): user_base = user_base[: -len(app.config["LDAP"]["ROOT_DN"]) - 1] User.base = user_base group_base = app.config["LDAP"].get("GROUP_BASE") Group.base = group_base app.url_map.strict_slashes = False config_oauth(app) app.register_blueprint(canaille.account.bp) app.register_blueprint(canaille.groups.bp, url_prefix="/groups") app.register_blueprint(canaille.oauth.bp, url_prefix="/oauth") app.register_blueprint(canaille.consents.bp, url_prefix="/consent") app.register_blueprint(canaille.well_known.bp, url_prefix="/.well-known") app.register_blueprint(canaille.admin.tokens.bp, url_prefix="/admin/token") app.register_blueprint( canaille.admin.authorizations.bp, url_prefix="/admin/authorization" ) app.register_blueprint(canaille.admin.clients.bp, url_prefix="/admin/client") app.register_blueprint(canaille.admin.mail.bp, url_prefix="/admin/mail") babel = Babel(app) additional_themes_dir = ( os.path.dirname(app.config["THEME"]) if app.config.get("THEME") and os.path.exists(app.config["THEME"]) else None ) themer = Themer( app, loaders=[FileSystemThemeLoader(additional_themes_dir)] if additional_themes_dir else None, ) @babel.localeselector def get_locale(): user = getattr(g, "user", None) if user is not None: return user.locale if app.config.get("LANGUAGE"): return app.config.get("LANGUAGE") return request.accept_languages.best_match(["fr_FR", "en_US"]) @babel.timezoneselector def get_timezone(): user = getattr(g, "user", None) if user is not None: return user.timezone @themer.current_theme_loader def get_current_theme(): # if config['THEME'] may be a theme name or an absolute path return app.config.get("THEME", "default").split("/")[-1] @app.before_request def before_request(): setup_ldap_connection(app) @app.after_request def after_request(response): teardown_ldap_connection(app) return response @app.before_request def make_session_permanent(): session.permanent = True app.permanent_session_lifetime = datetime.timedelta(days=365) @app.context_processor def global_processor(): return { "logo_url": app.config.get("LOGO"), "favicon_url": app.config.get("FAVICON", app.config.get("LOGO")), "website_name": app.config.get("NAME"), "user": current_user(), "menu": True, } @app.errorhandler(400) def bad_request(e): return render_template("error.html", error=400), 400 @app.errorhandler(403) def unauthorized(e): return render_template("error.html", error=403), 403 @app.errorhandler(404) def page_not_found(e): return render_template("error.html", error=404), 404 @app.errorhandler(500) def server_error(e): return render_template("error.html", error=500), 500 except Exception as exc: if SENTRY and app.config.get("SENTRY_DSN"): sentry_sdk.capture_exception(exc) raise