Properly handle LDAP date timezones

This commit is contained in:
Éloi Rivard 2023-03-18 00:38:56 +01:00
parent 6b87788d23
commit 61940844e6
16 changed files with 118 additions and 58 deletions

View file

@ -12,6 +12,7 @@ Changed
*******
- UX rework. Submenu addition. :pr:`114`
- Properly handle LDAP date timezones. :pr:`117`
[0.0.22] - 2023-03-13
=====================

View file

@ -1,8 +1,7 @@
import datetime
import io
from dataclasses import astuple
from dataclasses import dataclass
from datetime import datetime
from datetime import timedelta
from typing import List
import pkg_resources
@ -205,11 +204,13 @@ class Invitation:
@property
def creation_date(self):
return datetime.fromisoformat(self.creation_date_isoformat)
return datetime.datetime.fromisoformat(self.creation_date_isoformat)
def has_expired(self):
DEFAULT_INVITATION_DURATION = 2 * 24 * 60 * 60
return datetime.now() - self.creation_date > timedelta(
return datetime.datetime.now(
datetime.timezone.utc
) - self.creation_date > datetime.timedelta(
seconds=current_app.config.get(
"INVITATION_EXPIRATION", DEFAULT_INVITATION_DURATION
)
@ -234,7 +235,7 @@ def user_invitation(user):
if request.form and form.validate():
form_validated = True
invitation = Invitation(
datetime.now().isoformat(),
datetime.datetime.now(datetime.timezone.utc).isoformat(),
form.uid.data,
form.uid_editable.data,
form.mail.data,

View file

@ -34,8 +34,11 @@ def ldap_to_python(value, syntax):
if value == LDAP_NULL_DATE:
# python cannot represent datetimes with year 0
return datetime.datetime.min
else:
return datetime.datetime.strptime(value, "%Y%m%d%H%M%SZ") if value else None
if value.endswith("Z"):
return datetime.datetime.strptime(value, "%Y%m%d%H%M%SZ").replace(
tzinfo=datetime.timezone.utc
)
return datetime.datetime.strptime(value, "%Y%m%d%H%M%S%z")
if syntax == Syntax.INTEGER:
return int(value.decode("utf-8"))
@ -57,8 +60,10 @@ def python_to_ldap(value, syntax, encode=True):
if syntax == Syntax.GENERALIZED_TIME and isinstance(value, datetime.datetime):
if value == datetime.datetime.min:
value = LDAP_NULL_DATE
else:
elif value.tzinfo == datetime.timezone.utc:
value = value.strftime("%Y%m%d%H%M%SZ")
else:
value = value.strftime("%Y%m%d%H%M%S%z")
if syntax == Syntax.INTEGER and isinstance(value, int):
value = str(value)

View file

@ -51,7 +51,7 @@ def add(user):
)
client_id = gen_salt(24)
client_id_issued_at = datetime.datetime.now()
client_id_issued_at = datetime.datetime.now(datetime.timezone.utc)
client = Client(
client_id=client_id,
client_id_issued_at=client_id_issued_at,

View file

@ -96,7 +96,7 @@ def restore(user, consent_id):
else:
consent.restore()
if not consent.issue_date:
consent.issue_date = datetime.datetime.now()
consent.issue_date = datetime.datetime.now(datetime.timezone.utc)
consent.save()
flash(_("The access has been restored"), "success")

View file

@ -149,7 +149,7 @@ def authorize():
client=client,
subject=user,
scope=scopes,
issue_date=datetime.datetime.now(),
issue_date=datetime.datetime.now(datetime.timezone.utc),
)
consent.save()

View file

@ -141,13 +141,17 @@ class AuthorizationCode(LDAPObject, AuthorizationCodeMixin):
return self.nonce
def is_expired(self):
return (
self.issue_date + datetime.timedelta(seconds=int(self.lifetime))
< datetime.datetime.now()
)
return self.issue_date + datetime.timedelta(
seconds=int(self.lifetime)
) < datetime.datetime.now(datetime.timezone.utc)
def get_auth_time(self):
return int((self.issue_date - datetime.datetime(1970, 1, 1)).total_seconds())
return int(
(
self.issue_date
- datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)
).total_seconds()
)
class Token(LDAPObject, TokenMixin):
@ -185,11 +189,17 @@ class Token(LDAPObject, TokenMixin):
return int(self.lifetime)
def get_issued_at(self):
return int((self.issue_date - datetime.datetime(1970, 1, 1)).total_seconds())
return int(
(
self.issue_date
- datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)
).total_seconds()
)
def get_expires_at(self):
issue_timestamp = (
self.issue_date - datetime.datetime(1970, 1, 1)
self.issue_date
- datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)
).total_seconds()
return int(issue_timestamp) + int(self.lifetime)
@ -197,13 +207,12 @@ class Token(LDAPObject, TokenMixin):
if self.revokation_date:
return False
return self.expire_date >= datetime.datetime.now()
return self.expire_date >= datetime.datetime.now(datetime.timezone.utc)
def is_expired(self):
return (
self.issue_date + datetime.timedelta(seconds=int(self.lifetime))
< datetime.datetime.now()
)
return self.issue_date + datetime.timedelta(
seconds=int(self.lifetime)
) < datetime.datetime.now(datetime.timezone.utc)
def is_revoked(self):
return bool(self.revokation_date)
@ -231,7 +240,7 @@ class Consent(LDAPObject):
return bool(self.revokation_date)
def revoke(self):
self.revokation_date = datetime.datetime.now()
self.revokation_date = datetime.datetime.now(datetime.timezone.utc)
self.save()
tokens = Token.query(

View file

@ -124,7 +124,7 @@ def generate_user_claims(user, claims, jwt_mapping_config=None):
def save_authorization_code(code, request):
nonce = request.data.get("nonce")
now = datetime.datetime.now()
now = datetime.datetime.now(datetime.timezone.utc)
scope = request.client.get_allowed_scope(request.scope)
code = AuthorizationCode(
authorization_code_id=gen_salt(48),
@ -193,7 +193,7 @@ class RefreshTokenGrant(_RefreshTokenGrant):
return credential.subject
def revoke_old_credential(self, credential):
credential.revokation_date = datetime.datetime.now()
credential.revokation_date = datetime.datetime.now(datetime.timezone.utc)
credential.save()
@ -235,7 +235,7 @@ def query_client(client_id):
def save_token(token, request):
now = datetime.datetime.now()
now = datetime.datetime.now(datetime.timezone.utc)
t = Token(
token_id=gen_salt(48),
type=token["token_type"],
@ -278,7 +278,7 @@ class RevocationEndpoint(_RevocationEndpoint):
return query_token(token, token_type_hint)
def revoke_token(self, token, request):
token.revokation_date = datetime.datetime.now()
token.revokation_date = datetime.datetime.now(datetime.timezone.utc)
token.save()
@ -340,7 +340,7 @@ class ClientRegistrationEndpoint(ClientManagementMixin, _ClientRegistrationEndpo
def save_client(self, client_info, client_metadata, request):
client_info["client_id_issued_at"] = datetime.datetime.fromtimestamp(
client_info["client_id_issued_at"]
client_info["client_id_issued_at"], datetime.timezone.utc
)
if "scope" in client_metadata and not isinstance(
client_metadata["scope"], list

View file

@ -53,7 +53,7 @@ def revoke(user, token_id):
if not token:
abort(404)
token.revokation_date = datetime.datetime.now()
token.revokation_date = datetime.datetime.now(datetime.timezone.utc)
token.save()
flash(_("The token has successfully been revoked."), "success")

View file

@ -100,12 +100,30 @@ def test_fuzzy(slapd_connection, user, moderator, admin):
def test_ldap_to_python():
assert (
python_to_ldap(datetime.datetime.min, Syntax.GENERALIZED_TIME)
== b"000001010000Z"
python_to_ldap(
datetime.datetime(2000, 1, 2, 3, 4, 5, tzinfo=datetime.timezone.utc),
Syntax.GENERALIZED_TIME,
)
== b"20000102030405Z"
)
assert (
python_to_ldap(datetime.datetime(2000, 1, 2, 3, 4, 5), Syntax.GENERALIZED_TIME)
== b"20000102030405Z"
python_to_ldap(
datetime.datetime(
2000,
1,
2,
3,
4,
5,
tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=79200)),
),
Syntax.GENERALIZED_TIME,
)
== b"20000102030405-0200"
)
assert (
python_to_ldap(datetime.datetime.min, Syntax.GENERALIZED_TIME)
== b"000001010000Z"
)
assert python_to_ldap(1337, Syntax.INTEGER) == b"1337"
@ -121,7 +139,18 @@ def test_ldap_to_python():
def test_python_to_ldap():
assert ldap_to_python(
b"20000102030405Z", Syntax.GENERALIZED_TIME
) == datetime.datetime(2000, 1, 2, 3, 4, 5)
) == datetime.datetime(2000, 1, 2, 3, 4, 5, tzinfo=datetime.timezone.utc)
assert ldap_to_python(
b"20000102030405-0200", Syntax.GENERALIZED_TIME
) == datetime.datetime(
2000,
1,
2,
3,
4,
5,
tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=79200)),
)
assert (
ldap_to_python(b"000001010000Z", Syntax.GENERALIZED_TIME)
== datetime.datetime.min

View file

@ -92,7 +92,7 @@ def client(testclient, other_client, slapd_connection):
"https://mydomain.tld/redirect2",
],
logo_uri="https://mydomain.tld/logo.png",
client_id_issued_at=datetime.datetime.now(),
client_id_issued_at=datetime.datetime.now(datetime.timezone.utc),
client_secret=gen_salt(48),
grant_types=[
"password",
@ -128,7 +128,7 @@ def other_client(testclient, slapd_connection):
"https://myotherdomain.tld/redirect2",
],
logo_uri="https://myotherdomain.tld/logo.png",
client_id_issued_at=datetime.datetime.now(),
client_id_issued_at=datetime.datetime.now(datetime.timezone.utc),
client_secret=gen_salt(48),
grant_types=[
"password",
@ -163,7 +163,7 @@ def authorization(testclient, user, client, slapd_connection):
response_type="code",
scope="openid profile",
nonce="nonce",
issue_date=datetime.datetime(2020, 1, 1),
issue_date=datetime.datetime(2020, 1, 1, tzinfo=datetime.timezone.utc),
lifetime="3600",
challenge="challenge",
challenge_method="method",
@ -185,7 +185,7 @@ def token(testclient, client, user, slapd_connection):
token_type=None,
refresh_token=gen_salt(48),
scope="openid profile",
issue_date=datetime.datetime.now(),
issue_date=datetime.datetime.now(datetime.timezone.utc),
lifetime=str(3600),
)
t.save()
@ -210,7 +210,7 @@ def consent(testclient, client, user, slapd_connection):
client=client,
subject=user,
scope=["openid", "profile"],
issue_date=datetime.datetime.now(),
issue_date=datetime.datetime.now(datetime.timezone.utc),
)
t.save()
yield t

