Compare commits

..

3 commits

Author SHA1 Message Date
Félix Rohrlich
6c1557cf27 feat: Added proper SCIM access token verification by client 2024-12-30 22:47:48 +01:00
Félix Rohrlich
6e64f51ad4 feat: Achieved communication with SCIM client 2024-12-17 18:11:08 +01:00
Félix Rohrlich
efe79505fd feat: initialize scim client feature 2024-12-16 16:40:36 +01:00
172 changed files with 18290 additions and 23200 deletions

View file

@ -42,7 +42,7 @@ jobs:
ulimit -n 1024
export TZ=UTC
uv sync --all-extras
uv run pytest --numprocesses auto
uv run pytest --showlocals
minversions:
name: minimum dependency versions
@ -68,7 +68,7 @@ jobs:
ulimit -n 1024
export TZ=UTC
uv sync --all-extras --resolution=lowest-direct
uv run pytest --numprocesses auto
uv run pytest --showlocals
style:
runs-on: ubuntu-latest
@ -107,5 +107,5 @@ jobs:
sudo DEBIAN_FRONTEND=noninteractive apt --yes --quiet install libsasl2-dev python3-dev libldap2-dev libssl-dev slapd ldap-utils
- run: |
export TZ=UTC
uv sync --group doc --all-extras
uv sync --group doc
uv run sphinx-build doc build/sphinx/html --fail-on-warning

1
.gitignore vendored
View file

