Use private/public keys to sign JWTs

This commit is contained in:
Éloi Rivard 2020-08-28 16:07:39 +02:00
parent 1f225e92bf
commit 0ae8a5a0f5
8 changed files with 76 additions and 25 deletions

3
.gitignore vendored
View file

@ -16,3 +16,6 @@ dist
python-ldap-test* python-ldap-test*
conf/oauth-authorization-server.json conf/oauth-authorization-server.json
conf/openid-configuration.json conf/openid-configuration.json
conf/*.pem
conf/*.pub
conf/*.key

View file

@ -25,9 +25,10 @@ USER_FILTER = "(|(uid={login})(cn={login}))"
ADMIN_FILTER = "cn=Jane Doe" ADMIN_FILTER = "cn=Jane Doe"
[JWT] [JWT]
KEY = "secret-key" PUBLIC_KEY = "conf/public.pem"
ALG = "HS256" PRIVATE_KEY = "conf/private.pem"
ISS = "http://mydomain.tld" KTY = "RSA"
ALG = "RS256"
EXP = 3600 EXP = 3600
[JWT.MAPPING] [JWT.MAPPING]

View file

@ -9,7 +9,7 @@
["client_secret_basic", "private_key_jwt", ["client_secret_basic", "private_key_jwt",
"client_secret_post", "none"], "client_secret_post", "none"],
"token_endpoint_auth_signing_alg_values_supported": "token_endpoint_auth_signing_alg_values_supported":
["RS256", "ES256"], ["RS256"],
"userinfo_endpoint": "userinfo_endpoint":
"https://mydomain.tld/oauth/userinfo", "https://mydomain.tld/oauth/userinfo",
"check_session_iframe": "check_session_iframe":

View file

@ -3,6 +3,9 @@ import ldap.ldapobject
import os import os
import pytest import pytest
import slapdtest import slapdtest
from cryptography.hazmat.primitives import serialization as crypto_serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend as crypto_default_backend
from flask_webtest import TestApp from flask_webtest import TestApp
from werkzeug.security import gen_salt from werkzeug.security import gen_salt
from web import create_app from web import create_app
@ -28,6 +31,37 @@ class CustomSlapdObject(slapdtest.SlapdObject):
return config return config
@pytest.fixture(scope="session")
def keypair():
key = rsa.generate_private_key(
backend=crypto_default_backend(), public_exponent=65537, key_size=2048
)
private_key = key.private_bytes(
crypto_serialization.Encoding.PEM,
crypto_serialization.PrivateFormat.PKCS8,
crypto_serialization.NoEncryption(),
)
public_key = key.public_key().public_bytes(
crypto_serialization.Encoding.OpenSSH, crypto_serialization.PublicFormat.OpenSSH
)
return private_key, public_key
@pytest.fixture
def keypair_path(keypair, tmp_path):
private_key, public_key = keypair
private_key_path = os.path.join(tmp_path, "private.pem")
with open(private_key_path, "wb") as fd:
fd.write(private_key)
public_key_path = os.path.join(tmp_path, "public.pem")
with open(public_key_path, "wb") as fd:
fd.write(public_key)
return private_key_path, public_key_path
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def slapd_server(): def slapd_server():
slapd = CustomSlapdObject() slapd = CustomSlapdObject()
@ -75,15 +109,15 @@ def slapd_connection(slapd_server):
@pytest.fixture @pytest.fixture
def app(slapd_server): def app(slapd_server, keypair_path):
os.environ["AUTHLIB_INSECURE_TRANSPORT"] = "true" os.environ["AUTHLIB_INSECURE_TRANSPORT"] = "true"
private_key_path, public_key_path = keypair_path
app = create_app( app = create_app(
{ {
"SECRET_KEY": gen_salt(24), "SECRET_KEY": gen_salt(24),
"OAUTH2_METADATA_FILE": "conf/oauth-authorization-server.sample.json", "OAUTH2_METADATA_FILE": "conf/oauth-authorization-server.sample.json",
"OIDC_METADATA_FILE": "conf/openid-configuration.sample.json", "OIDC_METADATA_FILE": "conf/openid-configuration.sample.json",
"LDAP": { "LDAP": {
"ROOT_DN": slapd_server.suffix, "ROOT_DN": slapd_server.suffix,
"URI": slapd_server.ldap_uri, "URI": slapd_server.ldap_uri,
@ -93,9 +127,10 @@ def app(slapd_server):
"ADMIN_FILTER": "uid=admin", "ADMIN_FILTER": "uid=admin",
}, },
"JWT": { "JWT": {
"KEY": "secret-key", "PUBLIC_KEY": public_key_path,
"ALG": "HS256", "PRIVATE_KEY": private_key_path,
"ISS": "https://mydomain.tld", "ALG": "RS256",
"KTY": "RSA",
"EXP": 3600, "EXP": 3600,
"MAPPING": { "MAPPING": {
"SUB": "uid", "SUB": "uid",

View file

@ -42,7 +42,7 @@ def test_oauth_hybrid(testclient, slapd_connection, user, client):
assert {"foo": "bar"} == res.json assert {"foo": "bar"} == res.json
def test_oidc_hybrid(testclient, slapd_connection, logged_user, client): def test_oidc_hybrid(testclient, slapd_connection, logged_user, client, keypair):
res = testclient.get( res = testclient.get(
"/oauth/authorize", "/oauth/authorize",
params=dict( params=dict(
@ -69,7 +69,7 @@ def test_oidc_hybrid(testclient, slapd_connection, logged_user, client):
assert token is not None assert token is not None
id_token = params["id_token"][0] id_token = params["id_token"][0]
claims = jwt.decode(id_token, "secret-key") claims = jwt.decode(id_token, keypair[1])
assert logged_user.uid[0] == claims["sub"] assert logged_user.uid[0] == claims["sub"]
assert logged_user.cn[0] == claims["name"] assert logged_user.cn[0] == claims["name"]
assert [client.oauthClientID] == claims["aud"] assert [client.oauthClientID] == claims["aud"]

View file

@ -47,7 +47,7 @@ def test_oauth_implicit(testclient, slapd_connection, user, client):
client.save(slapd_connection) client.save(slapd_connection)
def test_oidc_implicit(testclient, slapd_connection, user, client): def test_oidc_implicit(testclient, keypair, slapd_connection, user, client):
client.oauthGrantType = ["token id_token"] client.oauthGrantType = ["token id_token"]
client.oauthTokenEndpointAuthMethod = "none" client.oauthTokenEndpointAuthMethod = "none"
@ -83,7 +83,7 @@ def test_oidc_implicit(testclient, slapd_connection, user, client):
assert token is not None assert token is not None
id_token = params["id_token"][0] id_token = params["id_token"][0]
claims = jwt.decode(id_token, "secret-key") claims = jwt.decode(id_token, keypair[1])
assert user.uid[0] == claims["sub"] assert user.uid[0] == claims["sub"]
assert user.cn[0] == claims["name"] assert user.cn[0] == claims["name"]
assert [client.oauthClientID] == claims["aud"] assert [client.oauthClientID] == claims["aud"]

View file

@ -1,4 +1,4 @@
from authlib.common.encoding import urlsafe_b64encode from authlib.jose import jwk
from authlib.oauth2 import OAuth2Error from authlib.oauth2 import OAuth2Error
from flask import Blueprint, request, session, redirect from flask import Blueprint, request, session, redirect
from flask import render_template, jsonify, flash, current_app from flask import render_template, jsonify, flash, current_app
@ -71,8 +71,19 @@ def revoke_token():
@bp.route("/jwks.json") @bp.route("/jwks.json")
def jwks(): def jwks():
# TODO: Do not share secrets here! with open(current_app.config["JWT"]["PUBLIC_KEY"]) as fd:
key = urlsafe_b64encode(current_app.config["JWT"]["KEY"].encode("utf-8")).decode( pubkey = fd.read()
"utf-8"
obj = jwk.dumps(pubkey, current_app.config["JWT"]["KTY"])
return jsonify(
{
"keys": [
{
"kid": None,
"use": "sig",
"alg": current_app.config["JWT"]["ALG"],
**obj,
}
]
}
) )
return jsonify({"keys": [{"kid": None, "kty": "oct", "k": key}]})

View file

@ -28,12 +28,13 @@ def exists_nonce(nonce, req):
def get_jwt_config(grant): def get_jwt_config(grant):
return { with open(current_app.config["JWT"]["PRIVATE_KEY"]) as pk:
"key": current_app.config["JWT"]["KEY"], return {
"alg": current_app.config["JWT"]["ALG"], "key": pk.read(),
"iss": current_app.config["JWT"]["ISS"], "alg": current_app.config["JWT"]["ALG"],
"exp": current_app.config["JWT"]["EXP"], "iss": authorization.metadata["issuer"],
} "exp": current_app.config["JWT"]["EXP"],
}
def generate_user_info(user, scope): def generate_user_info(user, scope):
@ -253,7 +254,7 @@ class IntrospectionEndpoint(_IntrospectionEndpoint):
"scope": token.get_scope(), "scope": token.get_scope(),
"sub": token.oauthSubject, "sub": token.oauthSubject,
"aud": token.oauthClientID, "aud": token.oauthClientID,
"iss": current_app.config["JWT"]["ISS"], "iss": authorization.metadata["issuer"],
"exp": token.get_expires_at(), "exp": token.get_expires_at(),
"iat": token.get_issued_at(), "iat": token.get_issued_at(),
} }