View file

@ -16,7 +16,7 @@ def test_clean_command(testclient, slapd_connection, client, user):
response_type="code",
scope="openid profile",
nonce="nonce",
issue_date=datetime.datetime.now().replace(microsecond=0),
issue_date=datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0),
lifetime=3600,
challenge="challenge",
challenge_method="method",
@ -33,7 +33,8 @@ def test_clean_command(testclient, slapd_connection, client, user):
scope="openid profile",
nonce="nonce",
issue_date=(
datetime.datetime.now().replace(microsecond=0) - datetime.timedelta(days=1)
datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0)
- datetime.timedelta(days=1)
),
lifetime=3600,
challenge="challenge",
@ -50,7 +51,9 @@ def test_clean_command(testclient, slapd_connection, client, user):
type=None,
refresh_token=gen_salt(48),
scope="openid profile",
issue_date=(datetime.datetime.now().replace(microsecond=0)),
issue_date=(
datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0)
),
lifetime=3600,
)
valid_token.save()
@ -63,7 +66,8 @@ def test_clean_command(testclient, slapd_connection, client, user):
refresh_token=gen_salt(48),
scope="openid profile",
issue_date=(
datetime.datetime.now().replace(microsecond=0) - datetime.timedelta(days=1)
datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0)
- datetime.timedelta(days=1)
),
lifetime=3600,
)

