Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CH-167 Async support on Django user attach middleware #789

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from django.contrib import admin
from django.contrib.auth.admin import UserAdmin, GroupAdmin
from django.contrib.auth.models import User, Group

import asyncio
from admin_extra_buttons.api import ExtraButtonsMixin, button
from .models import Member
from cloudharness_django.services import get_user_service
Expand All @@ -17,6 +17,20 @@ class MemberAdmin(admin.StackedInline):
model = Member


def run_coroutine(coroutine):
try:
loop = asyncio.get_running_loop()
except RuntimeError: # No running event loop
loop = None

if loop and loop.is_running():
# If the event loop is already running, create a task
return asyncio.create_task(coroutine)
else:
# If no event loop is running, run the coroutine using asyncio.run
return asyncio.run(coroutine)


class CHUserAdmin(ExtraButtonsMixin, UserAdmin):

inlines = [MemberAdmin]
Expand All @@ -31,8 +45,8 @@ def has_delete_permission(self, request, obj=None):
return settings.DEBUG or settings.USER_CHANGE_ENABLED

@button()
def sync_keycloak(self, request):
get_user_service().sync_kc_users_groups()
async def sync_keycloak(self, request):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Query

Do these sync_keycloak methods need to be coroutines? They don't await anything, so looks like they could be normal methods, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes probably a leftover

run_coroutine(get_user_service().sync_kc_users_groups())
self.message_user(request, 'Keycloak users & groups synced.')


Expand All @@ -48,8 +62,8 @@ def has_delete_permission(self, request, obj=None):
return settings.DEBUG

@button()
def sync_keycloak(self, request):
get_user_service().sync_kc_users_groups()
async def sync_keycloak(self, request):
run_coroutine(get_user_service().sync_kc_users_groups())
self.message_user(request, 'Keycloak users & groups synced.')


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
import jwt

from django.contrib.auth.models import User
from asgiref.sync import iscoroutinefunction
from django.utils.decorators import sync_and_async_middleware
from asgiref.sync import async_to_sync, iscoroutinefunction

from keycloak.exceptions import KeycloakGetError

Expand All @@ -11,7 +14,7 @@
from cloudharness import log


def _get_user():
async def _get_user():
bearer = get_authentication_token()
if bearer:
# found bearer token get the Django user
Expand All @@ -20,9 +23,9 @@ def _get_user():
payload = jwt.decode(token, algorithms=["RS256"], options={"verify_signature": False}, audience="web-client")
kc_id = payload["sub"]
try:
user = User.objects.get(member__kc_id=kc_id)
user = await User.objects.aget(member__kc_id=kc_id)
except User.DoesNotExist:
user = get_user_service().sync_kc_user(get_auth_service().get_auth_client().get_current_user())
user = await get_user_service().sync_kc_user(get_auth_service().get_auth_client().get_current_user())
return user
except KeycloakGetError:
# KC user not found
Expand All @@ -36,20 +39,31 @@ def _get_user():
return None


class BearerTokenMiddleware:
def __init__(self, get_response=None):
# One-time configuration and initialization.
self.get_response = get_response
@sync_and_async_middleware
def BearerTokenMiddleware(get_response=None):
# One-time configuration and initialization.
if iscoroutinefunction(get_response):
async def middleware(request):
if (not request.path.startswith("/static")) and getattr(getattr(request, "user", {}), "is_anonymous", True):
user = await _get_user()
if user:
# auto login, set the user
request.user = user
request._cached_user = user

def __call__(self, request):
if getattr(getattr(request, "user", {}), "is_anonymous", True):
user = _get_user()
if user:
# auto login, set the user
request.user = user
request._cached_user = user
response = await get_response(request)
return response
else:
def middleware(request):
if (not request.path.startswith("/static")) and getattr(getattr(request, "user", {}), "is_anonymous", True):
user = async_to_sync(_get_user)()
if user:
# auto login, set the user
request.user = user
request._cached_user = user

return self.get_response(request)
return get_response(request)
return middleware


class BearerTokenAuthentication:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
def get_auth_service():
global _auth_service
if not _auth_service:
raise KeycloakOIDCAuthServiceNotInitError("Auth Service not initialized")
init_services()
return _auth_service


def get_user_service():
global _user_service
if not _user_service:
raise KeycloakOIDUserServiceNotInitError("User Service not initialized")
init_services()
return _user_service


Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from cloudharness.applications import ConfigurationCallException

import asyncio
from django.conf import settings
from kafka.errors import TopicAlreadyExistsError

