-
Notifications
You must be signed in to change notification settings - Fork 23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
make v2 streaming like v3 #868
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
pub async fn update(&self, req: FfiV2SubscribeRequest) -> Result<(), GenericError> { | ||
let mut sub = self.inner_subscription.lock().await; | ||
sub.update(req.into()).await?; | ||
self.tx.send(req).await.map_err(|_| GenericError::Generic { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the pro/con of sending it through via mpsc, compared to calling it on the subscription directly like in the previous implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good question, other than the usual edge-casey footguns with mpsc,
I think the biggest downside is the extra memory overhead for the channel itself, which in this case is bounded to 10 updates (chosen pretty arbitrarily). Since it is bounded, if update
is called continually like in a loop we might lose some updates after reaching the limit on the channel. This shouldn't really be an issue, though, since the caller should be putting the topics in a Vec<>
to the channel FfiV2SubscribeRequest
type anyway. The limit is needed to avoid a infinitely growing channel which, if unbounded, would lead to high memory use if update
is misused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense - I guess more specifically, what would be the drawback of storing the subscription directly on FFIV2Subscription, and directly updating it here as in the old implementation, versus the new approach of storing the tx and sending a message over it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By storing the subscription locally in the task, we can avoid using a Mutex
to access the subscription. This has a few benefits, and it more generally/slightly resembles the actor
model of concurrency rather than locking:
- It's easier to reason about what's happening to the subscription, because all the events are handled in one event loop the task is spawning
- we avoid any footguns we might run into with
Mutex's
, which lets us avoid a class of errors that occurs withMutexes
, which makesupdate
more difficult to misuse outside of libxmtp. We still useMutexes
a bunch which is fine, esp considering this is with uniffi, but more mutexes means more potential for deadlocks and mutex contention. Mutexes are more performant, though, but I doubt using a channel vs mutex will make a huge diff in that direction here - if we want to modify the subscription in other ways, can extend it by just adding new events to the loop on the same channel with an
enum
or something, instead of adding another mutex for that resource
I guess its a bit arbitrary, when writing streams I do default to this strategy rather than Mutex
's, unless absolutely necessary for performance or to satisfy the borrow checker w.r.t uniffi or the like
fixes #859