Skip to content

Commit

Permalink
CH-167 Async support on Django related events
Browse files Browse the repository at this point in the history
  • Loading branch information
filippomc committed Dec 17, 2024
1 parent 8ab3502 commit b9e3a1d
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from django.contrib import admin
from django.contrib.auth.admin import UserAdmin, GroupAdmin
from django.contrib.auth.models import User, Group

import asyncio
from admin_extra_buttons.api import ExtraButtonsMixin, button
from .models import Member
from cloudharness_django.services import get_user_service
Expand All @@ -17,6 +17,20 @@ class MemberAdmin(admin.StackedInline):
model = Member


def run_coroutine(coroutine):
try:
loop = asyncio.get_running_loop()
except RuntimeError: # No running event loop
loop = None

if loop and loop.is_running():
# If the event loop is already running, create a task
return asyncio.create_task(coroutine)
else:
# If no event loop is running, run the coroutine using asyncio.run
return asyncio.run(coroutine)


class CHUserAdmin(ExtraButtonsMixin, UserAdmin):

inlines = [MemberAdmin]
Expand All @@ -31,8 +45,8 @@ def has_delete_permission(self, request, obj=None):
return settings.DEBUG or settings.USER_CHANGE_ENABLED

@button()
def sync_keycloak(self, request):
get_user_service().sync_kc_users_groups()
async def sync_keycloak(self, request):
run_coroutine(get_user_service().sync_kc_users_groups())
self.message_user(request, 'Keycloak users & groups synced.')


Expand All @@ -48,8 +62,8 @@ def has_delete_permission(self, request, obj=None):
return settings.DEBUG

@button()
def sync_keycloak(self, request):
get_user_service().sync_kc_users_groups()
async def sync_keycloak(self, request):
run_coroutine(get_user_service().sync_kc_users_groups())
self.message_user(request, 'Keycloak users & groups synced.')


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from django.contrib.auth.models import User
from asgiref.sync import iscoroutinefunction
from django.utils.decorators import sync_and_async_middleware
from asgiref.sync import async_to_sync, iscoroutinefunction

from keycloak.exceptions import KeycloakGetError

Expand All @@ -13,32 +14,7 @@
from cloudharness import log


def _get_user():
bearer = get_authentication_token()
if bearer:
# found bearer token get the Django user
try:
token = bearer.split(" ")[-1]
payload = jwt.decode(token, algorithms=["RS256"], options={"verify_signature": False}, audience="web-client")
kc_id = payload["sub"]
try:
user = User.objects.get(member__kc_id=kc_id)
except User.DoesNotExist:
user = get_user_service().sync_kc_user(get_auth_service().get_auth_client().get_current_user())
return user
except KeycloakGetError:
# KC user not found
return None
except InvalidToken:
return None
except Exception as e:
log.exception("User mapping error, %s", payload["email"])
return None

return None


async def _aget_user():
async def _get_user():
bearer = get_authentication_token()
if bearer:
# found bearer token get the Django user
Expand All @@ -49,7 +25,7 @@ async def _aget_user():
try:
user = await User.objects.aget(member__kc_id=kc_id)
except User.DoesNotExist:
user = await get_user_service().async_kc_user(get_auth_service().get_auth_client().get_current_user())
user = await get_user_service().sync_kc_user(get_auth_service().get_auth_client().get_current_user())
return user
except KeycloakGetError:
# KC user not found
Expand All @@ -69,7 +45,7 @@ def BearerTokenMiddleware(get_response=None):
if iscoroutinefunction(get_response):
async def middleware(request):
if (not request.path.startswith("/static")) and getattr(getattr(request, "user", {}), "is_anonymous", True):
user = await _aget_user()
user = await _get_user()
if user:
# auto login, set the user
request.user = user
Expand All @@ -80,7 +56,7 @@ async def middleware(request):
else:
def middleware(request):
if (not request.path.startswith("/static")) and getattr(getattr(request, "user", {}), "is_anonymous", True):
user = _get_user()
user = async_to_sync(_get_user)()
if user:
# auto login, set the user
request.user = user
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
def get_auth_service():
global _auth_service
if not _auth_service:
raise KeycloakOIDCAuthServiceNotInitError("Auth Service not initialized")
init_services()
return _auth_service


def get_user_service():
global _user_service
if not _user_service:
raise KeycloakOIDUserServiceNotInitError("User Service not initialized")
init_services()
return _user_service


Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from cloudharness.applications import ConfigurationCallException

import asyncio
from django.conf import settings
from kafka.errors import TopicAlreadyExistsError

