refactor: models attributes cardinality is closer to SCIM models

This commit is contained in:
Éloi Rivard 2023-11-15 18:20:13 +01:00
parent 0ee374dea7
commit 1fd8af2cf4
No known key found for this signature in database
GPG key ID: 7EDA204EA57DD184
39 changed files with 305 additions and 424 deletions

View file

@ -3,6 +3,11 @@ All notable changes to this project will be documented in this file.
The format is based on `Keep a Changelog <https://keepachangelog.com/en/1.0.0/>`_, The format is based on `Keep a Changelog <https://keepachangelog.com/en/1.0.0/>`_,
and this project adheres to `Semantic Versioning <https://semver.org/spec/v2.0.0.html>`_. and this project adheres to `Semantic Versioning <https://semver.org/spec/v2.0.0.html>`_.
Changed
*******
- Model attributes cardinality is closer to SCIM model. :pr:`155`
[0.0.34] - 2023-10-02 [0.0.34] - 2023-10-02
===================== =====================

View file

@ -58,3 +58,11 @@ def validate_uri(value):
re.IGNORECASE, re.IGNORECASE,
) )
return re.match(regex, value) is not None return re.match(regex, value) is not None
class classproperty:
def __init__(self, f):
self.f = f
def __get__(self, obj, owner):
return self.f(owner)

View file