Expand All @@ -18,7 +18,7 @@ def __init__(self, kafka_group_id):
self.topics_initialized = False

@staticmethod
def event_handler(app, event_client, message):
async def event_handler(app, event_client, message):
resource = message["resource-type"]
operation = message["operation-type"]
resource_path = message["resource-path"].split("/")
Expand All @@ -32,20 +32,20 @@ def event_handler(app, event_client, message):

if resource == "GROUP":
kc_group = auth_client.get_group(resource_path[1])
user_service.sync_kc_group(kc_group)
await user_service.sync_kc_group(kc_group)
if resource == "USER":
kc_user = auth_client.get_user(resource_path[1])
user_service.sync_kc_user(kc_user, delete=operation == "DELETE")
await user_service.sync_kc_user(kc_user, delete=operation == "DELETE")
if resource == "CLIENT_ROLE_MAPPING":
# adding/deleting user client roles
# set/user user is_superuser
kc_user = auth_client.get_user(resource_path[1])
user_service.sync_kc_user(kc_user)
await user_service.sync_kc_user(kc_user)
if resource == "GROUP_MEMBERSHIP":
# adding / deleting users from groups, update the user
# updating the user will also update the user groups
kc_user = auth_client.get_user(resource_path[1])
user_service.sync_kc_user(kc_user)
await user_service.sync_kc_user(kc_user)
except Exception as e:
log.error(e)
raise e
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from django.contrib.auth.models import User, Group

from cloudharness_django.models import Team, Member
Expand Down Expand Up @@ -52,78 +53,78 @@ def update_team(self, group):
self.auth_client.update_group(group.team.kc_id, group.name)
return group

