refactor: no more explicit conn argument in the LDAP backend

This commit is contained in:
Éloi Rivard 2023-12-25 14:03:47 +01:00
parent 95882c737b
commit d0dbaa588c
No known key found for this signature in database
GPG key ID: 7EDA204EA57DD184
3 changed files with 51 additions and 81 deletions

View file

@ -57,20 +57,18 @@ class Backend(BaseBackend):
@classmethod
def install(cls, config, debug=False):
cls.setup_schemas(config)
with ldap_connection(config) as conn:
models.Token.install(conn)
models.AuthorizationCode.install(conn)
models.Client.install(conn)
models.Consent.install(conn)
with cls(config).session():
models.Token.install()
models.AuthorizationCode.install()
models.Client.install()
models.Consent.install()
@classmethod
def setup_schemas(cls, config):
from .ldapobject import LDAPObject
with ldap_connection(config) as conn:
if "oauthClient" not in LDAPObject.ldap_object_classes(
conn=conn, force=True
):
with cls(config).session():
if "oauthClient" not in LDAPObject.ldap_object_classes(force=True):
install_schema(
config,
os.path.dirname(__file__) + "/schemas/oauth2-openldap.ldif",
@ -126,15 +124,10 @@ class Backend(BaseBackend):
def validate(cls, config):
from canaille.app import models
backend = cls(config)
try:
conn = ldap.initialize(config["BACKENDS"]["LDAP"]["URI"])
conn.set_option(
ldap.OPT_NETWORK_TIMEOUT, config["BACKENDS"]["LDAP"].get("TIMEOUT")
)
conn.simple_bind_s(
config["BACKENDS"]["LDAP"]["BIND_DN"],
config["BACKENDS"]["LDAP"]["BIND_PW"],
)
backend.setup()
models.User.ldap_object_classes()
except ldap.SERVER_DOWN as exc:
raise ConfigurationException(
@ -147,7 +140,6 @@ class Backend(BaseBackend):
) from exc
try:
models.User.ldap_object_classes(conn)
user = models.User(
formatted_name=f"canaille_{uuid.uuid4()}",
family_name=f"canaille_{uuid.uuid4()}",
@ -155,8 +147,8 @@ class Backend(BaseBackend):
emails=f"canaille_{uuid.uuid4()}@mydomain.tld",
password="correct horse battery staple",
)
user.save(conn)
user.delete(conn)
user.save()
user.delete()
except ldap.INSUFFICIENT_ACCESS as exc:
raise ConfigurationException(
@ -165,7 +157,7 @@ class Backend(BaseBackend):
) from exc
try:
models.Group.ldap_object_classes(conn)
models.Group.ldap_object_classes()
user = models.User(
cn=f"canaille_{uuid.uuid4()}",
@ -174,14 +166,14 @@ class Backend(BaseBackend):
emails=f"canaille_{uuid.uuid4()}@mydomain.tld",
password="correct horse battery staple",
)
user.save(conn)
user.save()
group = models.Group(
display_name=f"canaille_{uuid.uuid4()}",
members=[user],
)
group.save(conn)
group.delete(conn)
group.save()
group.delete()
except ldap.INSUFFICIENT_ACCESS as exc:
raise ConfigurationException(
@ -190,9 +182,9 @@ class Backend(BaseBackend):
) from exc
finally:
user.delete(conn)
user.delete()
conn.unbind_s()
backend.teardown()
@classmethod
def login_placeholder(cls):

View file

@ -232,8 +232,8 @@ class LDAPObject(Model, metaclass=LDAPObjectMetaclass):
return cls._must
@classmethod
def install(cls, conn=None):
conn = conn or Backend.get().connection
def install(cls):
conn = Backend.get().connection
cls.ldap_object_classes(conn)
cls.ldap_object_attributes(conn)
@ -254,11 +254,11 @@ class LDAPObject(Model, metaclass=LDAPObjectMetaclass):
pass
@classmethod
def ldap_object_classes(cls, conn=None, force=False):
def ldap_object_classes(cls, force=False):
if cls._object_class_by_name and not force:
return cls._object_class_by_name
conn = conn or Backend.get().connection
conn = Backend.get().connection
res = conn.search_s(
"cn=subschema", ldap.SCOPE_BASE, "(objectclass=*)", ["*", "+"]
@ -276,11 +276,11 @@ class LDAPObject(Model, metaclass=LDAPObjectMetaclass):
return cls._object_class_by_name
@classmethod
def ldap_object_attributes(cls, conn=None, force=False):
def ldap_object_attributes(cls, force=False):
if cls._attribute_type_by_name and not force:
return cls._attribute_type_by_name
conn = conn or Backend.get().connection
conn = Backend.get().connection
res = conn.search_s(
"cn=subschema", ldap.SCOPE_BASE, "(objectclass=*)", ["*", "+"]
@ -298,15 +298,15 @@ class LDAPObject(Model, metaclass=LDAPObjectMetaclass):
return cls._attribute_type_by_name
@classmethod
def get(cls, id=None, filter=None, conn=None, **kwargs):
def get(cls, id=None, filter=None, **kwargs):
try:
return cls.query(id, filter, conn, **kwargs)[0]
return cls.query(id, filter, **kwargs)[0]
except (IndexError, ldap.NO_SUCH_OBJECT):
return None
@classmethod
def query(cls, id=None, filter=None, conn=None, **kwargs):
conn = conn or Backend.get().connection
def query(cls, id=None, filter=None, **kwargs):
conn = Backend.get().connection
base = id or kwargs.get("id")
if base is None:
@ -397,14 +397,14 @@ class LDAPObject(Model, metaclass=LDAPObjectMetaclass):
def python_attribute_to_ldap(cls, name):
return cls.attribute_map.get(name, name) if cls.attribute_map else None
def reload(self, conn=None):
conn = conn or Backend.get().connection
def reload(self):
conn = Backend.get().connection
result = conn.search_s(self.id, ldap.SCOPE_SUBTREE, None, ["+", "*"])
self.changes = {}
self.state = result[0][1]
def save(self, conn=None):
conn = conn or Backend.get().connection
def save(self):
conn = Backend.get().connection
self.set_ldap_attribute("objectClass", self.ldap_object_class)
@ -446,6 +446,6 @@ class LDAPObject(Model, metaclass=LDAPObjectMetaclass):
self.state = {**self.state, **self.changes}
self.changes = {}
def delete(self, conn=None):
conn = conn or Backend.get().connection
def delete(self):
conn = Backend.get().connection
conn.delete_s(self.id)

View file

@ -1,4 +1,3 @@
import ldap
import pytest
from canaille import create_app
from canaille.app.installation import InstallationException
@ -52,18 +51,13 @@ def test_install_schemas(configuration, slapd_server):
configuration["BACKENDS"]["LDAP"]["BIND_DN"] = slapd_server.root_dn
configuration["BACKENDS"]["LDAP"]["BIND_PW"] = slapd_server.root_pw
conn = ldap.ldapobject.SimpleLDAPObject(slapd_server.ldap_uri)
conn.protocol_version = 3
conn.simple_bind_s(slapd_server.root_dn, slapd_server.root_pw)
assert "oauthClient" not in LDAPObject.ldap_object_classes(conn=conn, force=True)
with Backend(configuration).session():
assert "oauthClient" not in LDAPObject.ldap_object_classes(force=True)
Backend.setup_schemas(configuration)
assert "oauthClient" in LDAPObject.ldap_object_classes(conn=conn, force=True)
conn.unbind_s()
slapd_server.stop()
with Backend(configuration).session():
assert "oauthClient" in LDAPObject.ldap_object_classes(force=True)
def test_install_schemas_twice(configuration, slapd_server):
@ -72,21 +66,16 @@ def test_install_schemas_twice(configuration, slapd_server):
configuration["BACKENDS"]["LDAP"]["BIND_DN"] = slapd_server.root_dn
configuration["BACKENDS"]["LDAP"]["BIND_PW"] = slapd_server.root_pw
conn = ldap.ldapobject.SimpleLDAPObject(slapd_server.ldap_uri)
conn.protocol_version = 3
conn.simple_bind_s(slapd_server.root_dn, slapd_server.root_pw)
assert "oauthClient" not in LDAPObject.ldap_object_classes(conn=conn, force=True)
with Backend(configuration).session():
assert "oauthClient" not in LDAPObject.ldap_object_classes(force=True)
Backend.setup_schemas(configuration)
assert "oauthClient" in LDAPObject.ldap_object_classes(conn=conn, force=True)
with Backend(configuration).session():
assert "oauthClient" in LDAPObject.ldap_object_classes(force=True)
Backend.setup_schemas(configuration)
conn.unbind_s()
slapd_server.stop()
def test_install_no_permissions_to_install_schemas(configuration, slapd_server):
configuration["BACKENDS"]["LDAP"]["ROOT_DN"] = slapd_server.suffix
@ -96,19 +85,13 @@ def test_install_no_permissions_to_install_schemas(configuration, slapd_server):
] = "uid=admin,ou=users,dc=mydomain,dc=tld"
configuration["BACKENDS"]["LDAP"]["BIND_PW"] = "admin"
conn = ldap.ldapobject.SimpleLDAPObject(slapd_server.ldap_uri)
conn.protocol_version = 3
conn.simple_bind_s(slapd_server.root_dn, slapd_server.root_pw)
assert "oauthClient" not in LDAPObject.ldap_object_classes(conn=conn, force=True)
with Backend(configuration).session():
assert "oauthClient" not in LDAPObject.ldap_object_classes(force=True)
with pytest.raises(InstallationException):
Backend.setup_schemas(configuration)
assert "oauthClient" not in LDAPObject.ldap_object_classes(conn=conn, force=True)
conn.unbind_s()
slapd_server.stop()
assert "oauthClient" not in LDAPObject.ldap_object_classes(force=True)
def test_install_schemas_command(configuration, slapd_server):
@ -117,18 +100,13 @@ def test_install_schemas_command(configuration, slapd_server):
configuration["BACKENDS"]["LDAP"]["BIND_DN"] = slapd_server.root_dn
configuration["BACKENDS"]["LDAP"]["BIND_PW"] = slapd_server.root_pw
conn = ldap.ldapobject.SimpleLDAPObject(slapd_server.ldap_uri)
conn.protocol_version = 3
conn.simple_bind_s(slapd_server.root_dn, slapd_server.root_pw)
assert "oauthClient" not in LDAPObject.ldap_object_classes(conn=conn, force=True)
with Backend(configuration).session():
assert "oauthClient" not in LDAPObject.ldap_object_classes(force=True)
testclient = TestApp(create_app(configuration, validate=False))
runner = testclient.app.test_cli_runner()
res = runner.invoke(cli, ["install"])
assert res.exit_code == 0, res.stdout
assert "oauthClient" in LDAPObject.ldap_object_classes(conn=conn, force=True)
conn.unbind_s()
slapd_server.stop()
with Backend(configuration).session():
assert "oauthClient" in LDAPObject.ldap_object_classes(force=True)