canaille-globuzma/canaille/backends/ldap/backend.py

223 lines
7.1 KiB
Python
Raw Normal View History

import logging
import os
import uuid
from contextlib import contextmanager
import ldap.modlist
import ldif
from flask import current_app
from canaille.app import models
from canaille.app.configuration import ConfigurationException
from canaille.app.i18n import gettext as _
2023-06-05 16:10:37 +00:00
from canaille.backends import BaseBackend
2023-06-22 11:09:44 +00:00
from .utils import listify
@contextmanager
def ldap_connection(config):
conn = ldap.initialize(config["BACKENDS"]["LDAP"]["URI"])
conn.set_option(ldap.OPT_NETWORK_TIMEOUT, config["BACKENDS"]["LDAP"].get("TIMEOUT"))
conn.simple_bind_s(
config["BACKENDS"]["LDAP"]["BIND_DN"], config["BACKENDS"]["LDAP"]["BIND_PW"]
)
try:
yield conn
finally:
conn.unbind_s()
def install_schema(config, schema_path):
from canaille.app.installation import InstallationException
with open(schema_path) as fd:
parser = ldif.LDIFRecordList(fd)
parser.parse()
try:
with ldap_connection(config) as conn:
for dn, entry in parser.all_records:
add_modlist = ldap.modlist.addModlist(entry)
conn.add_s(dn, add_modlist)
except ldap.INSUFFICIENT_ACCESS as exc:
raise InstallationException(
f"The user '{config['BACKENDS']['LDAP']['BIND_DN']}' has insufficient permissions to install LDAP schemas."
) from exc
2023-06-05 16:10:37 +00:00
class Backend(BaseBackend):
def __init__(self, config):
super().__init__(config)
self._connection = None
setup_ldap_models(config)
@classmethod
2023-12-27 09:57:22 +00:00
def install(cls, config):
cls.setup_schemas(config)
with cls(config).session():
models.Token.install()
models.AuthorizationCode.install()
models.Client.install()
models.Consent.install()
@classmethod
def setup_schemas(cls, config):
from .ldapobject import LDAPObject
with cls(config).session():
if "oauthClient" not in LDAPObject.ldap_object_classes(force=True):
install_schema(
config,
os.path.dirname(__file__) + "/schemas/oauth2-openldap.ldif",
)
2023-04-08 18:42:38 +00:00
@property
def connection(self):
if self._connection:
return self._connection
2023-04-08 18:42:38 +00:00
try:
self._connection = ldap.initialize(self.config["BACKENDS"]["LDAP"]["URI"])
self._connection.set_option(
2023-04-08 18:42:38 +00:00
ldap.OPT_NETWORK_TIMEOUT,
self.config["BACKENDS"]["LDAP"].get("TIMEOUT"),
2023-04-08 18:42:38 +00:00
)
self._connection.simple_bind_s(
self.config["BACKENDS"]["LDAP"]["BIND_DN"],
self.config["BACKENDS"]["LDAP"]["BIND_PW"],
2023-04-08 18:42:38 +00:00
)
except ldap.SERVER_DOWN as exc:
2023-04-08 18:42:38 +00:00
message = _("Could not connect to the LDAP server '{uri}'").format(
uri=self.config["BACKENDS"]["LDAP"]["URI"]
2023-04-08 18:42:38 +00:00
)
logging.error(message)
raise ConfigurationException(message) from exc
2023-04-08 18:42:38 +00:00
except ldap.INVALID_CREDENTIALS as exc:
2023-04-08 18:42:38 +00:00
message = _("LDAP authentication failed with user '{user}'").format(
user=self.config["BACKENDS"]["LDAP"]["BIND_DN"]
2023-04-08 18:42:38 +00:00
)
logging.error(message)
raise ConfigurationException(message) from exc
2023-04-08 18:42:38 +00:00
return self._connection
2023-06-03 11:42:23 +00:00
def teardown(self):
if self._connection: # pragma: no branch
self._connection.unbind_s()
self._connection = None
2023-04-08 18:42:38 +00:00
@classmethod
def validate(cls, config):
from canaille.app import models
2023-04-08 18:42:38 +00:00
with cls(config).session():
try:
user = models.User(
formatted_name=f"canaille_{uuid.uuid4()}",
family_name=f"canaille_{uuid.uuid4()}",
user_name=f"canaille_{uuid.uuid4()}",
emails=f"canaille_{uuid.uuid4()}@mydomain.tld",
password="correct horse battery staple",
)
user.save()
user.delete()
except ldap.INSUFFICIENT_ACCESS as exc:
raise ConfigurationException(
f'LDAP user \'{config["BACKENDS"]["LDAP"]["BIND_DN"]}\' cannot create '
f'users at \'{config["BACKENDS"]["LDAP"]["USER_BASE"]}\''
) from exc
try:
models.Group.ldap_object_classes()
user = models.User(
cn=f"canaille_{uuid.uuid4()}",
family_name=f"canaille_{uuid.uuid4()}",
user_name=f"canaille_{uuid.uuid4()}",
emails=f"canaille_{uuid.uuid4()}@mydomain.tld",
password="correct horse battery staple",
)
user.save()
2023-04-08 18:42:38 +00:00
group = models.Group(
display_name=f"canaille_{uuid.uuid4()}",
members=[user],
)
group.save()
group.delete()
2023-04-08 18:42:38 +00:00
except ldap.INSUFFICIENT_ACCESS as exc:
raise ConfigurationException(
f'LDAP user \'{config["BACKENDS"]["LDAP"]["BIND_DN"]}\' cannot create '
f'groups at \'{config["BACKENDS"]["LDAP"]["GROUP_BASE"]}\''
) from exc
2023-04-08 18:42:38 +00:00
finally:
user.delete()
2023-04-08 18:42:38 +00:00
@classmethod
def login_placeholder(cls):
user_filter = current_app.config["BACKENDS"]["LDAP"].get(
"USER_FILTER", models.User.DEFAULT_FILTER
)
placeholders = []
2023-07-04 16:34:16 +00:00
if "cn={{login" in user_filter.replace(" ", ""):
placeholders.append(_("John Doe"))
2023-07-04 16:34:16 +00:00
if "uid={{login" in user_filter.replace(" ", ""):
placeholders.append(_("jdoe"))
2023-07-04 16:34:16 +00:00
if "mail={{login" in user_filter.replace(" ", "") or not placeholders:
placeholders.append(_("john@doe.com"))
return _(" or ").join(placeholders)
2022-11-01 11:25:21 +00:00
def has_account_lockability(self):
from .ldapobject import LDAPObject
try:
return "pwdEndTime" in LDAPObject.ldap_object_attributes()
2023-11-16 17:06:23 +00:00
except ldap.SERVER_DOWN: # pragma: no cover
return False
2022-11-01 11:25:21 +00:00
2023-04-08 18:42:38 +00:00
def setup_ldap_models(config):
from canaille.app import models
from .ldapobject import LDAPObject
LDAPObject.root_dn = config["BACKENDS"]["LDAP"]["ROOT_DN"]
user_base = config["BACKENDS"]["LDAP"]["USER_BASE"].replace(
f',{config["BACKENDS"]["LDAP"]["ROOT_DN"]}', ""
)
models.User.base = user_base
models.User.rdn_attribute = config["BACKENDS"]["LDAP"].get(
"USER_RDN", models.User.DEFAULT_RDN
)
object_class = config["BACKENDS"]["LDAP"].get(
"USER_CLASS", models.User.DEFAULT_OBJECT_CLASS
)
2023-06-22 11:09:44 +00:00
models.User.ldap_object_class = listify(object_class)
2022-12-13 18:04:33 +00:00
group_base = (
config["BACKENDS"]["LDAP"]
.get("GROUP_BASE", "")
.replace(f',{config["BACKENDS"]["LDAP"]["ROOT_DN"]}', "")
2022-12-13 18:04:33 +00:00
)
models.Group.base = group_base or None
models.Group.rdn_attribute = config["BACKENDS"]["LDAP"].get(
"GROUP_RDN", models.Group.DEFAULT_RDN
)
object_class = config["BACKENDS"]["LDAP"].get(
"GROUP_CLASS", models.Group.DEFAULT_OBJECT_CLASS
)
2023-06-22 11:09:44 +00:00
models.Group.ldap_object_class = listify(object_class)