def add_user_to_team(self, user, team_name):
async def add_user_to_team(self, user, team_name):
# add a user from the group/team
group = Group.objects.get(name=team_name)
group = Group.objects.aget(name=team_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

await missing

kc_group_id = group.team.kc_id
kc_user_id = user.member.kc_id
self.auth_client.group_user_add(kc_user_id, kc_group_id)

def rm_user_from_team(self, user, team_name):
async def rm_user_from_team(self, user, team_name):
# delete a user from the group/team
group = Group.objects.get(name=team_name)
group = await Group.objects.aget(name=team_name)
kc_group_id = group.team.kc_id
kc_user_id = user.member.kc_id
self.auth_client.group_user_remove(kc_user_id, kc_group_id)

def sync_kc_group(self, kc_group):
async def sync_kc_group(self, kc_group):
# sync the kc group with the django group
try:
team = Team.objects.get(kc_id=kc_group["id"])
group, created = Group.objects.get_or_create(team=team)
team = await Team.objects.aget(kc_id=kc_group["id"])
group, created = await Group.objects.aget_or_create(team=team)
group.name = kc_group["name"]
except Team.DoesNotExist:
group, created = Group.objects.get_or_create(name=kc_group["name"])
group, created = await Group.objects.aget_or_create(name=kc_group["name"])
try:
# check if group has a team
team = group.team
except Exception as e:
# create the team
superusers = User.objects.filter(is_superuser=True)
if superusers and len(superusers) > 0:
team = Team.objects.create(
owner=superusers[0], # one of the superusers will be the default team owner
try:
superuser = User.objects.filter(is_superuser=True).afirst()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

await missing


team = await Team.objects.acreate(
owner=superuser, # one of the superusers will be the default team owner
kc_id=kc_group["id"],
group=group)
team.save()
group.save()
await team.asave()
except User.DoesNotExist as ex:
raise Exception("There is no superuser") from ex
await group.asave()

def sync_kc_groups(self, kc_groups=None):
async def sync_kc_groups(self, kc_groups=None):
# sync all groups
if not kc_groups:
kc_groups = self.auth_client.get_groups()
for kc_group in kc_groups:
self.sync_kc_group(kc_group)
await asyncio.gather(self.sync_kc_group(kc_group) for kc_group in kc_groups)

def sync_kc_user(self, kc_user, is_superuser=False, delete=False):
async def sync_kc_user(self, kc_user, is_superuser=False, delete=False):
# sync the kc user with the django user

user, created = User.objects.get_or_create(username=kc_user["username"])
user, created = await User.objects.aget_or_create(username=kc_user["username"])

member, created = Member.objects.get_or_create(user=user)
member.kc_id = kc_user["id"]
member.save()
await Member.objects.aget_or_create(user=user, kc_id=kc_user["id"])
user = self._map_kc_user(user, kc_user, is_superuser, delete)
user.save()
return user

def sync_kc_user_groups(self, kc_user):
async def sync_kc_user_groups(self, kc_user):
# Sync the user usergroups and memberships
user = User.objects.get(username=kc_user["email"])
user = await User.objects.aget(username=kc_user["email"])
user_groups = []
for kc_group in kc_user["userGroups"]:
user_groups += [Group.objects.get(name=kc_group["name"])]
user.groups.set(user_groups)
user.save()
user_groups += [await Group.objects.aget(name=kc_group["name"])]
await user.groups.aset(user_groups)
user.asave()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Required

Another missing await here


try:
if user.member.kc_id != kc_user["id"]:
user.member.kc_id = kc_user["id"]
except Member.DoesNotExist:
member = Member(user=user, kc_id=kc_user["id"])
member.save()
await member.asave()

def sync_kc_users_groups(self):
async def sync_kc_users_groups(self):
# cache all admin users to minimize KC rest api calls
all_admin_users = self.auth_client.get_client_role_members(
self.auth_service.get_client_name(),
Expand All @@ -134,11 +135,10 @@ def sync_kc_users_groups(self):
for kc_user in self.auth_client.get_users():
# check if user in all_admin_users
is_superuser = any([admin_user for admin_user in all_admin_users if admin_user["email"] == kc_user["email"]])
self.sync_kc_user(kc_user, is_superuser)
await self.sync_kc_user(kc_user, is_superuser)

# sync the groups
self.sync_kc_groups()
await self.sync_kc_groups()

# sync the user groups and memberships
for kc_user in self.auth_client.get_users():
self.sync_kc_user_groups(kc_user)
await asyncio.gather(self.sync_kc_user_groups(kc_user) for kc_user in self.auth_client.get_users())
50 changes: 26 additions & 24 deletions libraries/cloudharness-common/cloudharness/events/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import time
from typing import List, Generator
import logging

import asyncio
from asgiref.sync import iscoroutinefunction
from time import sleep
from cloudharness import json, dumps

Expand Down Expand Up @@ -241,14 +242,18 @@ def close(self):
# for now no cleanup tasks to do
pass

def _consume_task(self, app=None, group_id=None, handler=None):
async def _consume_task(self, app=None, group_id=None, handler=None):

log.info(f'Kafka consumer thread started, listening for messages in queue: {self.topic_id}')
while True:
try:
self.consumer = self._get_consumer(group_id)
for message in self.consumer:
try:
handler(event_client=self, app=app, message=message.value)
if iscoroutinefunction(handler):
await handler(event_client=self, app=app, message=message.value)
else:
handler(event_client=self, app=app, message=message.value)
except Exception as e:
log.error(f"Error during execution of the consumer Topic {self.topic_id} --> {e}", exc_info=True)
self.consumer.close()
Expand All @@ -262,28 +267,25 @@ def async_consume(self, app=None, handler=None, group_id='default'):
log.debug('get current object from app')
app = app._get_current_object()
self._consumer_thread = threading.Thread(
target=self._consume_task,
kwargs={'app': app,
'group_id': group_id,
'handler': handler})
target=asyncio.run(self._consume_task(app, group_id, handler))
)
self._consumer_thread.daemon = True
self._consumer_thread.start()
log.debug('thread started')


if __name__ == "__main__":
# creat the required os env variables
os.environ['CLOUDHARNESS_EVENTS_CLIENT_ID'] = env.get_cloudharness_events_client_id()
os.environ['CLOUDHARNESS_EVENTS_SERVICE'] = env.get_cloudharness_events_service()

# instantiate the client
client = EventClient('test-sync-op-results-qcwbc')

# create a topic from env variables
# print(client.create_topic())
# publish to the prev created topic
# print(client.produce({"message": "In God we trust, all others bring data..."}))
# read from the topic
print(client.consume_all('my-group'))
# delete the topic
# print(client.delete_topic())
if __name__ == "__main__":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Query
This has all been indented to be inline with the async_consume body (i.e. this will be part of the async_consume method, is that intended, or should the indentation be removed to be at the module level?

# creat the required os env variables
os.environ['CLOUDHARNESS_EVENTS_CLIENT_ID'] = env.get_cloudharness_events_client_id()
os.environ['CLOUDHARNESS_EVENTS_SERVICE'] = env.get_cloudharness_events_service()

# instantiate the client
client = EventClient('test-sync-op-results-qcwbc')

# create a topic from env variables
# print(client.create_topic())
# publish to the prev created topic
# print(client.produce({"message": "In God we trust, all others bring data..."}))
# read from the topic
print(client.consume_all('my-group'))
# delete the topic
# print(client.delete_topic())
Loading