-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CH-167 Async support on Django user attach middleware #789
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
import asyncio | ||
from django.contrib.auth.models import User, Group | ||
|
||
from cloudharness_django.models import Team, Member | ||
|
@@ -52,78 +53,78 @@ def update_team(self, group): | |
self.auth_client.update_group(group.team.kc_id, group.name) | ||
return group | ||
|
||
def add_user_to_team(self, user, team_name): | ||
async def add_user_to_team(self, user, team_name): | ||
# add a user from the group/team | ||
group = Group.objects.get(name=team_name) | ||
group = Group.objects.aget(name=team_name) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. await missing |
||
kc_group_id = group.team.kc_id | ||
kc_user_id = user.member.kc_id | ||
self.auth_client.group_user_add(kc_user_id, kc_group_id) | ||
|
||
def rm_user_from_team(self, user, team_name): | ||
async def rm_user_from_team(self, user, team_name): | ||
# delete a user from the group/team | ||
group = Group.objects.get(name=team_name) | ||
group = await Group.objects.aget(name=team_name) | ||
kc_group_id = group.team.kc_id | ||
kc_user_id = user.member.kc_id | ||
self.auth_client.group_user_remove(kc_user_id, kc_group_id) | ||
|
||
def sync_kc_group(self, kc_group): | ||
async def sync_kc_group(self, kc_group): | ||
# sync the kc group with the django group | ||
try: | ||
team = Team.objects.get(kc_id=kc_group["id"]) | ||
group, created = Group.objects.get_or_create(team=team) | ||
team = await Team.objects.aget(kc_id=kc_group["id"]) | ||
group, created = await Group.objects.aget_or_create(team=team) | ||
group.name = kc_group["name"] | ||
except Team.DoesNotExist: | ||
group, created = Group.objects.get_or_create(name=kc_group["name"]) | ||
group, created = await Group.objects.aget_or_create(name=kc_group["name"]) | ||
try: | ||
# check if group has a team | ||
team = group.team | ||
except Exception as e: | ||
# create the team | ||
superusers = User.objects.filter(is_superuser=True) | ||
if superusers and len(superusers) > 0: | ||
team = Team.objects.create( | ||
owner=superusers[0], # one of the superusers will be the default team owner | ||
try: | ||
superuser = User.objects.filter(is_superuser=True).afirst() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. await missing |
||
|
||
team = await Team.objects.acreate( | ||
owner=superuser, # one of the superusers will be the default team owner | ||
kc_id=kc_group["id"], | ||
group=group) | ||
team.save() | ||
group.save() | ||
await team.asave() | ||
except User.DoesNotExist as ex: | ||
raise Exception("There is no superuser") from ex | ||
await group.asave() | ||
|
||
def sync_kc_groups(self, kc_groups=None): | ||
async def sync_kc_groups(self, kc_groups=None): | ||
# sync all groups | ||
if not kc_groups: | ||
kc_groups = self.auth_client.get_groups() | ||
for kc_group in kc_groups: | ||
self.sync_kc_group(kc_group) | ||
await asyncio.gather(self.sync_kc_group(kc_group) for kc_group in kc_groups) | ||
|
||
def sync_kc_user(self, kc_user, is_superuser=False, delete=False): | ||
async def sync_kc_user(self, kc_user, is_superuser=False, delete=False): | ||
# sync the kc user with the django user | ||
|
||
user, created = User.objects.get_or_create(username=kc_user["username"]) | ||
user, created = await User.objects.aget_or_create(username=kc_user["username"]) | ||
|
||
member, created = Member.objects.get_or_create(user=user) | ||
member.kc_id = kc_user["id"] | ||
member.save() | ||
await Member.objects.aget_or_create(user=user, kc_id=kc_user["id"]) | ||
user = self._map_kc_user(user, kc_user, is_superuser, delete) | ||
user.save() | ||
return user | ||
|
||
def sync_kc_user_groups(self, kc_user): | ||
async def sync_kc_user_groups(self, kc_user): | ||
# Sync the user usergroups and memberships | ||
user = User.objects.get(username=kc_user["email"]) | ||
user = await User.objects.aget(username=kc_user["email"]) | ||
user_groups = [] | ||
for kc_group in kc_user["userGroups"]: | ||
user_groups += [Group.objects.get(name=kc_group["name"])] | ||
user.groups.set(user_groups) | ||
user.save() | ||
user_groups += [await Group.objects.aget(name=kc_group["name"])] | ||
await user.groups.aset(user_groups) | ||
user.asave() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Required Another missing await here |
||
|
||
try: | ||
if user.member.kc_id != kc_user["id"]: | ||
user.member.kc_id = kc_user["id"] | ||
except Member.DoesNotExist: | ||
member = Member(user=user, kc_id=kc_user["id"]) | ||
member.save() | ||
await member.asave() | ||
|
||
def sync_kc_users_groups(self): | ||
async def sync_kc_users_groups(self): | ||
# cache all admin users to minimize KC rest api calls | ||
all_admin_users = self.auth_client.get_client_role_members( | ||
self.auth_service.get_client_name(), | ||
|
@@ -134,11 +135,10 @@ def sync_kc_users_groups(self): | |
for kc_user in self.auth_client.get_users(): | ||
# check if user in all_admin_users | ||
is_superuser = any([admin_user for admin_user in all_admin_users if admin_user["email"] == kc_user["email"]]) | ||
self.sync_kc_user(kc_user, is_superuser) | ||
await self.sync_kc_user(kc_user, is_superuser) | ||
|
||
# sync the groups | ||
self.sync_kc_groups() | ||
await self.sync_kc_groups() | ||
|
||
# sync the user groups and memberships | ||
for kc_user in self.auth_client.get_users(): | ||
self.sync_kc_user_groups(kc_user) | ||
await asyncio.gather(self.sync_kc_user_groups(kc_user) for kc_user in self.auth_client.get_users()) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,7 +4,8 @@ | |
import time | ||
from typing import List, Generator | ||
import logging | ||
|
||
import asyncio | ||
from asgiref.sync import iscoroutinefunction | ||
from time import sleep | ||
from cloudharness import json, dumps | ||
|
||
|
@@ -241,14 +242,18 @@ def close(self): | |
# for now no cleanup tasks to do | ||
pass | ||
|
||
def _consume_task(self, app=None, group_id=None, handler=None): | ||
async def _consume_task(self, app=None, group_id=None, handler=None): | ||
|
||
log.info(f'Kafka consumer thread started, listening for messages in queue: {self.topic_id}') | ||
while True: | ||
try: | ||
self.consumer = self._get_consumer(group_id) | ||
for message in self.consumer: | ||
try: | ||
handler(event_client=self, app=app, message=message.value) | ||
if iscoroutinefunction(handler): | ||
await handler(event_client=self, app=app, message=message.value) | ||
else: | ||
handler(event_client=self, app=app, message=message.value) | ||
except Exception as e: | ||
log.error(f"Error during execution of the consumer Topic {self.topic_id} --> {e}", exc_info=True) | ||
self.consumer.close() | ||
|
@@ -262,28 +267,25 @@ def async_consume(self, app=None, handler=None, group_id='default'): | |
log.debug('get current object from app') | ||
app = app._get_current_object() | ||
self._consumer_thread = threading.Thread( | ||
target=self._consume_task, | ||
kwargs={'app': app, | ||
'group_id': group_id, | ||
'handler': handler}) | ||
target=asyncio.run(self._consume_task(app, group_id, handler)) | ||
) | ||
self._consumer_thread.daemon = True | ||
self._consumer_thread.start() | ||
log.debug('thread started') | ||
|
||
|
||
if __name__ == "__main__": | ||
# creat the required os env variables | ||
os.environ['CLOUDHARNESS_EVENTS_CLIENT_ID'] = env.get_cloudharness_events_client_id() | ||
os.environ['CLOUDHARNESS_EVENTS_SERVICE'] = env.get_cloudharness_events_service() | ||
|
||
# instantiate the client | ||
client = EventClient('test-sync-op-results-qcwbc') | ||
|
||
# create a topic from env variables | ||
# print(client.create_topic()) | ||
# publish to the prev created topic | ||
# print(client.produce({"message": "In God we trust, all others bring data..."})) | ||
# read from the topic | ||
print(client.consume_all('my-group')) | ||
# delete the topic | ||
# print(client.delete_topic()) | ||
if __name__ == "__main__": | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Query |
||
# creat the required os env variables | ||
os.environ['CLOUDHARNESS_EVENTS_CLIENT_ID'] = env.get_cloudharness_events_client_id() | ||
os.environ['CLOUDHARNESS_EVENTS_SERVICE'] = env.get_cloudharness_events_service() | ||
|
||
# instantiate the client | ||
client = EventClient('test-sync-op-results-qcwbc') | ||
|
||
# create a topic from env variables | ||
# print(client.create_topic()) | ||
# publish to the prev created topic | ||
# print(client.produce({"message": "In God we trust, all others bring data..."})) | ||
# read from the topic | ||
print(client.consume_all('my-group')) | ||
# delete the topic | ||
# print(client.delete_topic()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Query
Do these sync_keycloak methods need to be coroutines? They don't await anything, so looks like they could be normal methods, right?