forked from Github-Mirrors/canaille
User token list view
This commit is contained in:
parent
5ab64429de
commit
955de489db
16 changed files with 154 additions and 49 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -13,3 +13,4 @@ htmlcov
|
|||
*.egg-info
|
||||
build
|
||||
dist
|
||||
python-ldap-test*
|
||||
|
|
4
TODO.md
4
TODO.md
|
@ -1,8 +1,6 @@
|
|||
- RFC 8414
|
||||
- Extract ldaputils in another library
|
||||
- Admin filter
|
||||
- User "Manage my permissions" screen
|
||||
- Admin tokens screen
|
||||
- Admin codes screen
|
||||
- Limit login attempts by time interval
|
||||
- Cleanup LDAP connections
|
||||
- Test with wrong inputs
|
||||
|
|
|
@ -9,6 +9,7 @@ def test_login_and_out(testclient, slapd_connection, user, client):
|
|||
res.form["password"] = "correct horse battery staple"
|
||||
res = res.form.submit()
|
||||
res = res.follow()
|
||||
res = res.follow()
|
||||
assert 200 == res.status_code
|
||||
|
||||
with testclient.session_transaction() as session:
|
||||
|
@ -59,6 +60,8 @@ def test_login_with_alternate_attribute(testclient, slapd_connection, user, clie
|
|||
res.form["password"] = "correct horse battery staple"
|
||||
res = res.form.submit()
|
||||
res = res.follow()
|
||||
assert 302 == res.status_code
|
||||
res = res.follow()
|
||||
assert 200 == res.status_code
|
||||
|
||||
with testclient.session_transaction() as session:
|
||||
|
|
|
@ -40,7 +40,8 @@ def test_authorization_code_flow(testclient, slapd_connection, logged_user, clie
|
|||
access_token = res.json["access_token"]
|
||||
|
||||
token = Token.get(access_token, conn=slapd_connection)
|
||||
assert token is not None
|
||||
assert token.oauthClientID == client.oauthClientID
|
||||
assert token.oauthSubject == logged_user.dn
|
||||
|
||||
res = testclient.get("/api/me", headers={"Authorization": f"Bearer {access_token}"})
|
||||
assert 200 == res.status_code
|
||||
|
@ -100,7 +101,8 @@ def test_logout_login(testclient, slapd_connection, logged_user, client):
|
|||
access_token = res.json["access_token"]
|
||||
|
||||
token = Token.get(access_token, conn=slapd_connection)
|
||||
assert token is not None
|
||||
assert token.oauthClientID == client.oauthClientID
|
||||
assert token.oauthSubject == logged_user.dn
|
||||
|
||||
res = testclient.get("/api/me", headers={"Authorization": f"Bearer {access_token}"})
|
||||
assert 200 == res.status_code
|
||||
|
@ -147,7 +149,8 @@ def test_refresh_token(testclient, slapd_connection, logged_user, client):
|
|||
res = testclient.post(
|
||||
"/oauth/token",
|
||||
params=dict(
|
||||
grant_type="refresh_token", refresh_token=res.json["refresh_token"],
|
||||
grant_type="refresh_token",
|
||||
refresh_token=res.json["refresh_token"],
|
||||
),
|
||||
headers={"Authorization": f"Basic {client_credentials(client)}"},
|
||||
)
|
||||
|
@ -207,7 +210,8 @@ def test_code_challenge(testclient, slapd_connection, logged_user, client):
|
|||
access_token = res.json["access_token"]
|
||||
|
||||
token = Token.get(access_token, conn=slapd_connection)
|
||||
assert token is not None
|
||||
assert token.oauthClientID == client.oauthClientID
|
||||
assert token.oauthSubject == logged_user.dn
|
||||
|
||||
res = testclient.get("/api/me", headers={"Authorization": f"Bearer {access_token}"})
|
||||
assert 200 == res.status_code
|
||||
|
|
18
tests/test_token.py
Normal file
18
tests/test_token.py
Normal file
|
@ -0,0 +1,18 @@
|
|||
def test_no_logged_no_access(testclient):
|
||||
testclient.get("/token", status=403)
|
||||
|
||||
|
||||
def test_client_list(testclient, slapd_connection, client, token, logged_user):
|
||||
res = testclient.get("/token")
|
||||
assert 200 == res.status_code
|
||||
assert token.oauthAccessToken in res.text
|
||||
|
||||
res = testclient.get(f"/token/delete/{token.oauthAccessToken}")
|
||||
assert 302 == res.status_code
|
||||
|
||||
res = res.follow()
|
||||
assert 200 == res.status_code
|
||||
assert token.oauthAccessToken not in res.text
|
||||
|
||||
token.revoked = False
|
||||
token.save(conn=slapd_connection)
|
|
@ -4,7 +4,9 @@ from . import client_credentials
|
|||
def test_token_introspection(testclient, user, client, token):
|
||||
res = testclient.post(
|
||||
"/oauth/introspect",
|
||||
params=dict(token=token.oauthAccessToken,),
|
||||
params=dict(
|
||||
token=token.oauthAccessToken,
|
||||
),
|
||||
headers={"Authorization": f"Basic {client_credentials(client)}"},
|
||||
)
|
||||
assert 200 == res.status_code
|
||||
|
|
|
@ -6,7 +6,9 @@ def test_token_revocation(testclient, user, client, token, slapd_connection):
|
|||
|
||||
res = testclient.post(
|
||||
"/oauth/revoke",
|
||||
params=dict(token=token.oauthAccessToken,),
|
||||
params=dict(
|
||||
token=token.oauthAccessToken,
|
||||
),
|
||||
headers={"Authorization": f"Basic {client_credentials(client)}"},
|
||||
)
|
||||
assert 200 == res.status_code
|
||||
|
|
|
@ -10,7 +10,9 @@ bp = Blueprint(__name__, "authorizations")
|
|||
@admin_needed()
|
||||
def index():
|
||||
authorizations = AuthorizationCode.filter()
|
||||
return render_template("admin/authorization_list.html", authorizations=authorizations)
|
||||
return render_template(
|
||||
"admin/authorization_list.html", authorizations=authorizations
|
||||
)
|
||||
|
||||
|
||||
@bp.route("/<authorization_id>", methods=["GET", "POST"])
|
||||
|
|
|
@ -152,7 +152,8 @@ def add():
|
|||
)
|
||||
client.save()
|
||||
flash(
|
||||
gettext("The client has been created."), "success",
|
||||
gettext("The client has been created."),
|
||||
"success",
|
||||
)
|
||||
|
||||
return redirect(url_for("web.admin.clients.edit", client_id=client_id))
|
||||
|
@ -196,7 +197,8 @@ def edit(client_id):
|
|||
)
|
||||
client.save()
|
||||
flash(
|
||||
gettext("The client has been edited."), "success",
|
||||
gettext("The client has been edited."),
|
||||
"success",
|
||||
)
|
||||
|
||||
return render_template("admin/client_edit.html", form=form, client=client)
|
||||
|
|
|
@ -13,9 +13,10 @@ def user_needed():
|
|||
def wrapper(view_function):
|
||||
@wraps(view_function)
|
||||
def decorator(*args, **kwargs):
|
||||
if not current_user():
|
||||
user = current_user()
|
||||
if not user:
|
||||
abort(403)
|
||||
return view_function(*args, **kwargs)
|
||||
return view_function(*args, user=user, **kwargs)
|
||||
|
||||
return decorator
|
||||
|
||||
|
|
|
@ -181,13 +181,22 @@ class LDAPObjectHelper:
|
|||
@classmethod
|
||||
def filter(cls, base=None, **kwargs):
|
||||
class_filter = "".join([f"(objectClass={oc})" for oc in cls.objectClass])
|
||||
arg_filter = "".join(f"({k}={v})" for k, v in kwargs.items())
|
||||
arg_filter = ""
|
||||
for k, v in kwargs.items():
|
||||
if not isinstance(v, list):
|
||||
arg_filter += f"({k}={v})"
|
||||
elif len(v) == 1:
|
||||
arg_filter += f"({k}={v[0]})"
|
||||
else:
|
||||
arg_filter += "(|" + "".join([f"({k}={_v})" for _v in v]) + ")"
|
||||
ldapfilter = f"(&{class_filter}{arg_filter})"
|
||||
base = base or f"{cls.base},{cls.root_dn}"
|
||||
result = cls.ldap().search_s(base, ldap.SCOPE_SUBTREE, ldapfilter)
|
||||
|
||||
return [
|
||||
cls(**{k: [elt.decode("utf-8") for elt in v] for k, v in args.items()},)
|
||||
cls(
|
||||
**{k: [elt.decode("utf-8") for elt in v] for k, v in args.items()},
|
||||
)
|
||||
for _, args in result
|
||||
]
|
||||
|
||||
|
|
|
@ -183,6 +183,12 @@ class Token(LDAPObjectHelper, TokenMixin):
|
|||
def issue_date(self):
|
||||
return datetime.datetime.strptime(self.oauthIssueDate, "%Y%m%d%H%M%SZ")
|
||||
|
||||
@property
|
||||
def expire_date(self):
|
||||
return datetime.datetime.strptime(
|
||||
self.oauthIssueDate, "%Y%m%d%H%M%SZ"
|
||||
) + datetime.timedelta(seconds=int(self.oauthTokenLifetime))
|
||||
|
||||
@property
|
||||
def revoked(self):
|
||||
return self.oauthRevoked in ("yes", "YES", 1, "on", "ON", "TRUE", "true")
|
||||
|
@ -216,8 +222,4 @@ class Token(LDAPObjectHelper, TokenMixin):
|
|||
if self.revoked:
|
||||
return False
|
||||
|
||||
return (
|
||||
datetime.datetime.strptime(self.oauthIssueDate, "%Y%m%d%H%M%SZ")
|
||||
+ datetime.timedelta(seconds=int(self.oauthTokenLifetime))
|
||||
>= datetime.datetime.now()
|
||||
)
|
||||
return self.expire_date >= datetime.datetime.now()
|
||||
|
|
|
@ -37,6 +37,7 @@ def get_jwt_config(grant):
|
|||
|
||||
|
||||
def generate_user_info(user, scope):
|
||||
user = User.get(user)
|
||||
fields = ["sub"]
|
||||
if "profile" in scope:
|
||||
fields += [
|
||||
|
@ -108,7 +109,7 @@ class AuthorizationCodeGrant(_AuthorizationCodeGrant):
|
|||
authorization_code.delete()
|
||||
|
||||
def authenticate_user(self, authorization_code):
|
||||
return User.get(authorization_code.oauthSubject)
|
||||
return User.get(authorization_code.oauthSubject).dn
|
||||
|
||||
|
||||
class OpenIDCode(_OpenIDCode):
|
||||
|
@ -124,7 +125,7 @@ class OpenIDCode(_OpenIDCode):
|
|||
|
||||
class PasswordGrant(_ResourceOwnerPasswordCredentialsGrant):
|
||||
def authenticate_user(self, username, password):
|
||||
return User.authenticate(username, password)
|
||||
return User.authenticate(username, password).dn
|
||||
|
||||
|
||||
class RefreshTokenGrant(_RefreshTokenGrant):
|
||||
|
@ -134,7 +135,7 @@ class RefreshTokenGrant(_RefreshTokenGrant):
|
|||
return token[0]
|
||||
|
||||
def authenticate_user(self, credential):
|
||||
return User.get(credential.oauthSubject)
|
||||
return User.get(credential.oauthSubject).dn
|
||||
|
||||
def revoke_old_credential(self, credential):
|
||||
credential.revoked = True
|
||||
|
@ -149,7 +150,6 @@ class OpenIDImplicitGrant(_OpenIDImplicitGrant):
|
|||
return get_jwt_config(grant)
|
||||
|
||||
def generate_user_info(self, user, scope):
|
||||
user = User.get(user)
|
||||
return generate_user_info(user, scope)
|
||||
|
||||
|
||||
|
@ -168,7 +168,6 @@ class OpenIDHybridGrant(_OpenIDHybridGrant):
|
|||
return get_jwt_config(grant)
|
||||
|
||||
def generate_user_info(self, user, scope):
|
||||
user = User.get(user)
|
||||
return generate_user_info(user, scope)
|
||||
|
||||
|
||||
|
@ -186,6 +185,7 @@ def save_token(token, request):
|
|||
oauthScope=token["scope"],
|
||||
oauthClientID=request.client.oauthClientID,
|
||||
oauthRefreshToken=token.get("refresh_token"),
|
||||
oauthSubject=request.user,
|
||||
)
|
||||
t.save()
|
||||
|
||||
|
|
|
@ -15,8 +15,7 @@ bp = Blueprint(__name__, "home")
|
|||
def index():
|
||||
if not current_user():
|
||||
return redirect(url_for("web.routes.login"))
|
||||
|
||||
return render_template("home.html")
|
||||
return redirect(url_for("web.tokens.tokens"))
|
||||
|
||||
|
||||
@bp.route("/login", methods=("GET", "POST"))
|
||||
|
|
|
@ -12,22 +12,58 @@
|
|||
{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
|
||||
<table class="ui table">
|
||||
<thead>
|
||||
<th>{% trans %}Token{% endtrans %}</th>
|
||||
<th>{% trans %}Client{% endtrans %}</th>
|
||||
<th>{% trans %}Subject{% endtrans %}</th>
|
||||
<th>{% trans %}Created{% endtrans %}</th>
|
||||
</thead>
|
||||
{% for token in tokens %}
|
||||
<tr>
|
||||
<td><a href="{{ url_for('web.admin.tokens.view', token_id=token.oauthAccessToken) }}">{{ token.oauthAccessToken }}</a></td>
|
||||
<td><a href="{{ url_for('web.admin.clients.edit', client_id=token.oauthClientID) }}">{{ token.oauthClientID }}</a></td>
|
||||
<td>{{ token.oauthSubject }}</td>
|
||||
<td>{{ token.issue_date }}</td>
|
||||
</tr>
|
||||
<div class="ui segment">
|
||||
<h1 class="ui header">{% trans %}My accesses{% endtrans %}</h1>
|
||||
{% with messages = get_flashed_messages(with_categories=true) %}
|
||||
{% for category, message in messages %}
|
||||
<div class="ui attached message {{ category }}">
|
||||
{{ message }}
|
||||
</div>
|
||||
{% endfor %}
|
||||
</table>
|
||||
{% endwith %}
|
||||
|
||||
{% if tokens %}
|
||||
<div class="ui cards">
|
||||
{% for token in tokens %}
|
||||
{% set client = clients[token.oauthClientID] %}
|
||||
<div class="ui card">
|
||||
<div class="content">
|
||||
{% if client.oauthLogoURI %}
|
||||
<img class="right floated mini ui image" src="{{ client.oauthLogoURI }}">
|
||||
{% endif %}
|
||||
{% if client.oauthClientURI %}
|
||||
<a href="{{ client.oauthClientURI }}" class="header">{{ client.oauthClientName }}</a>
|
||||
{% else %}
|
||||
<div class="header">{{ client.oauthClientName }}</div>
|
||||
{% endif %}
|
||||
<div class="meta">{% trans %}From:{% endtrans %} {{ token.issue_date.strftime("%d/%m/%Y %H:%M:%S") }}</div>
|
||||
<div class="meta">{% trans %}Until:{% endtrans %} {{ token.expire_date.strftime("%d/%m/%Y %H:%M:%S") }}</div>
|
||||
<div class="meta"><span class="ui small text">{{ token.oauthAccessToken }}</span></div>
|
||||
<div class="description">
|
||||
<p>{% trans %}Has access to:{% endtrans %}</p>
|
||||
<ul>
|
||||
{% for s in token.oauthScope[0].split(" ") %}
|
||||
<li>{{ s }}</li>
|
||||
{% endfor %}
|
||||
</ul>
|
||||
</div>
|
||||
</div>
|
||||
<a class="ui bottom attached button" href="{{ url_for('web.tokens.delete', token_id=token.oauthAccessToken ) }}">
|
||||
<i class="remove icon"></i>
|
||||
{% trans %}Remove access{% endtrans %}
|
||||
</a>
|
||||
</div>
|
||||
{% endfor %}
|
||||
</div>
|
||||
{% else %}
|
||||
<div class="ui center aligned">
|
||||
<i class="massive smile outline icon image ui"></i>
|
||||
|
||||
<h2 class="ui center aligned header">
|
||||
<div class="content">{% trans %}Nothing here{% endtrans %}</div>
|
||||
<div class="sub header">{% trans %}You did not authorize applications yet.{% endtrans %}</div>
|
||||
</h2>
|
||||
</div>
|
||||
{% endif %}
|
||||
</div>
|
||||
{% endblock %}
|
||||
|
|
|
@ -1,11 +1,37 @@
|
|||
from flask import Blueprint, render_template
|
||||
from web.models import Token
|
||||
from flask import Blueprint, render_template, flash, redirect, url_for
|
||||
from flask_babel import gettext
|
||||
from web.models import Token, Client
|
||||
from web.flaskutils import user_needed
|
||||
|
||||
|
||||
bp = Blueprint(__name__, "tokens")
|
||||
|
||||
|
||||
@bp.route("/")
|
||||
def tokens():
|
||||
tokens = Token.filter()
|
||||
return render_template("token_list.html", tokens=tokens)
|
||||
@user_needed()
|
||||
def tokens(user):
|
||||
tokens = Token.filter(oauthSubject=user.dn)
|
||||
tokens = [t for t in tokens if t.is_refresh_token_active()]
|
||||
client_ids = list(set(t.oauthClientID for t in tokens))
|
||||
clients = (
|
||||
{c.oauthClientID: c for c in Client.filter(oauthClientID=client_ids)}
|
||||
if client_ids
|
||||
else {}
|
||||
)
|
||||
return render_template("token_list.html", tokens=tokens, clients=clients)
|
||||
|
||||
|
||||
@bp.route("/delete/<token_id>")
|
||||
@user_needed()
|
||||
def delete(user, token_id):
|
||||
token = Token.get(token_id)
|
||||
|
||||
if not token or token.oauthSubject != user.dn:
|
||||
flash(gettext("Could not delete this access"), "error")
|
||||
|
||||
else:
|
||||
token.revoked = True
|
||||
token.save()
|
||||
flash(gettext("The access has been revoked"), "success")
|
||||
|
||||
return redirect(url_for("web.tokens.tokens"))
|
||||
|
|
Loading…
Reference in a new issue