@ -6,6 +6,7 @@ import ldap.filter
from canaille.backends.models import Model from canaille.backends.models import Model
from .backend import Backend from .backend import Backend
from .utils import cardinalize_attribute
from .utils import ldap_to_python from .utils import ldap_to_python
from .utils import listify from .utils import listify
from .utils import python_to_ldap from .utils import python_to_ldap
@ -109,7 +110,7 @@ class LDAPObject(Model, metaclass=LDAPObjectMetaclass):
base = None base = None
root_dn = None root_dn = None
rdn_attribute = None rdn_attribute = None
attributes = None attribute_map = None
ldap_object_class = None ldap_object_class = None
def __init__(self, dn=None, **kwargs): def __init__(self, dn=None, **kwargs):
@ -121,8 +122,7 @@ class LDAPObject(Model, metaclass=LDAPObjectMetaclass):
setattr(self, name, value) setattr(self, name, value)
def __repr__(self): def __repr__(self):
reverse_attributes = {v: k for k, v in (self.attributes or {}).items()} attribute_name = self.ldap_attribute_to_python(self.rdn_attribute)
attribute_name = reverse_attributes.get(self.rdn_attribute, self.rdn_attribute)
return ( return (
f"<{self.__class__.__name__} {attribute_name}={self.rdn_value}>" f"<{self.__class__.__name__} {attribute_name}={self.rdn_value}>"
if self.rdn_attribute if self.rdn_attribute
@ -130,29 +130,30 @@ class LDAPObject(Model, metaclass=LDAPObjectMetaclass):
) )
def __eq__(self, other): def __eq__(self, other):
ldap_attributes = self.may() + self.must()
if not ( if not (
isinstance(other, self.__class__) isinstance(other, self.__class__)
and self.may() == other.may() and self.may() == other.may()
and self.must() == other.must() and self.must() == other.must()
and all( and all(
hasattr(self, attr) == hasattr(other, attr) self.has_ldap_attribute(attr) == other.has_ldap_attribute(attr)
for attr in self.may() + self.must() for attr in ldap_attributes
) )
): ):
return False return False
self_attributes = python_attrs_to_ldap( self_attributes = python_attrs_to_ldap(
{ {
attr: getattr(self, attr) attr: self.get_ldap_attribute(attr)
for attr in self.may() + self.must() for attr in ldap_attributes
if hasattr(self, attr) if self.has_ldap_attribute(attr)
} }
) )
other_attributes = python_attrs_to_ldap( other_attributes = python_attrs_to_ldap(
{ {
attr: getattr(other, attr) attr: other.get_ldap_attribute(attr)
for attr in self.may() + self.must() for attr in ldap_attributes
if hasattr(self, attr) if other.has_ldap_attribute(attr)
} }
) )
return self_attributes == other_attributes return self_attributes == other_attributes
@ -161,17 +162,40 @@ class LDAPObject(Model, metaclass=LDAPObjectMetaclass):
return hash(self.id) return hash(self.id)
def __getattr__(self, name): def __getattr__(self, name):
name = self.attributes.get(name, name) if name not in self.attributes:
if name not in self.ldap_object_attributes():
return super().__getattribute__(name) return super().__getattribute__(name)
single_value = self.ldap_object_attributes()[name].single_value ldap_name = self.python_attribute_to_ldap(name)
if ldap_name == "dn":
return self.dn_for(self.rdn_value)
python_single_value = "List" not in str(self.__annotations__[name])
ldap_value = self.get_ldap_attribute(ldap_name)
return cardinalize_attribute(python_single_value, ldap_value)
def __setattr__(self, name, value):
if name not in self.attributes:
super().__setattr__(name, value)
ldap_name = self.python_attribute_to_ldap(name)
self.set_ldap_attribute(ldap_name, value)
def __delattr__(self, name):
ldap_name = self.python_attribute_to_ldap(name)
self.delete_ldap_attribute(ldap_name)
def has_ldap_attribute(self, name):
return name in self.ldap_object_attributes() and (
name in self.changes or name in self.state
)
def get_ldap_attribute(self, name):
if name in self.changes: if name in self.changes:
return self.changes[name][0] if single_value else self.changes[name] return self.changes[name]
if not self.state.get(name): if not self.state.get(name):
return None if single_value else [] return None
# Lazy conversion from ldap format to python format # Lazy conversion from ldap format to python format
if any(isinstance(value, bytes) for value in self.state[name]): if any(isinstance(value, bytes) for value in self.state[name]):
@ -180,35 +204,23 @@ class LDAPObject(Model, metaclass=LDAPObjectMetaclass):
ldap_to_python(value, syntax) for value in self.state[name] ldap_to_python(value, syntax) for value in self.state[name]
] ]
if single_value: return self.state.get(name)
return self.state.get(name)[0]
else:
return [value for value in self.state.get(name) if value is not None]
def __setattr__(self, name, value): def set_ldap_attribute(self, name, value):
if self.attributes: if name not in self.ldap_object_attributes():
name = self.attributes.get(name, name) return
if name in self.ldap_object_attributes():
value = listify(value) value = listify(value)
self.changes[name] = value self.changes[name] = value
else: def delete_ldap_attribute(self, name):
super().__setattr__(name, value)
def __delattr__(self, name):
name = self.attributes.get(name, name)
self.changes[name] = [None] self.changes[name] = [None]
@property @property
def rdn_value(self): def rdn_value(self):
value = getattr(self, self.rdn_attribute) value = self.get_ldap_attribute(self.rdn_attribute)
return (value[0] if isinstance(value, list) else value).strip() return (value[0] if isinstance(value, list) else value).strip()
@property
def dn(self):
return self.dn_for(self.rdn_value)
@classmethod @classmethod
def dn_for(cls, rdn): def dn_for(cls, rdn):
return f"{cls.rdn_attribute}={ldap.dn.escape_dn_chars(rdn)},{cls.base},{cls.root_dn}" return f"{cls.rdn_attribute}={ldap.dn.escape_dn_chars(rdn)},{cls.base},{cls.root_dn}"
@ -317,7 +329,7 @@ class LDAPObject(Model, metaclass=LDAPObjectMetaclass):
arg_filter = "" arg_filter = ""
kwargs = python_attrs_to_ldap( kwargs = python_attrs_to_ldap(
{ {
(cls.attributes or {}).get(name, name): values cls.python_attribute_to_ldap(name): values
for name, values in kwargs.items() for name, values in kwargs.items()
}, },
encode=False, encode=False,
@ -350,7 +362,7 @@ class LDAPObject(Model, metaclass=LDAPObjectMetaclass):
def fuzzy(cls, query, attributes=None, **kwargs): def fuzzy(cls, query, attributes=None, **kwargs):
query = ldap.filter.escape_filter_chars(query) query = ldap.filter.escape_filter_chars(query)
attributes = attributes or cls.may() + cls.must() attributes = attributes or cls.may() + cls.must()
attributes = [cls.attributes.get(name, name) for name in attributes] attributes = [cls.python_attribute_to_ldap(name) for name in attributes]
filter = ( filter = (
"(|" + "".join(f"({attribute}=*{query}*)" for attribute in attributes) + ")" "(|" + "".join(f"({attribute}=*{query}*)" for attribute in attributes) + ")"
) )
@ -380,6 +392,15 @@ class LDAPObject(Model, metaclass=LDAPObjectMetaclass):
cls._may = list(set(cls._may)) cls._may = list(set(cls._may))
cls._must = list(set(cls._must)) cls._must = list(set(cls._must))
@classmethod
def ldap_attribute_to_python(cls, name):
reverse_attribute_map = {v: k for k, v in (cls.attribute_map or {}).items()}
return reverse_attribute_map.get(name, name)
@classmethod
def python_attribute_to_ldap(cls, name):
return cls.attribute_map.get(name, name) if cls.attribute_map else None
def reload(self, conn=None): def reload(self, conn=None):
conn = conn or Backend.get().connection conn = conn or Backend.get().connection
result = conn.search_s(self.id, ldap.SCOPE_SUBTREE, None, ["+", "*"]) result = conn.search_s(self.id, ldap.SCOPE_SUBTREE, None, ["+", "*"])
@ -389,7 +410,7 @@ class LDAPObject(Model, metaclass=LDAPObjectMetaclass):
def save(self, conn=None): def save(self, conn=None):
conn = conn or Backend.get().connection conn = conn or Backend.get().connection
setattr(self, "objectClass", self.ldap_object_class) self.set_ldap_attribute("objectClass", self.ldap_object_class)
# Object already exists in the LDAP database # Object already exists in the LDAP database
if self.exists: if self.exists:
@ -429,10 +450,6 @@ class LDAPObject(Model, metaclass=LDAPObjectMetaclass):
self.state = {**self.state, **self.changes} self.state = {**self.state, **self.changes}
self.changes = {} self.changes = {}
def update(self, **kwargs):
for k, v in kwargs.items():
self.__setattr__(k, v)
def delete(self, conn=None): def delete(self, conn=None):
conn = conn or Backend.get().connection conn = conn or Backend.get().connection
conn.delete_s(self.id) conn.delete_s(self.id)

View file

@ -15,7 +15,7 @@ class User(canaille.core.models.User, LDAPObject):
DEFAULT_FILTER = "(|(uid={{ login }})(mail={{ login }}))" DEFAULT_FILTER = "(|(uid={{ login }})(mail={{ login }}))"
DEFAULT_RDN = "cn" DEFAULT_RDN = "cn"
attributes = { attribute_map = {
"id": "dn", "id": "dn",
"user_name": "uid", "user_name": "uid",
"password": "userPassword", "password": "userPassword",
@ -66,7 +66,7 @@ class User(canaille.core.models.User, LDAPObject):
filter_["groups"] = Group.dn_for(filter_["groups"]) filter_["groups"] = Group.dn_for(filter_["groups"])
base = "".join( base = "".join(
f"({cls.attributes.get(key, key)}={value})" f"({cls.python_attribute_to_ldap(key)}={value})"
for key, value in filter_.items() for key, value in filter_.items()
) )
return f"(&{base})" if len(filter_) > 1 else base return f"(&{base})" if len(filter_) > 1 else base
@ -147,7 +147,7 @@ class User(canaille.core.models.User, LDAPObject):
self.load_permissions() self.load_permissions()
def save(self, *args, **kwargs): def save(self, *args, **kwargs):
group_attr = self.attributes.get("groups", "groups") group_attr = self.python_attribute_to_ldap("groups")
new_groups = self.changes.get(group_attr) new_groups = self.changes.get(group_attr)
if not new_groups: if not new_groups:
return super().save(*args, **kwargs) return super().save(*args, **kwargs)
@ -194,7 +194,7 @@ class Group(canaille.core.models.Group, LDAPObject):
DEFAULT_NAME_ATTRIBUTE = "cn" DEFAULT_NAME_ATTRIBUTE = "cn"
DEFAULT_USER_FILTER = "member={user.id}" DEFAULT_USER_FILTER = "member={user.id}"
attributes = { attribute_map = {
"id": "dn", "id": "dn",
"display_name": "cn", "display_name": "cn",
"members": "member", "members": "member",
@ -243,9 +243,8 @@ class Client(canaille.oidc.models.Client, LDAPObject):
"software_version": "oauthSoftwareVersion", "software_version": "oauthSoftwareVersion",
} }
attributes = { attribute_map = {
"id": "dn", "id": "dn",
"description": "description",
"preconsent": "oauthPreconsent", "preconsent": "oauthPreconsent",
# post_logout_redirect_uris is not yet supported by authlib # post_logout_redirect_uris is not yet supported by authlib
"post_logout_redirect_uris": "oauthPostLogoutRedirectURI", "post_logout_redirect_uris": "oauthPostLogoutRedirectURI",
@ -263,10 +262,9 @@ class AuthorizationCode(canaille.oidc.models.AuthorizationCode, LDAPObject):
ldap_object_class = ["oauthAuthorizationCode"] ldap_object_class = ["oauthAuthorizationCode"]
base = "ou=authorizations,ou=oauth" base = "ou=authorizations,ou=oauth"
rdn_attribute = "oauthAuthorizationCodeID" rdn_attribute = "oauthAuthorizationCodeID"
attributes = { attribute_map = {
"id": "dn", "id": "dn",
"authorization_code_id": "oauthAuthorizationCodeID", "authorization_code_id": "oauthAuthorizationCodeID",
"description": "description",
"code": "oauthCode", "code": "oauthCode",
"client": "oauthClient", "client": "oauthClient",
"subject": "oauthSubject", "subject": "oauthSubject",
@ -290,11 +288,10 @@ class Token(canaille.oidc.models.Token, LDAPObject):
ldap_object_class = ["oauthToken"] ldap_object_class = ["oauthToken"]
base = "ou=tokens,ou=oauth" base = "ou=tokens,ou=oauth"
rdn_attribute = "oauthTokenID" rdn_attribute = "oauthTokenID"
attributes = { attribute_map = {
"id": "dn", "id": "dn",
"token_id": "oauthTokenID", "token_id": "oauthTokenID",
"access_token": "oauthAccessToken", "access_token": "oauthAccessToken",
"description": "description",
"client": "oauthClient", "client": "oauthClient",
"subject": "oauthSubject", "subject": "oauthSubject",
"type": "oauthTokenType", "type": "oauthTokenType",
@ -315,7 +312,7 @@ class Consent(canaille.oidc.models.Consent, LDAPObject):
ldap_object_class = ["oauthConsent"] ldap_object_class = ["oauthConsent"]
base = "ou=consents,ou=oauth" base = "ou=consents,ou=oauth"
rdn_attribute = "cn" rdn_attribute = "cn"
attributes = { attribute_map = {
"id": "dn", "id": "dn",
"consent_id": "cn", "consent_id": "cn",
"subject": "oauthSubject", "subject": "oauthSubject",

View file

@ -85,3 +85,13 @@ def python_to_ldap(value, syntax, encode=True):
def listify(value): def listify(value):
return value if isinstance(value, list) else [value] return value if isinstance(value, list) else [value]
def cardinalize_attribute(python_unique, value):
if not value:
return None if python_unique else []
if python_unique:
return value[0]
return [v for v in value if v is not None]

View file

@ -90,6 +90,7 @@ class MemoryModel(Model):
self.index()[self.id] = copy.deepcopy(self.state) self.index()[self.id] = copy.deepcopy(self.state)
# update the index for each attribute # update the index for each attribute
print(self.attributes)
for attribute in self.attributes: for attribute in self.attributes:
attribute_values = listify(getattr(self, attribute)) attribute_values = listify(getattr(self, attribute))
for value in attribute_values: for value in attribute_values:
@ -161,10 +162,6 @@ class MemoryModel(Model):
# update the id index # update the id index
del self.index()[self.id] del self.index()[self.id]
def update(self, **kwargs):
for attribute, value in kwargs.items():
setattr(self, attribute, value)
def reload(self): def reload(self):
self.state = self.__class__.get(id=self.id).state self.state = self.__class__.get(id=self.id).state
self.cache = {} self.cache = {}
@ -193,11 +190,11 @@ class MemoryModel(Model):
] ]
values = [value for value in values if value] values = [value for value in values if value]
if name in self.unique_attributes: unique_attribute = "List" not in str(self.__annotations__[name])
if unique_attribute:
return values[0] if values else None return values[0] if values else None
else: else:
return values or [] return values or []
raise AttributeError()
raise AttributeError() raise AttributeError()
@ -222,43 +219,13 @@ class MemoryModel(Model):
except KeyError: except KeyError:
pass pass
@property
def identifier(self):
return getattr(self, self.identifier_attribute)
class User(canaille.core.models.User, MemoryModel): class User(canaille.core.models.User, MemoryModel):
attributes = [
"id",
"user_name",
"password",
"preferred_language",
"family_name",
"given_name",
"formatted_name",
"display_name",
"emails",
"phone_numbers",
"formatted_address",
"street",
"postal_code",
"locality",
"region",
"photo",
"profile_url",
"employee_number",
"department",
"title",
"organization",
"last_modified",
"groups",
"lock_date",
]
identifier_attribute = "user_name" identifier_attribute = "user_name"
unique_attributes = [
"id",
"display_name",
"employee_number",
"preferred_language",
"last_modified",
"lock_date",
]
model_attributes = { model_attributes = {
"groups": ("Group", "members"), "groups": ("Group", "members"),
} }
@ -297,7 +264,7 @@ class User(canaille.core.models.User, MemoryModel):
).id ).id
return all( return all(
value in getattr(self, attribute, None) getattr(self, attribute) and value in getattr(self, attribute)
for attribute, value in filter.items() for attribute, value in filter.items()
) )
@ -307,10 +274,6 @@ class User(canaille.core.models.User, MemoryModel):
def get_from_login(cls, login=None, **kwargs): def get_from_login(cls, login=None, **kwargs):
return User.get(user_name=login) return User.get(user_name=login)
@property
def identifier(self):
return getattr(self, self.identifier_attribute)[0]
def has_password(self): def has_password(self):
return bool(self.password) return bool(self.password)
@ -335,178 +298,39 @@ class User(canaille.core.models.User, MemoryModel):
class Group(canaille.core.models.Group, MemoryModel): class Group(canaille.core.models.Group, MemoryModel):
attributes = [
"id",
"display_name",
"members",
"description",
]
unique_attributes = ["id", "display_name"]
model_attributes = { model_attributes = {
"members": ("User", "groups"), "members": ("User", "groups"),
} }
identifier_attribute = "display_name" identifier_attribute = "display_name"
@property
def identifier(self):
return getattr(self, self.identifier_attribute)
class Client(canaille.oidc.models.Client, MemoryModel): class Client(canaille.oidc.models.Client, MemoryModel):
attributes = [
"id",
"description",
"preconsent",
"post_logout_redirect_uris",
"audience",
"client_id",
"client_secret",
"client_id_issued_at",
"client_secret_expires_at",
"client_name",
"contacts",
"client_uri",
"redirect_uris",
"logo_uri",
"grant_types",
"response_types",
"scope",
"tos_uri",
"policy_uri",
"jwks_uri",
"jwk",
"token_endpoint_auth_method",
"software_id",
"software_version",
]
identifier_attribute = "client_id" identifier_attribute = "client_id"
unique_attributes = [
"id",
"preconsent",
"client_id",
"client_secret",
"client_id_issued_at",
"client_secret_expires_at",
"client_name",
"client_uri",
"logo_uri",
"tos_uri",
"policy_uri",
"jwks_uri",
"jwk",
"token_endpoint_auth_method",
"software_id",
"software_version",
]
model_attributes = { model_attributes = {
"audience": ("Client", None), "audience": ("Client", None),
} }
@property
def identifier(self):
return getattr(self, self.identifier_attribute)
class AuthorizationCode(canaille.oidc.models.AuthorizationCode, MemoryModel): class AuthorizationCode(canaille.oidc.models.AuthorizationCode, MemoryModel):
attributes = [
"id",
"authorization_code_id",
"description",
"code",
"client",
"subject",
"redirect_uri",
"response_type",
"scope",
"nonce",
"issue_date",
"lifetime",
"challenge",
"challenge_method",
"revokation_date",
]
identifier_attribute = "authorization_code_id" identifier_attribute = "authorization_code_id"
unique_attributes = [
"id",
"authorization_code_id",
"code",
"client",
"subject",
"redirect_uri",
"issue_date",
"lifetime",
"challenge",
"challenge_method",
"revokation_date",
"nonce",
]
model_attributes = { model_attributes = {
"client": ("Client", None), "client": ("Client", None),
"subject": ("User", None), "subject": ("User", None),
} }
@property
def identifier(self):
return getattr(self, self.identifier_attribute)
class Token(canaille.oidc.models.Token, MemoryModel): class Token(canaille.oidc.models.Token, MemoryModel):
attributes = [
"id",
"token_id",
"access_token",
"description",
"client",
"subject",
"type",
"refresh_token",
"scope",
"issue_date",
"lifetime",
"revokation_date",
"audience",
]
identifier_attribute = "token_id" identifier_attribute = "token_id"
unique_attributes = [
"id",
"token_id",
"subject",
"issue_date",
"lifetime",
"access_token",
"refresh_token",
"revokation_date",
"client",
"type",
]
model_attributes = { model_attributes = {
"client": ("Client", None), "client": ("Client", None),
"subject": ("User", None), "subject": ("User", None),
"audience": ("Client", None), "audience": ("Client", None),
} }
@property
def identifier(self):
return getattr(self, self.identifier_attribute)
class Consent(canaille.oidc.models.Consent, MemoryModel): class Consent(canaille.oidc.models.Consent, MemoryModel):
attributes = [
"id",
"consent_id",
"subject",
"client",
"scope",
"issue_date",
"revokation_date",
]
identifier_attribute = "consent_id" identifier_attribute = "consent_id"
unique_attributes = ["id", "subject", "client", "issue_date", "revokation_date"]
model_attributes = { model_attributes = {
"client": ("Client", None), "client": ("Client", None),
"subject": ("User", None), "subject": ("User", None),
} }
@property
def identifier(self):
return getattr(self, self.identifier_attribute)[0]

View file

@ -1,8 +1,19 @@
from collections import ChainMap
from canaille.app import classproperty
class Model: class Model:
""" """
Model abstract class. Model abstract class.
""" """
@classproperty
def attributes(cls):
return ChainMap(
*(c.__annotations__ for c in cls.__mro__ if "__annotations__" in c.__dict__)
)
@classmethod @classmethod
def query(cls, **kwargs): def query(cls, **kwargs):
""" """
@ -74,7 +85,8 @@ class Model:
>>> user.first_name >>> user.first_name
Jane Jane
""" """
raise NotImplementedError() for attribute, value in kwargs.items():
setattr(self, attribute, value)
def reload(self): def reload(self):
""" """

View file

@ -224,17 +224,17 @@ WRITE = [
# User objectClass. # User objectClass.
# {attribute} will be replaced by the user ldap attribute value. # {attribute} will be replaced by the user ldap attribute value.
# Default values fits inetOrgPerson. # Default values fits inetOrgPerson.
# SUB = "{{ user.user_name[0] }}" # SUB = "{{ user.user_name }}"
# NAME = "{{ user.formatted_name[0] }}" # NAME = "{{ user.formatted_name }}"
# PHONE_NUMBER = "{{ user.phone_numbers[0] }}" # PHONE_NUMBER = "{{ user.phone_numbers[0] }}"
# EMAIL = "{{ user.preferred_email }}" # EMAIL = "{{ user.preferred_email }}"
# GIVEN_NAME = "{{ user.given_name[0] }}" # GIVEN_NAME = "{{ user.given_name }}"
# FAMILY_NAME = "{{ user.family_name[0] }}" # FAMILY_NAME = "{{ user.family_name }}"
# PREFERRED_USERNAME = "{{ user.display_name }}" # PREFERRED_USERNAME = "{{ user.display_name }}"
# LOCALE = "{{ user.preferred_language }}" # LOCALE = "{{ user.preferred_language }}"
# ADDRESS = "{{ user.formatted_address[0] }}" # ADDRESS = "{{ user.formatted_address }}"
# PICTURE = "{% if user.photo %}{{ url_for('core.account.photo', user=user, field='photo', _external=True) }}{% endif %}" # PICTURE = "{% if user.photo %}{{ url_for('core.account.photo', user=user, field='photo', _external=True) }}{% endif %}"
# WEBSITE = "{{ user.profile_url[0] }}" # WEBSITE = "{{ user.profile_url }}"
# The SMTP server options. If not set, mail related features such as # The SMTP server options. If not set, mail related features such as
# user invitations, and password reset emails, will be disabled. # user invitations, and password reset emails, will be disabled.

View file

@ -438,7 +438,7 @@ def profile_creation(user):
def profile_create(current_app, form): def profile_create(current_app, form):
user = models.User() user = models.User()
for attribute in form: for attribute in form:
if attribute.name in user.attributes: if attribute.name in models.User.attributes:
if isinstance(attribute.data, FileStorage): if isinstance(attribute.data, FileStorage):
data = attribute.data.stream.read() data = attribute.data.stream.read()
else: else:
@ -449,8 +449,8 @@ def profile_create(current_app, form):
if "photo" in form and form["photo_delete"].data: if "photo" in form and form["photo_delete"].data:
del user.photo del user.photo
given_name = user.given_name[0] if user.given_name else "" given_name = user.given_name if user.given_name else ""
family_name = user.family_name[0] if user.family_name else "" family_name = user.family_name if user.family_name else ""
user.formatted_name = [f"{given_name} {family_name}".strip()] user.formatted_name = [f"{given_name} {family_name}".strip()]
user.save() user.save()
@ -802,7 +802,7 @@ def profile_delete(user, edited_user):
flash( flash(
_( _(
"The user %(user)s has been sucessfuly deleted", "The user %(user)s has been sucessfuly deleted",
user=edited_user.formatted_name[0], user=edited_user.formatted_name,
), ),
"success", "success",
) )
@ -818,7 +818,7 @@ def profile_delete(user, edited_user):
def impersonate(user, puppet): def impersonate(user, puppet):
login_user(puppet) login_user(puppet)
flash( flash(
_("Connection successful. Welcome %(user)s", user=puppet.formatted_name[0]), _("Connection successful. Welcome %(user)s", user=puppet.formatted_name),
"success", "success",
) )
return redirect(url_for("core.account.index")) return redirect(url_for("core.account.index"))
@ -837,11 +837,11 @@ def photo(user, field):
if request.if_none_match and etag in request.if_none_match: if request.if_none_match and etag in request.if_none_match:
return "", 304 return "", 304
photos = getattr(user, field) photo = getattr(user, field)
if not photos: if not photo:
abort(404) abort(404)
stream = io.BytesIO(photos[0]) stream = io.BytesIO(photo)
return send_file( return send_file(
stream, mimetype="image/jpeg", last_modified=user.last_modified, etag=etag stream, mimetype="image/jpeg", last_modified=user.last_modified, etag=etag
) )

View file

@ -89,7 +89,7 @@ def password():
del session["attempt_login"] del session["attempt_login"]
login_user(user) login_user(user)
flash( flash(
_("Connection successful. Welcome %(user)s", user=user.formatted_name[0]), _("Connection successful. Welcome %(user)s", user=user.formatted_name),
"success", "success",
) )
return redirect(session.pop("redirect-after-login", url_for("core.account.index"))) return redirect(session.pop("redirect-after-login", url_for("core.account.index")))
@ -103,7 +103,7 @@ def logout():
flash( flash(
_( _(
"You have been disconnected. See you next time %(user)s", "You have been disconnected. See you next time %(user)s",
user=user.formatted_name[0], user=user.formatted_name,
), ),
"success", "success",
) )
@ -169,7 +169,7 @@ def forgotten():
_( _(
"The user '%(user)s' does not have permissions to update their password. " "The user '%(user)s' does not have permissions to update their password. "
"We cannot send a password reset email.", "We cannot send a password reset email.",
user=user.formatted_name[0], user=user.formatted_name,
), ),
"error", "error",
) )

View file

@ -22,7 +22,7 @@ MINIMUM_PASSWORD_LENGTH = 8
def unique_login(form, field): def unique_login(form, field):
if models.User.get_from_login(field.data) and ( if models.User.get_from_login(field.data) and (
not getattr(form, "user", None) or form.user.user_name[0] != field.data not getattr(form, "user", None) or form.user.user_name != field.data
): ):
raise wtforms.ValidationError( raise wtforms.ValidationError(
_("The login '{login}' already exists").format(login=field.data) _("The login '{login}' already exists").format(login=field.data)

View file

@ -11,7 +11,7 @@
<div class="content"> <div class="content">
<p> <p>
{% if user != edited_user %} {% if user != edited_user %}
{% trans user_name=(edited_user.formatted_name[0] or edited_user.identifier) %} {% trans user_name=(edited_user.formatted_name or edited_user.identifier) %}
Are you sure you want to delete the account of {{ user_name }}? This action is unrevokable and all the data about this user will be removed. Are you sure you want to delete the account of {{ user_name }}? This action is unrevokable and all the data about this user will be removed.
{% endtrans %} {% endtrans %}
{% else %} {% else %}

View file

@ -11,7 +11,7 @@
<div class="content"> <div class="content">
<p> <p>
{% if user != edited_user %} {% if user != edited_user %}
{% trans user_name=(edited_user.formatted_name[0] or edited_user.identifier) %} {% trans user_name=(edited_user.formatted_name or edited_user.identifier) %}
Are you sure you want to lock the account of {{ user_name }} ? The user won't be able to login until their account is unlocked. Are you sure you want to lock the account of {{ user_name }} ? The user won't be able to login until their account is unlocked.
{% endtrans %} {% endtrans %}
{% else %} {% else %}

View file

@ -39,7 +39,7 @@
<td> <td>
<a href="{{ url_for('core.account.profile_edition', edited_user=watched_user) }}"> <a href="{{ url_for('core.account.profile_edition', edited_user=watched_user) }}">
{% if watched_user.user_name %} {% if watched_user.user_name %}
{{ watched_user.user_name[0] }} {{ watched_user.user_name }}
{% else %} {% else %}
{{ watched_user.identifier }} {{ watched_user.identifier }}
{% endif %} {% endif %}
@ -47,7 +47,7 @@
</td> </td>
{% endif %} {% endif %}
{% if user.can_read("family_name") or user.can_read("given_name") %} {% if user.can_read("family_name") or user.can_read("given_name") %}
<td>{{ watched_user.formatted_name[0] }}</td> <td>{{ watched_user.formatted_name }}</td>
{% endif %} {% endif %}
{% if user.can_read("emails") %} {% if user.can_read("emails") %}
<td><a href="mailto:{{ watched_user.preferred_email }}">{{ watched_user.preferred_email }}</a></td> <td><a href="mailto:{{ watched_user.preferred_email }}">{{ watched_user.preferred_email }}</a></td>

View file

@ -27,7 +27,7 @@ class Client(Model):
logo_uri: Optional[str] logo_uri: Optional[str]
grant_types: List[str] grant_types: List[str]
response_types: List[str] response_types: List[str]
scope: Optional[str] scope: List[str]
tos_uri: Optional[str] tos_uri: Optional[str]
policy_uri: Optional[str] policy_uri: Optional[str]
jwks_uri: Optional[str] jwks_uri: Optional[str]
@ -49,7 +49,7 @@ class AuthorizationCode(Model):
subject: User subject: User
redirect_uri: Optional[str] redirect_uri: Optional[str]
response_type: Optional[str] response_type: Optional[str]
scope: Optional[str] scope: List[str]
nonce: Optional[str] nonce: Optional[str]
issue_date: datetime.datetime issue_date: datetime.datetime
lifetime: int lifetime: int
@ -70,7 +70,7 @@ class Token(Model):
subject: User subject: User
type: str type: str
refresh_token: str refresh_token: str
scope: str scope: List[str]
issue_date: datetime.datetime issue_date: datetime.datetime
lifetime: int lifetime: int
revokation_date: datetime.datetime revokation_date: datetime.datetime
@ -86,7 +86,7 @@ class Consent(Model):
consent_id: str consent_id: str
subject: User subject: User
client: "Client" client: "Client"
scope: str scope: List[str]
issue_date: datetime.datetime issue_date: datetime.datetime
revokation_date: datetime.datetime revokation_date: datetime.datetime

View file

@ -242,7 +242,7 @@ def end_session():
if ( if (
not data.get("id_token_hint") not data.get("id_token_hint")
or (data.get("logout_hint") and data["logout_hint"] != user.user_name[0]) or (data.get("logout_hint") and data["logout_hint"] != user.user_name)
) and not session.get("end_session_confirmation"): ) and not session.get("end_session_confirmation"):
session["end_session_data"] = data session["end_session_data"] = data
return render_template("logout.html", form=form, client=client, menu=False) return render_template("logout.html", form=form, client=client, menu=False)
@ -282,7 +282,7 @@ def end_session():
if client: if client:
valid_uris.extend(client.post_logout_redirect_uris or []) valid_uris.extend(client.post_logout_redirect_uris or [])
if user.user_name[0] != id_token["sub"] and not session.get( if user.user_name != id_token["sub"] and not session.get(
"end_session_confirmation" "end_session_confirmation"
): ):
session["end_session_data"] = data session["end_session_data"] = data

View file

@ -37,17 +37,17 @@ DEFAULT_JWT_ALG = "RS256"
DEFAULT_JWT_EXP = 3600 DEFAULT_JWT_EXP = 3600
AUTHORIZATION_CODE_LIFETIME = 84400 AUTHORIZATION_CODE_LIFETIME = 84400
DEFAULT_JWT_MAPPING = { DEFAULT_JWT_MAPPING = {
"SUB": "{{ user.user_name[0] }}", "SUB": "{{ user.user_name }}",
"NAME": "{% if user.formatted_name %}{{ user.formatted_name[0] }}{% endif %}", "NAME": "{% if user.formatted_name %}{{ user.formatted_name }}{% endif %}",
"PHONE_NUMBER": "{% if user.phone_numbers %}{{ user.phone_numbers[0] }}{% endif %}", "PHONE_NUMBER": "{% if user.phone_numbers %}{{ user.phone_numbers[0] }}{% endif %}",
"EMAIL": "{% if user.preferred_email %}{{ user.preferred_email }}{% endif %}", "EMAIL": "{% if user.preferred_email %}{{ user.preferred_email }}{% endif %}",
"GIVEN_NAME": "{% if user.given_name %}{{ user.given_name[0] }}{% endif %}", "GIVEN_NAME": "{% if user.given_name %}{{ user.given_name }}{% endif %}",
"FAMILY_NAME": "{% if user.family_name %}{{ user.family_name[0] }}{% endif %}", "FAMILY_NAME": "{% if user.family_name %}{{ user.family_name }}{% endif %}",
"PREFERRED_USERNAME": "{% if user.display_name %}{{ user.display_name }}{% endif %}", "PREFERRED_USERNAME": "{% if user.display_name %}{{ user.display_name }}{% endif %}",
"LOCALE": "{% if user.preferred_language %}{{ user.preferred_language }}{% endif %}", "LOCALE": "{% if user.preferred_language %}{{ user.preferred_language }}{% endif %}",
"ADDRESS": "{% if user.formatted_address %}{{ user.formatted_address[0] }}{% endif %}", "ADDRESS": "{% if user.formatted_address %}{{ user.formatted_address }}{% endif %}",
"PICTURE": "{% if user.photo %}{{ url_for('core.account.photo', user=user, field='photo', _external=True) }}{% endif %}", "PICTURE": "{% if user.photo %}{{ url_for('core.account.photo', user=user, field='photo', _external=True) }}{% endif %}",
"WEBSITE": "{% if user.profile_url %}{{ user.profile_url[0] }}{% endif %}", "WEBSITE": "{% if user.profile_url %}{{ user.profile_url }}{% endif %}",
} }
@ -332,9 +332,9 @@ class IntrospectionEndpoint(_IntrospectionEndpoint):
"active": True, "active": True,
"client_id": token.client.client_id, "client_id": token.client.client_id,
"token_type": token.type, "token_type": token.type,
"username": token.subject.formatted_name[0], "username": token.subject.formatted_name,
"scope": token.get_scope(), "scope": token.get_scope(),
"sub": token.subject.user_name[0], "sub": token.subject.user_name,
"aud": audience, "aud": audience,
"iss": get_issuer(), "iss": get_issuer(),
"exp": token.get_expires_at(), "exp": token.get_expires_at(),

View file

@ -32,7 +32,7 @@
</div> </div>
<div class="ui center aligned container"> <div class="ui center aligned container">
{{ gettext('You are logged in as %(name)s', name=user.formatted_name[0]) }} {{ gettext('You are logged in as %(name)s', name=user.formatted_name) }}
</div> </div>
<div class="ui center aligned container"> <div class="ui center aligned container">

View file

@ -15,7 +15,7 @@
<td><a href="{{ url_for('oidc.clients.edit', client=authorization.client) }}">{{ authorization.client.client_id }}</a></td> <td><a href="{{ url_for('oidc.clients.edit', client=authorization.client) }}">{{ authorization.client.client_id }}</a></td>
<td> <td>
<a href="{{ url_for("core.account.profile_edition", edited_user=authorization.subject) }}"> <a href="{{ url_for("core.account.profile_edition", edited_user=authorization.subject) }}">
{{ authorization.subject.user_name[0] }} {{ authorization.subject.user_name }}
</a> </a>
</td> </td>
<td>{{ authorization.issue_date }}</td> <td>{{ authorization.issue_date }}</td>

View file

@ -23,7 +23,7 @@
</td> </td>
<td> <td>
<a href="{{ url_for("core.account.profile_edition", edited_user=token.subject) }}"> <a href="{{ url_for("core.account.profile_edition", edited_user=token.subject) }}">
{{ token.subject.user_name[0] }} {{ token.subject.user_name }}
</a> </a>
</td> </td>
<td>{{ token.issue_date }}</td> <td>{{ token.issue_date }}</td>

View file

@ -230,17 +230,17 @@ DYNAMIC_CLIENT_REGISTRATION_TOKENS = [
# User objectClass. # User objectClass.
# {attribute} will be replaced by the user ldap attribute value. # {attribute} will be replaced by the user ldap attribute value.
# Default values fits inetOrgPerson. # Default values fits inetOrgPerson.
# SUB = "{{ user.user_name[0] }}" # SUB = "{{ user.user_name }}"
# NAME = "{{ user.formatted_name[0] }}" # NAME = "{{ user.formatted_name }}"
# PHONE_NUMBER = "{{ user.phone_numbers[0] }}" # PHONE_NUMBER = "{{ user.phone_numbers[0] }}"
# EMAIL = "{{ user.preferred_email }}" # EMAIL = "{{ user.preferred_email }}"
# GIVEN_NAME = "{{ user.given_name[0] }}" # GIVEN_NAME = "{{ user.given_name }}"
# FAMILY_NAME = "{{ user.family_name[0] }}" # FAMILY_NAME = "{{ user.family_name }}"
# PREFERRED_USERNAME = "{{ user.display_name }}" # PREFERRED_USERNAME = "{{ user.display_name }}"
# LOCALE = "{{ user.preferred_language }}" # LOCALE = "{{ user.preferred_language }}"
# ADDRESS = "{{ user.formatted_address[0] }}" # ADDRESS = "{{ user.formatted_address }}"
# PICTURE = "{% if user.photo %}{{ url_for('core.account.photo', user=user, field='photo', _external=True) }}{% endif %}" # PICTURE = "{% if user.photo %}{{ url_for('core.account.photo', user=user, field='photo', _external=True) }}{% endif %}"
# WEBSITE = "{{ user.profile_url[0] }}" # WEBSITE = "{{ user.profile_url }}"
# The SMTP server options. If not set, mail related features such as # The SMTP server options. If not set, mail related features such as
# user invitations, and password reset emails, will be disabled. # user invitations, and password reset emails, will be disabled.

View file

@ -230,17 +230,17 @@ DYNAMIC_CLIENT_REGISTRATION_TOKENS = [
# User objectClass. # User objectClass.
# {attribute} will be replaced by the user ldap attribute value. # {attribute} will be replaced by the user ldap attribute value.
# Default values fits inetOrgPerson. # Default values fits inetOrgPerson.
# SUB = "{{ user.user_name[0] }}" # SUB = "{{ user.user_name }}"
# NAME = "{{ user.formatted_name[0] }}" # NAME = "{{ user.formatted_name }}"
# PHONE_NUMBER = "{{ user.phone_numbers[0] }}" # PHONE_NUMBER = "{{ user.phone_numbers[0] }}"
# EMAIL = "{{ user.preferred_email }}" # EMAIL = "{{ user.preferred_email }}"
# GIVEN_NAME = "{{ user.given_name[0] }}" # GIVEN_NAME = "{{ user.given_name }}"
# FAMILY_NAME = "{{ user.family_name[0] }}" # FAMILY_NAME = "{{ user.family_name }}"
# PREFERRED_USERNAME = "{{ user.display_name }}" # PREFERRED_USERNAME = "{{ user.display_name }}"
# LOCALE = "{{ user.preferred_language }}" # LOCALE = "{{ user.preferred_language }}"
# ADDRESS = "{{ user.formatted_address[0] }}" # ADDRESS = "{{ user.formatted_address }}"
# PICTURE = "{% if user.photo %}{{ url_for('core.account.photo', user=user, field='photo', _external=True) }}{% endif %}" # PICTURE = "{% if user.photo %}{{ url_for('core.account.photo', user=user, field='photo', _external=True) }}{% endif %}"
# WEBSITE = "{{ user.profile_url[0] }}" # WEBSITE = "{{ user.profile_url }}"
# The SMTP server options. If not set, mail related features such as # The SMTP server options. If not set, mail related features such as
# user invitations, and password reset emails, will be disabled. # user invitations, and password reset emails, will be disabled.

View file

@ -234,17 +234,17 @@ DYNAMIC_CLIENT_REGISTRATION_TOKENS = [
# User objectClass. # User objectClass.
# {attribute} will be replaced by the user ldap attribute value. # {attribute} will be replaced by the user ldap attribute value.
# Default values fits inetOrgPerson. # Default values fits inetOrgPerson.
# SUB = "{{ user.user_name[0] }}" # SUB = "{{ user.user_name }}"
# NAME = "{{ user.formatted_name[0] }}" # NAME = "{{ user.formatted_name }}"
# PHONE_NUMBER = "{{ user.phone_numbers[0] }}" # PHONE_NUMBER = "{{ user.phone_numbers[0] }}"
# EMAIL = "{{ user.preferred_email }}" # EMAIL = "{{ user.preferred_email }}"
# GIVEN_NAME = "{{ user.given_name[0] }}" # GIVEN_NAME = "{{ user.given_name }}"
# FAMILY_NAME = "{{ user.family_name[0] }}" # FAMILY_NAME = "{{ user.family_name }}"
# PREFERRED_USERNAME = "{{ user.display_name }}" # PREFERRED_USERNAME = "{{ user.display_name }}"
# LOCALE = "{{ user.preferred_language }}" # LOCALE = "{{ user.preferred_language }}"
# ADDRESS = "{{ user.formatted_address[0] }}" # ADDRESS = "{{ user.formatted_address }}"
# PICTURE = "{% if user.photo %}{{ url_for('core.account.photo', user=user, field='photo', _external=True) }}{% endif %}" # PICTURE = "{% if user.photo %}{{ url_for('core.account.photo', user=user, field='photo', _external=True) }}{% endif %}"
# WEBSITE = "{{ user.profile_url[0] }}" # WEBSITE = "{{ user.profile_url }}"
# The SMTP server options. If not set, mail related features such as # The SMTP server options. If not set, mail related features such as
# user invitations, and password reset emails, will be disabled. # user invitations, and password reset emails, will be disabled.

View file

@ -234,17 +234,17 @@ DYNAMIC_CLIENT_REGISTRATION_TOKENS = [
# User objectClass. # User objectClass.
# {attribute} will be replaced by the user ldap attribute value. # {attribute} will be replaced by the user ldap attribute value.
# Default values fits inetOrgPerson. # Default values fits inetOrgPerson.
# SUB = "{{ user.user_name[0] }}" # SUB = "{{ user.user_name }}"
# NAME = "{{ user.formatted_name[0] }}" # NAME = "{{ user.formatted_name }}"
# PHONE_NUMBER = "{{ user.phone_numbers[0] }}" # PHONE_NUMBER = "{{ user.phone_numbers[0] }}"
# EMAIL = "{{ user.preferred_email }}" # EMAIL = "{{ user.preferred_email }}"
# GIVEN_NAME = "{{ user.given_name[0] }}" # GIVEN_NAME = "{{ user.given_name }}"
# FAMILY_NAME = "{{ user.family_name[0] }}" # FAMILY_NAME = "{{ user.family_name }}"
# PREFERRED_USERNAME = "{{ user.display_name }}" # PREFERRED_USERNAME = "{{ user.display_name }}"
# LOCALE = "{{ user.preferred_language }}" # LOCALE = "{{ user.preferred_language }}"
# ADDRESS = "{{ user.formatted_address[0] }}" # ADDRESS = "{{ user.formatted_address }}"
# PICTURE = "{% if user.photo %}{{ url_for('core.account.photo', user=user, field='photo', _external=True) }}{% endif %}" # PICTURE = "{% if user.photo %}{{ url_for('core.account.photo', user=user, field='photo', _external=True) }}{% endif %}"
# WEBSITE = "{{ user.profile_url[0] }}" # WEBSITE = "{{ user.profile_url }}"
# The SMTP server options. If not set, mail related features such as # The SMTP server options. If not set, mail related features such as
# user invitations, and password reset emails, will be disabled. # user invitations, and password reset emails, will be disabled.

View file

@ -253,7 +253,7 @@ A mapping where keys are JWT claims, and values are LDAP user object attributes.
Attributes are rendered using jinja2, and can use a ``user`` variable. Attributes are rendered using jinja2, and can use a ``user`` variable.
:SUB: :SUB:
*Optional.* Defaults to ``{{ user.user_name[0] }}`` *Optional.* Defaults to ``{{ user.user_name }}``
:NAME: :NAME:
*Optional.* Defaults to ``{{ user.cn[0] }}`` *Optional.* Defaults to ``{{ user.cn[0] }}``
@ -265,10 +265,10 @@ Attributes are rendered using jinja2, and can use a ``user`` variable.
*Optional.* Defaults to ``{{ user.mail[0] }}`` *Optional.* Defaults to ``{{ user.mail[0] }}``
:GIVEN_NAME: :GIVEN_NAME:
*Optional.* Defaults to ``{{ user.given_name[0] }}`` *Optional.* Defaults to ``{{ user.given_name }}``
:FAMILY_NAME: :FAMILY_NAME:
*Optional.* Defaults to ``{{ user.family_name[0] }}`` *Optional.* Defaults to ``{{ user.family_name }}``
:PREFERRED_USERNAME: :PREFERRED_USERNAME:
*Optional.* Defaults to ``{{ user.display_name[0] }}`` *Optional.* Defaults to ``{{ user.display_name[0] }}``
@ -280,10 +280,10 @@ Attributes are rendered using jinja2, and can use a ``user`` variable.
*Optional.* Defaults to ``{{ user.address[0] }}`` *Optional.* Defaults to ``{{ user.address[0] }}``
:PICTURE: :PICTURE:
*Optional.* Defaults to ``{% if user.photo %}{{ url_for('core.account.photo', user_name=user.user_name[0], field='photo', _external=True) }}{% endif %}`` *Optional.* Defaults to ``{% if user.photo %}{{ url_for('core.account.photo', user_name=user.user_name, field='photo', _external=True) }}{% endif %}``
:WEBSITE: :WEBSITE:
*Optional.* Defaults to ``{{ user.profile_url[0] }}`` *Optional.* Defaults to ``{{ user.profile_url }}``
SMTP SMTP

View file

@ -14,6 +14,7 @@ from canaille.backends.ldap.utils import python_to_ldap
from canaille.backends.ldap.utils import Syntax from canaille.backends.ldap.utils import Syntax
# TODO: tester le changement de cardinalité des attributs
def test_object_creation(app, backend): def test_object_creation(app, backend):
user = models.User( user = models.User(
formatted_name="Doe", # leading space formatted_name="Doe", # leading space
@ -45,13 +46,14 @@ def test_dn_when_leading_space_in_id_attribute(testclient, backend):
) )
user.save() user.save()
assert ldap.dn.is_dn(user.dn) dn = user.id
assert ldap.dn.dn2str(ldap.dn.str2dn(user.dn)) == user.dn assert dn == "uid=user,ou=users,dc=mydomain,dc=tld"
assert user.dn == "uid=user,ou=users,dc=mydomain,dc=tld" assert ldap.dn.is_dn(dn)
assert ldap.dn.dn2str(ldap.dn.str2dn(dn)) == dn
assert user == models.User.get(user.identifier) assert user == models.User.get(user.identifier)
assert user == models.User.get(user_name=user.identifier) assert user == models.User.get(user_name=user.identifier)
assert user == models.User.get(id=user.dn) assert user == models.User.get(id=dn)
user.delete() user.delete()
@ -65,13 +67,14 @@ def test_special_chars_in_rdn(testclient, backend):
) )
user.save() user.save()
assert ldap.dn.is_dn(user.dn) dn = user.id
assert ldap.dn.dn2str(ldap.dn.str2dn(user.dn)) == user.dn assert ldap.dn.is_dn(dn)
assert user.dn == "uid=\\#user,ou=users,dc=mydomain,dc=tld" assert ldap.dn.dn2str(ldap.dn.str2dn(dn)) == dn
assert dn == "uid=\\#user,ou=users,dc=mydomain,dc=tld"
assert user == models.User.get(user.identifier) assert user == models.User.get(user.identifier)
assert user == models.User.get(user_name=user.identifier) assert user == models.User.get(user_name=user.identifier)
assert user == models.User.get(id=user.dn) assert user == models.User.get(id=dn)
user.delete() user.delete()
@ -179,10 +182,11 @@ def test_operational_attribute_conversion(backend):
def test_guess_object_from_dn(backend, testclient, foo_group): def test_guess_object_from_dn(backend, testclient, foo_group):
foo_group.members = [foo_group] foo_group.members = [foo_group]
foo_group.save() foo_group.save()
g = LDAPObject.get(id=foo_group.dn) dn = foo_group.id
g = LDAPObject.get(id=dn)
assert isinstance(g, models.Group) assert isinstance(g, models.Group)
assert g == foo_group assert g == foo_group
assert g.cn == foo_group.cn assert g.display_name == foo_group.display_name
ou = LDAPObject.get(id=f"{models.Group.base},{models.Group.root_dn}") ou = LDAPObject.get(id=f"{models.Group.base},{models.Group.root_dn}")
assert isinstance(ou, LDAPObject) assert isinstance(ou, LDAPObject)
@ -195,8 +199,10 @@ def test_object_class_update(backend, testclient):
user1 = models.User(cn="foo1", sn="bar1", user_name="baz1") user1 = models.User(cn="foo1", sn="bar1", user_name="baz1")
user1.save() user1.save()
assert user1.objectClass == ["inetOrgPerson"] assert user1.get_ldap_attribute("objectClass") == ["inetOrgPerson"]
assert models.User.get(id=user1.id).objectClass == ["inetOrgPerson"] assert models.User.get(id=user1.id).get_ldap_attribute("objectClass") == [
"inetOrgPerson"
]
testclient.app.config["BACKENDS"]["LDAP"]["USER_CLASS"] = [ testclient.app.config["BACKENDS"]["LDAP"]["USER_CLASS"] = [
"inetOrgPerson", "inetOrgPerson",
@ -207,18 +213,24 @@ def test_object_class_update(backend, testclient):
user2 = models.User(cn="foo2", sn="bar2", user_name="baz2") user2 = models.User(cn="foo2", sn="bar2", user_name="baz2")
user2.save() user2.save()
assert user2.objectClass == ["inetOrgPerson", "extensibleObject"] assert user2.get_ldap_attribute("objectClass") == [
assert models.User.get(id=user2.id).objectClass == [ "inetOrgPerson",
"extensibleObject",
]
assert models.User.get(id=user2.id).get_ldap_attribute("objectClass") == [
"inetOrgPerson", "inetOrgPerson",
"extensibleObject", "extensibleObject",
] ]
user1 = models.User.get(id=user1.id) user1 = models.User.get(id=user1.id)
assert user1.objectClass == ["inetOrgPerson"] assert user1.get_ldap_attribute("objectClass") == ["inetOrgPerson"]
user1.save() user1.save()
assert user1.objectClass == ["inetOrgPerson", "extensibleObject"] assert user1.get_ldap_attribute("objectClass") == [
assert models.User.get(id=user1.id).objectClass == [ "inetOrgPerson",
"extensibleObject",
]
assert models.User.get(id=user1.id).get_ldap_attribute("objectClass") == [
"inetOrgPerson", "inetOrgPerson",
"extensibleObject", "extensibleObject",
] ]

View file

@ -24,9 +24,6 @@ def test_required_methods(testclient):
with pytest.raises(NotImplementedError): with pytest.raises(NotImplementedError):
obj.delete() obj.delete()
with pytest.raises(NotImplementedError):
obj.update()
with pytest.raises(NotImplementedError): with pytest.raises(NotImplementedError):
obj.reload() obj.reload()
@ -74,11 +71,11 @@ def test_model_lifecycle(testclient, backend):
user.family_name = "new_family_name" user.family_name = "new_family_name"
assert user.family_name == ["new_family_name"] assert user.family_name == "new_family_name"
user.reload() user.reload()
assert user.family_name == ["family_name"] assert user.family_name == "family_name"
user.delete() user.delete()
@ -96,29 +93,30 @@ def test_model_attribute_edition(testclient, backend):
) )
user.save() user.save()
assert user.user_name == ["user_name"] assert user.user_name == "user_name"
assert user.family_name == ["family_name"] assert user.family_name == "family_name"
assert user.emails == ["email1@user.com", "email2@user.com"] assert user.emails == ["email1@user.com", "email2@user.com"]
user = models.User.get(id=user.id) user = models.User.get(id=user.id)
assert user.user_name == ["user_name"] assert user.user_name == "user_name"
assert user.family_name == ["family_name"] assert user.family_name == "family_name"
assert user.emails == ["email1@user.com", "email2@user.com"] assert user.emails == ["email1@user.com", "email2@user.com"]
user.family_name = ["new_family_name"] user.family_name = "new_family_name"
user.emails = ["email1@user.com"] user.emails = ["email1@user.com"]
user.save() user.save()
assert user.family_name == ["new_family_name"] assert user.family_name == "new_family_name"
assert user.emails == ["email1@user.com"] assert user.emails == ["email1@user.com"]
user = models.User.get(id=user.id) user = models.User.get(id=user.id)
assert user.family_name == ["new_family_name"] assert user.family_name == "new_family_name"
assert user.emails == ["email1@user.com"] assert user.emails == ["email1@user.com"]
user.display_name = [""] user.display_name = ""
user.save() assert not user.display_name
user.save()
assert not user.display_name assert not user.display_name
user.delete() user.delete()

View file

@ -152,7 +152,7 @@ def test_moderator_can_create_edit_and_delete_group(
logged_moderator.reload() logged_moderator.reload()
bar_group = models.Group.get(display_name="bar") bar_group = models.Group.get(display_name="bar")
assert bar_group.display_name == "bar" assert bar_group.display_name == "bar"
assert bar_group.description == ["yolo"] assert bar_group.description == "yolo"
assert bar_group.members == [ assert bar_group.members == [
logged_moderator logged_moderator
] # Group cannot be empty so creator is added in it ] # Group cannot be empty so creator is added in it
@ -162,7 +162,7 @@ def test_moderator_can_create_edit_and_delete_group(
res = testclient.get("/groups/bar", status=200) res = testclient.get("/groups/bar", status=200)
form = res.forms["editgroupform"] form = res.forms["editgroupform"]
form["display_name"] = "bar2" form["display_name"] = "bar2"
form["description"] = ["yolo2"] form["description"] = "yolo2"
res = form.submit(name="action", value="edit") res = form.submit(name="action", value="edit")
assert res.flashes == [("error", "Group edition failed.")] assert res.flashes == [("error", "Group edition failed.")]
@ -170,13 +170,13 @@ def test_moderator_can_create_edit_and_delete_group(
bar_group = models.Group.get(display_name="bar") bar_group = models.Group.get(display_name="bar")
assert bar_group.display_name == "bar" assert bar_group.display_name == "bar"
assert bar_group.description == ["yolo"] assert bar_group.description == "yolo"
assert models.Group.get(display_name="bar2") is None assert models.Group.get(display_name="bar2") is None
# Group description can be edited # Group description can be edited
res = testclient.get("/groups/bar", status=200) res = testclient.get("/groups/bar", status=200)
form = res.forms["editgroupform"] form = res.forms["editgroupform"]
form["description"] = ["yolo2"] form["description"] = "yolo2"
res = form.submit(name="action", value="edit") res = form.submit(name="action", value="edit")
assert res.flashes == [("success", "The group bar has been sucessfully edited.")] assert res.flashes == [("success", "The group bar has been sucessfully edited.")]
@ -184,7 +184,7 @@ def test_moderator_can_create_edit_and_delete_group(
bar_group = models.Group.get(display_name="bar") bar_group = models.Group.get(display_name="bar")
assert bar_group.display_name == "bar" assert bar_group.display_name == "bar"
assert bar_group.description == ["yolo2"] assert bar_group.description == "yolo2"
# Group is deleted # Group is deleted
res = res.forms["editgroupform"].submit(name="action", value="confirm-delete") res = res.forms["editgroupform"].submit(name="action", value="confirm-delete")
@ -279,15 +279,15 @@ def test_user_list_search(testclient, logged_admin, foo_group, user, moderator):
res = testclient.get("/groups/foo") res = testclient.get("/groups/foo")
res.mustcontain("3 items") res.mustcontain("3 items")
res.mustcontain(user.formatted_name[0]) res.mustcontain(user.formatted_name)
res.mustcontain(logged_admin.formatted_name[0]) res.mustcontain(logged_admin.formatted_name)
res.mustcontain(moderator.formatted_name[0]) res.mustcontain(moderator.formatted_name)
form = res.forms["search"] form = res.forms["search"]
form["query"] = "ohn" form["query"] = "ohn"
res = form.submit() res = form.submit()
res.mustcontain("1 item") res.mustcontain("1 item")
res.mustcontain(user.formatted_name[0]) res.mustcontain(user.formatted_name)
res.mustcontain(no=logged_admin.formatted_name[0]) res.mustcontain(no=logged_admin.formatted_name)
res.mustcontain(no=moderator.formatted_name[0]) res.mustcontain(no=moderator.formatted_name)

View file

@ -26,7 +26,7 @@ def test_user_creation_edition_and_deletion(
res = res.follow(status=200) res = res.follow(status=200)
george = models.User.get_from_login("george") george = models.User.get_from_login("george")
foo_group.reload() foo_group.reload()
assert "George" == george.given_name[0] assert "George" == george.given_name
assert george.groups == [foo_group] assert george.groups == [foo_group]
assert george.check_password("totoyolo")[0] assert george.check_password("totoyolo")[0]
@ -46,7 +46,7 @@ def test_user_creation_edition_and_deletion(
res = res.form.submit(name="action", value="edit-settings").follow() res = res.form.submit(name="action", value="edit-settings").follow()
george = models.User.get_from_login("george") george = models.User.get_from_login("george")
assert "Georgio" == george.given_name[0] assert "Georgio" == george.given_name
assert george.check_password("totoyolo")[0] assert george.check_password("totoyolo")[0]
foo_group.reload() foo_group.reload()
@ -92,7 +92,7 @@ def test_user_creation_without_password(testclient, logged_moderator):
assert ("success", "User account creation succeed.") in res.flashes assert ("success", "User account creation succeed.") in res.flashes
res = res.follow(status=200) res = res.follow(status=200)
george = models.User.get_from_login("george") george = models.User.get_from_login("george")
assert george.user_name[0] == "george" assert george.user_name == "george"
assert not george.has_password() assert not george.has_password()
george.delete() george.delete()
@ -145,7 +145,7 @@ def test_cn_setting_with_given_name_and_surname(testclient, logged_moderator):
) )
george = models.User.get_from_login("george") george = models.User.get_from_login("george")
assert george.formatted_name[0] == "George Abitbol" assert george.formatted_name == "George Abitbol"
george.delete() george.delete()
@ -160,7 +160,7 @@ def test_cn_setting_with_surname_only(testclient, logged_moderator):
) )
george = models.User.get_from_login("george") george = models.User.get_from_login("george")
assert george.formatted_name[0] == "Abitbol" assert george.formatted_name == "Abitbol"
george.delete() george.delete()

View file

@ -57,16 +57,16 @@ def test_user_list_bad_pages(testclient, logged_admin):
def test_user_list_search(testclient, logged_admin, user, moderator): def test_user_list_search(testclient, logged_admin, user, moderator):
res = testclient.get("/users") res = testclient.get("/users")
res.mustcontain("3 items") res.mustcontain("3 items")
res.mustcontain(moderator.formatted_name[0]) res.mustcontain(moderator.formatted_name)
res.mustcontain(user.formatted_name[0]) res.mustcontain(user.formatted_name)
form = res.forms["search"] form = res.forms["search"]
form["query"] = "Jack" form["query"] = "Jack"
res = form.submit() res = form.submit()
res.mustcontain("1 item") res.mustcontain("1 item")
res.mustcontain(moderator.formatted_name[0]) res.mustcontain(moderator.formatted_name)
res.mustcontain(no=user.formatted_name[0]) res.mustcontain(no=user.formatted_name)
def test_user_list_search_only_allowed_fields( def test_user_list_search_only_allowed_fields(
@ -74,16 +74,16 @@ def test_user_list_search_only_allowed_fields(
): ):
res = testclient.get("/users") res = testclient.get("/users")
res.mustcontain("3 items") res.mustcontain("3 items")
res.mustcontain(moderator.formatted_name[0]) res.mustcontain(moderator.formatted_name)
res.mustcontain(user.formatted_name[0]) res.mustcontain(user.formatted_name)
form = res.forms["search"] form = res.forms["search"]
form["query"] = "user" form["query"] = "user"
res = form.submit() res = form.submit()
res.mustcontain("1 item") res.mustcontain("1 item")
res.mustcontain(user.formatted_name[0]) res.mustcontain(user.formatted_name)
res.mustcontain(no=moderator.formatted_name[0]) res.mustcontain(no=moderator.formatted_name)
testclient.app.config["ACL"]["DEFAULT"]["READ"].remove("user_name") testclient.app.config["ACL"]["DEFAULT"]["READ"].remove("user_name")
g.user.reload() g.user.reload()
@ -93,8 +93,8 @@ def test_user_list_search_only_allowed_fields(
res = form.submit() res = form.submit()
res.mustcontain("0 items") res.mustcontain("0 items")
res.mustcontain(no=user.formatted_name[0]) res.mustcontain(no=user.formatted_name)
res.mustcontain(no=moderator.formatted_name[0]) res.mustcontain(no=moderator.formatted_name)
def test_edition_permission( def test_edition_permission(
@ -143,25 +143,25 @@ def test_edition(
logged_user.reload() logged_user.reload()
assert logged_user.given_name == ["given_name"] assert logged_user.given_name == "given_name"
assert logged_user.family_name == ["family_name"] assert logged_user.family_name == "family_name"
assert logged_user.display_name == "display_name" assert logged_user.display_name == "display_name"
assert logged_user.emails == ["email@mydomain.tld"] assert logged_user.emails == ["email@mydomain.tld"]
assert logged_user.phone_numbers == ["555-666-777"] assert logged_user.phone_numbers == ["555-666-777"]
assert logged_user.formatted_address == ["formatted_address"] assert logged_user.formatted_address == "formatted_address"
assert logged_user.street == ["street"] assert logged_user.street == "street"
assert logged_user.postal_code == ["postal_code"] assert logged_user.postal_code == "postal_code"
assert logged_user.locality == ["locality"] assert logged_user.locality == "locality"
assert logged_user.region == ["region"] assert logged_user.region == "region"
assert logged_user.preferred_language == "fr" assert logged_user.preferred_language == "fr"
assert logged_user.employee_number == "666" assert logged_user.employee_number == "666"
assert logged_user.department == ["1337"] assert logged_user.department == "1337"
assert logged_user.title == ["title"] assert logged_user.title == "title"
assert logged_user.organization == ["organization"] assert logged_user.organization == "organization"
assert logged_user.photo == [jpeg_photo] assert logged_user.photo == jpeg_photo
logged_user.formatted_name = ["John (johnny) Doe"] logged_user.formatted_name = "John (johnny) Doe"
logged_user.family_name = ["Doe"] logged_user.family_name = "Doe"
logged_user.emails = ["john@doe.com"] logged_user.emails = ["john@doe.com"]
logged_user.given_name = None logged_user.given_name = None
logged_user.photo = None logged_user.photo = None
@ -331,7 +331,7 @@ def test_surname_is_mandatory(testclient, logged_user):
logged_user.reload() logged_user.reload()
assert ["Doe"] == logged_user.family_name assert "Doe" == logged_user.family_name
def test_formcontrol(testclient, logged_user): def test_formcontrol(testclient, logged_user):

View file

@ -63,7 +63,7 @@ def test_photo_on_profile_edition(
logged_user.reload() logged_user.reload()
assert [jpeg_photo] == logged_user.photo assert logged_user.photo == jpeg_photo
# No change # No change
res = testclient.get("/profile/user", status=200) res = testclient.get("/profile/user", status=200)
@ -75,7 +75,7 @@ def test_photo_on_profile_edition(
logged_user.reload() logged_user.reload()
assert [jpeg_photo] == logged_user.photo assert logged_user.photo == jpeg_photo
# Photo deletion # Photo deletion
res = testclient.get("/profile/user", status=200) res = testclient.get("/profile/user", status=200)
@ -87,7 +87,7 @@ def test_photo_on_profile_edition(
logged_user.reload() logged_user.reload()
assert logged_user.photo == [] assert logged_user.photo is None
# Photo deletion AND upload, this should never happen # Photo deletion AND upload, this should never happen
res = testclient.get("/profile/user", status=200) res = testclient.get("/profile/user", status=200)
@ -100,7 +100,7 @@ def test_photo_on_profile_edition(
logged_user.reload() logged_user.reload()
assert [] == logged_user.photo assert logged_user.photo is None
def test_photo_on_profile_creation(testclient, jpeg_photo, logged_admin): def test_photo_on_profile_creation(testclient, jpeg_photo, logged_admin):
@ -119,7 +119,7 @@ def test_photo_on_profile_creation(testclient, jpeg_photo, logged_admin):
) )
user = models.User.get_from_login("foobar") user = models.User.get_from_login("foobar")
assert user.photo == [jpeg_photo] assert user.photo == jpeg_photo
user.delete() user.delete()
@ -140,5 +140,5 @@ def test_photo_deleted_on_profile_creation(testclient, jpeg_photo, logged_admin)
) )
user = models.User.get_from_login("foobar") user = models.User.get_from_login("foobar")
assert user.photo == [] assert user.photo is None
user.delete() user.delete()

View file

@ -28,7 +28,7 @@ def test_edition(
assert res.flashes == [("error", "Profile edition failed.")] assert res.flashes == [("error", "Profile edition failed.")]
logged_user.reload() logged_user.reload()
assert logged_user.user_name == ["user"] assert logged_user.user_name == "user"
foo_group.reload() foo_group.reload()
bar_group.reload() bar_group.reload()
@ -38,7 +38,7 @@ def test_edition(
assert logged_user.check_password("correct horse battery staple")[0] assert logged_user.check_password("correct horse battery staple")[0]
logged_user.user_name = ["user"] logged_user.user_name = "user"
logged_user.save() logged_user.save()
@ -72,10 +72,10 @@ def test_edition_without_groups(
logged_user.reload() logged_user.reload()
assert logged_user.user_name == ["user"] assert logged_user.user_name == "user"
assert logged_user.check_password("correct horse battery staple")[0] assert logged_user.check_password("correct horse battery staple")[0]
logged_user.user_name = ["user"] logged_user.user_name = "user"
logged_user.save() logged_user.save()

View file

@ -47,7 +47,6 @@ def test_clean_command(testclient, backend, client, user):
access_token="my-valid-token", access_token="my-valid-token",
client=client, client=client,
subject=user, subject=user,
type=None,
refresh_token=gen_salt(48), refresh_token=gen_salt(48),
scope="openid profile", scope="openid profile",
issue_date=( issue_date=(
@ -61,7 +60,6 @@ def test_clean_command(testclient, backend, client, user):
access_token="my-expired-token", access_token="my-expired-token",
client=client, client=client,
subject=user, subject=user,
type=None,
refresh_token=gen_salt(48), refresh_token=gen_salt(48),
scope="openid profile", scope="openid profile",
issue_date=( issue_date=(
@ -81,5 +79,5 @@ def test_clean_command(testclient, backend, client, user):
res = runner.invoke(cli, ["clean"]) res = runner.invoke(cli, ["clean"])
assert res.exit_code == 0, res.stdout assert res.exit_code == 0, res.stdout
assert models.AuthorizationCode.query() == [valid_code] assert models.AuthorizationCode.get() == valid_code
assert models.Token.query() == [valid_token] assert models.Token.get() == valid_token

View file

@ -80,14 +80,14 @@ def test_authorization_code_flow(
"phone", "phone",
} }
claims = jwt.decode(access_token, keypair[1]) claims = jwt.decode(access_token, keypair[1])
assert claims["sub"] == logged_user.user_name[0] assert claims["sub"] == logged_user.user_name
assert claims["name"] == logged_user.formatted_name[0] assert claims["name"] == logged_user.formatted_name
assert claims["aud"] == [client.client_id, other_client.client_id] assert claims["aud"] == [client.client_id, other_client.client_id]
id_token = res.json["id_token"] id_token = res.json["id_token"]
claims = jwt.decode(id_token, keypair[1]) claims = jwt.decode(id_token, keypair[1])
assert claims["sub"] == logged_user.user_name[0] assert claims["sub"] == logged_user.user_name
assert claims["name"] == logged_user.formatted_name[0] assert claims["name"] == logged_user.formatted_name
assert claims["aud"] == [client.client_id, other_client.client_id] assert claims["aud"] == [client.client_id, other_client.client_id]
res = testclient.get( res = testclient.get(
@ -208,8 +208,8 @@ def test_authorization_code_flow_preconsented(
id_token = res.json["id_token"] id_token = res.json["id_token"]
claims = jwt.decode(id_token, keypair[1]) claims = jwt.decode(id_token, keypair[1])
assert logged_user.user_name[0] == claims["sub"] assert logged_user.user_name == claims["sub"]
assert logged_user.formatted_name[0] == claims["name"] assert logged_user.formatted_name == claims["name"]
assert [client.client_id, other_client.client_id] == claims["aud"] assert [client.client_id, other_client.client_id] == claims["aud"]
res = testclient.get( res = testclient.get(
@ -240,7 +240,7 @@ def test_logout_login(testclient, logged_user, client):
res = res.follow() res = res.follow()
res = res.follow() res = res.follow()
res.form["login"] = logged_user.user_name[0] res.form["login"] = logged_user.user_name
res = res.form.submit() res = res.form.submit()
res = res.follow() res = res.follow()
@ -767,8 +767,8 @@ def test_authorization_code_request_scope_too_large(
id_token = res.json["id_token"] id_token = res.json["id_token"]
claims = jwt.decode(id_token, keypair[1]) claims = jwt.decode(id_token, keypair[1])
assert logged_user.user_name[0] == claims["sub"] assert logged_user.user_name == claims["sub"]
assert logged_user.formatted_name[0] == claims["name"] assert logged_user.formatted_name == claims["name"]
res = testclient.get( res = testclient.get(
"/oauth/userinfo", "/oauth/userinfo",

View file

@ -18,7 +18,7 @@ def test_revokation(testclient, client, consent, logged_user, token):
assert not consent.revoked assert not consent.revoked
assert not token.revoked assert not token.revoked
res = testclient.get(f"/consent/revoke/{consent.consent_id[0]}", status=302) res = testclient.get(f"/consent/revoke/{consent.consent_id}", status=302)
assert ("success", "The access has been revoked") in res.flashes assert ("success", "The access has been revoked") in res.flashes
res = res.follow(status=200) res = res.follow(status=200)
res.mustcontain(no="Revoke access") res.mustcontain(no="Revoke access")
@ -36,7 +36,7 @@ def test_revokation_already_revoked(testclient, client, consent, logged_user):
consent.reload() consent.reload()
assert consent.revoked assert consent.revoked
res = testclient.get(f"/consent/revoke/{consent.consent_id[0]}", status=302) res = testclient.get(f"/consent/revoke/{consent.consent_id}", status=302)
assert ("error", "The access is already revoked") in res.flashes assert ("error", "The access is already revoked") in res.flashes
res = res.follow(status=200) res = res.follow(status=200)
@ -52,7 +52,7 @@ def test_restoration(testclient, client, consent, logged_user, token):
token.reload() token.reload()
assert token.revoked assert token.revoked
res = testclient.get(f"/consent/restore/{consent.consent_id[0]}", status=302) res = testclient.get(f"/consent/restore/{consent.consent_id}", status=302)
assert ("success", "The access has been restored") in res.flashes assert ("success", "The access has been restored") in res.flashes
res = res.follow(status=200) res = res.follow(status=200)
@ -65,7 +65,7 @@ def test_restoration(testclient, client, consent, logged_user, token):
def test_restoration_already_restored(testclient, client, consent, logged_user, token): def test_restoration_already_restored(testclient, client, consent, logged_user, token):
assert not consent.revoked assert not consent.revoked
res = testclient.get(f"/consent/restore/{consent.consent_id[0]}", status=302) res = testclient.get(f"/consent/restore/{consent.consent_id}", status=302)
assert ("error", "The access is not revoked") in res.flashes assert ("error", "The access is not revoked") in res.flashes
res = res.follow(status=200) res = res.follow(status=200)
@ -77,7 +77,7 @@ def test_invalid_consent_revokation(testclient, client, logged_user):
def test_someone_else_consent_revokation(testclient, client, consent, logged_moderator): def test_someone_else_consent_revokation(testclient, client, consent, logged_moderator):
res = testclient.get(f"/consent/revoke/{consent.consent_id[0]}", status=302) res = testclient.get(f"/consent/revoke/{consent.consent_id}", status=302)
assert ("success", "The access has been revoked") not in res.flashes assert ("success", "The access has been revoked") not in res.flashes
assert ("error", "Could not revoke this access") in res.flashes assert ("error", "Could not revoke this access") in res.flashes
@ -91,7 +91,7 @@ def test_invalid_consent_restoration(testclient, client, logged_user):
def test_someone_else_consent_restoration( def test_someone_else_consent_restoration(
testclient, client, consent, logged_moderator testclient, client, consent, logged_moderator
): ):
res = testclient.get(f"/consent/restore/{consent.consent_id[0]}", status=302) res = testclient.get(f"/consent/restore/{consent.consent_id}", status=302)
assert ("success", "The access has been restore") not in res.flashes assert ("success", "The access has been restore") not in res.flashes
assert ("error", "Could not restore this access") in res.flashes assert ("error", "Could not restore this access") in res.flashes
@ -171,7 +171,7 @@ def test_revoke_preconsented_client(testclient, client, logged_user, token):
token.reload() token.reload()
assert token.revoked assert token.revoked
res = testclient.get(f"/consent/restore/{consent.consent_id[0]}", status=302) res = testclient.get(f"/consent/restore/{consent.consent_id}", status=302)
assert ("success", "The access has been restored") in res.flashes assert ("success", "The access has been restored") in res.flashes
consent.reload() consent.reload()
@ -180,7 +180,7 @@ def test_revoke_preconsented_client(testclient, client, logged_user, token):
token.reload() token.reload()
assert token.revoked assert token.revoked
res = testclient.get(f"/consent/revoke/{consent.consent_id[0]}", status=302) res = testclient.get(f"/consent/revoke/{consent.consent_id}", status=302)
assert ("success", "The access has been revoked") in res.flashes assert ("success", "The access has been revoked") in res.flashes
consent.reload() consent.reload()
assert consent.revoked assert consent.revoked

View file

@ -17,7 +17,7 @@ def test_oauth_hybrid(testclient, backend, user, client):
).follow() ).follow()
assert "text/html" == res.content_type, res.json assert "text/html" == res.content_type, res.json
res.form["login"] = user.user_name[0] res.form["login"] = user.user_name
res = res.form.submit(status=302).follow() res = res.form.submit(status=302).follow()
res.form["password"] = "correct horse battery staple" res.form["password"] = "correct horse battery staple"
@ -73,8 +73,8 @@ def test_oidc_hybrid(testclient, backend, logged_user, client, keypair, other_cl
id_token = params["id_token"][0] id_token = params["id_token"][0]
claims = jwt.decode(id_token, keypair[1]) claims = jwt.decode(id_token, keypair[1])
assert logged_user.user_name[0] == claims["sub"] assert logged_user.user_name == claims["sub"]
assert logged_user.formatted_name[0] == claims["name"] assert logged_user.formatted_name == claims["name"]
assert [client.client_id, other_client.client_id] == claims["aud"] assert [client.client_id, other_client.client_id] == claims["aud"]
res = testclient.get( res = testclient.get(

View file

@ -86,8 +86,8 @@ def test_oidc_implicit(testclient, keypair, user, client, other_client):
id_token = params["id_token"][0] id_token = params["id_token"][0]
claims = jwt.decode(id_token, keypair[1]) claims = jwt.decode(id_token, keypair[1])
assert user.user_name[0] == claims["sub"] assert user.user_name == claims["sub"]
assert user.formatted_name[0] == claims["name"] assert user.formatted_name == claims["name"]
assert [client.client_id, other_client.client_id] == claims["aud"] assert [client.client_id, other_client.client_id] == claims["aud"]
res = testclient.get( res = testclient.get(
@ -141,8 +141,8 @@ def test_oidc_implicit_with_group(
id_token = params["id_token"][0] id_token = params["id_token"][0]
claims = jwt.decode(id_token, keypair[1]) claims = jwt.decode(id_token, keypair[1])
assert user.user_name[0] == claims["sub"] assert user.user_name == claims["sub"]
assert user.formatted_name[0] == claims["name"] assert user.formatted_name == claims["name"]
assert [client.client_id, other_client.client_id] == claims["aud"] assert [client.client_id, other_client.client_id] == claims["aud"]
assert ["foo"] == claims["groups"] assert ["foo"] == claims["groups"]

View file

@ -17,9 +17,9 @@ def test_access_token_introspection(testclient, user, client, token):
"active": True, "active": True,
"client_id": client.client_id, "client_id": client.client_id,
"token_type": token.type, "token_type": token.type,
"username": user.formatted_name[0], "username": user.formatted_name,
"scope": token.get_scope(), "scope": token.get_scope(),
"sub": user.user_name[0], "sub": user.user_name,
"aud": [client.client_id], "aud": [client.client_id],
"iss": "https://auth.mydomain.tld", "iss": "https://auth.mydomain.tld",
"exp": token.get_expires_at(), "exp": token.get_expires_at(),
@ -38,9 +38,9 @@ def test_refresh_token_introspection(testclient, user, client, token):
"active": True, "active": True,
"client_id": client.client_id, "client_id": client.client_id,
"token_type": token.type, "token_type": token.type,
"username": user.formatted_name[0], "username": user.formatted_name,
"scope": token.get_scope(), "scope": token.get_scope(),
"sub": user.user_name[0], "sub": user.user_name,
"aud": [client.client_id], "aud": [client.client_id],
"iss": "https://auth.mydomain.tld", "iss": "https://auth.mydomain.tld",
"exp": token.get_expires_at(), "exp": token.get_expires_at(),
@ -108,9 +108,9 @@ def test_full_flow(testclient, logged_user, client, user, other_client):
"active": True, "active": True,
"client_id": client.client_id, "client_id": client.client_id,
"token_type": token.type, "token_type": token.type,
"username": user.formatted_name[0], "username": user.formatted_name,
"scope": token.get_scope(), "scope": token.get_scope(),
"sub": user.user_name[0], "sub": user.user_name,
"iss": "https://auth.mydomain.tld", "iss": "https://auth.mydomain.tld",
"exp": token.get_expires_at(), "exp": token.get_expires_at(),
"iat": token.get_issued_at(), "iat": token.get_issued_at(),

View file

@ -284,7 +284,7 @@ def test_generate_user_standard_claims_with_default_config(testclient, backend,
def test_custom_config_format_claim_is_well_formated(testclient, backend, user): def test_custom_config_format_claim_is_well_formated(testclient, backend, user):
jwt_mapping_config = DEFAULT_JWT_MAPPING.copy() jwt_mapping_config = DEFAULT_JWT_MAPPING.copy()
jwt_mapping_config["EMAIL"] = "{{ user.user_name[0] }}@mydomain.tld" jwt_mapping_config["EMAIL"] = "{{ user.user_name }}@mydomain.tld"
data = generate_user_claims(user, STANDARD_CLAIMS, jwt_mapping_config) data = generate_user_claims(user, STANDARD_CLAIMS, jwt_mapping_config)