View file

@ -197,7 +197,9 @@ def test_client_delete(testclient, logged_admin):
client = Client(client_id="client_id")
client.save()
token = Token(
token_id="id", client=client, issue_datetime=datetime.datetime.utcnow()
token_id="id",
client=client,
issue_datetime=datetime.datetime.now(datetime.timezone.utc),
)
token.save()
consent = Consent(

View file

@ -30,7 +30,9 @@ def test_token_list_pagination(testclient, logged_admin, client):
type=None,
refresh_token=gen_salt(48),
scope="openid profile",
issue_date=(datetime.datetime.now().replace(microsecond=0)),
issue_date=(
datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0)
),
lifetime=3600,
)
token.save()
@ -77,7 +79,9 @@ def test_token_list_search(testclient, logged_admin, client):
type=None,
refresh_token=gen_salt(48),
scope="openid profile",
issue_date=(datetime.datetime.now().replace(microsecond=0)),
issue_date=(
datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0)
),
lifetime=3600,
)
token1.save()
@ -89,7 +93,9 @@ def test_token_list_search(testclient, logged_admin, client):
type=None,
refresh_token=gen_salt(48),
scope="openid profile",
issue_date=(datetime.datetime.now().replace(microsecond=0)),
issue_date=(
datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0)
),
lifetime=3600,
)
token2.save()

