Admin login

This commit is contained in:
Éloi Rivard 2020-08-19 16:20:57 +02:00
parent 6bfb1002bf
commit 6595b16e28
14 changed files with 177 additions and 59 deletions

View file

@ -10,3 +10,5 @@ URI = "ldap://ldap"
ROOT_DN = "dc=mydomain,dc=tld"
BIND_DN = "cn=admin,dc=mydomain,dc=tld"
BIND_PW = "admin"
ADMIN_FILTER = "cn=Jane Doe"

View file

@ -127,7 +127,7 @@ def client(app, slapd_connection):
@pytest.fixture
def user(app, slapd_connection):
u = User(cn="John Doe", sn="Doe",)
u = User(cn="John Doe", sn="Doe")
u.save(slapd_connection)
slapd_connection.passwd_s(
u.dn.encode("utf-8"), None, "correct horse battery staple".encode("utf-8"),

View file

@ -29,7 +29,7 @@ def test_success(testclient, slapd_connection, user, client):
assert res.location.startswith(client.oauthRedirectURIs[0])
params = parse_qs(urlsplit(res.location).query)
code = params["code"][0]
authcode = AuthorizationCode.get(code, slapd_connection)
authcode = AuthorizationCode.get(code, conn=slapd_connection)
assert authcode is not None
res = testclient.post(
@ -45,7 +45,7 @@ def test_success(testclient, slapd_connection, user, client):
assert 200 == res.status_code
access_token = res.json["access_token"]
token = Token.get(access_token, slapd_connection)
token = Token.get(access_token, conn=slapd_connection)
assert token is not None
res = testclient.get("/api/me", headers={"Authorization": f"Bearer {access_token}"})

View file

@ -19,7 +19,7 @@ def test_success(testclient, slapd_connection, user, client):
assert res.json["token_type"] == "Bearer"
access_token = res.json["access_token"]
token = Token.get(access_token, slapd_connection)
token = Token.get(access_token, conn=slapd_connection)
assert token is not None
res = testclient.get("/api/me", headers={"Authorization": f"Bearer {access_token}"})

View file

@ -3,11 +3,12 @@ import os
import toml
from . import routes, clients, oauth
from flask import Flask, g, request
from flask import Flask, g, request, render_template
from flask_babel import Babel
from .oauth2utils import config_oauth
from .flaskutils import current_user
from .ldaputils import LDAPObjectHelper
from .oauth2utils import config_oauth
def create_app(config=None):
@ -54,6 +55,7 @@ def setup_app(app):
return {
"logo_url": app.config.get("LOGO"),
"website_name": app.config.get("NAME"),
"user": current_user(),
}
@babel.localeselector
@ -72,3 +74,15 @@ def setup_app(app):
user = getattr(g, "user", None)
if user is not None:
return user.timezone
@app.errorhandler(403)
def unauthorized(e):
return render_template("error.html", error=403), 403
@app.errorhandler(404)
def page_not_found(e):
return render_template("error.html", error=404), 404
@app.errorhandler(500)
def server_error(e):
return render_template("error.html", error=500), 500

View file

@ -6,12 +6,14 @@ from flask_wtf import FlaskForm
from flask_babel import gettext
from werkzeug.security import gen_salt
from .models import Client
from .flaskutils import admin_needed
bp = Blueprint(__name__, "clients")
@bp.route("/")
@admin_needed()
def index():
clients = Client.filter()
return render_template("client_list.html", clients=clients)
@ -107,6 +109,7 @@ class ClientAdd(FlaskForm):
@bp.route("/add", methods=["GET", "POST"])
@admin_needed()
def add():
form = ClientAdd(request.form or None)
@ -153,6 +156,7 @@ def add():
@bp.route("/edit/<client_id>", methods=["GET", "POST"])
@admin_needed()
def edit(client_id):
client = Client.get(client_id)
data = dict(client)

38
web/flaskutils.py Normal file
View file

@ -0,0 +1,38 @@
from functools import wraps
from flask import session, abort
from web.models import User
def current_user():
if "user_dn" in session:
return User.get(session["user_dn"])
return None
def user_needed():
def wrapper(view_function):
@wraps(view_function)
def decorator(*args, **kwargs):
if not current_user():
abort(403)
return view_function(*args, **kwargs)
return decorator
return wrapper
def admin_needed():
def wrapper(view_function):
@wraps(view_function)
def decorator(*args, **kwargs):
user = current_user()
if not user:
abort(403)
if not user.is_admin:
abort(404)
return view_function(*args, **kwargs)
return decorator
return wrapper

14
web/forms.py Normal file
View file

@ -0,0 +1,14 @@
import wtforms
from flask_babel import gettext
from flask_wtf import FlaskForm
class LoginForm(FlaskForm):
login = wtforms.StringField(
gettext("Username"),
validators=[wtforms.validators.DataRequired()],
render_kw={"placeholder": "mdupont"},
)
password = wtforms.PasswordField(
gettext("Password"), validators=[wtforms.validators.DataRequired()]
)

View file

@ -133,11 +133,11 @@ class LDAPObjectHelper:
conn.add_s(self.dn, attributes)
@classmethod
def get(cls, dn, conn=None):
def get(cls, dn, filter=None, conn=None):
conn = conn or cls.ldap()
if "=" not in dn:
dn = f"{cls.id}={dn},{cls.base},{cls.root_dn}"
result = conn.search_s(dn, ldap.SCOPE_SUBTREE)
result = conn.search_s(dn, ldap.SCOPE_SUBTREE, filter)
if not result:
return None

View file

@ -7,7 +7,7 @@ from authlib.oauth2.rfc6749 import (
TokenMixin,
AuthorizationCodeMixin,
)
from flask import current_app
from flask import current_app, session
from .ldaputils import LDAPObjectHelper
@ -15,6 +15,29 @@ class User(LDAPObjectHelper):
objectClass = ["person"]
base = "ou=users"
id = "cn"
admin = False
@classmethod
def get(cls, dn, filter=None, conn=None):
conn = conn or cls.ldap()
user = super().get(dn, filter, conn)
admin_filter = current_app.config["LDAP"].get("ADMIN_FILTER")
if (
admin_filter
and user
and conn.search_s(user.dn, ldap.SCOPE_SUBTREE, admin_filter)
):
user.admin = True
return user
def login(self, password):
if not self.check_password(password):
return False
session["user_dn"] = self.dn
return True
def check_password(self, password):
conn = ldap.initialize(current_app.config["LDAP"]["URI"])

View file

@ -1,30 +1,19 @@
import wtforms
from authlib.oauth2 import OAuth2Error
from flask import Blueprint, request, session, redirect
from flask import render_template, jsonify, flash
from flask_babel import gettext
from flask_wtf import FlaskForm
from .models import User, Client
from .oauth2utils import authorization
from .forms import LoginForm
from .flaskutils import current_user
bp = Blueprint(__name__, "oauth")
class LoginForm(FlaskForm):
login = wtforms.StringField(
gettext("Username"),
validators=[wtforms.validators.DataRequired()],
render_kw={"placeholder": "mdupont"},
)
password = wtforms.PasswordField(
gettext("Password"), validators=[wtforms.validators.DataRequired()]
)
@bp.route("/authorize", methods=["GET", "POST"])
def authorize():
user = User.get(session["user_dn"]) if "user_dn" in session else None
user = current_user()
client = Client.get(request.values["client_id"])
if not user:
@ -37,11 +26,10 @@ def authorize():
return render_template("login.html", form=form)
user = User.get(form.login.data)
if not user or not user.check_password(form.password.data):
if not user or not user.login(form.password.data):
flash(gettext("Login failed, please check your information"), "error")
return render_template("login.html", form=form)
session["user_dn"] = form.login.data
return redirect(request.url)
if request.method == "GET":

View file

@ -1,31 +1,45 @@
from flask import Blueprint, request, session
from flask import Blueprint, request, session, flash, url_for
from flask import render_template, redirect, jsonify
from .models import User, Client
from flask_babel import gettext
from .forms import LoginForm
from .flaskutils import current_user
from .models import User
from .oauth2utils import require_oauth
bp = Blueprint(__name__, "home")
@bp.route("/", methods=("GET", "POST"))
def home():
if request.method == "POST":
username = request.form.get("username")
user = User.get(username)
@bp.route("/")
def index():
if not current_user():
return redirect(url_for("web.routes.login"))
if not user:
user = User(cn=username, sn=username)
user.save()
return render_template("home.html")
session["user_dn"] = user.dn
return redirect("/")
clients = Client.filter()
return render_template("home.html", clients=clients)
@bp.route("/login", methods=("GET", "POST"))
def login():
form = LoginForm(request.form or None)
if request.form:
if not form.validate():
flash(gettext("Login failed, please check your information"), "error")
return render_template("login.html", form=form)
user = User.get(form.login.data)
if not user or not user.login(form.password.data):
flash(gettext("Login failed, please check your information"), "error")
return render_template("login.html", form=form)
return redirect(url_for("web.routes.index"))
return render_template("login.html", form=form)
@bp.route("/logout")
def logout():
if "user_dn" in session:
del session["user_dn"]
return redirect("/")

View file

@ -23,6 +23,7 @@
<body>
<div class="container">
{% block menu %}
{% if user %}
<nav class="ui labeled icon menu">
{% if logo_url %}
<div class="header item">
@ -31,6 +32,7 @@
</a>
</div>
{% endif %}
{% if user.admin %}
<a class="item" href="{{ url_for('web.clients.index') }}">
<i class="plug icon"></i>
{% trans %}Clients{% endtrans %}
@ -43,7 +45,13 @@
<i class="user secret icon"></i>
{% trans %}Codes{% endtrans %}
</a>
{% endif %}
<a class="item" href="{{ url_for('web.routes.logout') }}">
<i class="sign out alternate icon"></i>
{% trans %}Log out{% endtrans %}
</a>
</nav>
{% endif %}
{% endblock %}
<div class="content">
{% block content %}{% endblock %}

13
web/templates/error.html Normal file
View file

@ -0,0 +1,13 @@
{% extends 'base.html' %}
{% block content %}
<div class="ui error message">
{% if error == 400 %}
Accès interdit.
{% elif error == 404 %}
Page non trouvée.
{% elif error == 500 %}
Erreur technique.
{% endif %}
</div>
{% endblock %}