import ldap from flask import g class LDAPObjectHelper: _object_class_by_name = None _attribute_type_by_name = None may = None must = None base = None root_dn = None id = None def __init__(self, dn=None, **kwargs): self.attrs = {} self.changes = {} for k, v in kwargs.items(): self.attrs[k] = [v] if not isinstance(v, list) else v self.attrs.setdefault("objectClass", self.objectClass) by_name = self.ocs_by_name() ocs = [by_name[name] for name in self.objectClass] self.may = [] self.must = [] for oc in ocs: self.may.extend(oc.may) self.must.extend(oc.must) def __repr__(self): return "<{} {}={}>".format( self.__class__.__name__, self.id, getattr(self, self.id) ) @classmethod def ldap(cls): return g.ldap def keys(self): return self.must + self.may def __getitem__(self, item): return getattr(self, item) def update(self, **kwargs): for k, v in kwargs.items(): self.__setattr__(k, v) def delete(self): self.ldap().delete_s(self.dn) @property def dn(self): if not self.id in self.attrs: return None return f"{self.id}={self.attrs[self.id][0]},{self.base},{self.root_dn}" @classmethod def initialize(cls, conn=None): conn = conn or cls.ldap() cls.ocs_by_name(conn) cls.attr_type_by_name(conn) dn = f"{cls.base},{cls.root_dn}" conn.add_s( dn, [ ("objectClass", [b"organizationalUnit"]), ("ou", [cls.base.encode("utf-8")]), ], ) @classmethod def ocs_by_name(cls, conn=None): if cls._object_class_by_name: return cls._object_class_by_name conn = conn or cls.ldap() res = conn.search_s( "cn=subschema", ldap.SCOPE_BASE, "(objectclass=*)", ["*", "+"] ) subschema_entry = res[0] subschema_subentry = ldap.cidict.cidict(subschema_entry[1]) subschema = ldap.schema.SubSchema(subschema_subentry) object_class_oids = subschema.listall(ldap.schema.models.ObjectClass) cls._object_class_by_name = {} for oid in object_class_oids: oc = subschema.get_obj(ldap.schema.models.ObjectClass, oid) for name in oc.names: cls._object_class_by_name[name] = oc return cls._object_class_by_name @classmethod def attr_type_by_name(cls, conn=None): if cls._attribute_type_by_name: return cls._attribute_type_by_name conn = conn or cls.ldap() res = conn.search_s( "cn=subschema", ldap.SCOPE_BASE, "(objectclass=*)", ["*", "+"] ) subschema_entry = res[0] subschema_subentry = ldap.cidict.cidict(subschema_entry[1]) subschema = ldap.schema.SubSchema(subschema_subentry) attribute_type_oids = subschema.listall(ldap.schema.models.AttributeType) cls._attribute_type_by_name = {} for oid in attribute_type_oids: oc = subschema.get_obj(ldap.schema.models.AttributeType, oid) for name in oc.names: cls._attribute_type_by_name[name] = oc return cls._attribute_type_by_name def reload(self, conn=None): conn = conn or self.ldap() result = conn.search_s(self.dn, ldap.SCOPE_SUBTREE) self.changes = {} self.attrs = { k: [elt.decode("utf-8") for elt in v] for k, v in result[0][1].items() } def save(self, conn=None): conn = conn or self.ldap() try: match = bool(conn.search_s(self.dn, ldap.SCOPE_SUBTREE)) except ldap.NO_SUCH_OBJECT: match = False if match: mods = { k: v for k, v in self.changes.items() if v and v[0] and self.attrs.get(k) != v } attributes = [ (ldap.MOD_REPLACE, k, [elt.encode("utf-8") for elt in v]) for k, v in mods.items() ] conn.modify_s(self.dn, attributes) else: mods = {} for k, v in self.attrs.items(): if v and v[0]: mods[k] = v for k, v in self.changes.items(): if v and v[0]: mods[k] = v attributes = [ (k, [elt.encode("utf-8") for elt in v]) for k, v in mods.items() ] conn.add_s(self.dn, attributes) for k, v in self.changes.items(): self.attrs[k] = v self.changes = {} @classmethod def get(cls, dn=None, filter=None, conn=None): conn = conn or cls.ldap() if dn is None: dn = f"{cls.base},{cls.root_dn}" elif "=" not in dn: dn = f"{cls.id}={dn},{cls.base},{cls.root_dn}" result = conn.search_s(dn, ldap.SCOPE_SUBTREE, filter) if not result: return None o = cls( **{k: [elt.decode("utf-8") for elt in v] for k, v in result[0][1].items()} ) return o @classmethod def filter(cls, base=None, **kwargs): class_filter = "".join([f"(objectClass={oc})" for oc in cls.objectClass]) arg_filter = "".join(f"({k}={v})" for k, v in kwargs.items()) ldapfilter = f"(&{class_filter}{arg_filter})" base = base or f"{cls.base},{cls.root_dn}" result = cls.ldap().search_s(base, ldap.SCOPE_SUBTREE, ldapfilter) return [ cls(**{k: [elt.decode("utf-8") for elt in v] for k, v in args.items()},) for _, args in result ] def __getattr__(self, name): if (not self.may or name not in self.may) and ( not self.must or name not in self.must ): return super().__getattribute__(name) if ( not self.attr_type_by_name() or not self.attr_type_by_name()[name].single_value ): return self.changes.get(name, self.attrs.get(name, [])) return self.changes.get(name, self.attrs.get(name, [None]))[0] def __setattr__(self, name, value): super().__setattr__(name, value) if (self.may and name in self.may) or (self.must and name in self.must): if self.attr_type_by_name()[name].single_value: self.changes[name] = [value] else: self.changes[name] = value