diff --git a/example/src/main/java/org/xmtp/android/example/MainViewModel.kt b/example/src/main/java/org/xmtp/android/example/MainViewModel.kt index 882e3dbb3..47b6e01cb 100644 --- a/example/src/main/java/org/xmtp/android/example/MainViewModel.kt +++ b/example/src/main/java/org/xmtp/android/example/MainViewModel.kt @@ -77,7 +77,7 @@ class MainViewModel : ViewModel() { val stream: StateFlow = stateFlow(viewModelScope, null) { subscriptionCount -> if (ClientManager.clientState.value is ClientManager.ClientState.Ready) { - ClientManager.client.conversations.stream() + ClientManager.client.conversations.streamAll() .flowWhileShared( subscriptionCount, SharingStarted.WhileSubscribed(1000L) diff --git a/library/src/androidTest/java/org/xmtp/android/library/GroupTest.kt b/library/src/androidTest/java/org/xmtp/android/library/GroupTest.kt index 6a66565ee..45178da73 100644 --- a/library/src/androidTest/java/org/xmtp/android/library/GroupTest.kt +++ b/library/src/androidTest/java/org/xmtp/android/library/GroupTest.kt @@ -2,6 +2,8 @@ package org.xmtp.android.library import androidx.test.ext.junit.runners.AndroidJUnit4 import androidx.test.platform.app.InstrumentationRegistry +import app.cash.turbine.test +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.runBlocking import org.junit.Assert.assertEquals import org.junit.Assert.assertThrows @@ -197,4 +199,56 @@ class GroupTest { assertEquals(ReactionAction.Added, content?.action) assertEquals(ReactionSchema.Unicode, content?.schema) } + + @OptIn(ExperimentalCoroutinesApi::class) + @Test + fun testCanStreamGroupMessages() = kotlinx.coroutines.test.runTest { + val group = boClient.conversations.newGroup(listOf(alix.walletAddress.lowercase())) + + group.streamMessages().test { + group.send("hi") + assertEquals("hi", awaitItem().body) + group.send("hi again") + assertEquals("hi again", awaitItem().body) + } + } + + @OptIn(ExperimentalCoroutinesApi::class) + @Test + fun testCanStreamDecryptedGroupMessages() = kotlinx.coroutines.test.runTest { + val group = boClient.conversations.newGroup(listOf(alix.walletAddress)) + + group.streamDecryptedMessages().test { + group.send("hi") + assertEquals("hi", awaitItem().encodedContent.content.toStringUtf8()) + group.send("hi again") + assertEquals("hi again", awaitItem().encodedContent.content.toStringUtf8()) + } + } + + @OptIn(ExperimentalCoroutinesApi::class) + @Test + fun testCanStreamGroups() = kotlinx.coroutines.test.runTest { + boClient.conversations.streamGroups().test { + val group = + alixClient.conversations.newGroup(listOf(bo.walletAddress)) + assertEquals(group.id.toHex(), awaitItem().topic) + val group2 = + caroClient.conversations.newGroup(listOf(bo.walletAddress)) + assertEquals(group2.id.toHex(), awaitItem().topic) + } + } + + @OptIn(ExperimentalCoroutinesApi::class) + @Test + fun testCanStreamGroupsAndConversations() = kotlinx.coroutines.test.runTest { + boClient.conversations.streamAll().test { + val group = + caroClient.conversations.newGroup(listOf(bo.walletAddress)) + assertEquals(group.id.toHex(), awaitItem().topic) + val conversation = + boClient.conversations.newConversation(alix.walletAddress) + assertEquals(conversation.topic, awaitItem().topic) + } + } } diff --git a/library/src/main/java/org/xmtp/android/library/Conversation.kt b/library/src/main/java/org/xmtp/android/library/Conversation.kt index 08d8fe8af..534e42a0f 100644 --- a/library/src/main/java/org/xmtp/android/library/Conversation.kt +++ b/library/src/main/java/org/xmtp/android/library/Conversation.kt @@ -318,7 +318,7 @@ sealed class Conversation { return when (this) { is V1 -> conversationV1.streamMessages() is V2 -> conversationV2.streamMessages() - is Group -> throw XMTPException("Coming follow up PR") + is Group -> group.streamMessages() } } @@ -326,7 +326,7 @@ sealed class Conversation { return when (this) { is V1 -> conversationV1.streamDecryptedMessages() is V2 -> conversationV2.streamDecryptedMessages() - is Group -> throw XMTPException("Coming follow up PR") + is Group -> group.streamDecryptedMessages() } } diff --git a/library/src/main/java/org/xmtp/android/library/Conversations.kt b/library/src/main/java/org/xmtp/android/library/Conversations.kt index 9bd572c89..1c146ac8c 100644 --- a/library/src/main/java/org/xmtp/android/library/Conversations.kt +++ b/library/src/main/java/org/xmtp/android/library/Conversations.kt @@ -3,9 +3,12 @@ package org.xmtp.android.library import android.util.Log import io.grpc.StatusException import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.channels.awaitClose import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.callbackFlow import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.merge import kotlinx.coroutines.runBlocking import org.xmtp.android.library.GRPCApiClient.Companion.makeQueryRequest import org.xmtp.android.library.GRPCApiClient.Companion.makeSubscribeRequest @@ -32,7 +35,9 @@ import org.xmtp.android.library.messages.walletAddress import org.xmtp.proto.keystore.api.v1.Keystore.TopicMap.TopicData import org.xmtp.proto.message.contents.Contact import org.xmtp.proto.message.contents.Invitation +import uniffi.xmtpv3.FfiConversationCallback import uniffi.xmtpv3.FfiConversations +import uniffi.xmtpv3.FfiGroup import uniffi.xmtpv3.FfiListConversationsOptions import java.util.Date import kotlin.time.Duration.Companion.nanoseconds @@ -477,7 +482,6 @@ data class Conversations( client.subscribeTopic( listOf(Topic.userIntro(client.address), Topic.userInvite(client.address)), ).collect { envelope -> - if (envelope.contentTopic == Topic.userIntro(client.address).description) { val conversationV1 = fromIntro(envelope = envelope) if (!streamedConversationTopics.contains(conversationV1.topic)) { @@ -496,6 +500,21 @@ data class Conversations( } } + fun streamAll(): Flow { + return merge(streamGroups(), stream()) + } + + fun streamGroups(): Flow = callbackFlow { + val groupCallback = object : FfiConversationCallback { + override fun onConversation(conversation: FfiGroup) { + trySend(Conversation.Group(Group(client, conversation))) + } + } + val stream = libXMTPConversations?.stream(groupCallback) + ?: throw XMTPException("Client does not support Groups") + awaitClose { stream.end() } + } + /** * Get the stream of all messages of the current [Client] * @return Flow object of [DecodedMessage] that represents all the messages of the diff --git a/library/src/main/java/org/xmtp/android/library/Group.kt b/library/src/main/java/org/xmtp/android/library/Group.kt index e7671af11..3c9b5a9a5 100644 --- a/library/src/main/java/org/xmtp/android/library/Group.kt +++ b/library/src/main/java/org/xmtp/android/library/Group.kt @@ -1,5 +1,8 @@ package org.xmtp.android.library +import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.callbackFlow import kotlinx.coroutines.runBlocking import org.xmtp.android.library.codecs.ContentCodec import org.xmtp.android.library.codecs.EncodedContent @@ -10,6 +13,8 @@ import org.xmtp.android.library.messages.PagingInfoSortDirection import org.xmtp.proto.message.api.v1.MessageApiOuterClass import uniffi.xmtpv3.FfiGroup import uniffi.xmtpv3.FfiListMessagesOptions +import uniffi.xmtpv3.FfiMessage +import uniffi.xmtpv3.FfiMessageCallback import java.util.Date import kotlin.time.Duration.Companion.nanoseconds import kotlin.time.DurationUnit @@ -34,7 +39,7 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) { runBlocking { libXMTPGroup.send(contentBytes = encodedContent.toByteArray()) } - return id.toString() + return id.toHex() } fun prepareMessage(content: T, options: SendOptions?): EncodedContent { @@ -136,4 +141,26 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) { libXMTPGroup.listMembers().map { it.accountAddress } } } + + fun streamMessages(): Flow = callbackFlow { + val messageCallback = object : FfiMessageCallback { + override fun onMessage(message: FfiMessage) { + trySend(Message(client, message).decode()) + } + } + + val stream = libXMTPGroup.stream(messageCallback) + awaitClose { stream.end() } + } + + fun streamDecryptedMessages(): Flow = callbackFlow { + val messageCallback = object : FfiMessageCallback { + override fun onMessage(message: FfiMessage) { + trySend(decrypt(Message(client, message))) + } + } + + val stream = libXMTPGroup.stream(messageCallback) + awaitClose { stream.end() } + } }