Expand All @@ -18,7 +18,7 @@ def __init__(self, kafka_group_id):
self.topics_initialized = False

@staticmethod
def event_handler(app, event_client, message):
async def event_handler(app, event_client, message):
resource = message["resource-type"]
operation = message["operation-type"]
resource_path = message["resource-path"].split("/")
Expand All @@ -32,20 +32,20 @@ def event_handler(app, event_client, message):

if resource == "GROUP":
kc_group = auth_client.get_group(resource_path[1])
user_service.sync_kc_group(kc_group)
await user_service.sync_kc_group(kc_group)
if resource == "USER":
kc_user = auth_client.get_user(resource_path[1])
user_service.sync_kc_user(kc_user, delete=operation == "DELETE")
await user_service.sync_kc_user(kc_user, delete=operation == "DELETE")
if resource == "CLIENT_ROLE_MAPPING":
# adding/deleting user client roles
# set/user user is_superuser
kc_user = auth_client.get_user(resource_path[1])
user_service.sync_kc_user(kc_user)
await user_service.sync_kc_user(kc_user)
if resource == "GROUP_MEMBERSHIP":
# adding / deleting users from groups, update the user
# updating the user will also update the user groups
kc_user = auth_client.get_user(resource_path[1])
user_service.sync_kc_user(kc_user)
await user_service.sync_kc_user(kc_user)
except Exception as e:
log.error(e)
raise e
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from django.contrib.auth.models import User, Group

from cloudharness_django.models import Team, Member
Expand Down Expand Up @@ -52,62 +53,52 @@ def update_team(self, group):
self.auth_client.update_group(group.team.kc_id, group.name)
return group

def add_user_to_team(self, user, team_name):
async def add_user_to_team(self, user, team_name):
# add a user from the group/team
group = Group.objects.get(name=team_name)
group = Group.objects.aget(name=team_name)
kc_group_id = group.team.kc_id
kc_user_id = user.member.kc_id
self.auth_client.group_user_add(kc_user_id, kc_group_id)

def rm_user_from_team(self, user, team_name):
async def rm_user_from_team(self, user, team_name):
# delete a user from the group/team
group = Group.objects.get(name=team_name)
group = await Group.objects.aget(name=team_name)
kc_group_id = group.team.kc_id
kc_user_id = user.member.kc_id
self.auth_client.group_user_remove(kc_user_id, kc_group_id)

