From 29c9e0765f0740e06e2d8f5d93419dd96105c23c Mon Sep 17 00:00:00 2001 From: Naomi Plasterer Date: Fri, 26 Jan 2024 07:40:22 -0800 Subject: [PATCH] add streaming messages to groups --- .../org/xmtp/android/library/GroupTest.kt | 6 ++++- .../org/xmtp/android/library/Conversation.kt | 2 +- .../org/xmtp/android/library/Conversations.kt | 1 + .../java/org/xmtp/android/library/Group.kt | 25 ++++++++++++++++--- .../android/library/libxmtp/MessageEmitter.kt | 17 +++++++++++++ 5 files changed, 45 insertions(+), 6 deletions(-) create mode 100644 library/src/main/java/org/xmtp/android/library/libxmtp/MessageEmitter.kt diff --git a/library/src/androidTest/java/org/xmtp/android/library/GroupTest.kt b/library/src/androidTest/java/org/xmtp/android/library/GroupTest.kt index 56bb764ac..b9bbb6ed1 100644 --- a/library/src/androidTest/java/org/xmtp/android/library/GroupTest.kt +++ b/library/src/androidTest/java/org/xmtp/android/library/GroupTest.kt @@ -88,7 +88,11 @@ class GroupTest { group.send("howdy") group.send("gm") runBlocking { group.sync() } - assertEquals(group.messages().last().body, "gm") + assertEquals(group.messages().last().body, "howdy") assertEquals(group.messages().size, 2) + + val sameGroup = alixClient.conversations.listGroups().last() + assertEquals(sameGroup.messages().size, 2) + assertEquals(sameGroup.messages().last().body, "howdy") } } \ No newline at end of file diff --git a/library/src/main/java/org/xmtp/android/library/Conversation.kt b/library/src/main/java/org/xmtp/android/library/Conversation.kt index c70d2493a..e678c188a 100644 --- a/library/src/main/java/org/xmtp/android/library/Conversation.kt +++ b/library/src/main/java/org/xmtp/android/library/Conversation.kt @@ -290,7 +290,7 @@ sealed class Conversation { return when (this) { is V1 -> conversationV1.streamMessages() is V2 -> conversationV2.streamMessages() - is Group -> flowOf(group.messages().last()) // TODO fix this + is Group -> group.streamMessages() } } diff --git a/library/src/main/java/org/xmtp/android/library/Conversations.kt b/library/src/main/java/org/xmtp/android/library/Conversations.kt index caeafcfea..013adc638 100644 --- a/library/src/main/java/org/xmtp/android/library/Conversations.kt +++ b/library/src/main/java/org/xmtp/android/library/Conversations.kt @@ -98,6 +98,7 @@ data class Conversations( fun listGroups(): List { return runBlocking { + syncGroups() libXMTPConversations?.list(opts = FfiListConversationsOptions(null, null, null))?.map { Group(client, it) } diff --git a/library/src/main/java/org/xmtp/android/library/Group.kt b/library/src/main/java/org/xmtp/android/library/Group.kt index f20ebc1ba..fbcf95b76 100644 --- a/library/src/main/java/org/xmtp/android/library/Group.kt +++ b/library/src/main/java/org/xmtp/android/library/Group.kt @@ -1,13 +1,15 @@ package org.xmtp.android.library +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.xmtp.android.library.codecs.ContentCodec import org.xmtp.android.library.codecs.EncodedContent import org.xmtp.android.library.codecs.compress import org.xmtp.android.library.libxmtp.Message -import org.xmtp.android.library.messages.EnvelopeBuilder -import org.xmtp.android.library.messages.MessageBuilder -import org.xmtp.android.library.messages.MessageV2Builder +import org.xmtp.android.library.libxmtp.MessageEmitter import uniffi.xmtpv3.FfiGroup import uniffi.xmtpv3.FfiListMessagesOptions import java.util.Date @@ -69,10 +71,25 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) { ) ).map { Message(client, it).decode() - }.drop(1).reversed()// The first message is something I can't decode because it's about adding members + }.drop(1) + .reversed()// The first message is something I can't decode because it's about adding members } } + fun streamMessages(): Flow = flow { + val messageEmitter = MessageEmitter() + + coroutineScope { + launch { + messageEmitter.messages.collect { message -> + emit(Message(client, message).decode()) + } + } + } + + libXMTPGroup.stream(messageEmitter.callback) + } + fun addMembers(addresses: List) { runBlocking { libXMTPGroup.addMembers(addresses) } } diff --git a/library/src/main/java/org/xmtp/android/library/libxmtp/MessageEmitter.kt b/library/src/main/java/org/xmtp/android/library/libxmtp/MessageEmitter.kt new file mode 100644 index 000000000..17a65b9f4 --- /dev/null +++ b/library/src/main/java/org/xmtp/android/library/libxmtp/MessageEmitter.kt @@ -0,0 +1,17 @@ +package org.xmtp.android.library.libxmtp + +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.asSharedFlow +import uniffi.xmtpv3.FfiMessage +import uniffi.xmtpv3.FfiMessageCallback + +class MessageEmitter { + private val _messages = MutableSharedFlow() + val messages = _messages.asSharedFlow() + + val callback: FfiMessageCallback = object : FfiMessageCallback { + override fun onMessage(message: FfiMessage) { + _messages.tryEmit(message) + } + } +} \ No newline at end of file