Skip to content

Commit

Permalink
error when subscription is lost (#122)
Browse files Browse the repository at this point in the history
  • Loading branch information
nplasterer authored Sep 28, 2023
1 parent a562956 commit 5cd45da
Showing 1 changed file with 34 additions and 19 deletions.
53 changes: 34 additions & 19 deletions library/src/main/java/org/xmtp/android/library/Conversations.kt
Original file line number Diff line number Diff line change
Expand Up @@ -354,29 +354,44 @@ data class Conversations(
}

fun streamAllMessages(): Flow<DecodedMessage> = flow {
val topics: MutableList<String> =
mutableListOf(
while (true) {
val topics = mutableListOf(
Topic.userInvite(client.address).description,
Topic.userIntro(client.address).description
)
for (conversation in list()) {
topics.add(conversation.topic)
}
client.subscribe(topics).collect { envelope ->
var conversation = conversationsByTopic[envelope.contentTopic]
var decoded: DecodedMessage? = null
if (conversation != null) {
decoded = conversation.decodeOrNull(envelope)
} else if (envelope.contentTopic.startsWith("/xmtp/0/invite-")) {
conversation = fromInvite(envelope = envelope)
conversationsByTopic[conversation.topic] = conversation
} else if (envelope.contentTopic.startsWith("/xmtp/0/intro-")) {
conversation = fromIntro(envelope = envelope)
conversationsByTopic[conversation.topic] = conversation
decoded = conversation.decodeOrNull(envelope)

for (conversation in list()) {
topics.add(conversation.topic)
}
if (decoded != null) {
emit(decoded)

try {
client.subscribe(topics = topics).collect { envelope ->
when {
conversationsByTopic.containsKey(envelope.contentTopic) -> {
val conversation = conversationsByTopic[envelope.contentTopic]
val decoded = conversation?.decode(envelope)
decoded?.let { emit(it) }
}

envelope.contentTopic.startsWith("/xmtp/0/invite-") -> {
val conversation = fromInvite(envelope)
conversationsByTopic[conversation.topic] = conversation
// Break so we can resubscribe with the new conversation
return@collect
}

envelope.contentTopic.startsWith("/xmtp/0/intro-") -> {
val conversation = fromIntro(envelope)
conversationsByTopic[conversation.topic] = conversation
val decoded = conversation.decode(envelope)
emit(decoded)
// Break so we can resubscribe with the new conversation
return@collect
}
}
}
} catch (error: Exception) {
throw error
}
}
}
Expand Down

0 comments on commit 5cd45da

Please sign in to comment.