Skip to content

Commit

Permalink
Revert "remove all the streams work and move to another PR"
Browse files Browse the repository at this point in the history
This reverts commit 76bfcc7.
  • Loading branch information
nplasterer committed Jan 30, 2024
1 parent 76bfcc7 commit cbfdce1
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 5 deletions.
37 changes: 37 additions & 0 deletions library/src/androidTest/java/org/xmtp/android/library/GroupTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package org.xmtp.android.library

import androidx.test.ext.junit.runners.AndroidJUnit4
import androidx.test.platform.app.InstrumentationRegistry
import app.cash.turbine.test
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.runBlocking
import org.junit.Assert.assertEquals
import org.junit.Before
Expand All @@ -16,6 +18,7 @@ import org.xmtp.android.library.messages.PrivateKey
import org.xmtp.android.library.messages.PrivateKeyBuilder
import org.xmtp.android.library.messages.walletAddress

@OptIn(ExperimentalCoroutinesApi::class)
@RunWith(AndroidJUnit4::class)
class GroupTest {
lateinit var fakeApiClient: FakeApiClient
Expand Down Expand Up @@ -174,4 +177,38 @@ class GroupTest {
assertEquals(ReactionAction.Added, content?.action)
assertEquals(ReactionSchema.Unicode, content?.schema)
}

@Test
fun testCanStreamGroupMessages() = kotlinx.coroutines.test.runTest {
val group = boClient.conversations.newGroup(listOf(alix.walletAddress.lowercase()))

group.streamMessages().test {
group.send("hi")
assertEquals("hi", awaitItem().body)
awaitComplete()
}
}

@Test
fun testCanStreamGroups() = kotlinx.coroutines.test.runTest {
boClient.conversations.streamGroups().test {
val conversation =
boClient.conversations.newGroup(listOf(alix.walletAddress.lowercase()))
conversation.send(content = "hi")
assertEquals("hi", awaitItem().messages().first().body)
awaitComplete()
}
}

@Test
fun testCanStreamGroupsAndConversations() = kotlinx.coroutines.test.runTest {
boClient.conversations.stream(includeGroups = true).test {
val group =
boClient.conversations.newGroup(listOf(alix.walletAddress.lowercase()))
val conversation =
boClient.conversations.newConversation(alix.walletAddress.lowercase())
assertEquals("hi", awaitItem().messages().first().body)
awaitComplete()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -318,15 +318,15 @@ sealed class Conversation {
return when (this) {
is V1 -> conversationV1.streamMessages()
is V2 -> conversationV2.streamMessages()
is Group -> throw XMTPException("Coming follow up PR")
is Group -> group.streamMessages()
}
}

fun streamDecryptedMessages(): Flow<DecryptedMessage> {
return when (this) {
is V1 -> conversationV1.streamDecryptedMessages()
is V2 -> conversationV2.streamDecryptedMessages()
is Group -> throw XMTPException("Coming follow up PR")
is Group -> group.streamDecryptedMessages()
}
}

Expand Down
37 changes: 34 additions & 3 deletions library/src/main/java/org/xmtp/android/library/Conversations.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package org.xmtp.android.library
import android.util.Log
import io.grpc.StatusException
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.xmtp.android.library.GRPCApiClient.Companion.makeQueryRequest
import org.xmtp.android.library.GRPCApiClient.Companion.makeSubscribeRequest
Expand Down Expand Up @@ -34,6 +36,7 @@ import org.xmtp.proto.message.contents.Contact
import org.xmtp.proto.message.contents.Invitation
import uniffi.xmtpv3.FfiConversations
import uniffi.xmtpv3.FfiListConversationsOptions
import uniffi.xmtpv3.org.xmtp.android.library.libxmtp.GroupEmitter
import java.util.Date
import kotlin.time.Duration.Companion.nanoseconds
import kotlin.time.DurationUnit
Expand Down Expand Up @@ -90,8 +93,8 @@ data class Conversations(
if (accountAddresses.isEmpty()) {
throw XMTPException("Cannot start an empty group chat.")
}
if (accountAddresses.size == 1 &&
accountAddresses.first().lowercase() == client.address.lowercase()
if (accountAddresses.size == 1 && accountAddresses.first()
.lowercase() == client.address.lowercase()
) {
throw XMTPException("Recipient is sender")
}
Expand Down Expand Up @@ -473,7 +476,21 @@ data class Conversations(
* of the information of those conversations according to the topics
* @return Stream of data information for the conversations
*/
fun stream(): Flow<Conversation> = flow {
fun stream(includeGroups: Boolean = false): Flow<Conversation> = flow {
if (includeGroups) {
val groupEmitter = GroupEmitter()

coroutineScope {
launch {
groupEmitter.groups.collect { group ->
emit(Conversation.Group(Group(client, group)))
}
}
}

libXMTPConversations?.stream(groupEmitter.callback)
}

val streamedConversationTopics: MutableSet<String> = mutableSetOf()
client.subscribeTopic(
listOf(Topic.userIntro(client.address), Topic.userInvite(client.address)),
Expand All @@ -497,6 +514,20 @@ data class Conversations(
}
}

fun streamGroups(): Flow<Group> = flow {
val groupEmitter = GroupEmitter()

coroutineScope {
launch {
groupEmitter.groups.collect { group ->
emit(Group(client, group))
}
}
}

libXMTPConversations?.stream(groupEmitter.callback)
}

/**
* Get the stream of all messages of the current [Client]
* @return Flow object of [DecodedMessage] that represents all the messages of the
Expand Down
33 changes: 33 additions & 0 deletions library/src/main/java/org/xmtp/android/library/Group.kt
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package org.xmtp.android.library

import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.xmtp.android.library.codecs.ContentCodec
import org.xmtp.android.library.codecs.EncodedContent
import org.xmtp.android.library.codecs.compress
import org.xmtp.android.library.libxmtp.Message
import org.xmtp.android.library.libxmtp.MessageEmitter
import org.xmtp.android.library.messages.DecryptedMessage
import org.xmtp.android.library.messages.PagingInfoSortDirection
import org.xmtp.proto.message.api.v1.MessageApiOuterClass
Expand Down Expand Up @@ -123,6 +128,34 @@ class Group(val client: Client, private val libXMTPGroup: FfiGroup) {
)
}

fun streamMessages(): Flow<DecodedMessage> = flow {
val messageEmitter = MessageEmitter()

coroutineScope {
launch {
messageEmitter.messages.collect { message ->
emit(Message(client, message).decode())
}
}
}

libXMTPGroup.stream(messageEmitter.callback)
}

fun streamDecryptedMessages(): Flow<DecryptedMessage> = flow {
val messageEmitter = MessageEmitter()

coroutineScope {
launch {
messageEmitter.messages.collect { message ->
emit(decrypt(Message(client, message)))
}
}
}

libXMTPGroup.stream(messageEmitter.callback)
}

fun addMembers(addresses: List<String>) {
runBlocking { libXMTPGroup.addMembers(addresses) }
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package uniffi.xmtpv3.org.xmtp.android.library.libxmtp

import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import uniffi.xmtpv3.FfiConversationCallback
import uniffi.xmtpv3.FfiGroup

class GroupEmitter {
private val _groups = MutableSharedFlow<FfiGroup>()
val groups = _groups.asSharedFlow()

val callback: FfiConversationCallback = object : FfiConversationCallback {
override fun onConversation(conversation: FfiGroup) {
_groups.tryEmit(conversation)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.xmtp.android.library.libxmtp

import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import uniffi.xmtpv3.FfiMessage
import uniffi.xmtpv3.FfiMessageCallback

class MessageEmitter {
private val _messages = MutableSharedFlow<FfiMessage>()
val messages = _messages.asSharedFlow()

val callback: FfiMessageCallback = object : FfiMessageCallback {
override fun onMessage(message: FfiMessage) {
_messages.tryEmit(message)
}
}
}

0 comments on commit cbfdce1

Please sign in to comment.