Skip to content

Commit

Permalink
Rewritten token resolver as class
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrey Rusakov committed Sep 26, 2024
1 parent ff8a248 commit 1764745
Showing 1 changed file with 70 additions and 63 deletions.
133 changes: 70 additions & 63 deletions oidc.py
Original file line number Diff line number Diff line change
@@ -1,101 +1,108 @@
import os
import json
import requests
from requests.auth import HTTPBasicAuth
import secrets
import time

from django.core import serializers
from django.views.generic import View
from rest_framework_simplejwt.tokens import RefreshToken
from mozilla_django_oidc.auth import OIDCAuthenticationBackend
from authlib.jose import jwt
from django.http import HttpResponse
from django.conf import settings
from django.shortcuts import render
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
from django.contrib.auth import login
from django.http import JsonResponse, HttpResponse
from django.http import JsonResponse
from django.contrib.auth import get_user_model

UserModel = get_user_model()

KEY_FILE = "/home/arusakov/devel/c2c/extract/geoshop-back/user_key.json"
ZITADEL_OIDC_DOMAIN = "https://geoshop-demo-syazg0.zitadel.cloud"
ZITADEL_INTROSPECT_URL = f"{ZITADEL_OIDC_DOMAIN}/oauth/v2/introspect"

def status(request):
return {
"OIDC_ENABLED": settings.OIDC_ENABLED,
}
return {"OIDC_ENABLED": settings.OIDC_ENABLED}


def updateUser(user, claims):
def _updateUser(user, claims):
user.email = claims.get("email")
user.first_name = claims.get("given_name")
user.last_name = claims.get("family_name")
return user

def private_key():
with open(KEY_FILE, "r") as f:

def _read_private_key(keyfile):
with open(keyfile, "r") as f:
data = json.load(f)
return {
"client_id": data["clientId"],
"key_id": data["keyId"],
"private_key": data["key"],
}

def get_jwt_token(oidc_domain: str):
prk = private_key()
payload = {
"iss": prk["client_id"],
"sub": prk["client_id"],
"aud": oidc_domain,
"exp": int(time.time() + 3600),
"iat": int(time.time()),
}
return jwt.encode(
{"alg": "RS256", "kid": prk["key_id"]}, payload, prk["private_key"]
)

@csrf_exempt
def validate_token(request):
# Handle JSON error
# TODO: Test missing id token
# TODO: Localize error response
# TODO: Test same token multiple times
data = json.loads(request.body.decode("utf-8"))
if "token" not in data:
return JsonResponse({"error": "No token provided"}, status=400)

resp = requests.post(
ZITADEL_INTROSPECT_URL,
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
"client_assertion": get_jwt_token(ZITADEL_OIDC_DOMAIN),
"token": data["token"],
},
)
resp.raise_for_status()
user_data = json.loads(resp.content)

UserModel = get_user_model()
user = UserModel.objects.get(username=user_data["email"])
if not user:
user = UserModel.objects.create_user(username=user_data["email"])
updateUser(user, user_data)
user.save()

return JsonResponse({"user": {"id": user.id, "username": user.username}})

class FrontendAuthentication(View):

def __init__(self):
super().__init__()
self.private_key = _read_private_key(settings.OIDC_PRIVATE_KEYFILE)

def _get_jwt_token(self):
return jwt.encode(
{"alg": "RS256", "kid": self.private_key["key_id"]},
{
"iss": self.private_key["client_id"],
"sub": self.private_key["client_id"],
"aud": settings.OIDC_OP_BASE_URL,
"exp": int(time.time() + 3600),
"iat": int(time.time()),
},
self.private_key["private_key"])

def _resolve_user_data(self, token: str):
resp = requests.post(
settings.OIDC_INTROSPECT_URL,
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
"client_assertion": self._get_jwt_token(),
"token": token,
},
)
resp.raise_for_status()
return json.loads(resp.content)

@method_decorator(csrf_exempt)
def dispatch(self, request, *args, **kwargs):
return super().dispatch(request, *args, **kwargs)

def post(self, request):
# Handle JSON error
# TODO: Test missing id token
# TODO: Localize error response
# TODO: Test same token multiple times
token_data = json.loads(request.body.decode("utf-8"))
if "token" not in token_data:
return JsonResponse({"error": "No token provided"}, status=400)

user_data = self._resolve_user_data(token_data["token"])
user = UserModel.objects.get(username=user_data["email"])
if not user:
user = UserModel.objects.create_user(username=user_data["email"])
_updateUser(user, user_data)
user.save()

token = RefreshToken.for_user(user)
return JsonResponse({"access": str(token.access_token), "refresh": str(token)})


class PermissionBackend(OIDCAuthenticationBackend):

def authenticate_header(self, request):
# TODO: Test if header exists
token = request.headers["Authorization"].replace("Bearer ", "")
return token

def create_user(self, claims):
user = self.UserModel.objects.create_user(username=claims.get("email"))
updateUser(user, claims)
_updateUser(user, claims)
user.save()
return user

def update_user(self, user, claims):
updateUser(user, claims).save()
_updateUser(user, claims).save()
return user

0 comments on commit 1764745

Please sign in to comment.