Skip to content

Commit

Permalink
add streaming messages to groups
Browse files Browse the repository at this point in the history
  • Loading branch information
nplasterer committed Jan 26, 2024
1 parent 093c5e4 commit 29c9e07
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ class GroupTest {
group.send("howdy")
group.send("gm")
runBlocking { group.sync() }
assertEquals(group.messages().last().body, "gm")
assertEquals(group.messages().last().body, "howdy")
assertEquals(group.messages().size, 2)

val sameGroup = alixClient.conversations.listGroups().last()
assertEquals(sameGroup.messages().size, 2)
assertEquals(sameGroup.messages().last().body, "howdy")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ sealed class Conversation {
return when (this) {
is V1 -> conversationV1.streamMessages()
is V2 -> conversationV2.streamMessages()
is Group -> flowOf(group.messages().last()) // TODO fix this
is Group -> group.streamMessages()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ data class Conversations(

fun listGroups(): List<Group> {
return runBlocking {
syncGroups()
libXMTPConversations?.list(opts = FfiListConversationsOptions(null, null, null))?.map {
Group(client, it)
}
Expand Down
25 changes: 21 additions & 4 deletions library/src/main/java/org/xmtp/android/library/Group.kt
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package org.xmtp.android.library

import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.xmtp.android.library.codecs.ContentCodec
import org.xmtp.android.library.codecs.EncodedContent
import org.xmtp.android.library.codecs.compress
import org.xmtp.android.library.libxmtp.Message
import org.xmtp.android.library.messages.EnvelopeBuilder
import org.xmtp.android.library.messages.MessageBuilder
import org.xmtp.android.library.messages.MessageV2Builder
import org.xmtp.android.library.libxmtp.MessageEmitter
import uniffi.xmtpv3.FfiGroup
import uniffi.xmtpv3.FfiListMessagesOptions
import java.util.Date
Expand Down Expand Up @@ -69,10 +71,25 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) {
)
).map {
Message(client, it).decode()
}.drop(1).reversed()// The first message is something I can't decode because it's about adding members
}.drop(1)
.reversed()// The first message is something I can't decode because it's about adding members
}
}

fun streamMessages(): Flow<DecodedMessage> = flow {
val messageEmitter = MessageEmitter()

coroutineScope {
launch {
messageEmitter.messages.collect { message ->
emit(Message(client, message).decode())
}
}
}

libXMTPGroup.stream(messageEmitter.callback)
}

fun addMembers(addresses: List<String>) {
runBlocking { libXMTPGroup.addMembers(addresses) }
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.xmtp.android.library.libxmtp

import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import uniffi.xmtpv3.FfiMessage
import uniffi.xmtpv3.FfiMessageCallback

class MessageEmitter {
private val _messages = MutableSharedFlow<FfiMessage>()
val messages = _messages.asSharedFlow()

val callback: FfiMessageCallback = object : FfiMessageCallback {
override fun onMessage(message: FfiMessage) {
_messages.tryEmit(message)
}
}
}

0 comments on commit 29c9e07

Please sign in to comment.