Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscribe2 #226

Closed
wants to merge 11 commits into from
Closed

Subscribe2 #226

wants to merge 11 commits into from

Conversation

nplasterer
Copy link
Contributor

@nplasterer nplasterer commented Jan 29, 2024

WIP
Improvements to subscribeAllMessages use the subscription to update the stream instead of breaking and resubscribing.

All the tests pass but I think we just don't have great testing around this.

@nplasterer nplasterer self-assigned this Jan 29, 2024
@nplasterer nplasterer requested a review from neekolas January 31, 2024 05:56
@nplasterer nplasterer marked this pull request as ready for review January 31, 2024 05:56
@nplasterer nplasterer requested a review from a team as a code owner January 31, 2024 05:56

while(true) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure where this while loop should actually live. 🤔

if let conversation = conversationsByTopic[envelope.contentTopic] {
let decoded = try conversation.decode(envelope)
continuation.yield(decoded)
} else if envelope.contentTopic.hasPrefix("/xmtp/0/invite-") {
let conversation = try fromInvite(envelope: envelope)
conversationsByTopic[conversation.topic] = conversation
break // Break so we can resubscribe with the new conversation
topics.append(conversation.topic)
try await subscription.update(req: client.makeSubscribeRequest(topics: topics).toFFI)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this update method want the entire list of topics or just the new topics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is also blocking from the for loop continuing..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It takes the entire list of topics


func subscribe(topics: [String]) -> AsyncThrowingStream<Envelope, Error> {
func subscribe(topics: [String]) -> AsyncThrowingStream<(envelope: Envelope, subscription: LibXMTP.FfiV2Subscription), Error> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A Touple probably isn't the best idea to solve this. But the simplest for now.

do {
for try await envelope in client.subscribe(topics: topics) {
for try await (envelope, subscription) in client.subscribe(topics: topics) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried a scenario where we returned the subscription independent of this for loop but the await function never completed. It may be my lack of knowledge of iOS threads/streams. But you can see that work in the previous commit history.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zombieobject I've got some iOS threading questions for you next week 🙏

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nplasterer Thanks for looping me in, I'll have a look. 👨🏼‍🚒

@nplasterer
Copy link
Contributor Author

@fabriguespe the tests definitely pass but I'm suspicious. Do you have an example you're running to test this? Can we try it on this branch?

@nplasterer
Copy link
Contributor Author

Okay I was able to reproduce that this isn't fixing the stream entirely. I've got an issue with subcription.update blocking the thread from continueing and when I put it in a task it doesn't actually update. You can see the failure in this test

https://github.com/xmtp/xmtp-ios/blob/main/Tests/XMTPTests/IntegrationTests.swift#L248-L277

@nplasterer
Copy link
Contributor Author

This may be a red herring. If everyone thinks this improvement is futile I'm happy to abandon this effort.

Copy link
Contributor

@neekolas neekolas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved for whenever you think this is ready to go

@nplasterer
Copy link
Contributor Author

@zombieobject and I paired on this today and decided that this probably isn't worth our time right now to fix. I think the real fix will likely be in the rust layer changing the current streaming interface to a callback interface like the V3 work does. Since this works fine for now and we have other high priorities going to pause this work.

@nplasterer nplasterer closed this Mar 13, 2024
@nplasterer nplasterer deleted the np/update-subscribe2 branch March 13, 2024 22:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants