diff --git a/library/src/main/java/org/xmtp/android/library/Conversations.kt b/library/src/main/java/org/xmtp/android/library/Conversations.kt index 39303c853..3ef969b96 100644 --- a/library/src/main/java/org/xmtp/android/library/Conversations.kt +++ b/library/src/main/java/org/xmtp/android/library/Conversations.kt @@ -354,29 +354,44 @@ data class Conversations( } fun streamAllMessages(): Flow = flow { - val topics: MutableList = - mutableListOf( + while (true) { + val topics = mutableListOf( Topic.userInvite(client.address).description, Topic.userIntro(client.address).description ) - for (conversation in list()) { - topics.add(conversation.topic) - } - client.subscribe(topics).collect { envelope -> - var conversation = conversationsByTopic[envelope.contentTopic] - var decoded: DecodedMessage? = null - if (conversation != null) { - decoded = conversation.decodeOrNull(envelope) - } else if (envelope.contentTopic.startsWith("/xmtp/0/invite-")) { - conversation = fromInvite(envelope = envelope) - conversationsByTopic[conversation.topic] = conversation - } else if (envelope.contentTopic.startsWith("/xmtp/0/intro-")) { - conversation = fromIntro(envelope = envelope) - conversationsByTopic[conversation.topic] = conversation - decoded = conversation.decodeOrNull(envelope) + + for (conversation in list()) { + topics.add(conversation.topic) } - if (decoded != null) { - emit(decoded) + + try { + client.subscribe(topics = topics).collect { envelope -> + when { + conversationsByTopic.containsKey(envelope.contentTopic) -> { + val conversation = conversationsByTopic[envelope.contentTopic] + val decoded = conversation?.decode(envelope) + decoded?.let { emit(it) } + } + + envelope.contentTopic.startsWith("/xmtp/0/invite-") -> { + val conversation = fromInvite(envelope) + conversationsByTopic[conversation.topic] = conversation + // Break so we can resubscribe with the new conversation + return@collect + } + + envelope.contentTopic.startsWith("/xmtp/0/intro-") -> { + val conversation = fromIntro(envelope) + conversationsByTopic[conversation.topic] = conversation + val decoded = conversation.decode(envelope) + emit(decoded) + // Break so we can resubscribe with the new conversation + return@collect + } + } + } + } catch (error: Exception) { + throw error } } }