diff --git a/fastapi_websocket_pubsub/event_broadcaster.py b/fastapi_websocket_pubsub/event_broadcaster.py index 966ce7c..4aa2d55 100644 --- a/fastapi_websocket_pubsub/event_broadcaster.py +++ b/fastapi_websocket_pubsub/event_broadcaster.py @@ -222,9 +222,6 @@ async def __aenter__(self): return await self._context_manager.__aenter__() async def __aexit__(self, exc_type, exc, tb): - if self.listening_broadcast_channel is not None: - await self.listening_broadcast_channel.disconnect() - self.listening_broadcast_channel = None await self._context_manager.__aexit__(exc_type, exc, tb) async def start_reader_task(self): @@ -268,40 +265,45 @@ async def __read_notifications__(self): read incoming broadcasts and posting them to the intreal notifier """ logger.debug("Starting broadcaster listener") - # Subscribe to our channel - async with self.listening_broadcast_channel.subscribe( - channel=self._channel - ) as subscriber: - async for event in subscriber: - try: - notification = BroadcastNotification.parse_raw(event.message) - # Avoid re-publishing our own broadcasts - if notification.notifier_id != self._id: - logger.debug( - "Handling incoming broadcast event: {}".format( - { - "topics": notification.topics, - "src": notification.notifier_id, - } + try: + # Subscribe to our channel + async with self.listening_broadcast_channel.subscribe( + channel=self._channel + ) as subscriber: + async for event in subscriber: + try: + notification = BroadcastNotification.parse_raw(event.message) + # Avoid re-publishing our own broadcasts + if notification.notifier_id != self._id: + logger.debug( + "Handling incoming broadcast event: {}".format( + { + "topics": notification.topics, + "src": notification.notifier_id, + } + ) ) - ) - # Notify subscribers of message received from broadcast - task = asyncio.create_task( - self._notifier.notify( - notification.topics, - notification.data, - notifier_id=self._id, + # Notify subscribers of message received from broadcast + task = asyncio.create_task( + self._notifier.notify( + notification.topics, + notification.data, + notifier_id=self._id, + ) ) - ) - self._tasks.add(task) + self._tasks.add(task) - def cleanup(task): - self._tasks.remove(task) + def cleanup(task): + self._tasks.remove(task) - task.add_done_callback(cleanup) - except: - logger.exception("Failed handling incoming broadcast") - logger.info( - "No more events to read from subscriber (underlying connection closed)" - ) + task.add_done_callback(cleanup) + except: + logger.exception("Failed handling incoming broadcast") + logger.info( + "No more events to read from subscriber (underlying connection closed)" + ) + finally: + if self.listening_broadcast_channel is not None: + await self.listening_broadcast_channel.disconnect() + self.listening_broadcast_channel = None