-
Notifications
You must be signed in to change notification settings - Fork 1
/
oidc.py
112 lines (91 loc) · 3.62 KB
/
oidc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import json
import requests
import time
from django.views.generic import View
from rest_framework_simplejwt.tokens import RefreshToken
from mozilla_django_oidc.auth import OIDCAuthenticationBackend
from authlib.jose import jwt
from django.conf import settings
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
from django.http import JsonResponse
from django.contrib.auth import get_user_model
UserModel = get_user_model()
def status(request):
return {"OIDC_ENABLED": settings.OIDC_ENABLED}
def _updateUser(user, claims):
user.email = claims.get("email")
user.first_name = claims.get("given_name")
user.last_name = claims.get("family_name")
return user
def _read_private_key(keyfile):
with open(keyfile, "r") as f:
data = json.load(f)
return {
"client_id": data["clientId"],
"key_id": data["keyId"],
"private_key": data["key"],
}
class FrontendAuthentication(View):
def __init__(self):
super().__init__()
self.private_key = _read_private_key(settings.OIDC_PRIVATE_KEYFILE)
def _get_jwt_token(self):
return jwt.encode(
{"alg": "RS256", "kid": self.private_key["key_id"]},
{
"iss": self.private_key["client_id"],
"sub": self.private_key["client_id"],
"aud": settings.OIDC_OP_BASE_URL,
"exp": int(time.time() + 3600),
"iat": int(time.time()),
},
self.private_key["private_key"])
def _resolve_user_data(self, token: str):
resp = requests.post(
settings.OIDC_INTROSPECT_URL,
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
"client_assertion": self._get_jwt_token(),
"token": token,
},
)
resp.raise_for_status()
return json.loads(resp.content)
@method_decorator(csrf_exempt)
def dispatch(self, request, *args, **kwargs):
return super().dispatch(request, *args, **kwargs)
def post(self, request):
# Handle JSON error
# TODO: Test missing id token
# TODO: Localize error response
# TODO: Test same token multiple times
# TODO: Test user does not exist
# TODO: Test user exists, but unauthrized
# TODO: Test user exists and authorized
token_data = json.loads(request.body.decode("utf-8"))
if "token" not in token_data:
return JsonResponse({"error": "No token provided"}, status=400)
user_data = self._resolve_user_data(token_data["token"])
try:
user = UserModel.objects.get(username=user_data["email"])
except UserModel.DoesNotExist:
user = UserModel.objects.create_user(username=user_data["email"])
_updateUser(user, user_data)
user.save()
token = RefreshToken.for_user(user)
return JsonResponse({"access": str(token.access_token), "refresh": str(token)})
class PermissionBackend(OIDCAuthenticationBackend):
def authenticate_header(self, request):
# TODO: Test if header exists
token = request.headers["Authorization"].replace("Bearer ", "")
return token
def create_user(self, claims):
user = self.UserModel.objects.create_user(username=claims.get("email"))
_updateUser(user, claims)
user.save()
return user
def update_user(self, user, claims):
_updateUser(user, claims).save()
return user