def sync_kc_group(self, kc_group):
async def sync_kc_group(self, kc_group):
# sync the kc group with the django group
try:
team = Team.objects.get(kc_id=kc_group["id"])
group, created = Group.objects.get_or_create(team=team)
team = await Team.objects.aget(kc_id=kc_group["id"])
group, created = await Group.objects.aget_or_create(team=team)
group.name = kc_group["name"]
except Team.DoesNotExist:
group, created = Group.objects.get_or_create(name=kc_group["name"])
group, created = await Group.objects.aget_or_create(name=kc_group["name"])
try:
# check if group has a team
team = group.team
except Exception as e:
# create the team
superusers = User.objects.filter(is_superuser=True)
if superusers and len(superusers) > 0:
team = Team.objects.create(
owner=superusers[0], # one of the superusers will be the default team owner
try:
superuser = User.objects.filter(is_superuser=True).afirst()

team = await Team.objects.acreate(
owner=superuser, # one of the superusers will be the default team owner
kc_id=kc_group["id"],
group=group)
team.save()
group.save()
await team.asave()
except User.DoesNotExist as ex:
raise Exception("There is no superuser") from ex
await group.asave()

def sync_kc_groups(self, kc_groups=None):
async def sync_kc_groups(self, kc_groups=None):
# sync all groups
if not kc_groups:
kc_groups = self.auth_client.get_groups()
for kc_group in kc_groups:
self.sync_kc_group(kc_group)

def sync_kc_user(self, kc_user, is_superuser=False, delete=False):
# sync the kc user with the django user

user, created = User.objects.get_or_create(username=kc_user["username"])

member, created = Member.objects.get_or_create(user=user)
member.kc_id = kc_user["id"]
member.save()
user = self._map_kc_user(user, kc_user, is_superuser, delete)
user.save()
return user
await asyncio.gather(self.sync_kc_group(kc_group) for kc_group in kc_groups)

async def async_kc_user(self, kc_user, is_superuser=False, delete=False):
async def sync_kc_user(self, kc_user, is_superuser=False, delete=False):
# sync the kc user with the django user

user, created = await User.objects.aget_or_create(username=kc_user["username"])
Expand All @@ -117,23 +108,23 @@ async def async_kc_user(self, kc_user, is_superuser=False, delete=False):
user.save()
return user

def sync_kc_user_groups(self, kc_user):
async def sync_kc_user_groups(self, kc_user):
# Sync the user usergroups and memberships
user = User.objects.get(username=kc_user["email"])
user = await User.objects.aget(username=kc_user["email"])
user_groups = []
for kc_group in kc_user["userGroups"]:
user_groups += [Group.objects.get(name=kc_group["name"])]
user.groups.set(user_groups)
user.save()
user_groups += [await Group.objects.aget(name=kc_group["name"])]
await user.groups.aset(user_groups)
user.asave()

try:
if user.member.kc_id != kc_user["id"]:
user.member.kc_id = kc_user["id"]
except Member.DoesNotExist:
member = Member(user=user, kc_id=kc_user["id"])
member.save()
await member.asave()

def sync_kc_users_groups(self):
async def sync_kc_users_groups(self):
# cache all admin users to minimize KC rest api calls
all_admin_users = self.auth_client.get_client_role_members(
self.auth_service.get_client_name(),
Expand All @@ -144,11 +135,10 @@ def sync_kc_users_groups(self):
for kc_user in self.auth_client.get_users():
# check if user in all_admin_users
is_superuser = any([admin_user for admin_user in all_admin_users if admin_user["email"] == kc_user["email"]])
self.sync_kc_user(kc_user, is_superuser)
await self.sync_kc_user(kc_user, is_superuser)

# sync the groups
self.sync_kc_groups()
await self.sync_kc_groups()

# sync the user groups and memberships
for kc_user in self.auth_client.get_users():
self.sync_kc_user_groups(kc_user)
await asyncio.gather(self.sync_kc_user_groups(kc_user) for kc_user in self.auth_client.get_users())
50 changes: 26 additions & 24 deletions libraries/cloudharness-common/cloudharness/events/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import time
from typing import List, Generator
import logging

import asyncio
from asgiref.sync import iscoroutinefunction
from time import sleep
from cloudharness import json, dumps

Expand Down Expand Up @@ -241,14 +242,18 @@ def close(self):
# for now no cleanup tasks to do
pass

def _consume_task(self, app=None, group_id=None, handler=None):
async def _consume_task(self, app=None, group_id=None, handler=None):

log.info(f'Kafka consumer thread started, listening for messages in queue: {self.topic_id}')
while True:
try:
self.consumer = self._get_consumer(group_id)
for message in self.consumer:
try:
handler(event_client=self, app=app, message=message.value)
if iscoroutinefunction(handler):
await handler(event_client=self, app=app, message=message.value)
else:
handler(event_client=self, app=app, message=message.value)
except Exception as e:
log.error(f"Error during execution of the consumer Topic {self.topic_id} --> {e}", exc_info=True)
self.consumer.close()
Expand All @@ -262,28 +267,25 @@ def async_consume(self, app=None, handler=None, group_id='default'):
log.debug('get current object from app')
app = app._get_current_object()
self._consumer_thread = threading.Thread(
target=self._consume_task,
kwargs={'app': app,
'group_id': group_id,
'handler': handler})
target=asyncio.run(self._consume_task(app, group_id, handler))
)
self._consumer_thread.daemon = True
self._consumer_thread.start()
log.debug('thread started')


if __name__ == "__main__":
# creat the required os env variables
os.environ['CLOUDHARNESS_EVENTS_CLIENT_ID'] = env.get_cloudharness_events_client_id()
os.environ['CLOUDHARNESS_EVENTS_SERVICE'] = env.get_cloudharness_events_service()

# instantiate the client
client = EventClient('test-sync-op-results-qcwbc')

# create a topic from env variables
# print(client.create_topic())
# publish to the prev created topic
# print(client.produce({"message": "In God we trust, all others bring data..."}))
# read from the topic
print(client.consume_all('my-group'))
# delete the topic
# print(client.delete_topic())
if __name__ == "__main__":
# creat the required os env variables
os.environ['CLOUDHARNESS_EVENTS_CLIENT_ID'] = env.get_cloudharness_events_client_id()
os.environ['CLOUDHARNESS_EVENTS_SERVICE'] = env.get_cloudharness_events_service()

# instantiate the client
client = EventClient('test-sync-op-results-qcwbc')

# create a topic from env variables
# print(client.create_topic())
# publish to the prev created topic
# print(client.produce({"message": "In God we trust, all others bring data..."}))
# read from the topic
print(client.consume_all('my-group'))
# delete the topic
# print(client.delete_topic())

0 comments on commit b9e3a1d

Please sign in to comment.