import logging import os import uuid from contextlib import contextmanager import ldap.modlist import ldif from flask import current_app from ldap.controls import DecodeControlTuples from ldap.controls.ppolicy import PasswordPolicyControl from ldap.controls.ppolicy import PasswordPolicyError from ldap.controls.readentry import PostReadControl from canaille.app import models from canaille.app.configuration import ConfigurationException from canaille.app.i18n import gettext as _ from canaille.backends import Backend from canaille.backends import get_lockout_delay_message from .utils import listify from .utils import python_attrs_to_ldap @contextmanager def ldap_connection(config): conn = ldap.initialize(config["CANAILLE_LDAP"]["URI"]) conn.set_option(ldap.OPT_NETWORK_TIMEOUT, config["CANAILLE_LDAP"]["TIMEOUT"]) conn.simple_bind_s( config["CANAILLE_LDAP"]["BIND_DN"], config["CANAILLE_LDAP"]["BIND_PW"] ) try: yield conn finally: conn.unbind_s() def install_schema(config, schema_path): from canaille.app.installation import InstallationException with open(schema_path) as fd: parser = ldif.LDIFRecordList(fd) parser.parse() try: with ldap_connection(config) as conn: for dn, entry in parser.all_records: add_modlist = ldap.modlist.addModlist(entry) conn.add_s(dn, add_modlist) except ldap.INSUFFICIENT_ACCESS as exc: raise InstallationException( f"The user '{config['CANAILLE_LDAP']['BIND_DN']}' has insufficient permissions to install LDAP schemas." ) from exc class LDAPBackend(Backend): def __init__(self, config): super().__init__(config) self._connection = None setup_ldap_models(config) @classmethod def install(cls, config): cls.setup_schemas(config) with cls(config).session(): models.Token.install() models.AuthorizationCode.install() models.Client.install() models.Consent.install() @classmethod def setup_schemas(cls, config): from .ldapobject import LDAPObject with cls(config).session(): if "oauthClient" not in LDAPObject.ldap_object_classes(force=True): install_schema( config, os.path.dirname(__file__) + "/schemas/oauth2-openldap.ldif", ) @property def connection(self): if self._connection: return self._connection try: self._connection = ldap.initialize(self.config["CANAILLE_LDAP"]["URI"]) self._connection.set_option( ldap.OPT_NETWORK_TIMEOUT, self.config["CANAILLE_LDAP"]["TIMEOUT"], ) self._connection.simple_bind_s( self.config["CANAILLE_LDAP"]["BIND_DN"], self.config["CANAILLE_LDAP"]["BIND_PW"], ) except ldap.SERVER_DOWN as exc: message = _("Could not connect to the LDAP server '{uri}'").format( uri=self.config["CANAILLE_LDAP"]["URI"] ) logging.error(message) raise ConfigurationException(message) from exc except ldap.INVALID_CREDENTIALS as exc: message = _("LDAP authentication failed with user '{user}'").format( user=self.config["CANAILLE_LDAP"]["BIND_DN"] ) logging.error(message) raise ConfigurationException(message) from exc return self._connection def teardown(self): if self._connection: # pragma: no branch self._connection.unbind_s() self._connection = None @classmethod def validate(cls, config): from canaille.app import models with cls(config).session(): try: user = models.User( formatted_name=f"canaille_{uuid.uuid4()}", family_name=f"canaille_{uuid.uuid4()}", user_name=f"canaille_{uuid.uuid4()}", emails=f"canaille_{uuid.uuid4()}@mydomain.example", password="correct horse battery staple", ) Backend.instance.save(user) Backend.instance.delete(user) except ldap.INSUFFICIENT_ACCESS as exc: raise ConfigurationException( f'LDAP user \'{config["CANAILLE_LDAP"]["BIND_DN"]}\' cannot create ' f'users at \'{config["CANAILLE_LDAP"]["USER_BASE"]}\'' ) from exc try: models.Group.ldap_object_classes() user = models.User( cn=f"canaille_{uuid.uuid4()}", family_name=f"canaille_{uuid.uuid4()}", user_name=f"canaille_{uuid.uuid4()}", emails=f"canaille_{uuid.uuid4()}@mydomain.example", password="correct horse battery staple", ) Backend.instance.save(user) group = models.Group( display_name=f"canaille_{uuid.uuid4()}", members=[user], ) Backend.instance.save(group) Backend.instance.delete(group) except ldap.INSUFFICIENT_ACCESS as exc: raise ConfigurationException( f'LDAP user \'{config["CANAILLE_LDAP"]["BIND_DN"]}\' cannot create ' f'groups at \'{config["CANAILLE_LDAP"]["GROUP_BASE"]}\'' ) from exc finally: Backend.instance.delete(user) @classmethod def login_placeholder(cls): user_filter = current_app.config["CANAILLE_LDAP"]["USER_FILTER"] placeholders = [] if "cn={{login" in user_filter.replace(" ", ""): placeholders.append(_("John Doe")) if "uid={{login" in user_filter.replace(" ", ""): placeholders.append(_("jdoe")) if "mail={{login" in user_filter.replace(" ", "") or not placeholders: placeholders.append(_("john.doe@example.com")) return _(" or ").join(placeholders) def has_account_lockability(self): from .ldapobject import LDAPObject try: return "pwdEndTime" in LDAPObject.ldap_object_attributes() except ldap.SERVER_DOWN: # pragma: no cover return False def get_user_from_login(self, login=None): from .models import User raw_filter = current_app.config["CANAILLE_LDAP"]["USER_FILTER"] filter = ( ( current_app.jinja_env.from_string(raw_filter).render( login=ldap.filter.escape_filter_chars(login) ) ) if login else None ) return self.get(User, filter=filter) def check_user_password(self, user, password): if current_app.features.has_intruder_lockout: if current_lockout_delay := user.get_intruder_lockout_delay(): self.save(user) return (False, get_lockout_delay_message(current_lockout_delay)) conn = ldap.initialize(current_app.config["CANAILLE_LDAP"]["URI"]) conn.set_option( ldap.OPT_NETWORK_TIMEOUT, current_app.config["CANAILLE_LDAP"]["TIMEOUT"], ) message = None try: res = conn.simple_bind_s( user.dn, password, serverctrls=[PasswordPolicyControl()] ) controls = res[3] result = True except ldap.INVALID_CREDENTIALS as exc: controls = DecodeControlTuples(exc.args[0]["ctrls"]) result = False finally: conn.unbind_s() for control in controls: def gettext(x): return x if ( control.controlType == PasswordPolicyControl.controlType and control.error == PasswordPolicyError.namedValues["accountLocked"] ): message = gettext("Your account has been locked.") elif ( control.controlType == PasswordPolicyControl.controlType and control.error == PasswordPolicyError.namedValues["changeAfterReset"] ): message = gettext("You should change your password.") return result, message def set_user_password(self, user, password): conn = self.connection conn.passwd_s( user.dn, None, password.encode("utf-8"), ) def query(self, model, dn=None, filter=None, **kwargs): from .ldapobjectquery import LDAPObjectQuery base = dn if dn is None: base = f"{model.base},{model.root_dn}" elif "=" not in base: base = ldap.dn.escape_dn_chars(base) base = f"{model.rdn_attribute}={base},{model.base},{model.root_dn}" class_filter = ( "".join([f"(objectClass={oc})" for oc in model.ldap_object_class]) if model.ldap_object_class else "" ) if class_filter: class_filter = f"(|{class_filter})" arg_filter = "" ldap_args = python_attrs_to_ldap( { model.python_attribute_to_ldap(name): values for name, values in kwargs.items() if values is not None }, encode=False, ) for key, value in ldap_args.items(): if len(value) == 1: escaped_value = ldap.filter.escape_filter_chars(value[0]) arg_filter += f"({key}={escaped_value})" else: values = [ldap.filter.escape_filter_chars(v) for v in value] arg_filter += ( "(|" + "".join([f"({key}={value})" for value in values]) + ")" ) if not filter: filter = "" ldapfilter = f"(&{class_filter}{arg_filter}{filter})" base = base or f"{model.base},{model.root_dn}" try: result = self.connection.search_s( base, ldap.SCOPE_SUBTREE, ldapfilter or None, ["+", "*"] ) except ldap.NO_SUCH_OBJECT: result = [] return LDAPObjectQuery(model, result) def fuzzy(self, model, query, attributes=None, **kwargs): query = ldap.filter.escape_filter_chars(query) attributes = attributes or model.may() + model.must() attributes = [model.python_attribute_to_ldap(name) for name in attributes] filter = ( "(|" + "".join(f"({attribute}=*{query}*)" for attribute in attributes) + ")" ) return self.query(model, filter=filter, **kwargs) def get(self, model, identifier=None, /, **kwargs): try: return self.query(model, identifier, **kwargs)[0] except (IndexError, ldap.NO_SUCH_OBJECT): if identifier and model.base: return ( self.get(model, **{model.identifier_attribute: identifier}) or self.get(model, id=identifier) or None ) return None def save(self, instance): # run the instance save callback if existing save_callback = instance.save() if hasattr(instance, "save") else iter([]) next(save_callback, None) current_object_classes = instance.get_ldap_attribute("objectClass") or [] instance.set_ldap_attribute( "objectClass", list(set(instance.ldap_object_class) | set(current_object_classes)), ) # PostReadControl allows to read the updated object attributes on creation/edition attributes = ["objectClass"] + [ instance.python_attribute_to_ldap(name) for name in instance.attributes ] read_post_control = PostReadControl(criticality=True, attrList=attributes) # Object already exists in the LDAP database if instance.exists: deletions = [ name for name, value in instance.changes.items() if ( value is None or value == [] or (isinstance(value, list) and len(value) == 1 and not value[0]) ) and name in instance.state ] changes = { name: value for name, value in instance.changes.items() if name not in deletions and instance.state.get(name) != value } formatted_changes = python_attrs_to_ldap(changes, null_allowed=False) modlist = [(ldap.MOD_DELETE, name, None) for name in deletions] + [ (ldap.MOD_REPLACE, name, values) for name, values in formatted_changes.items() ] _, _, _, [result] = self.connection.modify_ext_s( instance.dn, modlist, serverctrls=[read_post_control] ) # Object does not exist yet in the LDAP database else: changes = { name: value for name, value in {**instance.state, **instance.changes}.items() if value and value[0] } formatted_changes = python_attrs_to_ldap(changes, null_allowed=False) modlist = [(name, values) for name, values in formatted_changes.items()] _, _, _, [result] = self.connection.add_ext_s( instance.dn, modlist, serverctrls=[read_post_control] ) instance.exists = True instance.state = {**result.entry, **instance.changes} instance.changes = {} # run the instance save callback again if existing next(save_callback, None) def delete(self, instance): # run the instance delete callback if existing save_callback = instance.delete() if hasattr(instance, "delete") else iter([]) next(save_callback, None) try: self.connection.delete_s(instance.dn) except ldap.NO_SUCH_OBJECT: pass # run the instance delete callback again if existing next(save_callback, None) def reload(self, instance): # run the instance reload callback if existing reload_callback = instance.reload() if hasattr(instance, "reload") else iter([]) next(reload_callback, None) result = self.connection.search_s( instance.dn, ldap.SCOPE_SUBTREE, None, ["+", "*"] ) instance.changes = {} instance.state = result[0][1] # run the instance reload callback again if existing next(reload_callback, None) def setup_ldap_models(config): from canaille.app import models from .ldapobject import LDAPObject LDAPObject.root_dn = config["CANAILLE_LDAP"]["ROOT_DN"] user_base = config["CANAILLE_LDAP"]["USER_BASE"].replace( f',{config["CANAILLE_LDAP"]["ROOT_DN"]}', "" ) models.User.base = user_base models.User.rdn_attribute = config["CANAILLE_LDAP"]["USER_RDN"] object_class = config["CANAILLE_LDAP"]["USER_CLASS"] models.User.ldap_object_class = listify(object_class) group_base = config["CANAILLE_LDAP"]["GROUP_BASE"].replace( f',{config["CANAILLE_LDAP"]["ROOT_DN"]}', "" ) models.Group.base = group_base or None models.Group.rdn_attribute = config["CANAILLE_LDAP"]["GROUP_RDN"] object_class = config["CANAILLE_LDAP"]["GROUP_CLASS"] models.Group.ldap_object_class = listify(object_class)