Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgres backend: handle connection loss #66

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

asafc
Copy link

@asafc asafc commented Jun 12, 2022

This bugfix handle a situation where broadcaster does not detect the Postgres backend terminating the connection.

A user of Broadcaster will not be able to handle disconnects in their code.

i.e: the next loop will not exit:

async def chatroom_ws_sender(websocket):
    async with broadcast.subscribe(channel="chatroom") as subscriber:
        async for event in subscriber:
            await websocket.send_text(event.message)

when either:

  1. the Postgres db is down
  2. Postgres terminated an idle session (long lived session) due to either idle_session_timeout or idle_in_transaction_session_timeout (see here)

@dariosky
Copy link

Hi @asafc - this was a good start.
It is missing the handling of the disconnection though. I ended up playing with this for a bit and change both the base and the postgres backend to reconnect and resubscribe to the channels so it works for my use-case.
Let me know if you want me to add to this merge-request.

@roekatz
Copy link

roekatz commented Mar 26, 2023

Hi @dariosky,
I was involved in this fix on our fork https://github.com/permitio/broadcaster.

I don't believe automatically reconnecting is a good idea: when connection is loss messages could be missed,
therefor the caller should be notified - any initialization required to catch up with potentially missing updates.

On our use case - disconnecting (marked by ending the iterator) is more than enough (then the caller can reconnect).
It's also possible to have some on_reconnect callback, so caller could be notified while connection is re-established - but IMO it doesn't really add a lot of value :)

WDYT?

@dariosky
Copy link

Having an on_reconnect parameter is a good idea - so the caller can handle the eventual gap-filling...
In my case I didn't need it - I had just a reconnect on-error loop and was keeping the memory of the subscriptions to restore them when the reconnection happened...

@roekatz
Copy link

roekatz commented Mar 27, 2023

Not sure I understand - have you pushed that fix? (or plan on pushing it?)

@dariosky
Copy link

I didn't (I asked above if it can be useful) - let me add it here :)

@dariosky
Copy link

See here: permitio#4

@alex-oleshkevich alex-oleshkevich mentioned this pull request Apr 3, 2024
19 tasks
@logankaser logankaser mentioned this pull request Jun 12, 2024
for queue in list(self._subscribers.get(event.channel, [])):
await queue.put(event)

# Ubsubscribe all
for queue in sum([list(qs) for qs in self._subscribers.values()], []):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# Unsubscribe all
for queue_set in self._subscribers.values():
    for queue in queue_set:
        await queue.put(None)

Seems like a lot less wasteful / clear what's going on.
As far as I know, you can loop over sets directly in python, no need
to convert to a list.

@alex-oleshkevich
Copy link
Member

This requires extra testing. Will not include in 0.3

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants