Skip to content

Commit

Permalink
Fix EventBroadcaster change to keep original behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
roekatz committed Jun 27, 2024
1 parent 95fded0 commit 212aafa
Showing 1 changed file with 37 additions and 35 deletions.
72 changes: 37 additions & 35 deletions fastapi_websocket_pubsub/event_broadcaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,6 @@ async def __aenter__(self):
return await self._context_manager.__aenter__()

async def __aexit__(self, exc_type, exc, tb):
if self.listening_broadcast_channel is not None:
await self.listening_broadcast_channel.disconnect()
self.listening_broadcast_channel = None
await self._context_manager.__aexit__(exc_type, exc, tb)

async def start_reader_task(self):
Expand Down Expand Up @@ -268,40 +265,45 @@ async def __read_notifications__(self):
read incoming broadcasts and posting them to the intreal notifier
"""
logger.debug("Starting broadcaster listener")
# Subscribe to our channel
async with self.listening_broadcast_channel.subscribe(
channel=self._channel
) as subscriber:
async for event in subscriber:
try:
notification = BroadcastNotification.parse_raw(event.message)
# Avoid re-publishing our own broadcasts
if notification.notifier_id != self._id:
logger.debug(
"Handling incoming broadcast event: {}".format(
{
"topics": notification.topics,
"src": notification.notifier_id,
}
try:
# Subscribe to our channel
async with self.listening_broadcast_channel.subscribe(
channel=self._channel
) as subscriber:
async for event in subscriber:
try:
notification = BroadcastNotification.parse_raw(event.message)
# Avoid re-publishing our own broadcasts
if notification.notifier_id != self._id:
logger.debug(
"Handling incoming broadcast event: {}".format(
{
"topics": notification.topics,
"src": notification.notifier_id,
}
)
)
)
# Notify subscribers of message received from broadcast
task = asyncio.create_task(
self._notifier.notify(
notification.topics,
notification.data,
notifier_id=self._id,
# Notify subscribers of message received from broadcast
task = asyncio.create_task(
self._notifier.notify(
notification.topics,
notification.data,
notifier_id=self._id,
)
)
)

self._tasks.add(task)
self._tasks.add(task)

def cleanup(task):
self._tasks.remove(task)
def cleanup(task):
self._tasks.remove(task)

task.add_done_callback(cleanup)
except:
logger.exception("Failed handling incoming broadcast")
logger.info(
"No more events to read from subscriber (underlying connection closed)"
)
task.add_done_callback(cleanup)
except:
logger.exception("Failed handling incoming broadcast")
logger.info(
"No more events to read from subscriber (underlying connection closed)"
)
finally:
if self.listening_broadcast_channel is not None:
await self.listening_broadcast_channel.disconnect()
self.listening_broadcast_channel = None

0 comments on commit 212aafa

Please sign in to comment.