@ -24,3 +24,4 @@ canaille/conf/*.pem
canaille/conf/*.pub
canaille/conf/*.key
.vscode
dump.json

View file

@ -41,7 +41,7 @@ coverage:
script:
- uv sync --all-extras
- uv pip install coveralls pyyaml tomli
- uv run pytest --cov --cov-fail-under=100 --cov-report term:skip-covered --numprocesses auto
- uv run pytest --cov --cov-fail-under=100 --cov-report term:skip-covered -n auto
- uv run coveralls
- uv cache prune --ci
@ -54,7 +54,7 @@ tests:
stage: test
script:
- uv sync --all-extras
- uv run pytest --numprocesses auto
- uv run pytest
- uv cache prune --ci
minversions:
@ -65,7 +65,7 @@ minversions:
stage: test
script:
- uv sync --all-extras --resolution=lowest-direct
- uv run pytest --numprocesses auto
- uv run pytest
- uv cache prune --ci
doc:
@ -74,6 +74,6 @@ doc:
image: ghcr.io/astral-sh/uv:$UV_VERSION-python$PYTHON_VERSION-$BASE_LAYER
stage: test
script:
- uv sync --group doc --all-extras
- uv sync --group doc
- uv run sphinx-build doc build/sphinx/html --fail-on-warning
- uv cache prune --ci

View file

@ -1,7 +1,7 @@
---
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: 'v0.9.0'
rev: 'v0.8.2'
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]

View file

@ -1,29 +1,4 @@
[0.0.59] - Unreleased
---------------------
Changed
^^^^^^^
- Bump to HTMX 2.0.4
[0.0.58] - 2025-01-10
---------------------
Added
^^^^^
- ``--version`` option to the CLI. :pr:`209`
- Password hashing scheme customization with the :attr:`~canaille.backends.sql.configuration.SQLSettings.PASSWORD_SCHEMES` parameter. :issue:`175`
- `canaille dump` command option to perform full database dumps.
- Automatic SQL database migrations. :issue:`217` :pr:`214`
Changed
^^^^^^^
- CLI commands dump hashed passwords. :issue:`214`
Fixed
^^^^^^^
- A bug on updating user's settings. :issue:`206`
[0.0.57] - 2024-12-31
[0.0.57] - Unreleased
---------------------
Added
@ -45,8 +20,6 @@ Added
- Implement OIDC client_credentials flow. :issue:`207`
- Button in the client admin page to create client tokens.
- Basic SCIM implementation. :issue:`116` :pr:`197`
- Password expiry policy :issue:`176`
- :attr:`~canaille.core.configuration.CoreSettings.PASSWORD_LIFETIME`
Changed
^^^^^^^

View file

@ -31,10 +31,9 @@ Initialize your development environment with:
After having launched the demo you have access to several services:
- A canaille server at `localhost:5000 <http://localhost:5000>`__
- A dummy client at `localhost:5001 <http://localhost:5001>`__
- Another dummy client at `localhost:5002 <http://localhost:5002>`__
- A mail catcher at `localhost:1080 <http://localhost:1080>`__
- A canaille server at `localhost:5000 <http://localhost:5000>`_
- A dummy client at `localhost:5001 <http://localhost:5001>`_
- Another dummy client at `localhost:5002 <http://localhost:5002>`_
The canaille server has some default users:
@ -100,8 +99,6 @@ With the LDAP backend, all data is lost when Canaille stops.
cd demo
docker compose --file docker-compose-ldap.yml up
.. _local_environment:
Local environment
~~~~~~~~~~~~~~~~~

View file

@ -1,55 +0,0 @@
FROM python AS builder
RUN \
apt update && \
apt -y upgrade && \
apt install -y \
build-essential \
libldap2-dev \
libsasl2-dev \
libssl-dev
COPY uv.lock pyproject.toml /opt/canaille/
RUN pip install uv
COPY canaille /opt/canaille/canaille
WORKDIR /opt/canaille
RUN uv sync --all-extras --upgrade
# We build a python wheel to install on the definitive docker image
RUN uv build --wheel
# We create a list of all the requirements to install next to canaille on the definitive docker image
RUN uv pip compile pyproject.toml --all-extras -o requirements.txt
WORKDIR /opt/canaille/dist
# We build these two modules here to avoid including build libraries on the definitive docker image
RUN pip wheel python-ldap uwsgi
FROM python:slim
# We need libxml2 for uwsgi to work
RUN \
apt update && \
apt -y upgrade && \
apt install -y \
libxml2
# I create a volume so that I can mount it in docker using -v ./database:/opt/canaille/database
RUN mkdir -p /opt/canaille/database
COPY --from=builder /opt/canaille/dist /opt/canaille/dist
COPY --from=builder /opt/canaille/requirements.txt /opt/canaille/requirements.txt
RUN pip install --find-links /opt/canaille/dist canaille python-ldap uwsgi
RUN pip install -r /opt/canaille/requirements.txt
COPY uwsgi.ini /opt/canaille
WORKDIR /opt/canaille
ENTRYPOINT ["uwsgi","--ini","uwsgi.ini"]

View file

@ -1,5 +1,5 @@
<div align="center">
<img src="https://gitlab.com/yaal/canaille/-/raw/main/canaille/static/img/canaille-full.webp" height="200" alt="Canaille" />
<img src="canaille/static/img/canaille-full.webp" height="200" alt="Canaille" />
</div>
**Canaille** is a French word meaning *rascal*. It is roughly pronounced **Can I?**,
@ -40,14 +40,19 @@ You have access to:
- a dummy client at [https://demo.client1.yaal.coop](https://demo.client1.yaal.coop)
- another dummy client at [https://demo.client2.yaal.coop](https://demo.client2.yaal.coop)
Authentication details are available on the demo pages. Data is reset every night at 02:00 CEST.
The canaille server has some default users:
- A regular user which login and password are **user**
- A moderator user which login and password are **moderator**
- An admin user which login and password are **admin**
Online demo data are reset every night at 02:00 CEST.
# Documentation
⚠ Canaille is under heavy development and may not fit a production environment yet. However, [contributions](CONTRIBUTING.rst) are welcome! ⚠
- Please have a look on our [documentation](https://canaille.readthedocs.io);
- To **install** canaille, follow the [installation guide](https://canaille.readthedocs.io/en/latest/tutorial/install.html);
- To **install** canaille, just follow the [installation guide](https://canaille.readthedocs.io/en/latest/tutorial/install.html);
- To **contribute** to canaille, please read the [contribution guide](https://canaille.readthedocs.io/en/latest/development/contributing.html).
## Translation status

View file

@ -2,9 +2,12 @@ import datetime
import sys
from flask import Flask
from flask import request
from flask import session
from flask_wtf.csrf import CSRFProtect
from canaille.app.forms import password_strength_calculator
csrf = CSRFProtect()
@ -25,6 +28,12 @@ def setup_sentry(app): # pragma: no cover
return sentry_sdk
def setup_jinja(app):
app.jinja_env.filters["len"] = len
app.jinja_env.filters["password_strength"] = password_strength_calculator
app.jinja_env.policies["ext.i18n.trimmed"] = True
def setup_blueprints(app):
import canaille.core.endpoints
@ -37,15 +46,13 @@ def setup_blueprints(app):
app.register_blueprint(canaille.oidc.endpoints.bp)
if app.features.has_scim_server:
if "CANAILLE_SCIM" in app.config and app.config["CANAILLE_SCIM"]["ENABLE_SERVER"]:
import canaille.scim.endpoints
app.register_blueprint(canaille.scim.endpoints.bp)
def setup_flask(app):
from canaille.app.templating import render_template
csrf.init_app(app)
@app.before_request
@ -53,25 +60,21 @@ def setup_flask(app):
session.permanent = True
app.permanent_session_lifetime = datetime.timedelta(days=365)
@app.errorhandler(400)
def bad_request(error):
return render_template("error.html", description=error, error_code=400), 400
@app.context_processor
def global_processor():
from canaille.app.session import current_user
@app.errorhandler(403)
def unauthorized(error):
return render_template("error.html", description=error, error_code=403), 403
@app.errorhandler(404)
def page_not_found(error):
from canaille.app.flask import redirect_to_bp_handlers
return redirect_to_bp_handlers(app, error) or render_template(
"error.html", description=error, error_code=404
), 404
@app.errorhandler(500)
def server_error(error): # pragma: no cover
return render_template("error.html", description=error, error_code=500), 500
return {
"debug": app.debug or app.config.get("TESTING", False),
"logo_url": app.config["CANAILLE"]["LOGO"],
"favicon_url": app.config["CANAILLE"]["FAVICON"]
or app.config["CANAILLE"]["LOGO"],
"website_name": app.config["CANAILLE"]["NAME"],
"user": current_user(),
"menu": True,
"is_boosted": request.headers.get("HX-Boosted", False),
"features": app.features,
}
def setup_flask_converters(app):
@ -86,7 +89,6 @@ def create_app(
config: dict = None,
validate: bool = True,
backend=None,
init_backend=None,
env_file: str = None,
env_prefix: str = "",
):
@ -102,8 +104,7 @@ def create_app(
from .app.features import setup_features
from .app.i18n import setup_i18n
from .app.logging import setup_logging
from .app.templating import setup_jinja
from .app.templating import setup_themer
from .app.themes import setup_themer
from .backends import setup_backend
app = Flask(__name__)
@ -120,7 +121,7 @@ def create_app(
sentry_sdk = setup_sentry(app)
try:
setup_logging(app)
backend = setup_backend(app, backend, init_backend)
backend = setup_backend(app, backend)
setup_features(app)
setup_flask_converters(app)
setup_blueprints(app)

View file

@ -54,7 +54,7 @@ def install():
from canaille.app.installation import install
try:
install(current_app)
install(current_app.config)
except ConfigurationException as exc: # pragma: no cover
print(exc)

View file

@ -234,7 +234,7 @@ def validate_smtp_configuration(config):
except smtplib.SMTPAuthenticationError as exc:
raise ConfigurationException(
f"SMTP authentication failed with user '{config['LOGIN']}'"
f'SMTP authentication failed with user \'{config["LOGIN"]}\''
) from exc
except smtplib.SMTPNotSupportedError as exc:
@ -270,7 +270,7 @@ def validate_theme(config):
if not os.path.exists(config["THEME"]) and not os.path.exists(
os.path.join(ROOT, "themes", config["THEME"])
):
raise ConfigurationException(f"Cannot find theme '{config['THEME']}'")
raise ConfigurationException(f'Cannot find theme \'{config["THEME"]}\'')
def validate_admin_email(config):

View file

@ -4,28 +4,18 @@ class Features:
@property
def has_smtp(self):
"""Indicate whether the mail sending feature is enabled.
This feature is required to :attr:`validate user email addresses <canaille.app.features.Features.has_email_confirmation>`, send email OTP passwords etc.
It is controlled by the :attr:`CANAILLE.SMTP <canaille.core.configuration.CoreSettings.SMTP>` configuration parameter.
"""
return bool(self.app.config["CANAILLE"]["SMTP"])
@property
def has_password_recovery(self):
"""Indicate whether the password recovery feature is enabled.
def has_oidc(self):
return "CANAILLE_OIDC" in self.app.config
It is controlled by the :attr:`CANAILLE.ENABLE_PASSWORD_RECOVERY <canaille.core.configuration.CoreSettings.ENABLE_PASSWORD_RECOVERY>` configuration parameter.
"""
@property
def has_password_recovery(self):
return self.app.config["CANAILLE"]["ENABLE_PASSWORD_RECOVERY"]
@property
def has_intruder_lockout(self):
"""Indicate whether the intruder lockout feature is enabled.
It is controlled by the :attr:`CANAILLE.ENABLE_INTRUDER_LOCKOUT <canaille.core.configuration.CoreSettings.ENABLE_INTRUDER_LOCKOUT>` configuration parameter.
"""
return self.app.config["CANAILLE"]["ENABLE_INTRUDER_LOCKOUT"]
@property
@ -34,108 +24,30 @@ class Features:
@property
def has_otp(self):
"""Indicate whether the OTP authentication factor is enabled.
It is controlled by the :attr:`CANAILLE.OTP_METHOD <canaille.core.configuration.CoreSettings.OTP_METHOD>` configuration parameter,
and needs the ``otp`` extra package to be installed.
"""
try:
import otpauth # noqa: F401
return bool(self.app.config["CANAILLE"]["OTP_METHOD"])
except ImportError: # pragma: no cover
return False
return bool(self.app.config["CANAILLE"]["OTP_METHOD"])
@property
def has_email_otp(self):
"""Indicate whether the email OTP authentication factor is enabled.
It is controlled by the :attr:`CANAILLE.EMAIL_OTP <canaille.core.configuration.CoreSettings.EMAIL_OTP>` configuration parameter.
"""
return bool(self.app.config["CANAILLE"]["EMAIL_OTP"])
@property
def has_sms_otp(self):
"""Indicate whether the SMS OTP authentication factor is enabled.
It is controlled by the :attr:`CANAILLE.SMS_OTP <canaille.core.configuration.CoreSettings.SMS_OTP>` configuration parameter,
and needs the ``smpp`` extra package to be installed.
"""
try:
import smpplib # noqa: F401
return self.app.config["CANAILLE"]["SMS_OTP"]
except ImportError: # pragma: no cover
return False
return self.app.config["CANAILLE"]["SMS_OTP"]
@property
def has_registration(self):
"""Indicate whether the user account registration is enabled.
It is controlled by the :attr:`CANAILLE.ENABLE_REGISTRATION <canaille.core.configuration.CoreSettings.ENABLE_REGISTRATION>` configuration parameter.
"""
return self.app.config["CANAILLE"]["ENABLE_REGISTRATION"]
@property
def has_account_lockability(self):
"""Indicate whether the user accounts can be locked.
It depends on the backend used by Canaille.
This is only disabled for OpenLDAP versions under 2.6.
"""
return self.app.backend.instance.has_account_lockability()
@property
def has_email_confirmation(self):
"""Indicate whether the user email confirmation is enabled.
It is controlled by the :attr:`CANAILLE.EMAIL_CONFIRMATION <canaille.core.configuration.CoreSettings.EMAIL_CONFIRMATION>` configuration parameter.
"""
return self.app.config["CANAILLE"]["EMAIL_CONFIRMATION"] is True or (
self.app.config["CANAILLE"]["EMAIL_CONFIRMATION"] is None and self.has_smtp
)
@property
def has_oidc(self):
"""Indicate whether the OIDC feature is enabled.
This feature is required to make Canaille an authorization server for other applications and enable SSO.
It is controlled by the :class:`CANAILLE_OIDC <canaille.oidc.configuration.OIDCSettings>` configuration parameter,
and needs the ``oidc`` extra package to be installed.
"""
try:
import authlib # noqa: F401
return "CANAILLE_OIDC" in self.app.config
except ImportError: # pragma: no cover
return False
@property
def has_scim_server(self):
"""Indicate whether the SCIM server feature is enabled.
This feature is required to make Canaille a provisioning server.
It is controlled by the :attr:`CANAILLE_SCIM.ENABLE_SERVER <canaille.scim.configuration.SCIMSettings.ENABLE_SERVER>` configuration parameter,
and needs the ``scim`` extra package to be installed.
"""
try:
import scim2_models # noqa: F401
return (
"CANAILLE_SCIM" in self.app.config
and self.app.config["CANAILLE_SCIM"]["ENABLE_SERVER"]
)
except ImportError: # pragma: no cover
return False
def setup_features(app):
app.features = Features(app)

View file

@ -5,20 +5,31 @@ from urllib.parse import urlunsplit
from flask import abort
from flask import current_app
from flask import flash
from flask import make_response
from flask import redirect
from flask import request
from flask import url_for
from werkzeug.exceptions import HTTPException
from werkzeug.routing import BaseConverter
from canaille.app.i18n import gettext as _
from canaille.app.session import current_user
from canaille.app.templating import render_template
from canaille.app.themes import render_template
def user_needed(*args):
def user_needed():
def wrapper(view_function):
@wraps(view_function)
def decorator(*args, **kwargs):
user = current_user()
if not user:
abort(403)
return view_function(*args, user=user, **kwargs)
return decorator
return wrapper
def permissions_needed(*args):
permissions = set(args)
def wrapper(view_function):
@ -27,19 +38,6 @@ def user_needed(*args):
user = current_user()
if not user or not user.can(*permissions):
abort(403)
if user.has_expired_password():
flash(
_("Your password has expired, please choose a new password."),
"info",
)
return redirect(
url_for(
"core.account.reset",
user=user,
)
)
return view_function(*args, user=user, **kwargs)
return decorator
@ -87,12 +85,9 @@ def request_is_htmx():
def render_htmx_template(template, htmx_template=None, **kwargs):
if request_is_htmx():
if htmx_template:
template = htmx_template
else:
*dirs, file = template.split("/")
template = "/".join([*dirs, "partial", file])
template = (
(htmx_template or f"partial/{template}") if request_is_htmx() else template
)
return render_template(template, **kwargs)

View file

@ -262,13 +262,7 @@ class BaseForm(HTMXFormMixin, I18NFormMixin, wtforms.form.BaseForm):
class TableForm(I18NFormMixin, FlaskForm):
"""
A form for table rendering of object collections.
"""
def __init__(
self, cls=None, page_size: int = 25, fields=None, filter=None, **kwargs
):
def __init__(self, cls=None, page_size=25, fields=None, filter=None, **kwargs):
filter = filter or {}
super().__init__(**kwargs)
if self.query.data:

View file

@ -1,60 +0,0 @@
import os
import flask
from flask import request
try:
import flask_themer
except ImportError:
flask_themer = None
if flask_themer:
render_template = flask_themer.render_template
def setup_themer(app):
theme_config = app.config["CANAILLE"]["THEME"]
additional_themes_dir = (
os.path.abspath(os.path.dirname(theme_config))
if theme_config and os.path.exists(theme_config)
else None
)
themer = flask_themer.Themer(
app,
loaders=[flask_themer.FileSystemThemeLoader(additional_themes_dir)]
if additional_themes_dir
else None,
)
@themer.current_theme_loader
def get_current_theme():
# if config['THEME'] may be a theme name or a path
return app.config["CANAILLE"]["THEME"].split("/")[-1]
else: # pragma: no cover
render_template = flask.render_template
def setup_jinja(app):
from canaille.app.forms import password_strength_calculator
app.jinja_env.filters["len"] = len
app.jinja_env.filters["password_strength"] = password_strength_calculator
app.jinja_env.policies["ext.i18n.trimmed"] = True
@app.context_processor
def global_processor():
from canaille.app.session import current_user
return {
"debug": app.debug or app.config.get("TESTING", False),
"logo_url": app.config["CANAILLE"]["LOGO"],
"favicon_url": app.config["CANAILLE"]["FAVICON"]
or app.config["CANAILLE"]["LOGO"],
"website_name": app.config["CANAILLE"]["NAME"],
"user": current_user(),
"menu": True,
"is_boosted": request.headers.get("HX-Boosted", False),
"features": app.features,
}

62
canaille/app/themes.py Normal file
View file

@ -0,0 +1,62 @@
import os
import flask
try:
import flask_themer
except ImportError:
flask_themer = None
if flask_themer:
render_template = flask_themer.render_template
def setup_themer(app):
theme_config = app.config["CANAILLE"]["THEME"]
additional_themes_dir = (
os.path.abspath(os.path.dirname(theme_config))
if theme_config and os.path.exists(theme_config)
else None
)
themer = flask_themer.Themer(
app,
loaders=[flask_themer.FileSystemThemeLoader(additional_themes_dir)]
if additional_themes_dir
else None,
)
@themer.current_theme_loader
def get_current_theme():
# if config['THEME'] may be a theme name or a path
return app.config["CANAILLE"]["THEME"].split("/")[-1]
@app.errorhandler(400)
def bad_request(error):
return render_template("error.html", description=error, error_code=400), 400
@app.errorhandler(403)
def unauthorized(error):
return render_template("error.html", description=error, error_code=403), 403
@app.errorhandler(404)
def page_not_found(error):
from canaille.app.flask import redirect_to_bp_handlers
return redirect_to_bp_handlers(app, error) or render_template(
"error.html", description=error, error_code=404
), 404
@app.errorhandler(500)
def server_error(error): # pragma: no cover
return render_template("error.html", description=error, error_code=500), 500
else: # pragma: no cover
render_template = flask.render_template
def setup_themer(app):
@app.errorhandler(404)
def page_not_found(error):
from canaille.app.flask import redirect_to_bp_handlers
if not redirect_to_bp_handlers(app, error):
raise error

View file

@ -1,8 +1,5 @@
import datetime
import importlib
import json
import os
import typing
from contextlib import contextmanager
from math import ceil
@ -11,48 +8,8 @@ from flask import g
from canaille.app import classproperty
class ModelEncoder(json.JSONEncoder):
"""JSON serializer that can handle Canaille models."""
@staticmethod
def serialize_model(instance):
def serialize_attribute(attribute_name, value):
"""Replace model instances by their id."""
multiple = typing.get_origin(instance.attributes[attribute_name]) is list
if multiple and isinstance(value, list):
return [serialize_attribute(attribute_name, v) for v in value]
model, _ = instance.get_model_annotations(attribute_name)
if model:
return value.id
return value
result = {}
for attribute in instance.attributes:
if serialized := serialize_attribute(
attribute, getattr(instance, attribute)
):
result[attribute] = serialized
return result
def default(self, obj):
from canaille.backends.models import Model
if isinstance(obj, datetime.datetime):
return obj.isoformat()
if isinstance(obj, Model):
return self.serialize_model(obj)
return super().default(obj)
class Backend:
_instance = None
json_encoder = ModelEncoder
def __init__(self, config):
self.config = config
@ -63,7 +20,7 @@ class Backend:
def instance(cls):
return cls._instance
def init_app(self, app, init_backend=None):
def init_app(self, app):
@app.before_request
def before_request():
return self.setup()
@ -79,7 +36,7 @@ class Backend:
self.teardown()
@classmethod
def install(self, app):
def install(self, config):
"""Prepare the database to host canaille data."""
raise NotImplementedError()
@ -203,7 +160,7 @@ class Backend:
models.register(getattr(backend_models, model_name))
def setup_backend(app, backend=None, init_backend=None):
def setup_backend(app, backend=None):
if not backend:
prefix = "CANAILLE_"
available_backends_names = [
@ -224,7 +181,7 @@ def setup_backend(app, backend=None, init_backend=None):
module, f"{backend_name.title()}Backend", None
) or getattr(module, f"{backend_name.upper()}Backend", None)
backend = backend_class(app.config)
backend.init_app(app, init_backend)
backend.init_app(app)
with app.app_context():
g.backend = backend

View file

@ -1,4 +1,5 @@
import datetime
import inspect
import json
import typing
@ -69,28 +70,60 @@ def is_multiple(attribute_type):
def register(cli):
"""Generate commands using factories that each have one subcommand per
available model."""
cli.add_command(get_command)
cli.add_command(set_command)
cli.add_command(create_command)
cli.add_command(delete_command)
factories = [get_factory, set_factory, create_factory, delete_factory]
for factory in factories:
command_help = inspect.getdoc(factory)
name = factory.__name__.replace("_factory", "")
@cli.command(cls=ModelCommand, factory=factory, name=name, help=command_help)
def factory_command(): ...
cli.add_command(reset_otp)
cli.add_command(dump)
@click.command()
@with_appcontext
@with_backendcontext
def dump():
"""Dump all the available models."""
objects = {}
for model_name, model in MODELS.items():
objects[model_name] = list(Backend.instance.query(model))
def serialize(instance):
"""Quick and dirty serialization method.
output = json.dumps(objects, cls=Backend.instance.json_encoder)
click.echo(output)
This can probably be made simpler when we will use pydantic models.
"""
def serialize_attribute(attribute_name, value):
multiple = is_multiple(instance.attributes[attribute_name])
if multiple and isinstance(value, list):
return [serialize_attribute(attribute_name, v) for v in value]
model, _ = instance.get_model_annotations(attribute_name)
if model:
return value.id
anonymized = ("password",)
if attribute_name in anonymized and value:
return "***"
if isinstance(value, datetime.datetime):
return value.isoformat()
return value
result = {}
for attribute in instance.attributes:
if serialized := serialize_attribute(attribute, getattr(instance, attribute)):
result[attribute] = serialized
return result
def get_factory(model):
"""Read information about models.
Options can be used to filter models::
canaille get user --given-name John --last-name Doe
Displays the matching models in JSON format in the standard output.
"""
command_help = f"""Search for {model.__name__.lower()}s and display the
matching models as JSON."""
@ -102,7 +135,7 @@ def get_factory(model):
attribute: value for attribute, value in kwargs.items() if value is not None
}
items = Backend.instance.query(model, **filter)
output = json.dumps(list(items), cls=Backend.instance.json_encoder)
output = json.dumps([serialize(item) for item in items])
click.echo(output)
for attribute, attribute_type in model.attributes.items():
@ -112,19 +145,16 @@ def get_factory(model):
return command
@click.command(cls=ModelCommand, factory=get_factory, name="get")
def get_command():
"""Read information about models.
def set_factory(model):
"""Update models.
Options can be used to filter models::
The command takes an model ID and edit one or several attributes::
canaille get user --given-name John --last-name Doe
canaille set user 229d112e-1bb5-452f-b2ac-f7680ffe7fb8 --given-name Jack
Displays the matching models in JSON format in the standard output.
Displays the edited model in JSON format in the standard output.
"""
def set_factory(model):
command_help = f"""Update a {model.__name__.lower()} and display the
edited model in JSON format in the standard output.
@ -158,7 +188,7 @@ def set_factory(model):
except Exception as exc: # pragma: no cover
raise click.ClickException(exc) from exc
output = json.dumps(instance, cls=Backend.instance.json_encoder)
output = json.dumps(serialize(instance))
click.echo(output)
attributes = dict(model.attributes)
@ -174,19 +204,16 @@ def set_factory(model):
return command
@click.command(cls=ModelCommand, factory=set_factory, name="set")
def set_command():
"""Update models.
def create_factory(model):
"""Create models.
The command takes an model ID and edit one or several attributes::
The model attributes can be passed as command options::
canaille set user 229d112e-1bb5-452f-b2ac-f7680ffe7fb8 --given-name Jack
canaille create user --given-name John --last-name Doe
Displays the edited model in JSON format in the standard output.
Displays the created model in JSON format in the standard output.
"""
def create_factory(model):
command_help = f"""Create a new {model.__name__.lower()} and display the
created model in JSON format in the standard output.
"""
@ -211,7 +238,7 @@ def create_factory(model):
except Exception as exc: # pragma: no cover
raise click.ClickException(exc) from exc
output = json.dumps(instance, cls=Backend.instance.json_encoder)
output = json.dumps(serialize(instance))
click.echo(output)
attributes = dict(model.attributes)
@ -227,19 +254,14 @@ def create_factory(model):
return command
@click.command(cls=ModelCommand, factory=create_factory, name="create")
def create_command():
"""Create models.
def delete_factory(model):
"""Delete models.
The model attributes can be passed as command options::
The command takes a model ID and deletes it::
canaille create user --given-name John --last-name Doe
Displays the created model in JSON format in the standard output.
canaille delete user --id 229d112e-1bb5-452f-b2ac-f7680ffe7fb8
"""
def delete_factory(model):
command_help = f"""Delete a {model.__name__.lower()}.
IDENTIFIER should be a {model.__name__.lower()} id or
@ -265,16 +287,6 @@ def delete_factory(model):
return command
@click.command(cls=ModelCommand, factory=delete_factory, name="delete")
def delete_command():
"""Delete models.
The command takes a model ID and deletes it::
canaille delete user --id 229d112e-1bb5-452f-b2ac-f7680ffe7fb8
"""
@click.command()
@with_appcontext
@with_backendcontext
@ -300,5 +312,5 @@ def reset_otp(identifier):
except Exception as exc: # pragma: no cover
raise click.ClickException(exc) from exc
output = json.dumps(user, cls=Backend.instance.json_encoder)
output = json.dumps(serialize(user))
click.echo(output)

View file

@ -61,9 +61,9 @@ class LDAPBackend(Backend):
setup_ldap_models(config)
@classmethod
def install(cls, app):
cls.setup_schemas(app.config)
with cls(app.config).session():
def install(cls, config):
cls.setup_schemas(config)
with cls(config).session():
models.Token.install()
models.AuthorizationCode.install()
models.Client.install()
@ -135,8 +135,8 @@ class LDAPBackend(Backend):
except ldap.INSUFFICIENT_ACCESS as exc:
raise ConfigurationException(
f"LDAP user '{config['CANAILLE_LDAP']['BIND_DN']}' cannot create "
f"users at '{config['CANAILLE_LDAP']['USER_BASE']}'"
f'LDAP user \'{config["CANAILLE_LDAP"]["BIND_DN"]}\' cannot create '
f'users at \'{config["CANAILLE_LDAP"]["USER_BASE"]}\''
) from exc
try:
@ -160,8 +160,8 @@ class LDAPBackend(Backend):
except ldap.INSUFFICIENT_ACCESS as exc:
raise ConfigurationException(
f"LDAP user '{config['CANAILLE_LDAP']['BIND_DN']}' cannot create "
f"groups at '{config['CANAILLE_LDAP']['GROUP_BASE']}'"
f'LDAP user \'{config["CANAILLE_LDAP"]["BIND_DN"]}\' cannot create '
f'groups at \'{config["CANAILLE_LDAP"]["GROUP_BASE"]}\''
) from exc
finally:
@ -430,7 +430,7 @@ def setup_ldap_models(config):
LDAPObject.root_dn = config["CANAILLE_LDAP"]["ROOT_DN"]
user_base = config["CANAILLE_LDAP"]["USER_BASE"].replace(
f",{config['CANAILLE_LDAP']['ROOT_DN']}", ""
f',{config["CANAILLE_LDAP"]["ROOT_DN"]}', ""
)
models.User.base = user_base
models.User.rdn_attribute = config["CANAILLE_LDAP"]["USER_RDN"]
@ -438,7 +438,7 @@ def setup_ldap_models(config):
models.User.ldap_object_class = listify(object_class)
group_base = config["CANAILLE_LDAP"]["GROUP_BASE"].replace(
f",{config['CANAILLE_LDAP']['ROOT_DN']}", ""
f',{config["CANAILLE_LDAP"]["ROOT_DN"]}', ""
)
models.Group.base = group_base or None
models.Group.rdn_attribute = config["CANAILLE_LDAP"]["GROUP_RDN"]

View file

@ -1,5 +1,4 @@
import ldap.filter
from flask import current_app
import canaille.core.models
import canaille.oidc.models
@ -41,7 +40,6 @@ class User(canaille.core.models.User, LDAPObject):
"one_time_password": "oathTokenPIN",
"one_time_password_emission_date": "oathSecretTime",
"password_failure_timestamps": "pwdFailureTime",
"password_last_update": "pwdChangedTime",
}
def match_filter(self, filter):
@ -52,8 +50,7 @@ class User(canaille.core.models.User, LDAPObject):
return super().match_filter(filter)
def save(self):
if current_app.features.has_otp and not self.secret_token:
self.initialize_otp()
super().save()
group_attr = self.python_attribute_to_ldap("groups")
if group_attr not in self.changes:

View file

@ -5,7 +5,6 @@ from typing import Any
from flask import current_app
import canaille.backends.memory.models
from canaille.backends import Backend
from canaille.backends import get_lockout_delay_message
@ -41,7 +40,7 @@ class MemoryBackend(Backend):
)
@classmethod
def install(cls, app):
def install(cls, config):
pass
def setup(self):
@ -84,10 +83,6 @@ class MemoryBackend(Backend):
def set_user_password(self, user, password):
user.password = password
user.password_last_update = datetime.datetime.now(
datetime.timezone.utc
).replace(microsecond=0)
self.save(user)
def query(self, model, **kwargs):
@ -142,12 +137,9 @@ class MemoryBackend(Backend):
return results[0] if results else None
def save(self, instance):
if (
isinstance(instance, canaille.backends.memory.models.User)
and current_app.features.has_otp
and not instance.secret_token
):
instance.initialize_otp()
# run the instance save callback if existing
if hasattr(instance, "save"):
instance.save()
if not instance.id:
instance.id = str(uuid.uuid4())
@ -164,14 +156,11 @@ class MemoryBackend(Backend):
def delete(self, instance):
# run the instance delete callback if existing
delete_callback = instance.delete() if hasattr(instance, "delete") else iter([])
next(delete_callback, None)
if hasattr(instance, "delete"):
instance.delete()
self.index_delete(instance)
# run the instance delete callback again if existing
next(delete_callback, None)
def reload(self, instance):
# run the instance reload callback if existing
reload_callback = instance.reload() if hasattr(instance, "reload") else iter([])

View file

@ -1,69 +1,44 @@
import datetime
from pathlib import Path
from flask import current_app
from flask_alembic import Alembic
from sqlalchemy import create_engine
from sqlalchemy import or_
from sqlalchemy import select
from sqlalchemy.orm import Session
from sqlalchemy.orm import declarative_base
from sqlalchemy_utils import Password
from canaille.backends import Backend
from canaille.backends import ModelEncoder
from canaille.backends import get_lockout_delay_message
Base = declarative_base()
class SQLModelEncoder(ModelEncoder):
def default(self, obj):
if isinstance(obj, Password):
return obj.hash.decode()
return super().default(obj)
def db_session(db_uri=None, init=False):
engine = create_engine(db_uri, echo=False, future=True)
if init:
Base.metadata.create_all(engine)
session = Session(engine)
return session
class SQLBackend(Backend):
engine = None
db_session = None
json_encoder = SQLModelEncoder
alembic = None
def __init__(self, config):
super().__init__(config)
SQLBackend.engine = create_engine(
self.config["CANAILLE_SQL"]["DATABASE_URI"], echo=False, future=True
)
SQLBackend.alembic = Alembic(metadatas=Base.metadata, engines=SQLBackend.engine)
@classmethod
def install(cls, app): # pragma: no cover
cls.init_alembic(app)
SQLBackend.alembic.upgrade()
@classmethod
def init_alembic(cls, app):
app.config["ALEMBIC"] = {
"script_location": str(Path(__file__).resolve().parent / "migrations"),
}
SQLBackend.alembic.init_app(app)
def init_app(self, app, init_backend=None):
super().init_app(app)
self.init_alembic(app)
init_backend = (
app.config["CANAILLE_SQL"]["AUTO_MIGRATE"]
if init_backend is None
else init_backend
def install(cls, config): # pragma: no cover
engine = create_engine(
config["CANAILLE_SQL"]["DATABASE_URI"],
echo=False,
future=True,
)
if init_backend: # pragma: no cover
with app.app_context():
self.alembic.upgrade()
Base.metadata.create_all(engine)
def setup(self):
def setup(self, init=False):
if not self.db_session:
self.db_session = Session(SQLBackend.engine)
self.db_session = db_session(
self.config["CANAILLE_SQL"]["DATABASE_URI"],
init=init,
)
def teardown(self):
pass
@ -102,9 +77,6 @@ class SQLBackend(Backend):
def set_user_password(self, user, password):
user.password = password
user.password_last_update = datetime.datetime.now(
datetime.timezone.utc
).replace(microsecond=0)
self.save(user)
def query(self, model, **kwargs):
@ -162,15 +134,12 @@ class SQLBackend(Backend):
def delete(self, instance):
# run the instance delete callback if existing
save_callback = instance.delete() if hasattr(instance, "delete") else iter([])
next(save_callback, None)
if hasattr(instance, "delete"):
instance.delete()
SQLBackend.instance.db_session.delete(instance)
SQLBackend.instance.db_session.commit()
# run the instance delete callback again if existing
next(save_callback, None)
def reload(self, instance):
# run the instance reload callback if existing
reload_callback = instance.reload() if hasattr(instance, "reload") else iter([])

View file

@ -15,21 +15,3 @@ class SQLSettings(BaseModel):
DATABASE_URI = "postgresql://user:password@localhost/database_name"
"""
PASSWORD_SCHEMES: str = "pbkdf2_sha512"
"""Password hashing scheme.
Defines password hashing scheme in SQL database.
examples : "mssql2000", "ldap_salted_sha1", "pbkdf2_sha512"
"""
AUTO_MIGRATE: bool = True
"""Whether to automatically apply database migrations.
If :data:`True`, database migrations will be automatically applied when Canaille web application is launched.
If :data:`False`, migrations must be applied manually with ``canaille db upgrade``.
.. note::
When running the CLI, migrations will never be applied.
"""

View file

@ -1,313 +0,0 @@
"""initial migration
Represents the state of the database in version 0.0.56
Revision ID: 1736443094
Revises:
Create Date: 2025-01-09 18:18:14.276914
"""
from collections.abc import Sequence
import sqlalchemy as sa
import sqlalchemy_utils.types.password
from alembic import op
import canaille.backends.sql.utils
# revision identifiers, used by Alembic.
revision: str = "1736443094"
down_revision: str | None = None
branch_labels: str | Sequence[str] | None = ("default",)
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"client",
sa.Column("id", sa.String(), nullable=False),
sa.Column(
"created",
canaille.backends.sql.utils.TZDateTime(timezone=True),
nullable=True,
),
sa.Column(
"last_modified",
canaille.backends.sql.utils.TZDateTime(timezone=True),
nullable=True,
),
sa.Column("description", sa.String(), nullable=True),
sa.Column("preconsent", sa.Boolean(), nullable=True),
sa.Column("post_logout_redirect_uris", sa.JSON(), nullable=True),
sa.Column("client_id", sa.String(), nullable=True),
sa.Column("client_secret", sa.String(), nullable=True),
sa.Column(
"client_id_issued_at",
canaille.backends.sql.utils.TZDateTime(timezone=True),
nullable=True,
),
sa.Column(
"client_secret_expires_at",
canaille.backends.sql.utils.TZDateTime(timezone=True),
nullable=True,
),
sa.Column("client_name", sa.String(), nullable=True),
sa.Column("contacts", sa.JSON(), nullable=True),
sa.Column("client_uri", sa.String(), nullable=True),
sa.Column("redirect_uris", sa.JSON(), nullable=True),
sa.Column("logo_uri", sa.String(), nullable=True),
sa.Column("grant_types", sa.JSON(), nullable=True),
sa.Column("response_types", sa.JSON(), nullable=True),
sa.Column("scope", sa.JSON(), nullable=True),
sa.Column("tos_uri", sa.String(), nullable=True),
sa.Column("policy_uri", sa.String(), nullable=True),
sa.Column("jwks_uri", sa.String(), nullable=True),
sa.Column("jwk", sa.String(), nullable=True),
sa.Column("token_endpoint_auth_method", sa.String(), nullable=True),
sa.Column("software_id", sa.String(), nullable=True),
sa.Column("software_version", sa.String(), nullable=True),
sa.PrimaryKeyConstraint("id"),
)
op.create_table(
"group",
sa.Column("id", sa.String(), nullable=False),
sa.Column(
"created",
canaille.backends.sql.utils.TZDateTime(timezone=True),
nullable=True,
),
sa.Column(
"last_modified",
canaille.backends.sql.utils.TZDateTime(timezone=True),
nullable=True,
),
sa.Column("display_name", sa.String(), nullable=False),
sa.Column("description", sa.String(), nullable=True),
sa.PrimaryKeyConstraint("id"),
)
op.create_table(
"user",
sa.Column("id", sa.String(), nullable=False),
sa.Column(
"created",
canaille.backends.sql.utils.TZDateTime(timezone=True),
nullable=True,
),
sa.Column(
"last_modified",
canaille.backends.sql.utils.TZDateTime(timezone=True),
nullable=True,
),
sa.Column("user_name", sa.String(), nullable=False),
sa.Column(
"password",
sqlalchemy_utils.types.password.PasswordType(max_length=4096),
nullable=True,
),
sa.Column("preferred_language", sa.String(), nullable=True),
sa.Column("family_name", sa.String(), nullable=True),
sa.Column("given_name", sa.String(), nullable=True),
sa.Column("formatted_name", sa.String(), nullable=True),
sa.Column("display_name", sa.String(), nullable=True),
sa.Column("emails", sa.JSON(), nullable=True),
sa.Column("phone_numbers", sa.JSON(), nullable=True),
sa.Column("formatted_address", sa.String(), nullable=True),
sa.Column("street", sa.String(), nullable=True),
sa.Column("postal_code", sa.String(), nullable=True),
sa.Column("locality", sa.String(), nullable=True),
sa.Column("region", sa.String(), nullable=True),
sa.Column("photo", sa.LargeBinary(), nullable=True),
sa.Column("profile_url", sa.String(), nullable=True),
sa.Column("employee_number", sa.String(), nullable=True),
sa.Column("department", sa.String(), nullable=True),
sa.Column("title", sa.String(), nullable=True),
sa.Column("organization", sa.String(), nullable=True),
sa.Column(
"lock_date",
canaille.backends.sql.utils.TZDateTime(timezone=True),
nullable=True,
),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("user_name"),
)
op.create_table(
"authorization_code",
sa.Column("id", sa.String(), nullable=False),
sa.Column(
"created",
canaille.backends.sql.utils.TZDateTime(timezone=True),
nullable=True,
),
sa.Column(
"last_modified",
canaille.backends.sql.utils.TZDateTime(timezone=True),
nullable=True,
),
sa.Column("authorization_code_id", sa.String(), nullable=True),
sa.Column("code", sa.String(), nullable=True),
sa.Column("client_id", sa.String(), nullable=False),
sa.Column("subject_id", sa.String(), nullable=False),
sa.Column("redirect_uri", sa.String(), nullable=True),
sa.Column("response_type", sa.String(), nullable=True),
sa.Column("scope", sa.JSON(), nullable=True),
sa.Column("nonce", sa.String(), nullable=True),
sa.Column(
"issue_date",
canaille.backends.sql.utils.TZDateTime(timezone=True),
nullable=True,
),
sa.Column("lifetime", sa.Integer(), nullable=True),
sa.Column("challenge", sa.String(), nullable=True),
sa.Column("challenge_method", sa.String(), nullable=True),
sa.Column(
"revokation_date",
canaille.backends.sql.utils.TZDateTime(timezone=True),
nullable=True,
),
sa.ForeignKeyConstraint(
["client_id"],
["client.id"],
),
sa.ForeignKeyConstraint(
["subject_id"],
["user.id"],
),
sa.PrimaryKeyConstraint("id"),
)
op.create_table(
"client_audience_association_table",
sa.Column("audience_id", sa.String(), nullable=True),
sa.Column("client_id", sa.String(), nullable=True),
sa.ForeignKeyConstraint(
["audience_id"],
["client.id"],
),
sa.ForeignKeyConstraint(
["client_id"],
["client.id"],
),
sa.PrimaryKeyConstraint("audience_id", "client_id"),
)
op.create_table(
"consent",
sa.Column("id", sa.String(), nullable=False),
sa.Column(
"created",
canaille.backends.sql.utils.TZDateTime(timezone=True),
nullable=True,
),
sa.Column(
"last_modified",
canaille.backends.sql.utils.TZDateTime(timezone=True),
nullable=True,
),
sa.Column("consent_id", sa.String(), nullable=True),
sa.Column("subject_id", sa.String(), nullable=False),
sa.Column("client_id", sa.String(), nullable=False),
sa.Column("scope", sa.JSON(), nullable=True),
sa.Column(
"issue_date",
canaille.backends.sql.utils.TZDateTime(timezone=True),
nullable=True,
),
sa.Column(
"revokation_date",
canaille.backends.sql.utils.TZDateTime(timezone=True),
nullable=True,
),
sa.ForeignKeyConstraint(
["client_id"],
["client.id"],
),
sa.ForeignKeyConstraint(
["subject_id"],
["user.id"],
),
sa.PrimaryKeyConstraint("id"),
)
op.create_table(
"membership_association_table",
sa.Column("user_id", sa.String(), nullable=False),
sa.Column("group_id", sa.String(), nullable=False),
sa.ForeignKeyConstraint(
["group_id"],
["group.id"],
),
sa.ForeignKeyConstraint(
["user_id"],
["user.id"],
),
sa.PrimaryKeyConstraint("user_id", "group_id"),
)
op.create_table(
"token",
sa.Column("id", sa.String(), nullable=False),
sa.Column(
"created",
canaille.backends.sql.utils.TZDateTime(timezone=True),
nullable=True,
),
sa.Column(
"last_modified",
canaille.backends.sql.utils.TZDateTime(timezone=True),
nullable=True,
),
sa.Column("token_id", sa.String(), nullable=True),
sa.Column("access_token", sa.String(), nullable=True),
sa.Column("client_id", sa.String(), nullable=False),
sa.Column("subject_id", sa.String(), nullable=False),
sa.Column("type", sa.String(), nullable=True),
sa.Column("refresh_token", sa.String(), nullable=True),
sa.Column("scope", sa.JSON(), nullable=True),
sa.Column(
"issue_date",
canaille.backends.sql.utils.TZDateTime(timezone=True),
nullable=True,
),
sa.Column("lifetime", sa.Integer(), nullable=True),
sa.Column(
"revokation_date",
canaille.backends.sql.utils.TZDateTime(timezone=True),
nullable=True,
),
sa.ForeignKeyConstraint(
["client_id"],
["client.id"],
),
sa.ForeignKeyConstraint(
["subject_id"],
["user.id"],
),
sa.PrimaryKeyConstraint("id"),
)
op.create_table(
"token_audience_association_table",
sa.Column("token_id", sa.String(), nullable=True),
sa.Column("client_id", sa.String(), nullable=True),
sa.ForeignKeyConstraint(
["client_id"],
["client.id"],
),
sa.ForeignKeyConstraint(
["token_id"],
["token.id"],
),
sa.PrimaryKeyConstraint("token_id", "client_id"),
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("token_audience_association_table")
op.drop_table("token")
op.drop_table("membership_association_table")
op.drop_table("consent")
op.drop_table("client_audience_association_table")
op.drop_table("authorization_code")
op.drop_table("user")
op.drop_table("group")
op.drop_table("client")
# ### end Alembic commands ###

View file

@ -1,76 +0,0 @@
"""0.0.58
Revision ID: 1736443538
Revises: 1736443094
Create Date: 2025-01-09 18:25:38.443578
"""
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
import canaille.backends.sql.utils
# revision identifiers, used by Alembic.
revision: str = "1736443538"
down_revision: str | None = "1736443094"
branch_labels: str | Sequence[str] | None = ()
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("token") as batch_op:
batch_op.alter_column("subject_id", existing_type=sa.VARCHAR(), nullable=True)
op.add_column(
"user",
sa.Column(
"password_last_update",
canaille.backends.sql.utils.TZDateTime(timezone=True),
nullable=True,
),
)
op.add_column(
"user", sa.Column("_password_failure_timestamps", sa.JSON(), nullable=True)
)
op.add_column(
"user",
sa.Column(
"last_otp_login",
canaille.backends.sql.utils.TZDateTime(timezone=True),
nullable=True,
),
)
op.add_column("user", sa.Column("secret_token", sa.String(), nullable=True))
op.add_column("user", sa.Column("hotp_counter", sa.Integer(), nullable=True))
op.add_column("user", sa.Column("one_time_password", sa.String(), nullable=True))
op.add_column(
"user",
sa.Column(
"one_time_password_emission_date",
canaille.backends.sql.utils.TZDateTime(timezone=True),
nullable=True,
),
)
with op.batch_alter_table("user") as batch_op:
batch_op.create_unique_constraint("uq_user_secret_token", ["secret_token"])
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("user") as batch_op:
batch_op.drop_constraint("uq_user_secret_token", type_="unique")
op.drop_column("user", "one_time_password_emission_date")
op.drop_column("user", "one_time_password")
op.drop_column("user", "hotp_counter")
op.drop_column("user", "secret_token")
op.drop_column("user", "last_otp_login")
op.drop_column("user", "_password_failure_timestamps")
op.drop_column("user", "password_last_update")
with op.batch_alter_table("token") as batch_op:
batch_op.alter_column("subject_id", existing_type=sa.VARCHAR(), nullable=False)
# ### end Alembic commands ###

View file

@ -1,29 +0,0 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from collections.abc import Sequence
import sqlalchemy as sa
import sqlalchemy_utils.types.password
from alembic import op
import canaille.backends.sql.utils
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

View file

@ -2,7 +2,6 @@ import datetime
import typing
import uuid
from flask import current_app
from sqlalchemy import Boolean
from sqlalchemy import Column
from sqlalchemy import ForeignKey
@ -20,7 +19,6 @@ from sqlalchemy_utils import force_auto_coercion
import canaille.core.models
import canaille.oidc.models
from canaille.backends import Backend
from canaille.backends.models import BackendModel
from .backend import Base
@ -64,13 +62,6 @@ membership_association_table = Table(
class User(canaille.core.models.User, Base, SqlAlchemyModel):
__tablename__ = "user"
@staticmethod
def default_password_arguments(**kwargs):
return dict(
schemes=Backend.instance.config["CANAILLE_SQL"]["PASSWORD_SCHEMES"],
**kwargs,
)
id: Mapped[str] = mapped_column(
String, primary_key=True, default=lambda: str(uuid.uuid4())
)
@ -82,13 +73,7 @@ class User(canaille.core.models.User, Base, SqlAlchemyModel):
)
user_name: Mapped[str] = mapped_column(String, unique=True, nullable=False)
password: Mapped[str] = mapped_column(
PasswordType(
onload=default_password_arguments,
),
nullable=True,
)
password_last_update: Mapped[datetime.datetime] = mapped_column(
TZDateTime(timezone=True), nullable=True
PasswordType(schemes=["pbkdf2_sha512"]), nullable=True
)
_password_failure_timestamps: Mapped[list[str]] = mapped_column(
MutableJson, nullable=True
@ -127,10 +112,6 @@ class User(canaille.core.models.User, Base, SqlAlchemyModel):
TZDateTime(timezone=True), nullable=True
)
def save(self):
if current_app.features.has_otp and not self.secret_token:
self.initialize_otp()
@property
def password_failure_timestamps(self):
if self._password_failure_timestamps:

View file

@ -1,5 +1,3 @@
import importlib.metadata
import click
from flask.cli import FlaskGroup
@ -9,21 +7,13 @@ import canaille.core.commands
import canaille.oidc.commands
from canaille import create_app
version = importlib.metadata.version("canaille")
def create_cli_app(): # pragma: no cover
# Force the non-application of migrations
return create_app(init_backend=False)
@click.group(
cls=FlaskGroup,
create_app=create_cli_app,
create_app=create_app,
add_version_option=False,
add_default_commands=False,
)
@click.version_option(version, prog_name="Canaille")
def cli():
"""Canaille management utilities."""
@ -32,7 +22,3 @@ canaille.app.commands.register(cli)
canaille.backends.commands.register(cli)
canaille.core.commands.register(cli)
canaille.oidc.commands.register(cli)
if __name__ == "__main__": # pragma: no cover
cli()

View file

@ -120,14 +120,6 @@ SECRET_KEY = "change me before you go in production"
# This url should not be modified.
# API_URL_HIBP = "https://api.pwnedpasswords.com/range/"
# Password validity duration.
# If a value is recorded Canaille will check if user's password is expired.
# Then, the user is forced to change his password when the lifetime of the password is over.
# This value is expressed in `ISO8601 format <https://en.wikipedia.org/wiki/ISO_8601#Durations>`_.
# Example for 60 days: "P60D"
# It is possible to disable this option by entering None.
# PASSWORD_LIFETIME = None
# [CANAILLE_SQL]
# The SQL database connection string
# Details on https://docs.sqlalchemy.org/en/20/core/engines.html

View file

@ -374,12 +374,3 @@ class CoreSettings(BaseModel):
PASSWORD_COMPROMISSION_CHECK_API_URL: str = "https://api.pwnedpasswords.com/range/"
"""Have i been pwned api url for compromission checks."""
PASSWORD_LIFETIME: str | None = None
"""Password validity duration.
If set, user passwords expire after this delay.
Users are forced to change their password when the lifetime of the password is over.
The duration value is expressed in `ISO8601 format <https://en.wikipedia.org/wiki/ISO_8601#Durations>`_.
For example, delay of 60 days is written "P60D".
"""

View file

@ -5,7 +5,7 @@ from . import admin
from . import auth
from . import groups
bp = Blueprint("core", __name__)
bp = Blueprint("core", __name__, template_folder="../templates")
bp.register_blueprint(account.bp)
bp.register_blueprint(admin.bp)

View file

@ -24,6 +24,7 @@ from canaille.app import build_hash
from canaille.app import default_fields
from canaille.app import models
from canaille.app import obj_to_b64
from canaille.app.flask import permissions_needed
from canaille.app.flask import render_htmx_template
from canaille.app.flask import request_is_htmx
from canaille.app.flask import smtp_needed
@ -41,7 +42,7 @@ from canaille.app.i18n import reload_translations
from canaille.app.session import current_user
from canaille.app.session import login_user
from canaille.app.session import logout_user
from canaille.app.templating import render_template
from canaille.app.themes import render_template
from canaille.backends import Backend
from ..mails import send_confirmation_email
@ -52,7 +53,6 @@ from ..mails import send_registration_mail
from .forms import EmailConfirmationForm
from .forms import InvitationForm
from .forms import JoinForm
from .forms import PasswordResetForm
from .forms import build_profile_form
bp = Blueprint("account", __name__)
@ -94,7 +94,7 @@ def join():
),
"success",
)
return render_template("core/join.html", form=form)
return render_template("join.html", form=form)
payload = RegistrationPayload(
creation_date_isoformat=datetime.datetime.now(
@ -130,17 +130,17 @@ def join():
"error",
)
return render_template("core/join.html", form=form)
return render_template("join.html", form=form)
@bp.route("/about")
def about():
version = metadata.version("canaille")
return render_template("core/about.html", version=version)
return render_template("about.html", version=version)
@bp.route("/users", methods=["GET", "POST"])
@user_needed("manage_users")
@permissions_needed("manage_users")
def users(user):
table_form = TableForm(
models.User,
@ -151,7 +151,7 @@ def users(user):
abort(404)
return render_htmx_template(
"core/users.html",
"users.html",
menuitem="users",
table_form=table_form,
)
@ -195,7 +195,7 @@ class RegistrationPayload(VerificationPayload):
@bp.route("/invite", methods=["GET", "POST"])
@smtp_needed()
@user_needed("manage_users")
@permissions_needed("manage_users")
def user_invitation(user):
form = InvitationForm(request.form or None)
email_sent = None
@ -221,7 +221,7 @@ def user_invitation(user):
email_sent = send_invitation_mail(form.email.data, registration_url)
return render_template(
"core/invite.html",
"invite.html",
form=form,
menuitems="users",
form_validated=form_validated,
@ -322,26 +322,27 @@ def registration(data=None, hash=None):
]
form["password2"].validators = [
wtforms.validators.DataRequired(),
wtforms.validators.EqualTo(
"password1", message=_("Password and confirmation do not match.")
),
]
form["password1"].flags.required = True
form["password2"].flags.required = True
if not request.form or form.form_control():
return render_template(
"core/profile_add.html",
"profile_add.html",
form=form,
menuitem="users",
edited_user=None,
self_deletion=False,
)
if not form.validate():
flash(_("User account creation failed."), "error")
return render_template(
"core/profile_add.html",
"profile_add.html",
form=form,
menuitem="users",
edited_user=None,
self_deletion=False,
)
user = profile_create(current_app, form)
@ -409,7 +410,7 @@ def email_confirmation(data, hash):
@bp.route("/profile", methods=("GET", "POST"))
@user_needed("manage_users")
@permissions_needed("manage_users")
def profile_creation(user):
form = build_profile_form(user.writable_fields, user.readable_fields)
form.process(CombinedMultiDict((request.files, request.form)) or None)
@ -420,17 +421,21 @@ def profile_creation(user):
if not request.form or form.form_control():
return render_template(
"core/profile_add.html",
"profile_add.html",
form=form,
menuitem="users",
edited_user=None,
self_deletion=False,
)
if not form.validate():
flash(_("User account creation failed."), "error")
return render_template(
"core/profile_add.html",
"profile_add.html",
form=form,
menuitem="users",
edited_user=None,
self_deletion=False,
)
user = profile_create(current_app, form)
@ -500,7 +505,7 @@ def profile_edition_main_form(user, edited_user, emails_readonly):
profile_form = build_profile_form(writable_fields, readable_fields)
profile_form.process(request_data or None, data=data)
profile_form.user = edited_user
profile_form.render_field_macro_file = "core/partial/profile_field.html"
profile_form.render_field_macro_file = "partial/profile_field.html"
profile_form.render_field_extra_context = {
"user": user,
"edited_user": edited_user,
@ -606,12 +611,12 @@ def profile_edition(user, edited_user):
}
if not request.form or profile_form.form_control():
return render_template("core/profile_edit.html", **render_context)
return render_template("profile_edit.html", **render_context)
if request_is_htmx() or request.form.get("action") == "edit-profile":
if not profile_form.validate():
flash(_("Profile edition failed."), "error")
return render_template("core/profile_edit.html", **render_context)
return render_template("profile_edit.html", **render_context)
profile_edition_main_form_validation(user, edited_user, profile_form)
@ -629,7 +634,7 @@ def profile_edition(user, edited_user):
if request.form.get("action") == "add_email":
if not emails_form.validate():
flash(_("Email addition failed."), "error")
return render_template("core/profile_edit.html", **render_context)
return render_template("profile_edit.html", **render_context)
if profile_edition_add_email(user, edited_user, emails_form):
flash(
@ -651,7 +656,7 @@ def profile_edition(user, edited_user):
user, edited_user, request.form.get("email_remove")
):
flash(_("Email deletion failed."), "error")
return render_template("core/profile_edit.html", **render_context)
return render_template("profile_edit.html", **render_context)
flash(_("The email have been successfully deleted."), "success")
return redirect(
@ -677,9 +682,7 @@ def profile_settings(user, edited_user):
return profile_settings_edit(user, edited_user)
if request.form.get("action") == "confirm-delete":
return render_template(
"core/modals/delete-account.html", edited_user=edited_user
)
return render_template("modals/delete-account.html", edited_user=edited_user)
if request.form.get("action") == "delete":
return profile_delete(user, edited_user)
@ -726,7 +729,7 @@ def profile_settings(user, edited_user):
and current_app.features.has_account_lockability
and not edited_user.locked
):
return render_template("core/modals/lock-account.html", edited_user=edited_user)
return render_template("modals/lock-account.html", edited_user=edited_user)
if (
request.form.get("action") == "lock"
@ -754,7 +757,7 @@ def profile_settings(user, edited_user):
request.form.get("action") == "confirm-reset-otp"
and current_app.features.has_otp
):
return render_template("core/modals/reset-otp.html", edited_user=edited_user)
return render_template("modals/reset-otp.html", edited_user=edited_user)
if request.form.get("action") == "reset-otp" and current_app.features.has_otp:
flash(_("One-time password authentication has been reset"), "success")
@ -784,7 +787,7 @@ def profile_settings_edit(editor, edited_user):
if hasattr(edited_user, k) and k in available_fields
}
data["groups"] = edited_user.groups
data["groups"] = [group.id for group in edited_user.groups]
form = build_profile_form(
editor.writable_fields & available_fields,
@ -822,7 +825,7 @@ def profile_settings_edit(editor, edited_user):
)
return render_template(
"core/profile_settings.html",
"profile_settings.html",
form=form,
menuitem=menuitem,
edited_user=edited_user,
@ -850,7 +853,7 @@ def profile_delete(user, edited_user):
@bp.route("/impersonate/<user:puppet>")
@user_needed("impersonate_users")
@permissions_needed("impersonate_users")
def impersonate(user, puppet):
if puppet.locked:
abort(403, _("Locked users cannot be impersonated."))
@ -884,23 +887,3 @@ def photo(user, field):
return send_file(
stream, mimetype="image/jpeg", last_modified=user.last_modified, etag=etag
)
@bp.route("/reset/<user:user>", methods=["GET", "POST"])
def reset(user):
form = PasswordResetForm(request.form)
if user != current_user() or not user.has_expired_password():
abort(403)
if request.form and form.validate():
Backend.instance.set_user_password(user, form.password.data)
login_user(user)
flash(_("Your password has been updated successfully"), "success")
return redirect(
session.pop(
"redirect-after-login",
url_for("core.account.profile_edition", edited_user=user),
)
)
return render_template("core/reset-password.html", form=form, user=user, hash=None)

View file

@ -7,11 +7,11 @@ from wtforms import StringField
from wtforms.validators import DataRequired
from canaille.app import obj_to_b64
from canaille.app.flask import user_needed
from canaille.app.flask import permissions_needed
from canaille.app.forms import Form
from canaille.app.forms import email_validator
from canaille.app.i18n import gettext as _
from canaille.app.templating import render_template
from canaille.app.themes import render_template
from canaille.core.mails import build_hash
from canaille.core.mails import send_test_mail
@ -34,7 +34,7 @@ class MailTestForm(Form):
@bp.route("/mail", methods=["GET", "POST"])
@user_needed("manage_oidc")
@permissions_needed("manage_oidc")
def mail_index(user):
form = MailTestForm(request.form or None)
if request.form and form.validate():
@ -43,15 +43,15 @@ def mail_index(user):
else:
flash(_("The test mail has not been sent correctly"), "error")
return render_template("core/mails/admin.html", form=form, menuitem="admin")
return render_template("mails/admin.html", form=form, menuitem="admin")
@bp.route("/mail/test.html")
@user_needed("manage_oidc")
@permissions_needed("manage_oidc")
def test_html(user):
base_url = url_for("core.account.index", _external=True)
return render_template(
"core/mails/test.html",
"mails/test.html",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=base_url,
logo=current_app.config["CANAILLE"]["LOGO"],
@ -62,18 +62,18 @@ def test_html(user):
@bp.route("/mail/test.txt")
@user_needed("manage_oidc")
@permissions_needed("manage_oidc")
def test_txt(user):
base_url = url_for("core.account.index", _external=True)
return render_template(
"core/mails/test.txt",
"mails/test.txt",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=current_app.config.get("SERVER_NAME", base_url),
)
@bp.route("/mail/password-init.html")
@user_needed("manage_oidc")
@permissions_needed("manage_oidc")
def password_init_html(user):
base_url = url_for("core.account.index", _external=True)
reset_url = url_for(
@ -87,7 +87,7 @@ def password_init_html(user):
)
return render_template(
"core/mails/firstlogin.html",
"mails/firstlogin.html",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=base_url,
reset_url=reset_url,
@ -99,7 +99,7 @@ def password_init_html(user):
@bp.route("/mail/password-init.txt")
@user_needed("manage_oidc")
@permissions_needed("manage_oidc")
def password_init_txt(user):
base_url = url_for("core.account.index", _external=True)
reset_url = url_for(
@ -110,7 +110,7 @@ def password_init_txt(user):
)
return render_template(
"core/mails/firstlogin.txt",
"mails/firstlogin.txt",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=current_app.config.get("SERVER_NAME", base_url),
reset_url=reset_url,
@ -118,7 +118,7 @@ def password_init_txt(user):
@bp.route("/mail/reset.html")
@user_needed("manage_oidc")
@permissions_needed("manage_oidc")
def password_reset_html(user):
base_url = url_for("core.account.index", _external=True)
reset_url = url_for(
@ -132,7 +132,7 @@ def password_reset_html(user):
)
return render_template(
"core/mails/reset.html",
"mails/reset.html",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=base_url,
reset_url=reset_url,
@ -144,7 +144,7 @@ def password_reset_html(user):
@bp.route("/mail/reset.txt")
@user_needed("manage_oidc")
@permissions_needed("manage_oidc")
def password_reset_txt(user):
base_url = url_for("core.account.index", _external=True)
reset_url = url_for(
@ -155,7 +155,7 @@ def password_reset_txt(user):
)
return render_template(
"core/mails/reset.txt",
"mails/reset.txt",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=current_app.config.get("SERVER_NAME", base_url),
reset_url=reset_url,
@ -163,7 +163,7 @@ def password_reset_txt(user):
@bp.route("/mail/<identifier>/<email>/invitation.html")
@user_needed("manage_oidc")
@permissions_needed("manage_oidc")
def invitation_html(user, identifier, email):
base_url = url_for("core.account.index", _external=True)
registration_url = url_for(
@ -174,7 +174,7 @@ def invitation_html(user, identifier, email):
)
return render_template(
"core/mails/invitation.html",
"mails/invitation.html",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=base_url,
registration_url=registration_url,
@ -186,7 +186,7 @@ def invitation_html(user, identifier, email):
@bp.route("/mail/<identifier>/<email>/invitation.txt")
@user_needed("manage_oidc")
@permissions_needed("manage_oidc")
def invitation_txt(user, identifier, email):
base_url = url_for("core.account.index", _external=True)
registration_url = url_for(
@ -197,7 +197,7 @@ def invitation_txt(user, identifier, email):
)
return render_template(
"core/mails/invitation.txt",
"mails/invitation.txt",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=base_url,
registration_url=registration_url,
@ -205,7 +205,7 @@ def invitation_txt(user, identifier, email):
@bp.route("/mail/<identifier>/<email>/email-confirmation.html")
@user_needed("manage_oidc")
@permissions_needed("manage_oidc")
def email_confirmation_html(user, identifier, email):
base_url = url_for("core.account.index", _external=True)
email_confirmation_url = url_for(
@ -216,7 +216,7 @@ def email_confirmation_html(user, identifier, email):
)
return render_template(
"core/mails/email-confirmation.html",
"mails/email-confirmation.html",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=base_url,
confirmation_url=email_confirmation_url,
@ -228,7 +228,7 @@ def email_confirmation_html(user, identifier, email):
@bp.route("/mail/<identifier>/<email>/email-confirmation.txt")
@user_needed("manage_oidc")
@permissions_needed("manage_oidc")
def email_confirmation_txt(user, identifier, email):
base_url = url_for("core.account.index", _external=True)
email_confirmation_url = url_for(
@ -239,7 +239,7 @@ def email_confirmation_txt(user, identifier, email):
)
return render_template(
"core/mails/email-confirmation.txt",
"mails/email-confirmation.txt",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=base_url,
confirmation_url=email_confirmation_url,
@ -247,7 +247,7 @@ def email_confirmation_txt(user, identifier, email):
@bp.route("/mail/<email>/registration.html")
@user_needed("manage_oidc")
@permissions_needed("manage_oidc")
def registration_html(user, email):
base_url = url_for("core.account.index", _external=True)
registration_url = url_for(
@ -258,7 +258,7 @@ def registration_html(user, email):
)
return render_template(
"core/mails/registration.html",
"mails/registration.html",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=base_url,
registration_url=registration_url,
@ -270,7 +270,7 @@ def registration_html(user, email):
@bp.route("/mail/<email>/registration.txt")
@user_needed("manage_oidc")
@permissions_needed("manage_oidc")
def registration_txt(user, email):
base_url = url_for("core.account.index", _external=True)
registration_url = url_for(
@ -281,7 +281,7 @@ def registration_txt(user, email):
)
return render_template(
"core/mails/registration.txt",
"mails/registration.txt",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=base_url,
registration_url=registration_url,
@ -289,7 +289,7 @@ def registration_txt(user, email):
@bp.route("/mail/compromised_password_check_failure.html")
@user_needed("manage_oidc")
@permissions_needed("manage_oidc")
def compromised_password_check_failure_html(user):
base_url = url_for("core.account.index", _external=True)
user_name = "<USER NAME>"
@ -298,7 +298,7 @@ def compromised_password_check_failure_html(user):
user_email = "<USER EMAIL>"
return render_template(
"core/mails/compromised_password_check_failure.html",
"mails/compromised_password_check_failure.html",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=base_url,
user_name=user_name,
@ -313,7 +313,7 @@ def compromised_password_check_failure_html(user):
@bp.route("/mail/compromised_password_check_failure.txt")
@user_needed("manage_oidc")
@permissions_needed("manage_oidc")
def compromised_password_check_failure_txt(user):
base_url = url_for("core.account.index", _external=True)
user_name = "<USER NAME>"
@ -322,7 +322,7 @@ def compromised_password_check_failure_txt(user):
user_email = "<USER EMAIL>"
return render_template(
"core/mails/compromised_password_check_failure.txt",
"mails/compromised_password_check_failure.txt",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=base_url,
user_name=user_name,
@ -333,13 +333,13 @@ def compromised_password_check_failure_txt(user):
@bp.route("/mail/email_otp.html")
@user_needed("manage_oidc")
@permissions_needed("manage_oidc")
def email_otp_html(user):
base_url = url_for("core.account.index", _external=True)
otp = "000000"
return render_template(
"core/mails/email_otp.html",
"mails/email_otp.html",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=base_url,
otp=otp,
@ -351,13 +351,13 @@ def email_otp_html(user):
@bp.route("/mail/email_otp.txt")
@user_needed("manage_oidc")
@permissions_needed("manage_oidc")
def email_otp_txt(user):
base_url = url_for("core.account.index", _external=True)
otp = "000000"
return render_template(
"core/mails/email_otp.txt",
"mails/email_otp.txt",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=base_url,
otp=otp,

View file

@ -18,7 +18,7 @@ from canaille.app.i18n import gettext as _
from canaille.app.session import current_user
from canaille.app.session import login_user
from canaille.app.session import logout_user
from canaille.app.templating import render_template
from canaille.app.themes import render_template
from canaille.backends import Backend
from canaille.core.endpoints.forms import TwoFactorForm
from canaille.core.models import SEND_NEW_OTP_DELAY
@ -49,11 +49,11 @@ def login():
)
form = LoginForm(request.form or None)
form.render_field_macro_file = "core/partial/login_field.html"
form.render_field_macro_file = "partial/login_field.html"
form["login"].render_kw["placeholder"] = Backend.instance.login_placeholder()
if not request.form or form.form_control():
return render_template("core/login.html", form=form)
return render_template("login.html", form=form)
user = Backend.instance.get_user_from_login(form.login.data)
if user and not user.has_password() and current_app.features.has_smtp:
@ -62,7 +62,7 @@ def login():
if not form.validate():
logout_user()
flash(_("Login failed, please check your information"), "error")
return render_template("core/login.html", form=form)
return render_template("login.html", form=form)
session["attempt_login"] = form.login.data
return redirect(url_for("core.auth.password"))
@ -80,11 +80,11 @@ def password():
return redirect(url_for("core.auth.login"))
form = PasswordForm(request.form or None)
form.render_field_macro_file = "core/partial/login_field.html"
form.render_field_macro_file = "partial/login_field.html"
if not request.form or form.form_control():
return render_template(
"core/password.html", form=form, username=session["attempt_login"]
"password.html", form=form, username=session["attempt_login"]
)
user = Backend.instance.get_user_from_login(session["attempt_login"])
@ -95,7 +95,7 @@ def password():
logout_user()
flash(_("Login failed, please check your information"), "error")
return render_template(
"core/password.html", form=form, username=session["attempt_login"]
"password.html", form=form, username=session["attempt_login"]
)
success, message = Backend.instance.check_user_password(user, form.password.data)
@ -103,11 +103,11 @@ def password():
if not success:
logout_user()
current_app.logger.security(
f"Failed login attempt for {session['attempt_login']} from {request_ip}"
f'Failed login attempt for {session["attempt_login"]} from {request_ip}'
)
flash(message or _("Login failed, please check your information"), "error")
return render_template(
"core/password.html", form=form, username=session["attempt_login"]
"password.html", form=form, username=session["attempt_login"]
)
otp_methods = []
@ -121,12 +121,12 @@ def password():
if otp_methods:
session["remaining_otp_methods"] = otp_methods
session["attempt_login_with_correct_password"] = session.pop("attempt_login")
return redirect_to_verify_mfa(
return redirect_to_verify_2fa(
user, otp_methods[0], request_ip, url_for("core.auth.password")
)
else:
current_app.logger.security(
f"Succeed login attempt for {session['attempt_login']} from {request_ip}"
f'Succeed login attempt for {session["attempt_login"]} from {request_ip}'
)
del session["attempt_login"]
login_user(user)
@ -165,7 +165,7 @@ def firstlogin(user):
form = FirstLoginForm(request.form or None)
if not request.form:
return render_template("core/firstlogin.html", form=form, user=user)
return render_template("firstlogin.html", form=form, user=user)
form.validate()
@ -182,7 +182,7 @@ def firstlogin(user):
else:
flash(_("Could not send the password initialization email"), "error")
return render_template("core/firstlogin.html", form=form)
return render_template("firstlogin.html", form=form)
@bp.route("/reset", methods=["GET", "POST"])
@ -193,11 +193,11 @@ def forgotten():
form = ForgottenPasswordForm(request.form)
if not request.form:
return render_template("core/forgotten-password.html", form=form)
return render_template("forgotten-password.html", form=form)
if not form.validate():
flash(_("Could not send the password reset link."), "error")
return render_template("core/forgotten-password.html", form=form)
return render_template("forgotten-password.html", form=form)
user = Backend.instance.get_user_from_login(form.login.data)
success_message = _(
@ -208,7 +208,7 @@ def forgotten():
not user or not user.can_edit_self
):
flash(success_message, "success")
return render_template("core/forgotten-password.html", form=form)
return render_template("forgotten-password.html", form=form)
if not user.can_edit_self:
flash(
@ -219,7 +219,7 @@ def forgotten():
),
"error",
)
return render_template("core/forgotten-password.html", form=form)
return render_template("forgotten-password.html", form=form)
request_ip = request.remote_addr or "unknown IP"
success = True
@ -238,7 +238,7 @@ def forgotten():
"error",
)
return render_template("core/forgotten-password.html", form=form)
return render_template("forgotten-password.html", form=form)
@bp.route("/reset/<user:user>/<hash>", methods=["GET", "POST"])
@ -274,10 +274,10 @@ def reset(user, hash):
)
)
return render_template("core/reset-password.html", form=form, user=user, hash=hash)
return render_template("reset-password.html", form=form, user=user, hash=hash)
@bp.route("/setup-mfa")
@bp.route("/setup-2fa")
def setup_two_factor_auth():
if not current_app.features.has_otp:
abort(404)
@ -298,14 +298,14 @@ def setup_two_factor_auth():
uri = user.get_otp_authentication_setup_uri()
base64_qr_image = get_b64encoded_qr_image(uri)
return render_template(
"core/setup-mfa.html",
"setup-2fa.html",
secret=user.secret_token,
qr_image=base64_qr_image,
user=user,
username=user.user_name,
)
@bp.route("/verify-mfa", methods=["GET", "POST"])
@bp.route("/verify-2fa", methods=["GET", "POST"])
def verify_two_factor_auth():
if current_user():
return redirect(
@ -331,11 +331,11 @@ def verify_two_factor_auth():
abort(404)
form = TwoFactorForm(request.form or None)
form.render_field_macro_file = "core/partial/login_field.html"
form.render_field_macro_file = "partial/login_field.html"
if not request.form or form.form_control():
return render_template(
"core/verify-mfa.html",
"verify-2fa.html",
form=form,
username=session["attempt_login_with_correct_password"],
method=current_otp_method,
@ -349,7 +349,7 @@ def verify_two_factor_auth():
session["remaining_otp_methods"].pop(0)
request_ip = request.remote_addr or "unknown IP"
if session["remaining_otp_methods"]:
return redirect_to_verify_mfa(
return redirect_to_verify_2fa(
user,
session["remaining_otp_methods"][0],
request_ip,
@ -359,7 +359,7 @@ def verify_two_factor_auth():
user.last_otp_login = datetime.datetime.now(datetime.timezone.utc)
Backend.instance.save(user)
current_app.logger.security(
f"Succeed login attempt for {session['attempt_login_with_correct_password']} from {request_ip}"
f'Succeed login attempt for {session["attempt_login_with_correct_password"]} from {request_ip}'
)
del session["attempt_login_with_correct_password"]
login_user(user)
@ -380,7 +380,7 @@ def verify_two_factor_auth():
)
request_ip = request.remote_addr or "unknown IP"
current_app.logger.security(
f"Failed login attempt (wrong OTP) for {session['attempt_login_with_correct_password']} from {request_ip}"
f'Failed login attempt (wrong OTP) for {session["attempt_login_with_correct_password"]} from {request_ip}'
)
return redirect(url_for("core.auth.verify_two_factor_auth"))
@ -408,7 +408,7 @@ def send_mail_otp():
Backend.instance.save(user)
request_ip = request.remote_addr or "unknown IP"
current_app.logger.security(
f"Sent one-time password for {session['attempt_login_with_correct_password']} to {user.emails[0]} from {request_ip}"
f'Sent one-time password for {session["attempt_login_with_correct_password"]} to {user.emails[0]} from {request_ip}'
)
flash(
"Code successfully sent!",
@ -448,7 +448,7 @@ def send_sms_otp():
Backend.instance.save(user)
request_ip = request.remote_addr or "unknown IP"
current_app.logger.security(
f"Sent one-time password for {session['attempt_login_with_correct_password']} to {user.phone_numbers[0]} from {request_ip}"
f'Sent one-time password for {session["attempt_login_with_correct_password"]} to {user.phone_numbers[0]} from {request_ip}'
)
flash(
"Code successfully sent!",
@ -465,7 +465,7 @@ def send_sms_otp():
return redirect(url_for("core.auth.verify_two_factor_auth"))
def redirect_to_verify_mfa(user, otp_method, request_ip, fail_redirect_url):
def redirect_to_verify_2fa(user, otp_method, request_ip, fail_redirect_url):
if otp_method in ["HOTP", "TOTP"]:
if not user.last_otp_login:
flash(
@ -487,7 +487,7 @@ def redirect_to_verify_mfa(user, otp_method, request_ip, fail_redirect_url):
"info",
)
current_app.logger.security(
f"Sent one-time password for {session['attempt_login_with_correct_password']} to {user.emails[0]} from {request_ip}"
f'Sent one-time password for {session["attempt_login_with_correct_password"]} to {user.emails[0]} from {request_ip}'
)
return redirect(url_for("core.auth.verify_two_factor_auth"))
else:
@ -510,7 +510,7 @@ def redirect_to_verify_mfa(user, otp_method, request_ip, fail_redirect_url):
"info",
)
current_app.logger.security(
f"Sent one-time password for {session['attempt_login_with_correct_password']} to {user.phone_numbers[0]} from {request_ip}"
f'Sent one-time password for {session["attempt_login_with_correct_password"]} to {user.phone_numbers[0]} from {request_ip}'
)
return redirect(url_for("core.auth.verify_two_factor_auth"))
else:

View file

@ -18,16 +18,77 @@ from canaille.app.forms import password_too_long_validator
from canaille.app.forms import phone_number
from canaille.app.forms import set_readonly
from canaille.app.forms import unique_values
from canaille.app.i18n import gettext
from canaille.app.i18n import lazy_gettext as _
from canaille.app.i18n import native_language_name_from_code
from canaille.backends import Backend
from canaille.core.models import OTP_DIGITS
from canaille.core.validators import existing_group_member
from canaille.core.validators import existing_login
from canaille.core.validators import non_empty_groups
from canaille.core.validators import unique_email
from canaille.core.validators import unique_group
from canaille.core.validators import unique_user_name
def unique_user_name(form, field):
if Backend.instance.get(models.User, user_name=field.data) and (
not getattr(form, "user", None) or form.user.user_name != field.data
):
raise wtforms.ValidationError(
_("The user name '{user_name}' already exists").format(user_name=field.data)
)
def unique_email(form, field):
if Backend.instance.get(models.User, emails=field.data) and (
not getattr(form, "user", None) or field.data not in form.user.emails
):
raise wtforms.ValidationError(
_("The email '{email}' is already used").format(email=field.data)
)
def unique_group(form, field):
if Backend.instance.get(models.Group, display_name=field.data):
raise wtforms.ValidationError(
_("The group '{group}' already exists").format(group=field.data)
)
def existing_login(form, field):
if not current_app.config["CANAILLE"][
"HIDE_INVALID_LOGINS"
] and not Backend.instance.get_user_from_login(field.data):
raise wtforms.ValidationError(
_("The login '{login}' does not exist").format(login=field.data)
)
def existing_group_member(form, field):
if field.data is None:
raise wtforms.ValidationError(
gettext("The user you are trying to remove does not exist.")
)
if field.data not in form.group.members:
raise wtforms.ValidationError(
gettext(
"The user '{user}' has already been removed from the group '{group}'"
).format(user=field.data.formatted_name, group=form.group.display_name)
)
def non_empty_groups(form, field):
"""LDAP groups cannot be empty because groupOfNames.member is a MUST
attribute.
https://www.rfc-editor.org/rfc/rfc2256.html#section-7.10
"""
if not form.user:
return
for group in form.user.groups:
if len(group.members) == 1 and group not in field.data:
raise wtforms.ValidationError(
_(
"The group '{group}' cannot be removed, because it must have at least one user left."
).format(group=group.display_name)
)
class LoginForm(Form):
@ -68,12 +129,7 @@ class ForgottenPasswordForm(Form):
class PasswordResetForm(Form):
password = wtforms.PasswordField(
_("Password"),
validators=[
wtforms.validators.DataRequired(),
password_length_validator,
password_too_long_validator,
compromised_password_validator,
],
validators=[wtforms.validators.DataRequired()],
render_kw={
"autocomplete": "new-password",
},
@ -260,13 +316,10 @@ PROFILE_FORM_FIELDS = dict(
groups=wtforms.SelectMultipleField(
_("Groups"),
default=[],
choices=lambda: sorted(
[
(group, group.display_name)
for group in Backend.instance.query(models.Group)
],
key=lambda group: group[0].id,
),
choices=lambda: [
(group, group.display_name)
for group in Backend.instance.query(models.Group)
],
render_kw={"placeholder": _("users, admins …")},
coerce=IDToModel("Group"),
validators=[non_empty_groups],
@ -312,8 +365,6 @@ def build_profile_form(write_field_names, readonly_field_names, user=None):
class CreateGroupForm(Form):
"""The group creation form."""
display_name = wtforms.StringField(
_("Name"),
validators=[wtforms.validators.DataRequired(), unique_group],
@ -328,8 +379,6 @@ class CreateGroupForm(Form):
class EditGroupForm(Form):
"""The group edition form."""
display_name = wtforms.StringField(
_("Name"),
validators=[
@ -373,8 +422,6 @@ class JoinForm(Form):
class InvitationForm(Form):
"""The user invitation form."""
user_name = wtforms.StringField(
_("User name"),
render_kw={"placeholder": _("jdoe")},

View file

@ -6,11 +6,11 @@ from flask import request
from flask import url_for
from canaille.app import models
from canaille.app.flask import permissions_needed
from canaille.app.flask import render_htmx_template
from canaille.app.flask import user_needed
from canaille.app.forms import TableForm
from canaille.app.i18n import gettext as _
from canaille.app.templating import render_template
from canaille.app.themes import render_template
from canaille.backends import Backend
from .forms import CreateGroupForm
@ -21,19 +21,17 @@ bp = Blueprint("groups", __name__, url_prefix="/groups")
@bp.route("/", methods=["GET", "POST"])
@user_needed("manage_groups")
@permissions_needed("manage_groups")
def groups(user):
table_form = TableForm(models.Group, formdata=request.form)
if request.form and request.form.get("page") and not table_form.validate():
abort(404)
return render_htmx_template(
"core/groups.html", menuitem="groups", table_form=table_form
)
return render_htmx_template("groups.html", menuitem="groups", table_form=table_form)
@bp.route("/add", methods=("GET", "POST"))
@user_needed("manage_groups")
@permissions_needed("manage_groups")
def create_group(user):
form = CreateGroupForm(request.form or None)
@ -56,12 +54,12 @@ def create_group(user):
return redirect(url_for("core.groups.group", group=group))
return render_template(
"core/group.html", menuitem="groups", form=form, edited_group=None, members=None
"group.html", menuitem="groups", form=form, edited_group=None, members=None
)
@bp.route("/<group:group>", methods=("GET", "POST"))
@user_needed("manage_groups")
@permissions_needed("manage_groups")
def group(user, group):
if (
request.method == "GET"
@ -71,7 +69,7 @@ def group(user, group):
return edit_group(group)
if request.form.get("action") == "confirm-delete":
return render_template("core/modals/delete-group.html", group=group)
return render_template("modals/delete-group.html", group=group)
if request.form.get("action") == "delete":
return delete_group(group)
@ -118,8 +116,8 @@ def edit_group(group):
flash(_("Group edition failed."), "error")
return render_htmx_template(
"core/group.html",
"core/partial/group-members.html",
"group.html",
"partial/group-members.html",
form=form,
menuitem="groups",
edited_group=group,
@ -139,7 +137,7 @@ def delete_member(group):
elif request.form.get("action") == "confirm-remove-member":
return render_template(
"core/modals/remove-group-member.html", group=group, form=form
"modals/remove-group-member.html", group=group, form=form
)
else:

View file

@ -5,7 +5,7 @@ from canaille.app import build_hash
from canaille.app.i18n import gettext as _
from canaille.app.mails import logo
from canaille.app.mails import send_email
from canaille.app.templating import render_template
from canaille.app.themes import render_template
def send_test_mail(email):
@ -16,12 +16,12 @@ def send_test_mail(email):
website_name=current_app.config["CANAILLE"]["NAME"]
)
text_body = render_template(
"core/mails/test.txt",
"mails/test.txt",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=base_url,
)
html_body = render_template(
"core/mails/test.html",
"mails/test.html",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=base_url,
logo=f"cid:{logo_cid[1:-1]}" if logo_cid else None,
@ -55,13 +55,13 @@ def send_password_reset_mail(user, mail):
website_name=current_app.config["CANAILLE"]["NAME"]
)
text_body = render_template(
"core/mails/reset.txt",
"mails/reset.txt",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=base_url,
reset_url=reset_url,
)
html_body = render_template(
"core/mails/reset.html",
"mails/reset.html",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=base_url,
reset_url=reset_url,
@ -96,13 +96,13 @@ def send_password_initialization_mail(user, email):
website_name=current_app.config["CANAILLE"]["NAME"]
)
text_body = render_template(
"core/mails/firstlogin.txt",
"mails/firstlogin.txt",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=base_url,
reset_url=reset_url,
)
html_body = render_template(
"core/mails/firstlogin.html",
"mails/firstlogin.html",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=base_url,
reset_url=reset_url,
@ -127,13 +127,13 @@ def send_invitation_mail(email, registration_url):
website_name=current_app.config["CANAILLE"]["NAME"]
)
text_body = render_template(
"core/mails/invitation.txt",
"mails/invitation.txt",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=base_url,
registration_url=registration_url,
)
html_body = render_template(
"core/mails/invitation.html",
"mails/invitation.html",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=base_url,
registration_url=registration_url,
@ -158,13 +158,13 @@ def send_confirmation_email(email, confirmation_url):
website_name=current_app.config["CANAILLE"]["NAME"]
)
text_body = render_template(
"core/mails/email-confirmation.txt",
"mails/email-confirmation.txt",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=base_url,
confirmation_url=confirmation_url,
)
html_body = render_template(
"core/mails/email-confirmation.html",
"mails/email-confirmation.html",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=base_url,
confirmation_url=confirmation_url,
@ -189,13 +189,13 @@ def send_registration_mail(email, registration_url):
website_name=current_app.config["CANAILLE"]["NAME"]
)
text_body = render_template(
"core/mails/registration.txt",
"mails/registration.txt",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=base_url,
registration_url=registration_url,
)
html_body = render_template(
"core/mails/registration.html",
"mails/registration.html",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=base_url,
registration_url=registration_url,
@ -222,7 +222,7 @@ def send_compromised_password_check_failure_mail(
website_name=current_app.config["CANAILLE"]["NAME"]
)
text_body = render_template(
"core/mails/compromised_password_check_failure.txt",
"mails/compromised_password_check_failure.txt",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=base_url,
check_password_url=check_password_url,
@ -231,7 +231,7 @@ def send_compromised_password_check_failure_mail(
hashed_password=hashed_password,
)
html_body = render_template(
"core/mails/compromised_password_check_failure.html",
"mails/compromised_password_check_failure.html",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=base_url,
check_password_url=check_password_url,
@ -259,13 +259,13 @@ def send_one_time_password_mail(mail, otp):
website_name=current_app.config["CANAILLE"]["NAME"]
)
text_body = render_template(
"core/mails/email_otp.txt",
"mails/email_otp.txt",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=base_url,
otp=otp,
)
html_body = render_template(
"core/mails/email_otp.html",
"mails/email_otp.html",
site_name=current_app.config["CANAILLE"]["NAME"],
site_url=base_url,
otp=otp,

View file

@ -4,12 +4,18 @@ from typing import Annotated
from typing import ClassVar
from flask import current_app
from pydantic import TypeAdapter
from httpx import Client as httpx_client
from scim2_client.engines.httpx import SyncSCIMClient
from scim2_models import SearchRequest
from werkzeug.security import gen_salt
from canaille.app import models
from canaille.backends import Backend
from canaille.backends.models import Model
from canaille.core.configuration import Permission
from canaille.core.mails import send_one_time_password_mail
from canaille.core.sms import send_one_time_password_sms
from canaille.scim.models import user_from_canaille_to_scim_for_client
HOTP_LOOK_AHEAD_WINDOW = 10
OTP_DIGITS = 6
@ -100,11 +106,6 @@ class User(Model):
"never").
"""
password_last_update: datetime.datetime | None = None
"""Specifies the last time the user password was changed.
By default, the date of creation of the password is retained.
"""
preferred_language: str | None = None
"""Indicates the user's preferred written or spoken languages and is
generally used for selecting a localized user interface.
@ -288,6 +289,14 @@ class User(Model):
_writable_fields = None
_permissions = None
def save(self):
if current_app.features.has_otp and not self.secret_token:
self.initialize_otp()
self.propagate_scim_changes()
def delete(self):
self.propagate_scim_delete()
def has_password(self) -> bool:
"""Check whether a password has been set for the user."""
return self.password is not None
@ -492,22 +501,86 @@ class User(Model):
).total_seconds()
return max(calculated_delay - time_since_last_failed_bind, 0)
def has_expired_password(self):
last_update = self.password_last_update or datetime.datetime.now(
datetime.timezone.utc
)
if current_app.config["CANAILLE"]["PASSWORD_LIFETIME"] is None:
password_expiration = None
else:
password_expiration = TypeAdapter(datetime.timedelta).validate_python(
current_app.config["CANAILLE"]["PASSWORD_LIFETIME"]
def propagate_scim_changes(self):
for client in self.get_clients():
scim_tokens = Backend.instance.query(
models.Token, client=client, subject=None
)
valid_scim_tokens = [
token
for token in scim_tokens
if not token.is_expired() and not token.is_revoked()
]
if valid_scim_tokens:
scim_token = valid_scim_tokens[0]
else:
scim_token = models.Token(
token_id=gen_salt(48),
access_token=gen_salt(48),
subject=None,
audience=[client],
client=client,
refresh_token=gen_salt(48),
scope=["openid", "profile"],
issue_date=datetime.datetime.now(datetime.timezone.utc),
lifetime=3600,
)
Backend.instance.save(scim_token)
return (
password_expiration is not None
and last_update + password_expiration
< datetime.datetime.now(datetime.timezone.utc)
client_httpx = httpx_client(
base_url=client.client_uri,
headers={"Authorization": f"Bearer {scim_token.access_token}"},
)
scim = SyncSCIMClient(client_httpx)
scim.discover()
User = scim.get_resource_model("User")
EnterpriseUser = User.get_extension_model("EnterpriseUser")
user = user_from_canaille_to_scim_for_client(self, User, EnterpriseUser)
req = SearchRequest(filter=f'userName eq "{self.user_name}"')
response = scim.query(User, search_request=req)
if not response.resources:
try:
scim.create(user)
except Exception:
current_app.logger.warning(
f"SCIM User {self.user_name} creation for client {client.client_name} failed"
)
else:
user.id = response.resources[0].id
try:
scim.replace(user)
except:
current_app.logger.warning(
f"SCIM User {self.user_name} update for client {client.client_name} failed"
)
req = SearchRequest(filter=f'userName eq "{self.user_name}"')
response = scim.query(User, search_request=req)
def propagate_scim_delete(self):
client = httpx_client(
base_url="http://localhost:8080",
headers={"Authorization": "Bearer MON_SUPER_TOKEN"},
)
scim = SyncSCIMClient(client)
scim.discover()
User = scim.get_resource_model("User")
try:
scim.delete(User, self.scim_id)
except:
current_app.logger.warning(f"SCIM User {self.user_name} delete failed")
def get_clients(self):
if self.id:
consents = Backend.instance.query(models.Consent, subject=self)
consented_clients = {t.client for t in consents}
preconsented_clients = [
client
for client in Backend.instance.query(models.Client)
if client.preconsent and client not in consented_clients
]
return list(consented_clients) + list(preconsented_clients)
return []
class Group(Model):

View file

@ -1,14 +1,14 @@
from flask import current_app
from canaille.app.sms import send_sms
from canaille.app.templating import render_template
from canaille.app.themes import render_template
def send_one_time_password_sms(phone_number, otp):
website_name = current_app.config["CANAILLE"]["NAME"]
text_body = render_template(
"core/sms/sms_otp.txt",
"sms/sms_otp.txt",
website_name=website_name,
otp=otp,
)

View file

@ -1,11 +1,3 @@
{#
The 'About' page.
This is an informational page, displaying the project links.
:param version: The current Canaille version.
:type version: :class:`str`
#}
{% extends theme('base.html') %}
{% import 'macro/form.html' as fui %}

View file

@ -1,7 +1,3 @@
{# The first login page.
This page is displayed to users who do not have set a password yet.
#}
{% extends theme('base.html') %}
{% import 'macro/form.html' as fui %}

View file

@ -1,7 +1,3 @@
{# Password forgotten page.
This page displays a form asking for the email address of users who cannot remember their password.
#}
{% extends theme('base.html') %}
{% import 'macro/form.html' as fui %}

View file

@ -1,12 +1,3 @@
{# Group edition page.
Displays the group edition or creation form.
:param edited_group: :data:`None` in a creation context. In edition context this is the edited group.
:type edited_group: :class:`~canaille.core.models.Group`
:param form: The group edition/creation form.
:type form: :class:`~canaille.core.endpoints.forms.CreateGroupForm` or :class:`~canaille.core.endpoints.forms.EditGroupForm`
#}
{% extends theme('base.html') %}
{% import 'macro/form.html' as fui %}
{% import "macro/table.html" as table %}
@ -89,7 +80,7 @@ Displays the group edition or creation form.
{{ table.search(table_form, "table.users") }}
</div>
{% include "core/partial/group-members.html" %}
{% include "partial/group-members.html" %}
{% endif %}
{% endblock %}

View file

@ -1,8 +1,3 @@
{# The group list page.
:param table: A :class:`~canaille.core.models.Group` pagination form.
:type table: :class:`~canaille.app.forms.TableForm`
#}
{% extends theme('base.html') %}
{% import "macro/table.html" as table %}
@ -30,5 +25,5 @@
</h2>
{{ table.search(table_form, "table.groups") }}
</div>
{% include "core/partial/groups.html" %}
{% include "partial/groups.html" %}
{% endblock %}

View file

@ -1,10 +1,3 @@
{# The invitation form page.
Displays the invitation form to users with the invitation permission.
:param form: The invitation form.
:type form: :class:`~canaille.core.endpoints.forms.InvitationForm`
#}
{% extends theme('base.html') %}
{% import 'macro/form.html' as fui %}

View file

@ -1,14 +1,6 @@
{# The invitation acceptation page.
This page is displayed to users who have clicked on invitation links sent by mail (or by other media).
It displays a basic account creation form.
:param form: The account creation form.
:type form: :class:`~canaille.core.endpoints.forms.JoinForm`
#}
{% extends theme('base.html') %}
{% import 'macro/form.html' as fui %}
{% import 'core/partial/profile_field.html' as profile %}
{% import 'partial/profile_field.html' as profile %}
{%- block title -%}
{%- trans %}User creation{% endtrans -%}

View file

@ -1,14 +1,7 @@
{# The login page.
This page displays a form to get the user identifier.
:param form: The login form.
:type form: :class:`~canaille.core.endpoints.forms.LoginForm`
#}
{% extends theme('base.html') %}
{% import 'macro/flask.html' as flask %}
{% import 'macro/form.html' as fui %}
{% import 'core/partial/login_field.html' as login_field %}
{% import 'partial/login_field.html' as login_field %}
{% block container %}
<div class="ui container" hx-boost="false">

View file

@ -1,7 +1,7 @@
{% extends theme('base.html') %}
{% import 'macro/flask.html' as flask %}
{% import 'macro/form.html' as fui %}
{% import 'core/partial/login_field.html' as login_field %}
{% import 'partial/login_field.html' as login_field %}
{% block container %}
<div class="ui container">

View file

@ -1,14 +1,6 @@
{# User account creation page.
This template displays an account creation form.
It is used in the registration page, and in the manual account creation page available for users with *user management* permission.
:param form: The user creation form. Dynamically built according to the user :attr:`~canaille.core.configuration.ACLSettings.READ` and :attr:`~canaille.core.configuration.ACLSettings.WRITE` permissions. The available fields are those appearing in *READ* and *WRITE*, those only appearing in *READ* are read-only.
:type form: :class:`~flask_wtf.FlaskForm`
#}
{% extends theme('base.html') %}
{% import 'macro/form.html' as fui %}
{% import 'core/partial/profile_field.html' as profile %}
{% import 'partial/profile_field.html' as profile %}
{%- block title -%}
{%- trans %}User creation{% endtrans -%}

View file

@ -1,18 +1,6 @@
{# The profile edition template.
Displays a user profile edition form.
:param edited_user: The user that the form will edit.
:type edited_user: :class:`~canaille.core.models.User`
:param profile_form: The user profile edition form. Dynamically built according to the user :attr:`~canaille.core.configuration.ACLSettings.READ` and :attr:`~canaille.core.configuration.ACLSettings.WRITE` permissions. The available fields are those appearing in *READ* and *WRITE*, those only appearing in *READ* are read-only.
:type profile_form: :class:`~flask_wtf.FlaskForm`
:param emails_form: An email edition form. Used when the :attr:`~canaille.app.features.Features.has_email_confirmation` feature is enabled.
:type emails_form: :class:`~canaille.core.endpoints.forms.EmailConfirmationForm`
#}
{% extends theme('base.html') %}
{% import 'macro/form.html' as fui %}
{% import 'core/partial/profile_field.html' as profile %}
{% import 'partial/profile_field.html' as profile %}
{%- block title -%}
{% if not edited_user %}

View file

@ -1,14 +1,3 @@
{# The profile settings template.
Displays the user settings edition form.
:param edited_user: The user that the form will edit.
:type edited_user: :class:`~canaille.core.models.User`
:param form: The user profile edition form. Dynamically built according to the user :attr:`~canaille.core.configuration.ACLSettings.READ` and :attr:`~canaille.core.configuration.ACLSettings.WRITE` permissions. The available fields are those appearing in *READ* and *WRITE*, those only appearing in *READ* are read-only.
:type form: :class:`~flask_wtf.FlaskForm`
:param self_deletion: Whether the editor is allowed to delete the account of the edited user.
:type self_deletion: :class:`bool`
#}
{% extends theme('base.html') %}
{% import 'macro/form.html' as fui %}

View file

@ -1,14 +1,3 @@
{# The password reset template.
Displays a password reset form.
:param form: The password reset form.
:type form: :class:`~canaille.core.endpoints.forms.PasswordResetForm`
:param user: The user associated with the URL.
:type user: :class:`~canaille.core.models.User`
:param hash: The secret link hash.
:type hash: :class:`str`
#}
{% extends theme('base.html') %}
{% import 'macro/form.html' as fui %}
@ -26,8 +15,7 @@ Displays a password reset form.
{% endblock %}
<div class="ui attached clearing segment">
{{ fui.render_form(form, _("Password reset")) }}
{{ fui.render_form(form, _("Password reset"), action=url_for("core.auth.reset", user=user, hash=hash)) }}
</div>
</div>
{% endblock %}

View file

@ -1,18 +1,7 @@
{# The multi-factor authentication initialization template.
Display a QR-code and the OTP secret.
:param user: The user initializing the OTP.
:type user: :class:`~canaille.core.models.User`
:param secret: The OTP secret.
:type secret: :class:`str`
:param qr_image: A QR-code image representing the OTP secret.
:type qr_image: A base64 encoded :class:`str`
#}
{% extends theme('base.html') %}
{% import 'macro/flask.html' as flask %}
{% import 'macro/form.html' as fui %}
{% import 'core/partial/login_field.html' as login_field %}
{% import 'partial/login_field.html' as login_field %}
{% block container %}
<div class="ui container">
@ -29,7 +18,7 @@ Display a QR-code and the OTP secret.
<h2 class="ui center aligned header">
<div class="content">
{{ _("Sign in as %(username)s", username=user.user_name) }}
{{ _("Sign in as %(username)s", username=username) }}
</div>
<div class="sub header">{% trans %}Set up multi-factor authentication.{% endtrans %}</div>
</h2>

View file

@ -1,10 +1,3 @@
{# The users list.
Displays a paginated list of :class:`~canaille.core.models.User`.
:param table_form: The paginated list form.
:type table_form: :class:`~canaille.app.forms.TableForm` of :class:`~canaille.core.models.User`.
#}
{% extends theme('base.html') %}
{% import "macro/table.html" as table %}
@ -39,5 +32,5 @@ Displays a paginated list of :class:`~canaille.core.models.User`.
{{ table.search(table_form, "table.users") }}
</div>
{% include "core/partial/users.html" %}
{% include "partial/users.html" %}
{% endblock %}

View file

@ -1,18 +1,7 @@
{# The multi-factor authentication code verification template.
Displays a form that asks for the multi-factor authentication code.
:param form: The code verification form.
:type form: :class:`~canaille.core.endpoints.forms.TwoFactorForm`
:param username: The username of the user attempting to log-in.
:type username: :class:`str`
:param method: The authentication factor method.
:type method: :class:`str` (*TOTP*, *HOTP*, *EMAIL_OTP*, *SMS_OTP*)
#}
{% extends theme('base.html') %}
{% import 'macro/flask.html' as flask %}
{% import 'macro/form.html' as fui %}
{% import 'core/partial/login_field.html' as login_field %}
{% import 'partial/login_field.html' as login_field %}
{% block container %}
<div class="ui container">

View file

@ -1,74 +0,0 @@
import wtforms.form
import wtforms.validators
from flask import current_app
from canaille.app import models
from canaille.app.i18n import gettext
from canaille.app.i18n import lazy_gettext as _
from canaille.backends import Backend
def unique_user_name(form, field):
if Backend.instance.get(models.User, user_name=field.data) and (
not getattr(form, "user", None) or form.user.user_name != field.data
):
raise wtforms.ValidationError(
_("The user name '{user_name}' already exists").format(user_name=field.data)
)
def unique_email(form, field):
if Backend.instance.get(models.User, emails=field.data) and (
not getattr(form, "user", None) or field.data not in form.user.emails
):
raise wtforms.ValidationError(
_("The email '{email}' is already used").format(email=field.data)
)
def unique_group(form, field):
if Backend.instance.get(models.Group, display_name=field.data):
raise wtforms.ValidationError(
_("The group '{group}' already exists").format(group=field.data)
)
def existing_login(form, field):
if not current_app.config["CANAILLE"][
"HIDE_INVALID_LOGINS"
] and not Backend.instance.get_user_from_login(field.data):
raise wtforms.ValidationError(
_("The login '{login}' does not exist").format(login=field.data)
)
def existing_group_member(form, field):
if field.data is None:
raise wtforms.ValidationError(
gettext("The user you are trying to remove does not exist.")
)
if field.data not in form.group.members:
raise wtforms.ValidationError(
gettext(
"The user '{user}' has already been removed from the group '{group}'"
).format(user=field.data.formatted_name, group=form.group.display_name)
)
def non_empty_groups(form, field):
"""LDAP groups cannot be empty because groupOfNames.member is a MUST
attribute.
https://www.rfc-editor.org/rfc/rfc2256.html#section-7.10
"""
if not form.user:
return
for group in form.user.groups:
if len(group.members) == 1 and group not in field.data:
raise wtforms.ValidationError(
_(
"The group '{group}' cannot be removed, because it must have at least one user left."
).format(group=group.display_name)
)

View file

@ -1,26 +0,0 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

View file

@ -7,7 +7,7 @@ from . import oauth
from . import tokens
from . import well_known
bp = Blueprint("oidc", __name__)
bp = Blueprint("oidc", __name__, template_folder="../templates")
bp.register_blueprint(authorizations.bp)
bp.register_blueprint(clients.bp)

View file

@ -3,33 +3,33 @@ from flask import abort
from flask import request
from canaille.app import models
from canaille.app.flask import permissions_needed
from canaille.app.flask import render_htmx_template
from canaille.app.flask import user_needed
from canaille.app.forms import TableForm
from canaille.app.templating import render_template
from canaille.app.themes import render_template
bp = Blueprint("authorizations", __name__, url_prefix="/admin/authorization")
@bp.route("/", methods=["GET", "POST"])
@user_needed("manage_oidc")
@permissions_needed("manage_oidc")
def index(user):
table_form = TableForm(models.AuthorizationCode, formdata=request.form)
if request.form and request.form.get("page") and not table_form.validate():
abort(404)
return render_htmx_template(
"oidc/authorization_list.html",
"authorization_list.html",
menuitem="admin",
table_form=table_form,
)
@bp.route("/<authorizationcode:authorization>", methods=["GET", "POST"])
@user_needed("manage_oidc")
@permissions_needed("manage_oidc")
def view(user, authorization):
return render_template(
"oidc/authorization_view.html",
"authorization_view.html",
authorization=authorization,
menuitem="admin",
)

View file

@ -10,11 +10,11 @@ from flask import url_for
from werkzeug.security import gen_salt
from canaille.app import models
from canaille.app.flask import permissions_needed
from canaille.app.flask import render_htmx_template
from canaille.app.flask import user_needed
from canaille.app.forms import TableForm
from canaille.app.i18n import gettext as _
from canaille.app.templating import render_template
from canaille.app.themes import render_template
from canaille.backends import Backend
from .forms import ClientAddForm
@ -23,31 +23,31 @@ bp = Blueprint("clients", __name__, url_prefix="/admin/client")
@bp.route("/", methods=["GET", "POST"])
@user_needed("manage_oidc")
@permissions_needed("manage_oidc")
def index(user):
table_form = TableForm(models.Client, formdata=request.form)
if request.form and request.form.get("page") and not table_form.validate():
abort(404)
return render_htmx_template(
"oidc/client_list.html", menuitem="admin", table_form=table_form
"client_list.html", menuitem="admin", table_form=table_form
)
@bp.route("/add", methods=["GET", "POST"])
@user_needed("manage_oidc")
@permissions_needed("manage_oidc")
def add(user):
form = ClientAddForm(request.form or None)
if not request.form or form.form_control():
return render_template("oidc/client_add.html", form=form, menuitem="admin")
return render_template("client_add.html", form=form, menuitem="admin")
if not form.validate():
flash(
_("The client has not been added. Please check your information."),
"error",
)
return render_template("oidc/client_add.html", form=form, menuitem="admin")
return render_template("client_add.html", form=form, menuitem="admin")
client_id = gen_salt(24)
client_id_issued_at = datetime.datetime.now(datetime.timezone.utc)
@ -87,10 +87,10 @@ def add(user):
@bp.route("/edit/<client:client>", methods=["GET", "POST"])
@user_needed("manage_oidc")
@permissions_needed("manage_oidc")
def edit(user, client):
if request.form.get("action") == "confirm-delete":
return render_template("oidc/modals/delete-client.html", client=client)
return render_template("modals/delete-client.html", client=client)
if request.form and request.form.get("action") == "delete":
return client_delete(client)
@ -110,7 +110,7 @@ def client_edit(client):
if not request.form or form.form_control():
return render_template(
"oidc/client_edit.html", form=form, client=client, menuitem="admin"
"client_edit.html", form=form, client=client, menuitem="admin"
)
if not form.validate():
@ -119,7 +119,7 @@ def client_edit(client):
"error",
)
return render_template(
"oidc/client_edit.html", form=form, client=client, menuitem="admin"
"client_edit.html", form=form, client=client, menuitem="admin"
)
Backend.instance.update(

View file

@ -11,7 +11,7 @@ from flask import url_for
from canaille.app import models
from canaille.app.flask import user_needed
from canaille.app.i18n import gettext as _
from canaille.app.templating import render_template
from canaille.app.themes import render_template
from canaille.backends import Backend
from ..utils import SCOPE_DETAILS
@ -33,7 +33,7 @@ def consents(user):
)
return render_template(
"oidc/consent_list.html",
"consent_list.html",
consents=consents,
menuitem="consents",
scope_details=SCOPE_DETAILS,
@ -58,10 +58,9 @@ def pre_consents(user):
nb_preconsents = len(preconsented)
return render_template(
"oidc/preconsent_list.html",
"preconsent_list.html",
menuitem="consents",
scope_details=SCOPE_DETAILS,
# TODO: do not delegate this var to the templates, or set this explicitly in the templates.
ignored_scopes=["openid"],
preconsented=preconsented,
nb_consents=nb_consents,

View file

@ -18,7 +18,7 @@ class LogoutForm(Form):
answer = wtforms.SubmitField()
def _client_audiences():
def client_audiences():
return [
(client, client.client_name) for client in Backend.instance.query(models.Client)
]
@ -111,7 +111,7 @@ class ClientAddForm(Form):
audience = wtforms.SelectMultipleField(
_("Token audiences"),
validators=[wtforms.validators.Optional()],
choices=_client_audiences,
choices=client_audiences,
validate_choice=False,
coerce=IDToModel("Client"),
)

View file

@ -15,7 +15,6 @@ from flask import request
from flask import session
from flask import url_for
from werkzeug.datastructures import CombinedMultiDict
from werkzeug.exceptions import HTTPException
from canaille import csrf
from canaille.app import models
@ -23,7 +22,7 @@ from canaille.app.flask import set_parameter_in_url_query
from canaille.app.i18n import gettext as _
from canaille.app.session import current_user
from canaille.app.session import logout_user
from canaille.app.templating import render_template
from canaille.app.themes import render_template
from canaille.backends import Backend
from ..oauth import ClientConfigurationEndpoint
@ -43,14 +42,6 @@ from .well_known import openid_configuration
bp = Blueprint("endpoints", __name__, url_prefix="/oauth")
@bp.errorhandler(HTTPException)
def http_error_handler(error):
return {
"error": error.name.lower().replace(" ", "_"),
"error_description": error.description,
}, error.code
@bp.route("/authorize", methods=["GET", "POST"])
def authorize():
current_app.logger.debug(
@ -96,7 +87,7 @@ def authorize_guards(client):
):
return {
"error": "invalid_request",
"error_description": f"prompt '{request.args['prompt']}' value is not supported",
"error_description": f"prompt '{request.args['prompt'] }' value is not supported",
}, 400
@ -152,7 +143,7 @@ def authorize_consent(client, user):
form = AuthorizeForm(request.form or None)
return render_template(
"oidc/authorize.html",
"authorize.html",
user=user,
grant=grant,
client=client,
@ -316,7 +307,7 @@ def end_session():
or (data.get("logout_hint") and data["logout_hint"] != user.user_name)
) and not session.get("end_session_confirmation"):
session["end_session_data"] = data
return render_template("oidc/logout.html", form=form, client=client, menu=False)
return render_template("logout.html", form=form, client=client, menu=False)
if data.get("id_token_hint"):
try:
@ -367,9 +358,7 @@ def end_session():
"end_session_confirmation"
):
session["end_session_data"] = data
return render_template(
"oidc/logout.html", form=form, client=client, menu=False
)
return render_template("logout.html", form=form, client=client, menu=False)
logout_user()

View file

@ -7,11 +7,11 @@ from flask import flash
from flask import request
from canaille.app import models
from canaille.app.flask import permissions_needed
from canaille.app.flask import render_htmx_template
from canaille.app.flask import user_needed
from canaille.app.forms import TableForm
from canaille.app.i18n import gettext as _
from canaille.app.templating import render_template
from canaille.app.themes import render_template
from canaille.backends import Backend
from .forms import TokenRevokationForm
@ -20,25 +20,25 @@ bp = Blueprint("tokens", __name__, url_prefix="/admin/token")
@bp.route("/", methods=["GET", "POST"])
@user_needed("manage_oidc")
@permissions_needed("manage_oidc")
def index(user):
table_form = TableForm(models.Token, formdata=request.form)
if request.form and request.form.get("page") and not table_form.validate():
abort(404)
return render_htmx_template(
"oidc/token_list.html", menuitem="admin", table_form=table_form
"token_list.html", menuitem="admin", table_form=table_form
)
@bp.route("/<token:token>", methods=["GET", "POST"])
@user_needed("manage_oidc")
@permissions_needed("manage_oidc")
def view(user, token):
form = TokenRevokationForm(request.form or None)
if request.form and form.validate():
if request.form.get("action") == "confirm-revoke":
return render_template("oidc/modals/revoke-token.html", token=token)
return render_template("modals/revoke-token.html", token=token)
elif request.form.get("action") == "revoke":
token.revokation_date = datetime.datetime.now(datetime.timezone.utc)
@ -53,7 +53,7 @@ def view(user, token):
abort(400, f"bad form action: {request.form.get('action')}")
return render_template(
"oidc/token_view.html",
"token_view.html",
token=token,
menuitem="admin",
form=form,

View file

@ -394,18 +394,20 @@ class IntrospectionEndpoint(_IntrospectionEndpoint):
def introspect_token(self, token):
audience = [aud.client_id for aud in token.audience]
return {
response = {
"active": True,
"client_id": token.client.client_id,
"token_type": token.type,
"username": token.subject.formatted_name,
"scope": token.get_scope(),
"sub": token.subject.user_name,
"aud": audience,
"iss": get_issuer(),
"exp": token.get_expires_at(),
"iat": token.get_issued_at(),
}
if token.subject:
response["username"] = token.subject.formatted_name
response["sub"] = token.subject.user_name
return response
class ClientManagementMixin:

View file

@ -1,10 +1,3 @@
{# The list of authorizations.
Displays a paginated list of :class:`~canaille.oidc.basemodels.AuthorizationCode`.
:param table_form: The paginated list form.
:type table_form: :class:`~canaille.app.forms.TableForm` of :class:`~canaille.oidc.basemodels.AuthorizationCode`.
#}
{% extends theme('base.html') %}
{% import "macro/table.html" as table %}
@ -45,5 +38,5 @@ Displays a paginated list of :class:`~canaille.oidc.basemodels.AuthorizationCode
{{ table.search(table_form, "table.codes") }}
</div>
{% include "oidc/partial/authorization_list.html" %}
{% include "partial/authorization_list.html" %}
{% endblock %}

View file

@ -1,10 +1,3 @@
{# Authorization details template.
Displays details about an :class:`~canaille.oidc.basemodels.AuthorizationCode`.
:param authorization: The detailed authorization.
:type authorization: :class:`~canaille.oidc.basemodels.AuthorizationCode`.
#}
{% extends theme('base.html') %}
{%- block title -%}

View file

@ -1,13 +1,3 @@
{# Consent request template.
This templates is displayed to users accessing a new application that requests to access to personal information.
It lists the personal information requested by the application, and offers the user to accept or decline.
:param user: The user whose consent is asked.
:type user: :class:`~canaille.core.models.User`
:param grant: The OIDC grant.
:type grant: An Authlib ``Grant``
#}
{% extends theme('base.html') %}
{% import 'macro/form.html' as fui %}

View file

@ -1,10 +1,3 @@
{# The client addition template.
Displays a form to create a new :class:`~canaille.oidc.basemodels.Client`.
:param form: The client creation form.
:type form: :class:`~canaille.oidc.endpoints.forms.ClientAddForm`.
#}
{% extends theme('base.html') %}
{% import 'macro/form.html' as fui %}

View file

@ -1,12 +1,3 @@
{# The client edition template.
Displays a form to edit a :class:`~canaille.oidc.basemodels.Client`.
:param form: The client creation form.
:type form: :class:`~canaille.oidc.endpoints.forms.ClientAddForm`.
:param client: The edited client.
:type client: :class:`~canaille.oidc.basemodels.Client`
#}
{% extends theme('base.html') %}
{% import 'macro/form.html' as fui %}

View file

@ -1,10 +1,3 @@
{# The list of OIDC clients.
Displays a paginated list of :class:`~canaille.oidc.basemodels.Client`.
:param table_form: The paginated list form.
:type table_form: :class:`~canaille.app.forms.TableForm` of :class:`~canaille.oidc.basemodels.Client`.
#}
{% extends theme('base.html') %}
{% import "macro/table.html" as table %}
@ -45,5 +38,5 @@ Displays a paginated list of :class:`~canaille.oidc.basemodels.Client`.
{{ table.search(table_form, "table.clients") }}
</div>
{% include "oidc/partial/client_list.html" %}
{% include "partial/client_list.html" %}
{% endblock %}

Some files were not shown because too many files have changed in this diff Show more