diff --git a/oidc.py b/oidc.py index 46f60bed..f808d5bb 100644 --- a/oidc.py +++ b/oidc.py @@ -1,41 +1,32 @@ -import os import json import requests -from requests.auth import HTTPBasicAuth -import secrets import time -from django.core import serializers +from django.views.generic import View +from rest_framework_simplejwt.tokens import RefreshToken from mozilla_django_oidc.auth import OIDCAuthenticationBackend from authlib.jose import jwt -from django.http import HttpResponse from django.conf import settings -from django.shortcuts import render +from django.utils.decorators import method_decorator from django.views.decorators.csrf import csrf_exempt -from django.contrib.auth import login -from django.http import JsonResponse, HttpResponse +from django.http import JsonResponse from django.contrib.auth import get_user_model UserModel = get_user_model() -KEY_FILE = "/home/arusakov/devel/c2c/extract/geoshop-back/user_key.json" -ZITADEL_OIDC_DOMAIN = "https://geoshop-demo-syazg0.zitadel.cloud" -ZITADEL_INTROSPECT_URL = f"{ZITADEL_OIDC_DOMAIN}/oauth/v2/introspect" - def status(request): - return { - "OIDC_ENABLED": settings.OIDC_ENABLED, - } + return {"OIDC_ENABLED": settings.OIDC_ENABLED} -def updateUser(user, claims): +def _updateUser(user, claims): user.email = claims.get("email") user.first_name = claims.get("given_name") user.last_name = claims.get("family_name") return user -def private_key(): - with open(KEY_FILE, "r") as f: + +def _read_private_key(keyfile): + with open(keyfile, "r") as f: data = json.load(f) return { "client_id": data["clientId"], @@ -43,59 +34,75 @@ def private_key(): "private_key": data["key"], } -def get_jwt_token(oidc_domain: str): - prk = private_key() - payload = { - "iss": prk["client_id"], - "sub": prk["client_id"], - "aud": oidc_domain, - "exp": int(time.time() + 3600), - "iat": int(time.time()), - } - return jwt.encode( - {"alg": "RS256", "kid": prk["key_id"]}, payload, prk["private_key"] - ) - -@csrf_exempt -def validate_token(request): - # Handle JSON error - # TODO: Test missing id token - # TODO: Localize error response - # TODO: Test same token multiple times - data = json.loads(request.body.decode("utf-8")) - if "token" not in data: - return JsonResponse({"error": "No token provided"}, status=400) - - resp = requests.post( - ZITADEL_INTROSPECT_URL, - headers={"Content-Type": "application/x-www-form-urlencoded"}, - data={ - "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", - "client_assertion": get_jwt_token(ZITADEL_OIDC_DOMAIN), - "token": data["token"], - }, - ) - resp.raise_for_status() - user_data = json.loads(resp.content) - - UserModel = get_user_model() - user = UserModel.objects.get(username=user_data["email"]) - if not user: - user = UserModel.objects.create_user(username=user_data["email"]) - updateUser(user, user_data) - user.save() - - return JsonResponse({"user": {"id": user.id, "username": user.username}}) + +class FrontendAuthentication(View): + + def __init__(self): + super().__init__() + self.private_key = _read_private_key(settings.OIDC_PRIVATE_KEYFILE) + + def _get_jwt_token(self): + return jwt.encode( + {"alg": "RS256", "kid": self.private_key["key_id"]}, + { + "iss": self.private_key["client_id"], + "sub": self.private_key["client_id"], + "aud": settings.OIDC_OP_BASE_URL, + "exp": int(time.time() + 3600), + "iat": int(time.time()), + }, + self.private_key["private_key"]) + + def _resolve_user_data(self, token: str): + resp = requests.post( + settings.OIDC_INTROSPECT_URL, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + data={ + "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", + "client_assertion": self._get_jwt_token(), + "token": token, + }, + ) + resp.raise_for_status() + return json.loads(resp.content) + + @method_decorator(csrf_exempt) + def dispatch(self, request, *args, **kwargs): + return super().dispatch(request, *args, **kwargs) + + def post(self, request): + # Handle JSON error + # TODO: Test missing id token + # TODO: Localize error response + # TODO: Test same token multiple times + token_data = json.loads(request.body.decode("utf-8")) + if "token" not in token_data: + return JsonResponse({"error": "No token provided"}, status=400) + + user_data = self._resolve_user_data(token_data["token"]) + user = UserModel.objects.get(username=user_data["email"]) + if not user: + user = UserModel.objects.create_user(username=user_data["email"]) + _updateUser(user, user_data) + user.save() + + token = RefreshToken.for_user(user) + return JsonResponse({"access": str(token.access_token), "refresh": str(token)}) class PermissionBackend(OIDCAuthenticationBackend): + def authenticate_header(self, request): + # TODO: Test if header exists + token = request.headers["Authorization"].replace("Bearer ", "") + return token + def create_user(self, claims): user = self.UserModel.objects.create_user(username=claims.get("email")) - updateUser(user, claims) + _updateUser(user, claims) user.save() return user def update_user(self, user, claims): - updateUser(user, claims).save() + _updateUser(user, claims).save() return user