View file

@ -64,7 +64,9 @@ def test_revoke_refresh_token_with_hint(testclient, user, client, token):
def test_cannot_refresh_after_revocation(testclient, user, client, token):
token.revokation_date = datetime.datetime.utcnow() - datetime.timedelta(days=7)
token.revokation_date = datetime.datetime.now(
datetime.timezone.utc
) - datetime.timedelta(days=7)
token.save()
res = testclient.post(

View file

@ -1,5 +1,4 @@
from datetime import datetime
from datetime import timedelta
import datetime
from canaille.account import Invitation
from canaille.models import User
@ -161,7 +160,7 @@ def test_invitation_login_already_taken(testclient, logged_admin):
def test_registration(testclient, foo_group):
invitation = Invitation(
datetime.now().isoformat(),
datetime.datetime.now(datetime.timezone.utc).isoformat(),
"someoneelse",
False,
"someone@mydomain.tld",
@ -174,7 +173,7 @@ def test_registration(testclient, foo_group):
def test_registration_invalid_hash(testclient, foo_group):
now = datetime.now().isoformat()
now = datetime.datetime.now(datetime.timezone.utc).isoformat()
invitation = Invitation(
now, "anything", False, "someone@mydomain.tld", [foo_group.id]
)
@ -185,7 +184,7 @@ def test_registration_invalid_hash(testclient, foo_group):
def test_registration_invalid_data(testclient, foo_group):
invitation = Invitation(
datetime.now().isoformat(),
datetime.datetime.now(datetime.timezone.utc).isoformat(),
"someoneelse",
False,
"someone@mydomain.tld",
@ -197,7 +196,9 @@ def test_registration_invalid_data(testclient, foo_group):
def test_registration_more_than_48_hours_after_invitation(testclient, foo_group):
two_days_ago = datetime.now() - timedelta(hours=48)
two_days_ago = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(
hours=48
)
invitation = Invitation(
two_days_ago.isoformat(),
"someoneelse",
@ -213,7 +214,7 @@ def test_registration_more_than_48_hours_after_invitation(testclient, foo_group)
def test_registration_no_password(testclient, foo_group):
invitation = Invitation(
datetime.now().isoformat(),
datetime.datetime.now(datetime.timezone.utc).isoformat(),
"someoneelse",
False,
"someone@mydomain.tld",
@ -238,7 +239,7 @@ def test_registration_no_password(testclient, foo_group):
def test_no_registration_if_logged_in(testclient, logged_user, foo_group):
invitation = Invitation(
datetime.now().isoformat(),
datetime.datetime.now(datetime.timezone.utc).isoformat(),
"someoneelse",
False,
"someone@mydomain.tld",
@ -275,7 +276,7 @@ def test_groups_are_saved_even_when_user_does_not_have_read_permission(
] # remove groups from default read permissions
invitation = Invitation(
datetime.now().isoformat(),
datetime.datetime.now(datetime.timezone.utc).isoformat(),
"someoneelse",
False,
"someone@mydomain.tld",